204 lines
6.5 KiB
Python
Executable File
204 lines
6.5 KiB
Python
Executable File
#!/usr/bin/env nix-shell
|
|
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p go2tv
|
|
# vim: set filetype=python :
|
|
"""
|
|
cast media (local video or audio files) to a device on the same network
|
|
with some awareness of device-specific quirks
|
|
and a menu to select a device if there's more than one online
|
|
"""
|
|
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Compat(Enum):
|
|
Default = "default"
|
|
# RenameToMp4: ensure the name of the file sent ends in ".mp4".
|
|
# this does not *transcode*. it doesn't modify the contents of the file at all. it just changes the name.
|
|
# some devices are just so dumb that they'll only render a mkv file if it ends in .mp4.
|
|
RenameToMp4 = "RenameToMp4"
|
|
|
|
@dataclass
|
|
class Device:
|
|
# "model", or device name. e.g. "Theater TV".
|
|
model: str
|
|
compat: Compat | None = None
|
|
# URL: endpoint at which the device can be controlled. e.g. http://1.2.3.4:567/MediaRenderer.xml
|
|
url: str | None = None
|
|
|
|
def augmented(self, other: "Device") -> "Device":
|
|
return Device(
|
|
model = self.model or other.model,
|
|
compat = self.compat or other.compat,
|
|
url = self.url or other.url,
|
|
)
|
|
|
|
def matches(self, filter: str) -> bool:
|
|
""" return True if the filter string occurs inside the identifier for this device """
|
|
clean = lambda s: "".join(c for c in s.lower() if c in "abcdefghijklmnopqrstuvwxyz0123456789")
|
|
return clean(filter) in clean(self.model)
|
|
|
|
# ranked in order of preference
|
|
KNOWN_DEVICES = [
|
|
Device("Theater TV", Compat.RenameToMp4),
|
|
Device("Cuddlevision", Compat.Default),
|
|
]
|
|
|
|
class Go2TvParser:
|
|
def __init__(self) -> None:
|
|
self.parsed_devices = []
|
|
self.partial_model = None
|
|
self.partial_url = None
|
|
|
|
def into_devices(self) -> list[Device]:
|
|
self.close_device()
|
|
return self.parsed_devices[:]
|
|
|
|
def feed_line(self, line: str) -> None:
|
|
sanitized = line \
|
|
.replace("\x1b[0m", "") \
|
|
.replace("\x1b[1m", "") \
|
|
.strip()
|
|
|
|
logger.debug(repr(sanitized))
|
|
if sanitized and all(c == "-" for c in sanitized):
|
|
self.close_device()
|
|
elif sanitized.startswith("Model:"):
|
|
self.partial_model = sanitized[len("Model:"):].strip()
|
|
elif sanitized.startswith("URL:"):
|
|
self.partial_url = sanitized[len("URL:"):].strip()
|
|
|
|
def close_device(self) -> None:
|
|
"""
|
|
if there's any device data parsed, move it into `parsed_devices`
|
|
"""
|
|
if self.partial_model is not None and self.partial_url:
|
|
self.parsed_devices.append(Device(
|
|
model=self.partial_model,
|
|
url=self.partial_url,
|
|
))
|
|
|
|
self.partial_model = None
|
|
self.partial_url = None
|
|
|
|
class Go2TvDriver:
|
|
def scan_devices(self) -> list[Device]:
|
|
go2tv = subprocess.Popen(
|
|
[ "go2tv", "-l" ],
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
parser = Go2TvParser()
|
|
for line in iter(go2tv.stdout.readline, b''):
|
|
parser.feed_line(line.decode("utf-8"))
|
|
|
|
return parser.into_devices()
|
|
|
|
def cast_to(self, dev: Device, media: str) -> None:
|
|
logger.info(f"casting to {dev.model} at {dev.url} with compat {dev.compat}")
|
|
|
|
if dev.compat == Compat.RenameToMp4:
|
|
if not media.endswith(".mp4"):
|
|
if media.startswith("http://") or media.startswith("https://"):
|
|
logger.info(f"ignoring compat requirement {dev.compat} for {media}")
|
|
else:
|
|
media = os.path.abspath(media)
|
|
# TODO: make sure this directory gets cleaned up!
|
|
dir_ = tempfile.mkdtemp(prefix="sane-cast-")
|
|
new_name = os.path.join(dir_, os.path.basename(media) + ".mp4")
|
|
os.symlink(media, new_name)
|
|
media = new_name
|
|
|
|
if media.startswith("http://") or media.startswith("https://"):
|
|
media_args = [ "-u", media ]
|
|
else:
|
|
media_args = [ "-v", media ]
|
|
|
|
cli_args = [ "go2tv", "-t", dev.url ] + media_args
|
|
logger.info(" ".join(cli_args))
|
|
|
|
os.execvp("go2tv", cli_args)
|
|
|
|
def filter_devices(devices: list[Device], filter: str) -> list[Device]:
|
|
return [d for d in devices if d.matches(filter)]
|
|
|
|
|
|
def rank_devices(devices: list[Device]) -> list[Device]:
|
|
ranked = []
|
|
for known in KNOWN_DEVICES:
|
|
for vis in devices:
|
|
if vis.model == known.model:
|
|
ranked.append(known.augmented(vis))
|
|
|
|
for vis in devices:
|
|
if not any(vis.model == r.model for r in ranked):
|
|
ranked.append(vis)
|
|
|
|
return ranked
|
|
|
|
|
|
def ask_device(devices: list[Device]) -> Device | None:
|
|
dev = None
|
|
while dev is None:
|
|
# TODO: use a GUI menu like zenity?
|
|
print("choose a device:")
|
|
for i, d in enumerate(devices):
|
|
print(f"[{i + 1}] {d.model}")
|
|
print("[q] quit")
|
|
print("")
|
|
print("> ", end="")
|
|
|
|
choice = input()
|
|
if choice.strip() == "q":
|
|
return
|
|
try:
|
|
dev = devices[int(choice.strip()) - 1]
|
|
except:
|
|
print(f"invalid choice {choice!r}")
|
|
|
|
return dev
|
|
|
|
def get_default_device(devices: list[Device]) -> Device | None:
|
|
if not devices:
|
|
logger.info("no devices found!")
|
|
|
|
if len(devices) == 1:
|
|
return devices[0]
|
|
|
|
def main():
|
|
logging.basicConfig()
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
parser = argparse.ArgumentParser(description="cast media to a DLNA receiver in range")
|
|
parser.add_argument("--verbose", action="store_true", help="more logging")
|
|
parser.add_argument("--always-ask", action="store_true", help="always ask which device to cast to, regardless how many are available")
|
|
parser.add_argument("--device", help="filter devices based on if this string is contained in their name (case-insensitive)")
|
|
parser.add_argument("media", help="file or URL to send to the DLNA device")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
go2tv = Go2TvDriver()
|
|
devices = go2tv.scan_devices()
|
|
|
|
if args.device:
|
|
devices = filter_devices(devices, args.device)
|
|
devices = rank_devices(devices)
|
|
|
|
dev = get_default_device(devices)
|
|
if dev is None or args.always_ask:
|
|
dev = ask_device(devices)
|
|
if dev:
|
|
go2tv.cast_to(dev, args.media)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|