From ad4631a5a8f1d09d427704585d91765891eed7eb Mon Sep 17 00:00:00 2001 From: Colin Date: Fri, 7 Jul 2023 07:08:17 +0000 Subject: [PATCH] sane-bt-search: add (limited) ability to search by category --- .../sane-scripts/src/sane-bt-search | 57 ++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/pkgs/additional/sane-scripts/src/sane-bt-search b/pkgs/additional/sane-scripts/src/sane-bt-search index 79f9237b0..bf05ecbe5 100755 --- a/pkgs/additional/sane-scripts/src/sane-bt-search +++ b/pkgs/additional/sane-scripts/src/sane-bt-search @@ -64,6 +64,23 @@ def parse_time(t: str) -> datetime: return try_parse_time(t).astimezone() or epoch +DROP_CATS = { "dvd", "hd", "misc", "other", "sd" } +MANGA_CATS = { "books", "comics", "ebook" } +KNOWN_CATS = frozenset(list(MANGA_CATS) + ["anime", "audio", "movies", "tv", "xxx"]) +def clean_cat(c: str) -> str | None: + if c in DROP_CATS: return None + return c + +def is_cat(cats: list[str], wanted_cats: list[str], default: bool = False) -> bool: + """ + return True if any of the `cats` is in `wanted_cats`. + in the event there no category is recognized, assume `default` + """ + if not any(c in KNOWN_CATS for c in cats): + return default + else: + return any(c in wanted_cats for c in cats) + @dataclass(eq=True, order=True, unsafe_hash=True) class Torrent: seeders: int @@ -74,10 +91,12 @@ class Torrent: magnet: "Optional[str]" http_dl_uri: "Optional[str]" # probably a .torrent file but it COULD be a referral to a magnet:// URI tracker_uri: "Optional[str]" + categories: frozenset[str] # human-friendly list of categories, lowercase. e.g. ["Books", "Anime"] def __str__(self) -> str: + cats = "/".join(self.categories) if self.categories else "?" rows = [] - rows.append(f"{self.seeders}[S]\t{self.pub_date}\t{self.mib}M\t{self.tracker}\t{self.title}") + rows.append(f"{self.seeders}[S]\t{cats}\t{self.tracker}\t{self.pub_date}\t{self.mib}M\t{self.title}") if self.tracker_uri: rows.append(f"\t{self.tracker_uri}") rows.append(f"\t{self.dl_uri}") @@ -95,7 +114,7 @@ class Torrent: def from_dict(d: dict) -> 'Torrent': logger.debug(f"Torrent.from_dict: fields: { ' '.join(d.keys()) }") for k, v in d.items(): - if k not in ("Seeders", "PublishDate", "Size", "Tracker", "Title", "MagnetUri", "Guid", "Link") and \ + if k not in ("CategoryDesc", "Seeders", "PublishDate", "Size", "Tracker", "Title", "MagnetUri", "Guid", "Link", "Details") and \ v != None and v != "" and v != [] and v != {}: logger.debug(f" {k} = {v}") @@ -107,6 +126,9 @@ class Torrent: magnet = d.get("MagnetUri") or d.get("Guid") http_dl_uri = d.get("Link") tracker_uri = d.get("Details") + categories = d.get("CategoryDesc", "").replace("/", ",").split(",") + categories = (c.strip().lower() for c in categories) + categories = frozenset(clean_cat(c) for c in categories if clean_cat(c)) if magnet and not magnet.startswith("magnet:"): logger.info(f"invalid magnet: {magnet}") @@ -114,7 +136,7 @@ class Torrent: if seeders is not None and pub_date is not None and title is not None and (magnet is not None or http_dl_uri is not None): pub_date = parse_time(pub_date) - return Torrent(seeders, pub_date, size, tracker, title, magnet, http_dl_uri, tracker_uri) + return Torrent(seeders, pub_date, size, tracker, title, magnet, http_dl_uri, tracker_uri, categories=categories) def to_dict(self) -> dict: # N.B.: not all fields: needs to be kept in sync with consumers like mx-sanebot @@ -127,6 +149,9 @@ class Torrent: magnet=self.magnet, ) + def is_manga(self, default: bool = False) -> bool: + return is_cat(self.categories, MANGA_CATS, default) + class Client: def __init__(self): self.apikey = open("/run/secrets/jackett_apikey").read().strip() @@ -149,6 +174,17 @@ class Client: return sorted(torrents, reverse=True) + +def filter_results(results: list[Torrent], full: bool, top: int, manga: bool) -> list[Torrent]: + """ + take the complete query and filter further based on CLI options + """ + if manga: + results = [t for t in results if t.is_manga(default=True)] + if not full: + results = results[:top] + return results + def parse_args(args: list) -> dict: options = dict( full=False, @@ -157,6 +193,7 @@ def parse_args(args: list) -> dict: query="", top="5", verbose=False, + manga=False, ) while args: arg = args[0] @@ -182,6 +219,7 @@ def main(args: list): query = options.pop("query") top = options.pop("top") verbose = options.pop("verbose") + manga = options.pop("manga") if options != {}: raise BadCliArgs(f"unexpected options: {options}") @@ -191,15 +229,20 @@ def main(args: list): if verbose: logging.getLogger().setLevel(logging.DEBUG) + num_listings = 1000 if full else int(top) client = Client() - res = client.query(query) + results = client.query(query) + num_results = len(results) + + results = filter_results(results, full, top, manga) + if json: - dumpable = [t.to_dict() for t in res[:num_listings]] + dumpable = [t.to_dict() for t in results] print(json.dumps(dumpable)) else: - print(f"found {len(res)} result(s)") - for r in res[:num_listings]: + print(f"found {num_results} result(s)") + for r in results: print(r) if __name__ == "__main__":