wob-audio: fix, by finishing the port to pipewire
also rewrote it in Python because bash can't do floating point math
This commit is contained in:
@@ -8,10 +8,10 @@
|
|||||||
#
|
#
|
||||||
{ config, lib, pkgs, ... }:
|
{ config, lib, pkgs, ... }:
|
||||||
let
|
let
|
||||||
wob-audio = pkgs.static-nix-shell.mkBash {
|
wob-audio = pkgs.static-nix-shell.mkPython3Bin {
|
||||||
pname = "wob-audio";
|
pname = "wob-audio";
|
||||||
srcRoot = ./.;
|
srcRoot = ./.;
|
||||||
pkgs = [ "coreutils" "gnugrep" "gnused" "wireplumber" ];
|
pkgs = [ "wireplumber" ];
|
||||||
};
|
};
|
||||||
cfg = config.sane.programs.wob;
|
cfg = config.sane.programs.wob;
|
||||||
in
|
in
|
||||||
@@ -98,9 +98,6 @@ in
|
|||||||
];
|
];
|
||||||
|
|
||||||
suggestedPrograms = [
|
suggestedPrograms = [
|
||||||
# "coreutils"
|
|
||||||
"gnugrep"
|
|
||||||
"gnused"
|
|
||||||
"wireplumber" #< TODO: replace with just the one binary we need.
|
"wireplumber" #< TODO: replace with just the one binary we need.
|
||||||
];
|
];
|
||||||
|
|
||||||
|
@@ -1,49 +1,97 @@
|
|||||||
#!/usr/bin/env nix-shell
|
#!/usr/bin/env nix-shell
|
||||||
#!nix-shell -i bash -p coreutils -p gnugrep -p gnused -p wireplumber
|
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p wireplumber
|
||||||
|
# vim: set filetype=python :
|
||||||
|
|
||||||
set -e
|
import logging
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
|
||||||
debug() {
|
logger = logging.getLogger(__name__)
|
||||||
printf "$@" >&2
|
|
||||||
printf "\n" >&2
|
|
||||||
}
|
|
||||||
verbose() {
|
|
||||||
if [ "$WOB_VERBOSE" = "1" ]; then
|
|
||||||
debug "$@"
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
volget() {
|
class PwMonConsumer:
|
||||||
# returns audio volume, as an integer (0 - 100)
|
last_volume: float | None = None
|
||||||
volLine="$(wpctl get-volume @DEFAULT_AUDIO_SINK@)"
|
last_mute: bool | None = None
|
||||||
if echo "$volLine" | grep -q '\[MUTED\]'; then
|
last_effective_volume: float | None = None
|
||||||
verbose "muted"
|
|
||||||
echo "0"
|
|
||||||
else
|
|
||||||
vol="$(echo "$vol" | cut -d' ' -f2)"
|
|
||||||
echo "$(( $vol * 100 ))"
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
notify_volume_change() {
|
# parser state:
|
||||||
verbose "notify_volume_change"
|
in_changed: bool = False
|
||||||
vol="$(volget)"
|
in_node: bool = False
|
||||||
verbose "got volume: %d -> %d" "$lastvol" "$vol"
|
in_vol: bool = False
|
||||||
if [ "$vol" != "$lastvol" ]; then
|
in_mute: bool = False
|
||||||
debug "notify wob: %d -> %d" "$lastvol" "$vol"
|
|
||||||
printf "%s\n" "$vol" > "$XDG_RUNTIME_DIR/$WOBSOCK_NAME"
|
|
||||||
fi
|
|
||||||
lastvol="$vol"
|
|
||||||
}
|
|
||||||
|
|
||||||
pactl subscribe | while read -r line; do
|
def feed_line(self, line: str) -> float | None:
|
||||||
verbose "pactl says: %s" "$line"
|
""" consume a line and *maybe* return the volume """
|
||||||
case "$line" in
|
|
||||||
"Event 'change' on sink "*)
|
logger.debug("got pw-mon line: %s", line.rstrip())
|
||||||
notify_volume_change
|
line = line.strip()
|
||||||
;;
|
|
||||||
"Event 'change' on source "*)
|
if line.startswith("changed:"):
|
||||||
# microphone volume changed. ignore.
|
self.in_changed = True
|
||||||
;;
|
elif line.startswith("added:") or line.startswith("removed:"):
|
||||||
esac
|
self.in_changed = False
|
||||||
done
|
elif line.startswith("type: "):
|
||||||
|
self.in_node = line.startswith("type: PipeWire:Interface:Node")
|
||||||
|
logger.debug("parsed `type:` %s %d", line, self.in_node)
|
||||||
|
elif line.startswith("Prop: "):
|
||||||
|
# which of the *Volumes params we read is unclear.
|
||||||
|
# alternative to this is to just detect the change, and then cal wpctl get-volume @DEFAULT_AUDIO_SINK@
|
||||||
|
self.in_vol = line.startswith("Prop: key Spa:Pod:Object:Param:Props:channelVolumes")
|
||||||
|
self.in_mute = line.startswith("Prop: key Spa:Pod:Object:Param:Props:softMute")
|
||||||
|
logger.debug("parsed `Prop:` %s %d", line, self.in_vol)
|
||||||
|
elif line.startswith("Float ") and self.in_changed and self.in_node and self.in_vol:
|
||||||
|
value = float(line[len("Float "):])
|
||||||
|
self.feed_volume(value)
|
||||||
|
elif line.startswith("Bool ") and self.in_changed and self.in_node and self.in_mute:
|
||||||
|
value = line[len("Bool "):] == "true"
|
||||||
|
self.feed_mute(value)
|
||||||
|
|
||||||
|
def feed_volume(self, new: float) -> None:
|
||||||
|
logger.debug("feed volume: %f -> %f", self.last_volume or 0.0, new)
|
||||||
|
self.last_volume = new
|
||||||
|
self.check_effective_volume()
|
||||||
|
|
||||||
|
def feed_mute(self, new: bool) -> None:
|
||||||
|
logger.debug("feed mute: %d -> %d", self.last_mute or False, new)
|
||||||
|
self.last_mute = new
|
||||||
|
self.check_effective_volume()
|
||||||
|
|
||||||
|
def check_effective_volume(self) -> None:
|
||||||
|
eff_volume = 0.0 if self.last_mute else self.last_volume
|
||||||
|
if eff_volume != self.last_effective_volume:
|
||||||
|
logger.info("new effective volume: %f", eff_volume)
|
||||||
|
self.on_new_volume(eff_volume)
|
||||||
|
self.last_effective_volume = eff_volume
|
||||||
|
|
||||||
|
|
||||||
|
class PwMonWobConsumer(PwMonConsumer):
|
||||||
|
def __init__(self):
|
||||||
|
self.sock_name = os.path.join(
|
||||||
|
os.environ.get("XDG_RUNTIME_DIR", "/run"),
|
||||||
|
os.environ.get("WOBSOCK_NAME", "wob.sock"),
|
||||||
|
)
|
||||||
|
self.wob_sock = open(self.sock_name, "w")
|
||||||
|
|
||||||
|
def on_new_volume(self, vol: float) -> None:
|
||||||
|
# pipewire volume is between 0 and 3.375.
|
||||||
|
# wob is between 0 - 1, or 1 - 2 if overdriven.
|
||||||
|
# idk where vol ** (1/3) comes from, but it precisely matches what wpctl shows,
|
||||||
|
# getting the range to 0.00 - 1.50 with precise 0.05 increments.
|
||||||
|
int_vol = int(round(vol ** 0.3333 * 100))
|
||||||
|
logger.info("writing to %s: %d", self.sock_name, int_vol)
|
||||||
|
self.wob_sock.write(f"{int_vol}\n")
|
||||||
|
self.wob_sock.flush()
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
logging.basicConfig()
|
||||||
|
logging.getLogger().setLevel(logging.INFO)
|
||||||
|
if os.environ.get("WOB_VERBOSE", "") == "1":
|
||||||
|
logging.getLogger().setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
consumer = PwMonWobConsumer()
|
||||||
|
proc = subprocess.Popen(["pw-mon"], stdout=subprocess.PIPE)
|
||||||
|
for line in iter(proc.stdout.readline, b''):
|
||||||
|
consumer.feed_line(line.decode("utf-8"))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
Reference in New Issue
Block a user