From 3e8ad5b8993df0761e665bbaf2a9ad67849bc44f Mon Sep 17 00:00:00 2001 From: Colin Date: Sun, 22 Oct 2023 06:11:49 +0000 Subject: [PATCH] ntfy: implement a wrapper which converts ntfy subscriptions into a more specific wakeup signal --- hosts/by-name/servo/services/ntfy/default.nix | 2 +- hosts/by-name/servo/services/ntfy/ntfy-waiter | 126 ++++++++++++++++++ .../servo/services/ntfy/ntfy-waiter.nix | 59 ++++++++ 3 files changed, 186 insertions(+), 1 deletion(-) create mode 100755 hosts/by-name/servo/services/ntfy/ntfy-waiter create mode 100644 hosts/by-name/servo/services/ntfy/ntfy-waiter.nix diff --git a/hosts/by-name/servo/services/ntfy/default.nix b/hosts/by-name/servo/services/ntfy/default.nix index be2a468a..4843f780 100644 --- a/hosts/by-name/servo/services/ntfy/default.nix +++ b/hosts/by-name/servo/services/ntfy/default.nix @@ -3,6 +3,7 @@ { config, ... }: { imports = [ + ./ntfy-waiter.nix ./ntfy-sh.nix ]; sops.secrets."ntfy-sh-topic" = { @@ -10,5 +11,4 @@ owner = config.users.users.ntfy-sh.name; group = config.users.users.ntfy-sh.name; }; - } diff --git a/hosts/by-name/servo/services/ntfy/ntfy-waiter b/hosts/by-name/servo/services/ntfy/ntfy-waiter new file mode 100755 index 00000000..85b8a30c --- /dev/null +++ b/hosts/by-name/servo/services/ntfy/ntfy-waiter @@ -0,0 +1,126 @@ +#!/usr/bin/env nix-shell +#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p ntfy-sh + +import argparse +import logging +import socket +import subprocess +import sys +import threading +import time + +logger = logging.getLogger(__name__) + +LISTEN_QUEUE = 3 +WAKE_MESSAGE = b'notification\n' + +class Client: + def __init__(self, sock, addr_info, live_after: float): + self.live_after = live_after + self.sock = sock + self.addr_info = addr_info + logger.info(f"accepted conn from {addr_info}") + + def __cmp__(self, other: 'Client'): + return cmp(self.addr_info, other.addr_info) + + def maybe_notify(self, message: bytes) -> bool: + """ + returns false if we wanted to notify the client but failed (e.g. socket died). + true otherwise. + """ + if time.time() < self.live_after: + logger.debug(f"not notifying client because it's not ready: {self.addr_info}") + return True + + try: + self.sock.sendall(message) + except Exception as e: + logger.warning(f"failed to notify client {self.addr_info} {e}") + return False + else: + logger.info(f"successfully notified {self.addr_info}: {message}") + return True + +class Adapter: + def __init__(self, host: str, port: int, silence: int, topic: str): + self.host = host + self.port = port + self.silence = silence + self.topic = topic + self.clients = set() + + def listener_loop(self): + logger.info(f"listening for connections on {self.host}:{self.port}") + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind((self.host, self.port)) + s.listen(LISTEN_QUEUE) + while True: + conn, addr_info = s.accept() + self.clients.add(Client(conn, addr_info, live_after = time.time() + self.silence)) + + def notify_clients(self, message: bytes = WAKE_MESSAGE): + # notify every client, and drop any which have disconnected + # XXX: paranoia about me not knowing Python's guarantees for mutation during set iteration + clients = set(self.clients) + + dead_clients = [ + c for c in clients if not c.maybe_notify(message) + ] + for c in dead_clients: + self.clients.remove(c) + + def notify_loop(self): + logger.info("waiting for notification events") + ntfy_proc = subprocess.Popen( + [ + "ntfy", + "sub", + f"https://ntfy.uninsane.org/{self.topic}" + ], + stdout=subprocess.PIPE + ) + for line in iter(ntfy_proc.stdout.readline, b''): + logger.debug(f"received notification: {line}") + self.notify_clients() + +def get_topic() -> str: + return open('/run/secrets/ntfy-sh-topic', 'rt').read().strip() + +def run_forever(callable): + callable() + logger.error(f"{callable} unexpectedly returned") + sys.exit(1) + +def main(): + logging.basicConfig() + logging.getLogger().setLevel(logging.DEBUG) + + parser = argparse.ArgumentParser(description="accept connections and notify the other end upon ntfy activity, with a guaranteed amount of silence") + parser.add_argument('--verbose', action='store_true') + parser.add_argument('--host', type=str, default='') + parser.add_argument('--port', type=int) + parser.add_argument('--silence', type=int, help="number of seconds to remain silent upon accepting a connection") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + else: + logging.getLogger().setLevel(logging.INFO) + + adapter = Adapter(args.host, args.port, args.silence, get_topic()) + + listener_loop = threading.Thread(target=run_forever, name="listener_loop", args=(adapter.listener_loop,)) + notify_loop = threading.Thread(target=run_forever, name="notify_loop", args=(adapter.notify_loop,)) + + # TODO: this method of exiting seems to leave the listener behind, + # preventing anyone else from re-binding the port. + listener_loop.start() + notify_loop.start() + listener_loop.join() + notify_loop.join() + + +if __name__ == '__main__': + main() diff --git a/hosts/by-name/servo/services/ntfy/ntfy-waiter.nix b/hosts/by-name/servo/services/ntfy/ntfy-waiter.nix new file mode 100644 index 00000000..7ed9858e --- /dev/null +++ b/hosts/by-name/servo/services/ntfy/ntfy-waiter.nix @@ -0,0 +1,59 @@ +# service which adapts ntfy-sh into something suitable specifically for the Pinephone's +# wake-on-lan (WoL) feature. +# notably, it provides a mechanism by which the caller can be confident of an interval in which +# zero traffic will occur on the TCP connection, thus allowing it to enter sleep w/o fear of hitting +# race conditions in the Pinephone WoL feature. +{ config, lib, pkgs, ... }: +let + cfg = config.sane.ntfy-waiter; + portLow = 5550; + portHigh = 5559; + portRange = lib.range portLow portHigh; + numPorts = portHigh - portLow + 1; + mkService = port: let + silence = port - portLow; + in { + "ntfy-waiter-${builtins.toString silence}" = { + # TODO: run not as root (e.g. as ntfy-sh) + description = "wait for notification, with ${builtins.toString silence} seconds of guaranteed silence"; + serviceConfig = { + Type = "simple"; + Restart = "always"; + ExecStart = "${cfg.package}/bin/ntfy-waiter --port ${builtins.toString port} --silence ${builtins.toString silence}"; + }; + after = [ "network.target" ]; + wantedBy = [ "network.target" ]; + }; + }; +in +{ + options = with lib; { + sane.ntfy-waiter.enable = mkOption { + type = types.bool; + default = true; + }; + sane.ntfy-waiter.package = mkOption { + type = types.package; + default = pkgs.static-nix-shell.mkPython3Bin { + pname = "ntfy-waiter"; + src = ./.; + pkgs = [ "ntfy-sh" ]; + }; + description = '' + exposed to provide an attr-path by which one may build the package for manual testing. + ''; + }; + }; + + config = lib.mkIf cfg.enable { + sane.ports.ports = lib.mkMerge (lib.forEach portRange (port: { + "${builtins.toString port}" = { + protocol = [ "tcp" ]; + visibleTo.lan = true; + visibleTo.wan = true; + description = "colin-notification-waiter-${builtins.toString (port+1)}-of-${builtins.toString numPorts}"; + }; + })); + systemd.services = lib.mkMerge (builtins.map mkService portRange); + }; +}