"""
Core XML Parsing of RSS documents. This is generally focused on RSS 2.0
"""
import requests
from xml.etree import ElementTree
from pprint import pprint
import csv
from pathlib import Path
from urllib.parse import urlparse
import datetime
from typing import NamedTuple, List
[docs]class ExpandedRSS(NamedTuple):
"""
Data expanded by the :func:`title_transform()` function.
Note that the names of the fields in this class will be the column
titles on saved CSV files. Any change here will be reflected in the files
created.
"""
title: str #: The title
link: str #: The link
description: str #: The description
pubDate: str #: The publication date
docket: str #: The parsed docket from the title
parties_title: str #: The parsed parties from the title
[docs]def xml_reader(url: str) -> List[SourceRSS]:
"""
Extract RSS data given a URL to read.
The root document is the ``<rss>`` tag which has a single ``<channel>`` tag.
The ``<channel>`` has some overall attributes, but contains a sequence
of ``<item>`` tags.
This will gather "title", "link", "description", and "pubDate" from each item
and build a :class:`SourceRSS` object.
It might be helpful to return the overall channel properties along with
the list of items.
:param url: URL to read.
:return: All of the SourceRSS from the channel of the feed, List[SourceRSS].
"""
items = []
response = requests.get(url)
rss = ElementTree.fromstring(response.content)
channel = rss.find('channel')
# Dump the overall channel properties
print("title", channel.findtext('title'))
print("link", channel.findtext('link'))
print("description", channel.findtext('description'))
print("last build date", channel.findtext('lastBuildDate'))
for item in channel.iter('item'):
item_row = SourceRSS(
title=item.findtext('title'),
link=item.findtext('link'),
description=item.findtext('description'),
pubDate=item.findtext('pubDate'),
)
items.append(item_row)
return items
[docs]def csv_dump(data: List[ExpandedRSS], output_path: Path) -> None:
"""
Save expanded data to a file, given the Path.
Note that the headers are the field names from the ExpandedRSS class definition.
This assures us that all fields will be written properly.
:param data: List of :class:`ExpandedRSS` items, built by :func:`title_transform`.
:param output_path: Path to which to write the file.
"""
with output_path.open('w', newline='') as output_file :
headings = list(ExpandedRSS._fields)
writer = csv.DictWriter(output_file, headings)
writer.writeheader()
for row in data:
writer.writerow(row._asdict())
[docs]def csv_load(input_path: Path) -> List[ExpandedRSS]:
"""
Recover expanded data from a file, given a Path.
Note that the headers **must be** the field names from the ExpandedRSS class definition.
If their isn't a trivial match, then this won't read properly.
:param input_path: Path from which to read the file.
:returns: List of ExpandedRSS objects used to compare previous day's feed
with today's feed.
"""
data = []
with input_path.open() as input_file:
reader = csv.DictReader(input_file)
for row in reader:
expanded_rss = ExpandedRSS(**row)
data.append(expanded_rss)
return data
[docs]def path_maker(url: str, now: datetime.datetime=None, format: str="%Y%m%d") -> Path:
"""
Builds a Path from today's date and the base name of the URL.
The default format is "%Y%m%d" to transform the date to a YYYYmmdd string.
An alternative can be ""%Y%W%w" to create a YYYYWWw string,
where WW is the week of the year and w is the day of the week.
>>> from rss_status import path_maker
>>> import datetime
>>> now = datetime.datetime(2018, 9, 10)
>>> str(path_maker("https://ecf.dcd.uscourts.gov/cgi-bin/rss_outside.pl", now))
'20180910/rss_outside'
:param url: An RSS-feed URL.
:param now: Optional date/time object. Defaults to datetime.datetime.now().
:return: A Path with the date string / base name from the URL.
"""
url_details = urlparse(url)
base_name = Path(url_details.path).stem
if not now:
now = datetime.datetime.now()
today_str = now.strftime(format)
return Path(today_str) / base_name
[docs]def find_yesterday(directory: Path, url: str, date_pattern: str='[0-9]*') -> Path:
"""
We need to search for the most recent previous entry. While we can hope for
dependably running this every day, that's a difficult thing to guarantee.
It's much more reliable to look for the most recent date which
contains files for a given channel. This means
Example. Here's two dates. One date has one channel, the other has two channels.
::
20180630/one_channel/daily.csv
20180630/one_channel/new.csv
20180630/one_channel/save.csv
20180701/one_channel/daily.csv
20180701/one_channel/new.csv
20180701/one_channel/save.csv
20180701/another_channel/daily.csv
20180701/another_channel/new.csv
20180701/another_channel/save.csv
If there's nothing available, returns None.
:param directory: The base directory to search
:param url: The full URL from which we can get the base name
:param date_pattern: Most of the time, the interesting filenames will begin with a digit
If the file name pattern is changed, however, this can be used to match dates,
and exclude non-date files that might be confusing.
:return: A Path with the date string / base name from the URL or None.
"""
url_details = urlparse(url)
base_name = Path(url_details.path).stem
candidates = list(directory.glob(f"{date_pattern}/{base_name}"))
if candidates:
return max(candidates)
[docs]def channel_processing(url: str, directory: Path=None, date: datetime.datetime=None):
"""
The daily process for a given channel.
Ideally there's a "yesterday" directory. Pragmatically, this may not exist.
We use :func:`find_yesterday` to track down the most recent file and
work with that. If there's no recent file, this is all new. Welcome.
:param url: The URL for the channel
:param directory: The working directory, default is the current working directory.
:param date: The date to assign to the files, by default, it's datetime.datetime.now.
"""
if directory is None:
directory = Path.cwd()
if date is None:
date = datetime.datetime.now()
yesterdays_path = find_yesterday(directory, url)
todays_path = path_maker(url)
todays_data = title_transform(xml_reader(url))
if yesterdays_path:
saved_data = csv_load(yesterdays_path / "save.csv")
else:
saved_data = []
new_data = set(todays_data) - set(saved_data)
all_data = set(todays_data) | set(saved_data)
todays_path.mkdir(parents=True, exist_ok=True)
csv_dump(todays_data, todays_path / "daily.csv")
csv_dump(new_data, todays_path / "new.csv")
csv_dump(all_data, todays_path / "save.csv")
def demo():
"""
This is downloads, enriches, and saves the daily files to the current
working directory.
"""
target_path = Path.cwd()
data1 = xml_reader("https://ecf.dcd.uscourts.gov/cgi-bin/rss_outside.pl")
data1_decomposed = title_transform(data1)
# pprint(data1_decomposed)
csv_dump(data1_decomposed, target_path / "file1.csv")
data2 = xml_reader("https://ecf.nyed.uscourts.gov/cgi-bin/readyDockets.pl")
data2_decomposed = title_transform(data2)
# pprint(data2_decomposed)
csv_dump(data2_decomposed, target_path / "file2.csv")
recovered = csv_load(target_path / "file2.csv")
assert set(recovered) == set(data2_decomposed), "Weird, recovering the file was a problem."
if __name__ == "__main__":
# demo()
for channel_url in (
"https://ecf.dcd.uscourts.gov/cgi-bin/rss_outside.pl",
"https://ecf.nyed.uscourts.gov/cgi-bin/readyDockets.pl",
# More channels here.
):
channel_processing(channel_url)