#!/usr/bin/env nix-shell #!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p ntfy-sh import argparse import logging import socket import subprocess import sys import threading import time logger = logging.getLogger(__name__) LISTEN_QUEUE = 3 WAKE_MESSAGE = b'notification\n' class Client: def __init__(self, sock, addr_info, live_after: float): self.live_after = live_after self.sock = sock self.addr_info = addr_info logger.info(f"accepted conn from {addr_info}") def __cmp__(self, other: 'Client'): return cmp(self.addr_info, other.addr_info) def maybe_notify(self, message: bytes) -> bool: """ returns false if we wanted to notify the client but failed (e.g. socket died). true otherwise. """ if time.time() < self.live_after: logger.debug(f"not notifying client because it's not ready: {self.addr_info}") return True try: self.sock.sendall(message) except Exception as e: logger.warning(f"failed to notify client {self.addr_info} {e}") return False else: logger.info(f"successfully notified {self.addr_info}: {message}") return True class Adapter: def __init__(self, host: str, port: int, silence: int, topic: str): self.host = host self.port = port self.silence = silence self.topic = topic self.clients = set() def listener_loop(self): logger.info(f"listening for connections on {self.host}:{self.port}") s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((self.host, self.port)) s.listen(LISTEN_QUEUE) while True: conn, addr_info = s.accept() self.clients.add(Client(conn, addr_info, live_after = time.time() + self.silence)) def notify_clients(self, message: bytes = WAKE_MESSAGE): # notify every client, and drop any which have disconnected # XXX: paranoia about me not knowing Python's guarantees for mutation during set iteration clients = set(self.clients) dead_clients = [ c for c in clients if not c.maybe_notify(message) ] for c in dead_clients: self.clients.remove(c) def notify_loop(self): logger.info("waiting for notification events") ntfy_proc = subprocess.Popen( [ "ntfy", "sub", f"https://ntfy.uninsane.org/{self.topic}" ], stdout=subprocess.PIPE ) for line in iter(ntfy_proc.stdout.readline, b''): logger.debug(f"received notification: {line}") self.notify_clients() def get_topic() -> str: return open('/run/secrets/ntfy-sh-topic', 'rt').read().strip() def run_forever(callable): callable() logger.error(f"{callable} unexpectedly returned") sys.exit(1) def main(): logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) parser = argparse.ArgumentParser(description="accept connections and notify the other end upon ntfy activity, with a guaranteed amount of silence") parser.add_argument('--verbose', action='store_true') parser.add_argument('--host', type=str, default='') parser.add_argument('--port', type=int) parser.add_argument('--silence', type=int, help="number of seconds to remain silent upon accepting a connection") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) else: logging.getLogger().setLevel(logging.INFO) adapter = Adapter(args.host, args.port, args.silence, get_topic()) listener_loop = threading.Thread(target=run_forever, name="listener_loop", args=(adapter.listener_loop,)) notify_loop = threading.Thread(target=run_forever, name="notify_loop", args=(adapter.notify_loop,)) # TODO: this method of exiting seems to leave the listener behind, # preventing anyone else from re-binding the port. listener_loop.start() notify_loop.start() listener_loop.join() notify_loop.join() if __name__ == '__main__': main()