blast-ugjka: introduce a helper blast-to-default
program
This commit is contained in:
147
hosts/common/programs/blast-ugjka/blast-to-default
Executable file
147
hosts/common/programs/blast-ugjka/blast-to-default
Executable file
@@ -0,0 +1,147 @@
|
||||
#!/usr/bin/env nix-shell
|
||||
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p blast-ugjka
|
||||
# vim: set filetype=python :
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import subprocess
|
||||
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# map from known devices -> required flags
|
||||
DEVICE_MAP = {
|
||||
"Theater TV": [],
|
||||
"[LG] webOS TV OLED55C9PUA": [ "-usewav" ],
|
||||
}
|
||||
|
||||
def get_ranked_ip_addrs():
|
||||
"""
|
||||
return the IP addresses most likely to be LAN addresses
|
||||
based on: <https://stackoverflow.com/a/1267524>
|
||||
"""
|
||||
_name, _aliases, static_addrs = socket.gethostbyname_ex(socket.gethostname())
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.connect(("1", 53))
|
||||
con_addr, _port = s.getsockname()
|
||||
return sorted(set(static_addrs + [ con_addr ]), key=lambda a: (a.startswith("127"), a))
|
||||
|
||||
|
||||
class ParserState(Enum):
|
||||
Break = "break"
|
||||
Receiver = "receiver"
|
||||
Ips = "ip"
|
||||
|
||||
class Status(Enum):
|
||||
Continue = "continue"
|
||||
Error = "error"
|
||||
RedoWithFlags = "redo_with_flags"
|
||||
|
||||
class BlastDriver:
|
||||
parsing: ParserState | None = None
|
||||
last_write: str | None = None
|
||||
def __init__(self, blast_flags: list[str] = []):
|
||||
self.ranked_ips = get_ranked_ip_addrs()
|
||||
self.blast = subprocess.Popen(["blast", "-source", "blast.monitor"] + blast_flags, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
self.blast_flags = list(blast_flags)
|
||||
self.receiver_names = []
|
||||
self.ips = []
|
||||
|
||||
def writeline(self, line: str) -> None:
|
||||
logger.debug("[send] %s", line)
|
||||
self.blast.stdin.write(f"{line}\n".encode())
|
||||
self.blast.stdin.flush()
|
||||
self.last_write = line
|
||||
|
||||
def readline(self) -> str:
|
||||
line = self.blast.stdout.readline().decode('utf-8').strip()
|
||||
line = line.replace('\x1b[1A\x1b[K', '') #< escape codes
|
||||
logger.debug("[recv] %r", line)
|
||||
return line
|
||||
|
||||
def set_state(self, state: ParserState):
|
||||
logger.debug("[pars] %s", state)
|
||||
self.parsing = state
|
||||
|
||||
def feedline(self, line: str) -> (Status, str|None):
|
||||
"""
|
||||
apply a line from blast's stdout to modify parser state.
|
||||
returns a status code (e.g. Status.Continue), and optionally a reply to send back to blast.
|
||||
"""
|
||||
if line == "Loading...":
|
||||
return Status.Continue, None
|
||||
elif line == "----------":
|
||||
self.set_state(ParserState.Break)
|
||||
return Status.Continue, None
|
||||
elif line == "DLNA receivers":
|
||||
self.set_state(ParserState.Receiver)
|
||||
return Status.Continue, None
|
||||
elif line == "Your LAN ip addresses":
|
||||
self.set_state(ParserState.Ips)
|
||||
return Status.Continue, None
|
||||
elif line == "Select the DLNA device:":
|
||||
assert len(self.receiver_names) == 1, self.receiver_names
|
||||
name = self.receiver_names[0]
|
||||
if name in DEVICE_MAP and DEVICE_MAP[name] != self.blast_flags:
|
||||
return Status.RedoWithFlags, None
|
||||
return Status.Continue, "0"
|
||||
elif line == "Select the lan IP address for the stream:":
|
||||
for r in self.ranked_ips:
|
||||
if r in self.ips:
|
||||
return Status.Continue, str(self.ips.index(r))
|
||||
# fallback: just guess the best IP
|
||||
return Status.Continue, "0"
|
||||
elif self.parsing == ParserState.Receiver:
|
||||
id_, name = line.split(": ")
|
||||
assert id_ == str(len(self.receiver_names)), (id_, self.receiver_names)
|
||||
self.receiver_names.append(name)
|
||||
return Status.Continue, None
|
||||
elif self.parsing == ParserState.Ips:
|
||||
id_, ip = line.split(": ")
|
||||
assert id_ == str(len(self.ips)), (id_, self.ips)
|
||||
self.ips.append(ip)
|
||||
return Status.Continue, None
|
||||
elif line == f"[{self.last_write}]":
|
||||
# it's echoing to us what we wrote
|
||||
return Status.Continue, None
|
||||
# elif line == "":
|
||||
# return Status.Continue, None
|
||||
else:
|
||||
logger.info("unrecognized output (state=%s): %r", self.parsing, line)
|
||||
return Status.Error, None
|
||||
|
||||
def step(self) -> Status:
|
||||
"""
|
||||
advance the interaction between us and blast.
|
||||
reads a line from blast, modifies internal state, maybe sends a reply.
|
||||
could block indefinitely.
|
||||
"""
|
||||
line = self.readline()
|
||||
status, reply = self.feedline(line)
|
||||
if reply is not None:
|
||||
self.writeline(reply)
|
||||
return status
|
||||
|
||||
def try_blast(*args, **kwargs):
|
||||
blast = BlastDriver(*args, **kwargs)
|
||||
status = Status.Continue
|
||||
while status == Status.Continue:
|
||||
status = blast.step()
|
||||
|
||||
if status == Status.RedoWithFlags:
|
||||
dev = blast.receiver_names[0]
|
||||
blast_flags = DEVICE_MAP[dev]
|
||||
logger.info("re-exec blast for %s with flags: %r", dev, blast_flags)
|
||||
blast.blast.terminate()
|
||||
try_blast(blast_flags=blast_flags)
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig()
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
try_blast()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@@ -3,9 +3,10 @@
|
||||
# - can cast the default sink, or create a new one "blast.monitor"
|
||||
# and either assign that to default or assign apps to it.
|
||||
# compatibility:
|
||||
# - `blast -usewav` is likely to give best results.
|
||||
# - there is no single invocation which will be compatible with all known devices.
|
||||
# - sony tv:
|
||||
# - `blast` (default): WORKS
|
||||
# - `-usewav`: FAILS
|
||||
# - LG TV:
|
||||
# - `-usewav`: WORKS!
|
||||
# - `-useaac`: FAILS
|
||||
@@ -17,7 +18,7 @@
|
||||
# - `-nochunked`: FAILS
|
||||
# - `-format "ogg" -mime 'audio/x-opus+ogg'`: FAILS
|
||||
# - `-mime audio/ac3 -format ac3`: FAILS
|
||||
{ config, lib, ... }:
|
||||
{ config, lib, pkgs, ... }:
|
||||
let
|
||||
cfg = config.sane.programs.blast-ugjka;
|
||||
in
|
||||
@@ -27,5 +28,19 @@ in
|
||||
sandbox.whitelistAudio = true;
|
||||
sandbox.net = "clearnet";
|
||||
};
|
||||
|
||||
sane.programs.blast-to-default = {
|
||||
# helper to deal with blast's interactive CLI
|
||||
packageUnwrapped = pkgs.static-nix-shell.mkPython3Bin {
|
||||
pname = "blast-to-default";
|
||||
pkgs = [ "blast-ugjka" ];
|
||||
srcRoot = ./.;
|
||||
};
|
||||
sandbox.method = "bwrap";
|
||||
sandbox.whitelistAudio = true;
|
||||
sandbox.net = "clearnet";
|
||||
suggestedPrograms = [ "blast-ugjka" ];
|
||||
};
|
||||
|
||||
networking.firewall.allowedTCPPorts = lib.mkIf cfg.enabled [ 9000 ];
|
||||
}
|
@@ -10,7 +10,7 @@
|
||||
./assorted.nix
|
||||
./audacity.nix
|
||||
./bemenu.nix
|
||||
./blast-ugjka.nix
|
||||
./blast-ugjka
|
||||
./bonsai.nix
|
||||
./brave.nix
|
||||
./bubblewrap.nix
|
||||
|
Reference in New Issue
Block a user