feeds/update-feed: add podcastindex support
also drop unused `site-name`, `site-url` from the saved feeds. podcastindex doesn't provide exactly those, and they've shown to be rather useless so far. TODO: remove these entries from existing feeds.
This commit is contained in:
@@ -24,6 +24,6 @@ lib.recurseIntoAttrs (lib.makeScope newScope (self: with self; {
|
||||
update-feed = static-nix-shell.mkPython3 {
|
||||
pname = "update-feed";
|
||||
srcRoot = ./.;
|
||||
pkgs = [ "feedsearch-crawler" ];
|
||||
pkgs = [ "feedsearch-crawler" "podcastindex-db" ];
|
||||
};
|
||||
}))
|
||||
|
@@ -2,15 +2,31 @@
|
||||
#!nix-shell -i python3 -p feedsearch-crawler -p podcastindex-db -p python3
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Iterator
|
||||
from typing import Any, IO, Iterator
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import feedsearch_crawler as fsc
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def logitems(context: str):
|
||||
"""
|
||||
decorator which wraps any function that returns an iterator.
|
||||
the wrapper function yields each item of the wrapped function,
|
||||
but logs each item as it does so.
|
||||
"""
|
||||
def wrapper(f):
|
||||
def wrapped(*args, **kwargs):
|
||||
for item in f(*args, **kwargs):
|
||||
logger.info(f"{context}: {item}")
|
||||
yield item
|
||||
return wrapped
|
||||
return wrapper
|
||||
|
||||
@dataclass(order=True)
|
||||
class Feed:
|
||||
# content_type
|
||||
@@ -22,8 +38,8 @@ class Feed:
|
||||
# is_push
|
||||
# last_updated
|
||||
# self_url
|
||||
site_name: str | None = None # not used
|
||||
site_url: str | None = None # not used
|
||||
# site_name
|
||||
# site_url
|
||||
title: str | None = None # used by <hosts/common/feeds.nix> (and others)
|
||||
url: str | None = None # used by <hosts/common/feeds.nix> (and many others)
|
||||
velocity: float | None = None # used by <hosts/common/feeds.nix>
|
||||
@@ -31,6 +47,7 @@ class Feed:
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
def clean(value: str | None) -> str | None:
|
||||
# TODO: clean HTML formatting like `<div> ... </div>`
|
||||
replacements = {
|
||||
"\u2013": "-",
|
||||
"\u2019": "'",
|
||||
@@ -45,7 +62,6 @@ class Feed:
|
||||
|
||||
# clean up characters for better printability
|
||||
self.title = clean(self.title)
|
||||
self.site_name = clean(self.site_name)
|
||||
self.description = clean(self.description)
|
||||
|
||||
def from_dict(d: dict[str, Any]) -> 'Self':
|
||||
@@ -57,8 +73,6 @@ class Feed:
|
||||
return Feed(
|
||||
description = d.get("description"),
|
||||
is_podcast = d.get("is_podcast"),
|
||||
site_name = d.get("site_name"),
|
||||
site_url = d.get("site_url"),
|
||||
title = d.get("title"),
|
||||
url = d.get("url"),
|
||||
velocity = d.get("velocity"),
|
||||
@@ -68,8 +82,6 @@ class Feed:
|
||||
return dict(
|
||||
description=self.description,
|
||||
is_podcast=self.is_podcast,
|
||||
site_name=self.site_name,
|
||||
site_url=self.site_url,
|
||||
title=self.title,
|
||||
url=self.url,
|
||||
velocity=self.velocity,
|
||||
@@ -78,18 +90,79 @@ class Feed:
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self.to_dict(), sort_keys=True, indent=2)
|
||||
|
||||
class PodcastIndex:
|
||||
def __init__(self):
|
||||
self.db = PodcastIndex.locate_db()
|
||||
|
||||
@staticmethod
|
||||
def locate_db() -> IO[str] | None:
|
||||
for d in os.environ.get("XDG_DATA_DIRS", "").split(":"):
|
||||
try:
|
||||
return open(os.path.join(d, "podcastindex", "podcastindex_feeds.csv"), "r")
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
return None
|
||||
|
||||
def search(self, query: str) -> Feed | None:
|
||||
for r in csv.DictReader(self.db):
|
||||
if query in r['link']:
|
||||
yield Feed(
|
||||
description=r.get("description"),
|
||||
is_podcast=True,
|
||||
title=r.get("title"),
|
||||
url=r.get("url"),
|
||||
# TODO: it's unclear what this field means
|
||||
velocity=float(r.get("updateFrequency") or "0"),
|
||||
# all fields:
|
||||
# - category1 ... category10
|
||||
# - chash
|
||||
# - contentType
|
||||
# - createdOn
|
||||
# - dead
|
||||
# - description
|
||||
# - episodeCount
|
||||
# - explicit
|
||||
# - generator
|
||||
# - host
|
||||
# - id
|
||||
# - imageUrl
|
||||
# - itunesAuthor
|
||||
# - itunesId
|
||||
# - itunesOwnerName
|
||||
# - itunesType
|
||||
# - language
|
||||
# - lastHttpStatus
|
||||
# - lastUpdate
|
||||
# - link
|
||||
# - newestEnclosureDuration
|
||||
# - newestEnclosureUrl
|
||||
# - newestItemPubdate
|
||||
# - oldestItemPubdate
|
||||
# - originalUrl
|
||||
# - podcastGuid
|
||||
# - popularityScore
|
||||
# - priority
|
||||
# - title
|
||||
# - updateFrequency
|
||||
# - url
|
||||
)
|
||||
|
||||
PODCAST_INDEX = PodcastIndex()
|
||||
|
||||
class Locator:
|
||||
def __init__(self, url: str, prefer_podcast: bool) -> None:
|
||||
self.url = url
|
||||
def __init__(self, url: str, prefer_podcast: bool, eager: bool) -> None:
|
||||
self.url = url #< schemeless and www.-less
|
||||
self.prefer_podcast = prefer_podcast
|
||||
self.eager = eager
|
||||
self.feeds = []
|
||||
|
||||
def locate_best(self, feeds: list[Feed] | None=None) -> Feed | None:
|
||||
if feeds is None:
|
||||
feeds = list(self.locate_all())
|
||||
def is_match(self, feed: Feed) -> bool:
|
||||
return feed.is_podcast == self.prefer_podcast \
|
||||
and (feed.url and self.url in feed.url)
|
||||
|
||||
def choose_best(self, feeds: list[Feed]) -> Feed | None:
|
||||
feeds = sorted(feeds, key=lambda f: (
|
||||
(f.is_podcast == self.prefer_podcast),
|
||||
self.is_match(f),
|
||||
-f.velocity if f.velocity is not None else 0, #< prefer higher-velocity sources
|
||||
f.url, #< prefer shorter URLs
|
||||
len(f.title) if f.title is not None else 1000, #< prefer shorter titles
|
||||
@@ -97,28 +170,62 @@ class Locator:
|
||||
))
|
||||
return feeds[0] if len(feeds) else None
|
||||
|
||||
def locate_all(self) -> Iterator[Feed]:
|
||||
uris = [
|
||||
f"https://{self.url}",
|
||||
f"http://{self.url}",
|
||||
f"https://www.{self.url}",
|
||||
f"http://www.{self.url}",
|
||||
]
|
||||
seen = []
|
||||
for uri in uris:
|
||||
for feed in self.locate_feedsearch(uri):
|
||||
if feed not in seen:
|
||||
seen.append(feed)
|
||||
yield feed
|
||||
def locate_best(self) -> Feed | None:
|
||||
feeds = []
|
||||
def eager_match():
|
||||
if not self.eager:
|
||||
return None
|
||||
matches = [f for f in feeds if self.is_match(f)]
|
||||
if len(matches) == 1:
|
||||
logger.info(f"eagerly matched {matches[0]}")
|
||||
return matches[0]
|
||||
|
||||
def locate_feedsearch(self, uri: str) -> Iterator[Feed]:
|
||||
sources = [
|
||||
self.locate_podcastindex,
|
||||
lambda: self.locate_feedsearch_once(f"https://{self.url}"),
|
||||
lambda: self.locate_feedsearch_once(f"http://{self.url}"),
|
||||
lambda: self.locate_feedsearch_once(f"https://www.{self.url}"),
|
||||
lambda: self.locate_feedsearch_once(f"http://www.{self.url}"),
|
||||
]
|
||||
|
||||
for f in sources:
|
||||
feeds.extend(f())
|
||||
if eager_match():
|
||||
return eager_match()
|
||||
|
||||
logger.info("no eager match: choosing from all candidate feeds")
|
||||
return self.choose_best(feeds)
|
||||
|
||||
|
||||
@logitems("discovered feed via feedsearch")
|
||||
def locate_feedsearch_once(self, uri: str) -> Iterator[Feed]:
|
||||
scheme, _separator, url = uri.partition("://")
|
||||
assert scheme and url, f"failed to partition ${uri!r}"
|
||||
url = fsc.crawler.coerce_url(url, default_scheme=scheme)
|
||||
print(f"trying {url}")
|
||||
feeds = fsc.search(url, total_timeout=180, request_timeout=90, max_content_length=100*1024*1024)
|
||||
# all fields:
|
||||
# - content_type
|
||||
# - description
|
||||
# - favicon
|
||||
# - favicon_data_uri # embedded favicon
|
||||
# - hubs # PubSub hubs
|
||||
# - is_podcast
|
||||
# - is_push
|
||||
# - last_updated
|
||||
# - self_url
|
||||
# - site_name
|
||||
# - site_url
|
||||
# - title
|
||||
# - url
|
||||
# - velocity
|
||||
# - version
|
||||
return (Feed.from_dict(i.serialize()) for i in fsc.sort_urls(feeds))
|
||||
|
||||
@logitems("discovered feed via podcastindex")
|
||||
def locate_podcastindex(self) -> Iterator[Feed]:
|
||||
yield from PODCAST_INDEX.search(self.url)
|
||||
|
||||
def try_load_existing_feed(path_: str) -> Feed:
|
||||
try:
|
||||
f = open(path_, "r")
|
||||
@@ -139,30 +246,33 @@ def select_feed(feeds: list[dict], prefer_podcast: bool) -> dict:
|
||||
|
||||
def main():
|
||||
logging.basicConfig()
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
logger.debug("logging enabled")
|
||||
|
||||
parser = argparse.ArgumentParser(usage=__doc__)
|
||||
parser.add_argument("url", help="where to start searching for a feed")
|
||||
parser.add_argument("output", help="where to save extracted feed data (should end in .json)")
|
||||
parser.add_argument('--podcast', help="if multiple feeds are found, prefer the podcast feed over any text/image feed", action='store_true')
|
||||
parser.add_argument("--podcast", help="if multiple feeds are found, prefer the podcast feed over any text/image feed", action="store_true")
|
||||
parser.add_argument("--verbose", help="show more info about feed discovery", action="store_true")
|
||||
parser.add_argument("--exhaustive", help="disable eager matching and force an exhaustive search", action="store_true")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
url, json_path = args.url, args.output
|
||||
url = args.url
|
||||
json_path = args.output
|
||||
eager = not args.exhaustive
|
||||
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
existing_data = try_load_existing_feed(json_path)
|
||||
prefer_podcast = args.podcast or (existing_data.is_podcast or False)
|
||||
locator = Locator(url, prefer_podcast=prefer_podcast)
|
||||
|
||||
all_feeds = list(locator.locate_all())
|
||||
for feed in all_feeds:
|
||||
print(feed.to_json())
|
||||
locator = Locator(url, prefer_podcast=prefer_podcast, eager=eager)
|
||||
|
||||
# save the best feed to disk
|
||||
keep = locator.locate_best(all_feeds)
|
||||
feed = locator.locate_best()
|
||||
with open(json_path, "w") as out:
|
||||
out.write(keep.to_json())
|
||||
out.write(feed.to_json())
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
Reference in New Issue
Block a user