nix-files/pkgs/additional/sane-cast/sane-cast

189 lines
5.7 KiB
Python
Executable File

#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p go2tv
# vim: set filetype=python :
"""
cast media (local video or audio files) to a device on the same network
with some awareness of device-specific quirks
and a menu to select a device if there's more than one online
"""
from dataclasses import dataclass
from enum import Enum
import argparse
import logging
import os
import subprocess
import tempfile
logger = logging.getLogger(__name__)
class Compat(Enum):
Default = "default"
# RenameToMp4: ensure the name of the file sent ends in ".mp4".
# this does not *transcode*. it doesn't modify the contents of the file at all. it just changes the name.
# some devices are just so dumb that they'll only render a mkv file if it ends in .mp4.
RenameToMp4 = "RenameToMp4"
@dataclass
class Device:
# "model", or device name. e.g. "Theater TV".
model: str
compat: Compat | None = None
# URL: endpoint at which the device can be controlled. e.g. http://1.2.3.4:567/MediaRenderer.xml
url: str | None = None
def augmented(self, other: "Device") -> "Device":
return Device(
model = self.model or other.model,
compat = self.compat or other.compat,
url = self.url or other.url,
)
# ranked in order of preference
KNOWN_DEVICES = [
Device("Theater TV", Compat.RenameToMp4),
Device("[LG] webOS TV OLED55C9PUA", Compat.Default),
]
class Go2TvParser:
def __init__(self) -> None:
self.parsed_devices = []
self.partial_model = None
self.partial_url = None
def into_devices(self) -> list[Device]:
self.close_device()
return self.parsed_devices[:]
def feed_line(self, line: str) -> None:
sanitized = line \
.replace("\x1b[0m", "") \
.replace("\x1b[1m", "") \
.strip()
logger.debug(repr(sanitized))
if sanitized and all(c == "-" for c in sanitized):
self.close_device()
elif sanitized.startswith("Model:"):
self.partial_model = sanitized[len("Model:"):].strip()
elif sanitized.startswith("URL:"):
self.partial_url = sanitized[len("URL:"):].strip()
def close_device(self) -> None:
"""
if there's any device data parsed, move it into `parsed_devices`
"""
if self.partial_model is not None and self.partial_url:
self.parsed_devices.append(Device(
model=self.partial_model,
url=self.partial_url,
))
self.partial_model = None
self.partial_url = None
class Go2TvDriver:
visible_devices = None
def scan_devices(self) -> None:
go2tv = subprocess.Popen(
[ "go2tv", "-l" ],
stdout=subprocess.PIPE,
)
parser = Go2TvParser()
for line in iter(go2tv.stdout.readline, b''):
parser.feed_line(line.decode("utf-8"))
self.visible_devices = parser.into_devices()
def rank_devices(self) -> list[Device]:
ranked = []
for known in KNOWN_DEVICES:
for vis in self.visible_devices:
if vis.model == known.model:
ranked.append(known.augmented(vis))
for vis in self.visible_devices:
if not any(vis.model == r.model for r in ranked):
ranked.append(vis)
return ranked
def cast_to(self, dev: Device, media: str) -> None:
logger.info(f"casting to {dev.model} at {dev.url} with compat {dev.compat}")
if dev.compat == Compat.RenameToMp4:
if not media.endswith(".mp4"):
if media.startswith("http://") or media.startswith("https://"):
logger.info(f"ignoring compat requirement {dev.compat} for {media}")
else:
media = os.path.abspath(media)
# TODO: make sure this directory gets cleaned up!
dir_ = tempfile.mkdtemp(prefix="sane-cast-")
new_name = os.path.join(dir_, os.path.basename(media) + ".mp4")
os.symlink(media, new_name)
media = new_name
if media.startswith("http://") or media.startswith("https://"):
media_args = [ "-u", media ]
else:
media_args = [ "-v", media ]
cli_args = [ "go2tv", "-t", dev.url ] + media_args
logger.info(" ".join(cli_args))
os.execvp("go2tv", cli_args)
def choose_device(devices: list[Device]) -> Device | None:
if not devices:
logger.info("no devices found!")
return
if len(devices) == 1:
return devices[0]
dev = None
while dev is None:
# TODO: use a GUI menu like zenity?
print("choose a device:")
for i, d in enumerate(devices):
print(f"[{i + 1}] {d.model}")
print("[q] quit")
print("")
print("> ", end="")
choice = input()
if choice.strip() == "q":
return
try:
dev = devices[int(choice.strip()) - 1]
except:
print(f"invalid choice {choice!r}")
return dev
def main():
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser(description="cast media to a DLNA receiver in range")
parser.add_argument("--verbose", action="store_true", help="more logging")
parser.add_argument("media", help="file or URL to send to the DLNA device")
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
go2tv = Go2TvDriver()
go2tv.scan_devices()
devices = go2tv.rank_devices()
dev = choose_device(devices)
if dev:
go2tv.cast_to(dev, args.media)
if __name__ == "__main__":
main()