diff --git a/pkgs/by-name/feeds/package.nix b/pkgs/by-name/feeds/package.nix index 6dd48c658..148d54bca 100644 --- a/pkgs/by-name/feeds/package.nix +++ b/pkgs/by-name/feeds/package.nix @@ -24,6 +24,6 @@ lib.recurseIntoAttrs (lib.makeScope newScope (self: with self; { update-feed = static-nix-shell.mkPython3 { pname = "update-feed"; srcRoot = ./.; - pkgs = [ "feedsearch-crawler" ]; + pkgs = [ "feedsearch-crawler" "podcastindex-db" ]; }; })) diff --git a/pkgs/by-name/feeds/update-feed b/pkgs/by-name/feeds/update-feed index a3734a1f1..fac2f423e 100755 --- a/pkgs/by-name/feeds/update-feed +++ b/pkgs/by-name/feeds/update-feed @@ -2,15 +2,31 @@ #!nix-shell -i python3 -p feedsearch-crawler -p podcastindex-db -p python3 from dataclasses import dataclass -from typing import Any, Iterator +from typing import Any, IO, Iterator import argparse +import csv import feedsearch_crawler as fsc import json import logging +import os logger = logging.getLogger(__name__) +def logitems(context: str): + """ + decorator which wraps any function that returns an iterator. + the wrapper function yields each item of the wrapped function, + but logs each item as it does so. + """ + def wrapper(f): + def wrapped(*args, **kwargs): + for item in f(*args, **kwargs): + logger.info(f"{context}: {item}") + yield item + return wrapped + return wrapper + @dataclass(order=True) class Feed: # content_type @@ -22,8 +38,8 @@ class Feed: # is_push # last_updated # self_url - site_name: str | None = None # not used - site_url: str | None = None # not used + # site_name + # site_url title: str | None = None # used by (and others) url: str | None = None # used by (and many others) velocity: float | None = None # used by @@ -31,6 +47,7 @@ class Feed: def __post_init__(self) -> None: def clean(value: str | None) -> str | None: + # TODO: clean HTML formatting like `
...
` replacements = { "\u2013": "-", "\u2019": "'", @@ -45,7 +62,6 @@ class Feed: # clean up characters for better printability self.title = clean(self.title) - self.site_name = clean(self.site_name) self.description = clean(self.description) def from_dict(d: dict[str, Any]) -> 'Self': @@ -57,8 +73,6 @@ class Feed: return Feed( description = d.get("description"), is_podcast = d.get("is_podcast"), - site_name = d.get("site_name"), - site_url = d.get("site_url"), title = d.get("title"), url = d.get("url"), velocity = d.get("velocity"), @@ -68,8 +82,6 @@ class Feed: return dict( description=self.description, is_podcast=self.is_podcast, - site_name=self.site_name, - site_url=self.site_url, title=self.title, url=self.url, velocity=self.velocity, @@ -78,18 +90,79 @@ class Feed: def to_json(self) -> str: return json.dumps(self.to_dict(), sort_keys=True, indent=2) +class PodcastIndex: + def __init__(self): + self.db = PodcastIndex.locate_db() + + @staticmethod + def locate_db() -> IO[str] | None: + for d in os.environ.get("XDG_DATA_DIRS", "").split(":"): + try: + return open(os.path.join(d, "podcastindex", "podcastindex_feeds.csv"), "r") + except FileNotFoundError: + pass + return None + + def search(self, query: str) -> Feed | None: + for r in csv.DictReader(self.db): + if query in r['link']: + yield Feed( + description=r.get("description"), + is_podcast=True, + title=r.get("title"), + url=r.get("url"), + # TODO: it's unclear what this field means + velocity=float(r.get("updateFrequency") or "0"), + # all fields: + # - category1 ... category10 + # - chash + # - contentType + # - createdOn + # - dead + # - description + # - episodeCount + # - explicit + # - generator + # - host + # - id + # - imageUrl + # - itunesAuthor + # - itunesId + # - itunesOwnerName + # - itunesType + # - language + # - lastHttpStatus + # - lastUpdate + # - link + # - newestEnclosureDuration + # - newestEnclosureUrl + # - newestItemPubdate + # - oldestItemPubdate + # - originalUrl + # - podcastGuid + # - popularityScore + # - priority + # - title + # - updateFrequency + # - url + ) + +PODCAST_INDEX = PodcastIndex() + class Locator: - def __init__(self, url: str, prefer_podcast: bool) -> None: - self.url = url + def __init__(self, url: str, prefer_podcast: bool, eager: bool) -> None: + self.url = url #< schemeless and www.-less self.prefer_podcast = prefer_podcast + self.eager = eager self.feeds = [] - def locate_best(self, feeds: list[Feed] | None=None) -> Feed | None: - if feeds is None: - feeds = list(self.locate_all()) + def is_match(self, feed: Feed) -> bool: + return feed.is_podcast == self.prefer_podcast \ + and (feed.url and self.url in feed.url) + def choose_best(self, feeds: list[Feed]) -> Feed | None: feeds = sorted(feeds, key=lambda f: ( - (f.is_podcast == self.prefer_podcast), + self.is_match(f), -f.velocity if f.velocity is not None else 0, #< prefer higher-velocity sources f.url, #< prefer shorter URLs len(f.title) if f.title is not None else 1000, #< prefer shorter titles @@ -97,28 +170,62 @@ class Locator: )) return feeds[0] if len(feeds) else None - def locate_all(self) -> Iterator[Feed]: - uris = [ - f"https://{self.url}", - f"http://{self.url}", - f"https://www.{self.url}", - f"http://www.{self.url}", - ] - seen = [] - for uri in uris: - for feed in self.locate_feedsearch(uri): - if feed not in seen: - seen.append(feed) - yield feed + def locate_best(self) -> Feed | None: + feeds = [] + def eager_match(): + if not self.eager: + return None + matches = [f for f in feeds if self.is_match(f)] + if len(matches) == 1: + logger.info(f"eagerly matched {matches[0]}") + return matches[0] - def locate_feedsearch(self, uri: str) -> Iterator[Feed]: + sources = [ + self.locate_podcastindex, + lambda: self.locate_feedsearch_once(f"https://{self.url}"), + lambda: self.locate_feedsearch_once(f"http://{self.url}"), + lambda: self.locate_feedsearch_once(f"https://www.{self.url}"), + lambda: self.locate_feedsearch_once(f"http://www.{self.url}"), + ] + + for f in sources: + feeds.extend(f()) + if eager_match(): + return eager_match() + + logger.info("no eager match: choosing from all candidate feeds") + return self.choose_best(feeds) + + + @logitems("discovered feed via feedsearch") + def locate_feedsearch_once(self, uri: str) -> Iterator[Feed]: scheme, _separator, url = uri.partition("://") assert scheme and url, f"failed to partition ${uri!r}" url = fsc.crawler.coerce_url(url, default_scheme=scheme) print(f"trying {url}") feeds = fsc.search(url, total_timeout=180, request_timeout=90, max_content_length=100*1024*1024) + # all fields: + # - content_type + # - description + # - favicon + # - favicon_data_uri # embedded favicon + # - hubs # PubSub hubs + # - is_podcast + # - is_push + # - last_updated + # - self_url + # - site_name + # - site_url + # - title + # - url + # - velocity + # - version return (Feed.from_dict(i.serialize()) for i in fsc.sort_urls(feeds)) + @logitems("discovered feed via podcastindex") + def locate_podcastindex(self) -> Iterator[Feed]: + yield from PODCAST_INDEX.search(self.url) + def try_load_existing_feed(path_: str) -> Feed: try: f = open(path_, "r") @@ -139,30 +246,33 @@ def select_feed(feeds: list[dict], prefer_podcast: bool) -> dict: def main(): logging.basicConfig() - logging.getLogger().setLevel(logging.DEBUG) + logging.getLogger().setLevel(logging.INFO) logger.debug("logging enabled") parser = argparse.ArgumentParser(usage=__doc__) parser.add_argument("url", help="where to start searching for a feed") parser.add_argument("output", help="where to save extracted feed data (should end in .json)") - parser.add_argument('--podcast', help="if multiple feeds are found, prefer the podcast feed over any text/image feed", action='store_true') + parser.add_argument("--podcast", help="if multiple feeds are found, prefer the podcast feed over any text/image feed", action="store_true") + parser.add_argument("--verbose", help="show more info about feed discovery", action="store_true") + parser.add_argument("--exhaustive", help="disable eager matching and force an exhaustive search", action="store_true") args = parser.parse_args() - url, json_path = args.url, args.output + url = args.url + json_path = args.output + eager = not args.exhaustive + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) existing_data = try_load_existing_feed(json_path) prefer_podcast = args.podcast or (existing_data.is_podcast or False) - locator = Locator(url, prefer_podcast=prefer_podcast) - - all_feeds = list(locator.locate_all()) - for feed in all_feeds: - print(feed.to_json()) + locator = Locator(url, prefer_podcast=prefer_podcast, eager=eager) # save the best feed to disk - keep = locator.locate_best(all_feeds) + feed = locator.locate_best() with open(json_path, "w") as out: - out.write(keep.to_json()) + out.write(feed.to_json()) if __name__ == '__main__': main()