#!/usr/bin/env nix-shell #!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p blast-ugjka # vim: set filetype=python : import ctypes import logging import os import signal import socket import subprocess from enum import Enum logger = logging.getLogger(__name__) # map from known devices -> required flags DEVICE_MAP = { "Theater TV": [], "[LG] webOS TV OLED55C9PUA": [ "-usewav" ], } def set_pdeathsig(sig=signal.SIGTERM): """ helper function to ensure once parent process exits, its children processes will automatically die. see: see: """ libc = ctypes.CDLL("libc.so.6") return libc.prctl(1, sig) MY_PID = None def reap_children(sig=None, frame=None): global MY_PID # reset SIGTERM handler to avoid recursing signal.signal(signal.SIGTERM, signal.Handlers.SIG_DFL) logger.info("killing all children (of pid %d)", MY_PID) os.killpg(MY_PID, signal.SIGTERM) def reap_on_exit(): """ catch when the parent exits, and map that to SIGTERM for this process. when this process receives SIGTERM, also terminate all descendent processes. this is done because: 1. mpv invokes this, but (potentially) via the sandbox wrapper. 2. when mpv exits, it `SIGKILL`s that sandbox wrapper. 3. bwrap does not pass SIGKILL or SIGTERM to its child. 4. hence, we neither receive that signal NOR can we pass it on simply by killing our immediate children (since any bwrap'd children wouldn't pass that signal on...) really, the proper fix would be on mpv's side: - mpv should create a new process group when it launches a command, and kill that process group on exit. or fix this in the sandbox wrapper: - why *doesn't* bwrap forward the signals? - there's --die-with-parent, but i can't apply that *system wide* and expect reasonably behavior """ global MY_PID MY_PID = os.getpid() # create a new process group, pgid = gid os.setpgid(MY_PID, MY_PID) set_pdeathsig(signal.SIGTERM) signal.signal(signal.SIGTERM, reap_children) def get_ranked_ip_addrs(): """ return the IP addresses most likely to be LAN addresses based on: """ _name, _aliases, static_addrs = socket.gethostbyname_ex(socket.gethostname()) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("1", 53)) con_addr, _port = s.getsockname() return sorted(set(static_addrs + [ con_addr ]), key=lambda a: (a.startswith("127"), a)) class ParserState(Enum): Break = "break" Receiver = "receiver" Ips = "ip" class Status(Enum): Continue = "continue" Error = "error" RedoWithFlags = "redo_with_flags" Launched = "launched" class BlastDriver: parsing: ParserState | None = None last_write: str | None = None def __init__(self, blast_flags: list[str] = []): self.ranked_ips = get_ranked_ip_addrs() self.blast = subprocess.Popen( ["blast", "-source", "blast.monitor"] + blast_flags, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, # this pdeathsig isn't necessary; seems it might result in leaked pulse outputs # preexec_fn=set_pdeathsig ) self.blast_flags = list(blast_flags) self.receiver_names = [] self.ips = [] def writeline(self, line: str) -> None: logger.debug("[send] %s", line) self.blast.stdin.write(f"{line}\n".encode()) self.blast.stdin.flush() self.last_write = line def readline(self) -> str: line = self.blast.stdout.readline().decode('utf-8').strip() line = line.replace('\x1b[1A\x1b[K', '') #< escape codes logger.debug("[recv] %r", line) return line def set_state(self, state: ParserState): logger.debug("[pars] %s", state) self.parsing = state def feedline(self, line: str) -> (Status, str|None): """ apply a line from blast's stdout to modify parser state. returns a status code (e.g. Status.Continue), and optionally a reply to send back to blast. """ if line == "Loading...": return Status.Continue, None elif line == "----------": self.set_state(ParserState.Break) return Status.Continue, None elif line == "DLNA receivers": self.set_state(ParserState.Receiver) return Status.Continue, None elif line == "Your LAN ip addresses": self.set_state(ParserState.Ips) return Status.Continue, None elif line == "Select the DLNA device:": assert len(self.receiver_names) == 1, self.receiver_names name = self.receiver_names[0] if name in DEVICE_MAP and DEVICE_MAP[name] != self.blast_flags: return Status.RedoWithFlags, None return Status.Continue, "0" elif line == "Select the lan IP address for the stream:": for r in self.ranked_ips: if r in self.ips: return Status.Launched, str(self.ips.index(r)) # fallback: just guess the best IP return Status.Launched, "0" elif self.parsing == ParserState.Receiver: id_, name = line.split(": ") assert id_ == str(len(self.receiver_names)), (id_, self.receiver_names) self.receiver_names.append(name) return Status.Continue, None elif self.parsing == ParserState.Ips: id_, ip = line.split(": ") assert id_ == str(len(self.ips)), (id_, self.ips) self.ips.append(ip) return Status.Continue, None elif line == f"[{self.last_write}]": # it's echoing to us what we wrote return Status.Continue, None # elif line == "": # return Status.Continue, None else: logger.info("unrecognized output (state=%s): %r", self.parsing, line) return Status.Error, None def step(self) -> Status: """ advance the interaction between us and blast. reads a line from blast, modifies internal state, maybe sends a reply. could block indefinitely. """ line = self.readline() status, reply = self.feedline(line) if reply is not None: self.writeline(reply) return status def try_blast(*args, **kwargs) -> BlastDriver | None: blast = BlastDriver(*args, **kwargs) status = Status.Continue while status == Status.Continue: status = blast.step() if status == Status.RedoWithFlags: dev = blast.receiver_names[0] blast_flags = DEVICE_MAP[dev] logger.info("re-exec blast for %s with flags: %r", dev, blast_flags) blast.blast.terminate() return try_blast(blast_flags=blast_flags) elif status == Status.Error: logger.info("blast error => terminating") blast.blast.terminate() else: # successfully launched return blast def main(): logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) reap_on_exit() blast = try_blast() if blast is not None: logger.info("waiting until blast exits") blast.blast.wait() reap_children() if __name__ == "__main__": main()