sane-bt-search: add (limited) ability to search by category

This commit is contained in:
2023-07-07 07:08:17 +00:00
parent 799cbccdbe
commit ad4631a5a8

View File

@@ -64,6 +64,23 @@ def parse_time(t: str) -> datetime:
return try_parse_time(t).astimezone() or epoch
DROP_CATS = { "dvd", "hd", "misc", "other", "sd" }
MANGA_CATS = { "books", "comics", "ebook" }
KNOWN_CATS = frozenset(list(MANGA_CATS) + ["anime", "audio", "movies", "tv", "xxx"])
def clean_cat(c: str) -> str | None:
if c in DROP_CATS: return None
return c
def is_cat(cats: list[str], wanted_cats: list[str], default: bool = False) -> bool:
"""
return True if any of the `cats` is in `wanted_cats`.
in the event there no category is recognized, assume `default`
"""
if not any(c in KNOWN_CATS for c in cats):
return default
else:
return any(c in wanted_cats for c in cats)
@dataclass(eq=True, order=True, unsafe_hash=True)
class Torrent:
seeders: int
@@ -74,10 +91,12 @@ class Torrent:
magnet: "Optional[str]"
http_dl_uri: "Optional[str]" # probably a .torrent file but it COULD be a referral to a magnet:// URI
tracker_uri: "Optional[str]"
categories: frozenset[str] # human-friendly list of categories, lowercase. e.g. ["Books", "Anime"]
def __str__(self) -> str:
cats = "/".join(self.categories) if self.categories else "?"
rows = []
rows.append(f"{self.seeders}[S]\t{self.pub_date}\t{self.mib}M\t{self.tracker}\t{self.title}")
rows.append(f"{self.seeders}[S]\t{cats}\t{self.tracker}\t{self.pub_date}\t{self.mib}M\t{self.title}")
if self.tracker_uri:
rows.append(f"\t{self.tracker_uri}")
rows.append(f"\t{self.dl_uri}")
@@ -95,7 +114,7 @@ class Torrent:
def from_dict(d: dict) -> 'Torrent':
logger.debug(f"Torrent.from_dict: fields: { ' '.join(d.keys()) }")
for k, v in d.items():
if k not in ("Seeders", "PublishDate", "Size", "Tracker", "Title", "MagnetUri", "Guid", "Link") and \
if k not in ("CategoryDesc", "Seeders", "PublishDate", "Size", "Tracker", "Title", "MagnetUri", "Guid", "Link", "Details") and \
v != None and v != "" and v != [] and v != {}:
logger.debug(f" {k} = {v}")
@@ -107,6 +126,9 @@ class Torrent:
magnet = d.get("MagnetUri") or d.get("Guid")
http_dl_uri = d.get("Link")
tracker_uri = d.get("Details")
categories = d.get("CategoryDesc", "").replace("/", ",").split(",")
categories = (c.strip().lower() for c in categories)
categories = frozenset(clean_cat(c) for c in categories if clean_cat(c))
if magnet and not magnet.startswith("magnet:"):
logger.info(f"invalid magnet: {magnet}")
@@ -114,7 +136,7 @@ class Torrent:
if seeders is not None and pub_date is not None and title is not None and (magnet is not None or http_dl_uri is not None):
pub_date = parse_time(pub_date)
return Torrent(seeders, pub_date, size, tracker, title, magnet, http_dl_uri, tracker_uri)
return Torrent(seeders, pub_date, size, tracker, title, magnet, http_dl_uri, tracker_uri, categories=categories)
def to_dict(self) -> dict:
# N.B.: not all fields: needs to be kept in sync with consumers like mx-sanebot
@@ -127,6 +149,9 @@ class Torrent:
magnet=self.magnet,
)
def is_manga(self, default: bool = False) -> bool:
return is_cat(self.categories, MANGA_CATS, default)
class Client:
def __init__(self):
self.apikey = open("/run/secrets/jackett_apikey").read().strip()
@@ -149,6 +174,17 @@ class Client:
return sorted(torrents, reverse=True)
def filter_results(results: list[Torrent], full: bool, top: int, manga: bool) -> list[Torrent]:
"""
take the complete query and filter further based on CLI options
"""
if manga:
results = [t for t in results if t.is_manga(default=True)]
if not full:
results = results[:top]
return results
def parse_args(args: list) -> dict:
options = dict(
full=False,
@@ -157,6 +193,7 @@ def parse_args(args: list) -> dict:
query="",
top="5",
verbose=False,
manga=False,
)
while args:
arg = args[0]
@@ -182,6 +219,7 @@ def main(args: list):
query = options.pop("query")
top = options.pop("top")
verbose = options.pop("verbose")
manga = options.pop("manga")
if options != {}:
raise BadCliArgs(f"unexpected options: {options}")
@@ -191,15 +229,20 @@ def main(args: list):
if verbose:
logging.getLogger().setLevel(logging.DEBUG)
num_listings = 1000 if full else int(top)
client = Client()
res = client.query(query)
results = client.query(query)
num_results = len(results)
results = filter_results(results, full, top, manga)
if json:
dumpable = [t.to_dict() for t in res[:num_listings]]
dumpable = [t.to_dict() for t in results]
print(json.dumps(dumpable))
else:
print(f"found {len(res)} result(s)")
for r in res[:num_listings]:
print(f"found {num_results} result(s)")
for r in results:
print(r)
if __name__ == "__main__":