164 lines
5.4 KiB
Python
Executable File
164 lines
5.4 KiB
Python
Executable File
#!/usr/bin/env nix-shell
|
|
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p blast-ugjka
|
|
# vim: set filetype=python :
|
|
|
|
import logging
|
|
import socket
|
|
import subprocess
|
|
|
|
from enum import Enum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# map from known devices -> required flags
|
|
DEVICE_MAP = {
|
|
"Theater TV": [],
|
|
"Cuddlevision": [ "-usewav" ],
|
|
}
|
|
|
|
def get_ranked_ip_addrs():
|
|
"""
|
|
return the IP addresses most likely to be LAN addresses
|
|
based on: <https://stackoverflow.com/a/1267524>
|
|
"""
|
|
_name, _aliases, static_addrs = socket.gethostbyname_ex(socket.gethostname())
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("1", 53))
|
|
con_addr, _port = s.getsockname()
|
|
return sorted(set(static_addrs + [ con_addr ]), key=lambda a: (a.startswith("127"), a))
|
|
|
|
|
|
class ParserState(Enum):
|
|
Break = "break"
|
|
Receiver = "receiver"
|
|
Ips = "ip"
|
|
|
|
class Status(Enum):
|
|
Continue = "continue"
|
|
Error = "error"
|
|
RedoWithFlags = "redo_with_flags"
|
|
Launched = "launched"
|
|
|
|
class BlastDriver:
|
|
parsing: ParserState | None = None
|
|
last_write: str | None = None
|
|
def __init__(self, blast_flags: list[str] = []):
|
|
self.ranked_ips = get_ranked_ip_addrs()
|
|
self.blast = subprocess.Popen(
|
|
["blast", "-source", "blast.monitor"] + blast_flags,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
self.blast_flags = list(blast_flags)
|
|
self.receiver_names = []
|
|
self.ips = []
|
|
|
|
def writeline(self, line: str) -> None:
|
|
logger.debug("[send] %s", line)
|
|
self.blast.stdin.write(f"{line}\n".encode())
|
|
self.blast.stdin.flush()
|
|
self.last_write = line
|
|
|
|
def readline(self) -> str:
|
|
line = self.blast.stdout.readline().decode('utf-8').strip()
|
|
line = line.replace('\x1b[1A\x1b[K', '') #< escape codes
|
|
logger.debug("[recv] %r", line)
|
|
return line
|
|
|
|
def set_state(self, state: ParserState):
|
|
logger.debug("[pars] %s", state)
|
|
self.parsing = state
|
|
|
|
def feedline(self, line: str) -> (Status, str|None):
|
|
"""
|
|
apply a line from blast's stdout to modify parser state.
|
|
returns a status code (e.g. Status.Continue), and optionally a reply to send back to blast.
|
|
"""
|
|
if line == "Loading...":
|
|
return Status.Continue, None
|
|
elif line == "----------":
|
|
self.set_state(ParserState.Break)
|
|
return Status.Continue, None
|
|
elif line == "DLNA receivers":
|
|
self.set_state(ParserState.Receiver)
|
|
return Status.Continue, None
|
|
elif line == "Your LAN ip addresses":
|
|
self.set_state(ParserState.Ips)
|
|
return Status.Continue, None
|
|
elif line == "Select the DLNA device:":
|
|
assert len(self.receiver_names) == 1, self.receiver_names
|
|
name = self.receiver_names[0]
|
|
if name in DEVICE_MAP and DEVICE_MAP[name] != self.blast_flags:
|
|
return Status.RedoWithFlags, None
|
|
return Status.Continue, "0"
|
|
elif line == "Select the lan IP address for the stream:":
|
|
for r in self.ranked_ips:
|
|
if r in self.ips:
|
|
return Status.Launched, str(self.ips.index(r))
|
|
# fallback: just guess the best IP
|
|
return Status.Launched, "0"
|
|
elif self.parsing == ParserState.Receiver:
|
|
id_, name = line.split(": ")
|
|
assert id_ == str(len(self.receiver_names)), (id_, self.receiver_names)
|
|
self.receiver_names.append(name)
|
|
return Status.Continue, None
|
|
elif self.parsing == ParserState.Ips:
|
|
id_, ip = line.split(": ")
|
|
assert id_ == str(len(self.ips)), (id_, self.ips)
|
|
self.ips.append(ip)
|
|
return Status.Continue, None
|
|
elif line == f"[{self.last_write}]":
|
|
# it's echoing to us what we wrote
|
|
return Status.Continue, None
|
|
# elif line == "":
|
|
# return Status.Continue, None
|
|
else:
|
|
logger.info("unrecognized output (state=%s): %r", self.parsing, line)
|
|
return Status.Error, None
|
|
|
|
def step(self) -> Status:
|
|
"""
|
|
advance the interaction between us and blast.
|
|
reads a line from blast, modifies internal state, maybe sends a reply.
|
|
could block indefinitely.
|
|
"""
|
|
line = self.readline()
|
|
status, reply = self.feedline(line)
|
|
if reply is not None:
|
|
self.writeline(reply)
|
|
return status
|
|
|
|
def try_blast(*args, **kwargs) -> BlastDriver | None:
|
|
blast = BlastDriver(*args, **kwargs)
|
|
status = Status.Continue
|
|
while status == Status.Continue:
|
|
status = blast.step()
|
|
|
|
if status == Status.RedoWithFlags:
|
|
dev = blast.receiver_names[0]
|
|
blast_flags = DEVICE_MAP[dev]
|
|
logger.info("re-exec blast for %s with flags: %r", dev, blast_flags)
|
|
blast.blast.terminate()
|
|
return try_blast(blast_flags=blast_flags)
|
|
elif status == Status.Error:
|
|
logger.info("blast error => terminating")
|
|
blast.blast.terminate()
|
|
else:
|
|
# successfully launched
|
|
return blast
|
|
|
|
|
|
def main():
|
|
logging.basicConfig()
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
blast = try_blast()
|
|
|
|
if blast is not None:
|
|
logger.info("waiting until blast exits")
|
|
blast.blast.wait()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|