nix-files/pkgs/additional/sane-scripts/src/sane-bt-search

250 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ps.natsort ps.requests ])"
# vim: set filetype=python :
"""
usage: sane-bt-search [options] <query_string>
searches Jackett for torrent files matching the title.
returns select results and magnet links.
options:
--full display all results
--help show this help message and exit
--manga show only manga results
--json output one json document instead of a human-readable table
--top=<n> show the <n> top rated torrents (default: 5)
--verbose show more information, useful for debugging/development
"""
# about Jackett
# - source: <https://github.com/Jackett/Jackett>
# - can be queried via APIs:
# - Torznab: <https://torznab.github.io/spec-1.3-draft/index.html>
# - TorrentPotato: <https://github.com/RuudBurger/CouchPotatoServer/wiki/Couchpotato-torrent-provider>
# - its own JSON-based API
from dataclasses import dataclass
from datetime import datetime
import logging
import json
import natsort
import requests
import sys
import time
SERVICE = "https://jackett.uninsane.org"
ENDPOINTS = dict(
results="api/v2.0/indexers/all/results"
# results_torznab="api/v2.0/indexers/all/results/torznab"
)
epoch = datetime(1970, 1, 1)
logger = logging.getLogger(__name__)
class BadCliArgs(Exception):
def __init__(self, msg: str = None):
helpstr = __doc__
if msg:
super().__init__(f"{msg}\n\n{helpstr}")
else:
super().__init__(helpstr)
def try_parse_time(t: str):
try:
return datetime.fromisoformat(t)
except ValueError: pass
if len(t) > len('YYYY-MM-DD'):
# sometimes these timestamps are encoded with e.g. too many digits in the milliseconds field.
# so just keep chomping until we get something that parses as a timestamp
return try_parse_time(t[:-1])
def parse_time(t: str) -> datetime:
return try_parse_time(t).astimezone() or epoch
DROP_CATS = { "dvd", "hd", "misc", "other", "sd" }
MANGA_CATS = { "books", "comics", "ebook" }
KNOWN_CATS = frozenset(list(MANGA_CATS) + ["anime", "audio", "movies", "tv", "xxx"])
def clean_cat(c: str) -> str | None:
if c in DROP_CATS: return None
return c
def is_cat(cats: list[str], wanted_cats: list[str], default: bool = False) -> bool:
"""
return True if any of the `cats` is in `wanted_cats`.
in the event there no category is recognized, assume `default`
"""
if not any(c in KNOWN_CATS for c in cats):
return default
else:
return any(c in wanted_cats for c in cats)
@dataclass(eq=True, order=True, unsafe_hash=True)
class Torrent:
seeders: int
pub_date: datetime
size: int
tracker: str
title: str
magnet: str | None
http_dl_uri: str | None # probably a .torrent file but it COULD be a referral to a magnet:// URI
tracker_uri: str | None
categories: frozenset[str] # human-friendly list of categories, lowercase. e.g. ["Books", "Anime"]
def __str__(self) -> str:
cats = "/".join(self.categories) if self.categories else "?"
rows = []
rows.append(f"{self.seeders}[S]\t{cats}\t{self.tracker}\t{self.pub_date}\t{self.mib}M\t{self.title}")
if self.tracker_uri:
rows.append(f"\t{self.tracker_uri}")
rows.append(f"\t{self.dl_uri}")
return "\n".join(rows)
@property
def dl_uri(self) -> str:
return self.magnet or self.http_dl_uri
@property
def mib(self) -> int:
return int(round(self.size / 1024 / 1024))
@staticmethod
def from_dict(d: dict) -> 'Torrent':
logger.debug(f"Torrent.from_dict: fields: { ' '.join(d.keys()) }")
for k, v in d.items():
if k not in ("CategoryDesc", "Seeders", "PublishDate", "Size", "Tracker", "Title", "MagnetUri", "Guid", "Link", "Details") and \
v != None and v != "" and v != [] and v != {}:
logger.debug(f" {k} = {v}")
seeders = d.get("Seeders")
pub_date = d.get("PublishDate")
size = d.get("Size")
tracker = d.get("Tracker")
title = d.get("Title")
magnet = d.get("MagnetUri") or d.get("Guid")
http_dl_uri = d.get("Link")
tracker_uri = d.get("Details")
categories = d.get("CategoryDesc", "").replace("/", ",").split(",")
categories = (c.strip().lower() for c in categories)
categories = frozenset(clean_cat(c) for c in categories if clean_cat(c))
if magnet and not magnet.startswith("magnet:"):
logger.info(f"invalid magnet: {magnet}")
magnet = None
if seeders is not None and pub_date is not None and title is not None and (magnet is not None or http_dl_uri is not None):
pub_date = parse_time(pub_date)
return Torrent(seeders, pub_date, size, tracker, title, magnet, http_dl_uri, tracker_uri, categories=categories)
def to_dict(self) -> dict:
# N.B.: not all fields: needs to be kept in sync with consumers like mx-sanebot
return dict(
seeders=self.seeders,
pub_date=self.pub_date.strftime("%Y-%m-%d"),
size=self.size,
tracker=self.tracker,
title=self.title,
magnet=self.magnet,
)
def is_manga(self, default: bool = False) -> bool:
return is_cat(self.categories, MANGA_CATS, default)
class Client:
def __init__(self):
self.apikey = open("/run/secrets/jackett_apikey").read().strip()
def api_call(self, method: str, params: dict) -> dict:
endpoint = ENDPOINTS[method]
url = f"{SERVICE}/{endpoint}"
params = params.copy()
params.update(apikey=self.apikey, _=str(int(time.time())))
resp = requests.get(url, params=params)
return resp.json()
def query(self, q: str) -> list[Torrent]:
torrents = set()
api_res = self.api_call("results", dict(Query=q))
for r in api_res["Results"]:
t = Torrent.from_dict(r)
if t is not None:
torrents.add(t)
return sorted(torrents, reverse=True)
def filter_results(results: list[Torrent], full: bool, top: int, manga: bool) -> list[Torrent]:
"""
take the complete query and filter further based on CLI options
"""
if manga:
results = [t for t in results if t.is_manga(default=True)]
if not full:
results = results[:top]
return results
def parse_args(args: list[str]) -> dict:
options = dict(
full=False,
help=False,
json=False,
query="",
top="5",
verbose=False,
manga=False,
)
while args:
arg = args[0]
del args[0]
if arg.startswith('--'):
opt = arg[2:]
if "=" in opt:
name, val = opt.split('=')
else:
name, val = opt, True
options[name] = val
else:
options["query"] = options["query"] + " " + arg if options["query"] else arg
return options
def main(args: list[str]):
logging.basicConfig()
options = parse_args(args)
full = options.pop("full")
help = options.pop("help")
json = options.pop("json")
query = options.pop("query")
top = int(options.pop("top"))
verbose = options.pop("verbose")
manga = options.pop("manga")
if options != {}:
raise BadCliArgs(f"unexpected options: {options}")
if help:
raise BadCliArgs()
if verbose:
logging.getLogger().setLevel(logging.DEBUG)
client = Client()
results = client.query(query)
num_results = len(results)
results = filter_results(results, full, top, manga)
if json:
dumpable = [t.to_dict() for t in results]
print(json.dumps(dumpable))
else:
print(f"found {num_results} result(s)")
for r in results:
print(r)
if __name__ == "__main__":
main(sys.argv[1:])