nix-files/hosts/common/programs/blast-ugjka/blast-to-default
2024-03-12 12:06:57 +00:00

217 lines
7.4 KiB
Python
Executable File

#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p blast-ugjka
# vim: set filetype=python :
import ctypes
import logging
import os
import signal
import socket
import subprocess
from enum import Enum
logger = logging.getLogger(__name__)
# map from known devices -> required flags
DEVICE_MAP = {
"Theater TV": [],
"[LG] webOS TV OLED55C9PUA": [ "-usewav" ],
}
def set_pdeathsig(sig=signal.SIGTERM):
"""
helper function to ensure once parent process exits, its children processes will automatically die.
see: <https://stackoverflow.com/a/43152455>
see: <https://www.man7.org/linux/man-pages/man2/prctl.2.html>
"""
libc = ctypes.CDLL("libc.so.6")
return libc.prctl(1, sig)
MY_PID = None
def reap_children(sig=None, frame=None):
global MY_PID
# reset SIGTERM handler to avoid recursing
signal.signal(signal.SIGTERM, signal.Handlers.SIG_DFL)
logger.info("killing all children (of pid %d)", MY_PID)
os.killpg(MY_PID, signal.SIGTERM)
def reap_on_exit():
"""
catch when the parent exits, and map that to SIGTERM for this process.
when this process receives SIGTERM, also terminate all descendent processes.
this is done because:
1. mpv invokes this, but (potentially) via the sandbox wrapper.
2. when mpv exits, it `SIGKILL`s that sandbox wrapper.
3. bwrap does not pass SIGKILL or SIGTERM to its child.
4. hence, we neither receive that signal NOR can we pass it on simply by killing our immediate children
(since any bwrap'd children wouldn't pass that signal on...)
really, the proper fix would be on mpv's side:
- mpv should create a new process group when it launches a command, and kill that process group on exit.
or fix this in the sandbox wrapper:
- why *doesn't* bwrap forward the signals?
- there's --die-with-parent, but i can't apply that *system wide* and expect reasonably behavior
<https://github.com/containers/bubblewrap/issues/529>
"""
global MY_PID
MY_PID = os.getpid()
# create a new process group, pgid = gid
os.setpgid(MY_PID, MY_PID)
set_pdeathsig(signal.SIGTERM)
signal.signal(signal.SIGTERM, reap_children)
def get_ranked_ip_addrs():
"""
return the IP addresses most likely to be LAN addresses
based on: <https://stackoverflow.com/a/1267524>
"""
_name, _aliases, static_addrs = socket.gethostbyname_ex(socket.gethostname())
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("1", 53))
con_addr, _port = s.getsockname()
return sorted(set(static_addrs + [ con_addr ]), key=lambda a: (a.startswith("127"), a))
class ParserState(Enum):
Break = "break"
Receiver = "receiver"
Ips = "ip"
class Status(Enum):
Continue = "continue"
Error = "error"
RedoWithFlags = "redo_with_flags"
Launched = "launched"
class BlastDriver:
parsing: ParserState | None = None
last_write: str | None = None
def __init__(self, blast_flags: list[str] = []):
self.ranked_ips = get_ranked_ip_addrs()
self.blast = subprocess.Popen(
["blast", "-source", "blast.monitor"] + blast_flags,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# this pdeathsig isn't necessary; seems it might result in leaked pulse outputs
# preexec_fn=set_pdeathsig
)
self.blast_flags = list(blast_flags)
self.receiver_names = []
self.ips = []
def writeline(self, line: str) -> None:
logger.debug("[send] %s", line)
self.blast.stdin.write(f"{line}\n".encode())
self.blast.stdin.flush()
self.last_write = line
def readline(self) -> str:
line = self.blast.stdout.readline().decode('utf-8').strip()
line = line.replace('\x1b[1A\x1b[K', '') #< escape codes
logger.debug("[recv] %r", line)
return line
def set_state(self, state: ParserState):
logger.debug("[pars] %s", state)
self.parsing = state
def feedline(self, line: str) -> (Status, str|None):
"""
apply a line from blast's stdout to modify parser state.
returns a status code (e.g. Status.Continue), and optionally a reply to send back to blast.
"""
if line == "Loading...":
return Status.Continue, None
elif line == "----------":
self.set_state(ParserState.Break)
return Status.Continue, None
elif line == "DLNA receivers":
self.set_state(ParserState.Receiver)
return Status.Continue, None
elif line == "Your LAN ip addresses":
self.set_state(ParserState.Ips)
return Status.Continue, None
elif line == "Select the DLNA device:":
assert len(self.receiver_names) == 1, self.receiver_names
name = self.receiver_names[0]
if name in DEVICE_MAP and DEVICE_MAP[name] != self.blast_flags:
return Status.RedoWithFlags, None
return Status.Continue, "0"
elif line == "Select the lan IP address for the stream:":
for r in self.ranked_ips:
if r in self.ips:
return Status.Launched, str(self.ips.index(r))
# fallback: just guess the best IP
return Status.Launched, "0"
elif self.parsing == ParserState.Receiver:
id_, name = line.split(": ")
assert id_ == str(len(self.receiver_names)), (id_, self.receiver_names)
self.receiver_names.append(name)
return Status.Continue, None
elif self.parsing == ParserState.Ips:
id_, ip = line.split(": ")
assert id_ == str(len(self.ips)), (id_, self.ips)
self.ips.append(ip)
return Status.Continue, None
elif line == f"[{self.last_write}]":
# it's echoing to us what we wrote
return Status.Continue, None
# elif line == "":
# return Status.Continue, None
else:
logger.info("unrecognized output (state=%s): %r", self.parsing, line)
return Status.Error, None
def step(self) -> Status:
"""
advance the interaction between us and blast.
reads a line from blast, modifies internal state, maybe sends a reply.
could block indefinitely.
"""
line = self.readline()
status, reply = self.feedline(line)
if reply is not None:
self.writeline(reply)
return status
def try_blast(*args, **kwargs) -> BlastDriver | None:
blast = BlastDriver(*args, **kwargs)
status = Status.Continue
while status == Status.Continue:
status = blast.step()
if status == Status.RedoWithFlags:
dev = blast.receiver_names[0]
blast_flags = DEVICE_MAP[dev]
logger.info("re-exec blast for %s with flags: %r", dev, blast_flags)
blast.blast.terminate()
return try_blast(blast_flags=blast_flags)
elif status == Status.Error:
logger.info("blast error => terminating")
blast.blast.terminate()
else:
# successfully launched
return blast
def main():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
reap_on_exit()
blast = try_blast()
if blast is not None:
logger.info("waiting until blast exits")
blast.blast.wait()
reap_children()
if __name__ == "__main__":
main()