nix-files/hosts/common/programs/blast-ugjka/blast-to-default

164 lines
5.4 KiB
Python
Executable File

#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p blast-ugjka
# vim: set filetype=python :
import logging
import socket
import subprocess
from enum import Enum
logger = logging.getLogger(__name__)
# map from known devices -> required flags
DEVICE_MAP = {
"Theater TV": [],
"Cuddlevision": [ "-usewav" ],
}
def get_ranked_ip_addrs():
"""
return the IP addresses most likely to be LAN addresses
based on: <https://stackoverflow.com/a/1267524>
"""
_name, _aliases, static_addrs = socket.gethostbyname_ex(socket.gethostname())
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("1", 53))
con_addr, _port = s.getsockname()
return sorted(set(static_addrs + [ con_addr ]), key=lambda a: (a.startswith("127"), a))
class ParserState(Enum):
Break = "break"
Receiver = "receiver"
Ips = "ip"
class Status(Enum):
Continue = "continue"
Error = "error"
RedoWithFlags = "redo_with_flags"
Launched = "launched"
class BlastDriver:
parsing: ParserState | None = None
last_write: str | None = None
def __init__(self, blast_flags: list[str] = []):
self.ranked_ips = get_ranked_ip_addrs()
self.blast = subprocess.Popen(
["blast", "-source", "blast.monitor"] + blast_flags,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self.blast_flags = list(blast_flags)
self.receiver_names = []
self.ips = []
def writeline(self, line: str) -> None:
logger.debug("[send] %s", line)
self.blast.stdin.write(f"{line}\n".encode())
self.blast.stdin.flush()
self.last_write = line
def readline(self) -> str:
line = self.blast.stdout.readline().decode('utf-8').strip()
line = line.replace('\x1b[1A\x1b[K', '') #< escape codes
logger.debug("[recv] %r", line)
return line
def set_state(self, state: ParserState):
logger.debug("[pars] %s", state)
self.parsing = state
def feedline(self, line: str) -> (Status, str|None):
"""
apply a line from blast's stdout to modify parser state.
returns a status code (e.g. Status.Continue), and optionally a reply to send back to blast.
"""
if line == "Loading...":
return Status.Continue, None
elif line == "----------":
self.set_state(ParserState.Break)
return Status.Continue, None
elif line == "DLNA receivers":
self.set_state(ParserState.Receiver)
return Status.Continue, None
elif line == "Your LAN ip addresses":
self.set_state(ParserState.Ips)
return Status.Continue, None
elif line == "Select the DLNA device:":
assert len(self.receiver_names) == 1, self.receiver_names
name = self.receiver_names[0]
if name in DEVICE_MAP and DEVICE_MAP[name] != self.blast_flags:
return Status.RedoWithFlags, None
return Status.Continue, "0"
elif line == "Select the lan IP address for the stream:":
for r in self.ranked_ips:
if r in self.ips:
return Status.Launched, str(self.ips.index(r))
# fallback: just guess the best IP
return Status.Launched, "0"
elif self.parsing == ParserState.Receiver:
id_, name = line.split(": ")
assert id_ == str(len(self.receiver_names)), (id_, self.receiver_names)
self.receiver_names.append(name)
return Status.Continue, None
elif self.parsing == ParserState.Ips:
id_, ip = line.split(": ")
assert id_ == str(len(self.ips)), (id_, self.ips)
self.ips.append(ip)
return Status.Continue, None
elif line == f"[{self.last_write}]":
# it's echoing to us what we wrote
return Status.Continue, None
# elif line == "":
# return Status.Continue, None
else:
logger.info("unrecognized output (state=%s): %r", self.parsing, line)
return Status.Error, None
def step(self) -> Status:
"""
advance the interaction between us and blast.
reads a line from blast, modifies internal state, maybe sends a reply.
could block indefinitely.
"""
line = self.readline()
status, reply = self.feedline(line)
if reply is not None:
self.writeline(reply)
return status
def try_blast(*args, **kwargs) -> BlastDriver | None:
blast = BlastDriver(*args, **kwargs)
status = Status.Continue
while status == Status.Continue:
status = blast.step()
if status == Status.RedoWithFlags:
dev = blast.receiver_names[0]
blast_flags = DEVICE_MAP[dev]
logger.info("re-exec blast for %s with flags: %r", dev, blast_flags)
blast.blast.terminate()
return try_blast(blast_flags=blast_flags)
elif status == Status.Error:
logger.info("blast error => terminating")
blast.blast.terminate()
else:
# successfully launched
return blast
def main():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
blast = try_blast()
if blast is not None:
logger.info("waiting until blast exits")
blast.blast.wait()
if __name__ == "__main__":
main()