#!/usr/bin/env nix-shell #!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p ntfy-sh import argparse import logging import os import socket import subprocess import sys import threading import time logger = logging.getLogger(__name__) LISTEN_QUEUE = 3 WAKE_MESSAGE = b'notification\n' class Client: def __init__(self, sock, addr_info, live_after: float): self.live_after = live_after self.sock = sock self.addr_info = addr_info def __cmp__(self, other: 'Client'): return cmp(self.addr_info, other.addr_info) def try_notify(self, message: bytes) -> bool: """ returns true if we send a packet to notify client. fals otherwise (e.g. the socket is dead). """ ttl = self.live_after - time.time() if ttl > 0: logger.debug(f"sleeping {ttl:.2f}s until client {self.addr_info} is ready to receive notification") time.sleep(ttl) try: self.sock.sendall(message) except Exception as e: logger.warning(f"failed to notify client {self.addr_info} {e}") return False else: logger.info(f"successfully notified {self.addr_info}: {message}") return True class Adapter: def __init__(self, host: str, port: int, silence: int, topic: str): self.host = host self.port = port self.silence = silence self.topic = topic self.clients = set() def log_clients(self): clients_str = '\n'.join(f' {c.addr_info}' for c in self.clients) logger.debug(f"clients alive ({len(self.clients)}):\n{clients_str}") def add_client(self, client: Client): # it's a little bit risky to keep more than one client at the same IP address, # because it's possible a notification comes in and we ring the old connection, # even when the new connection says "don't ring yet". for c in set(self.clients): if c.addr_info[0] == client.addr_info[0]: logger.info(f"purging old client before adding new one at same address: {c.addr_info} -> {client.addr_info}") self.clients.remove(c) logger.info(f"accepted client at {client.addr_info}") self.clients.add(client) def listener_loop(self): logger.info(f"listening for connections on {self.host}:{self.port}") s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((self.host, self.port)) s.listen(LISTEN_QUEUE) while True: conn, addr_info = s.accept() self.add_client(Client(conn, addr_info, live_after = time.time() + self.silence)) def notify_clients(self, message: bytes = WAKE_MESSAGE): # notify every client, and drop any which have disconnected. # note that we notify based on age (oldest -> youngest) # because notifying young clients might entail sleeping until they're ready. clients = sorted(self.clients, key=lambda c: (c.live_after, c.addr_info)) dead_clients = [ c for c in clients if not c.try_notify(message) ] for c in dead_clients: self.clients.remove(c) self.log_clients() def notify_loop(self): logger.info("waiting for notification events") ntfy_proc = subprocess.Popen( [ "ntfy", "sub", f"https://ntfy.uninsane.org/{self.topic}" ], stdout=subprocess.PIPE ) for line in iter(ntfy_proc.stdout.readline, b''): logger.debug(f"received notification: {line}") self.notify_clients() def get_topic() -> str: return open('/run/secrets/ntfy-sh-topic', 'rt').read().strip() def run_forever(callable): try: callable() except Exception as e: logger.error(f"{callable} failed: {e}") else: logger.error(f"{callable} unexpectedly returned") # sys.exit(1) os._exit(1) # sometimes `sys.exit()` doesn't actually exit... def main(): logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) parser = argparse.ArgumentParser(description="accept connections and notify the other end upon ntfy activity, with a guaranteed amount of silence") parser.add_argument('--verbose', action='store_true') parser.add_argument('--host', type=str, default='') parser.add_argument('--port', type=int) parser.add_argument('--silence', type=int, help="number of seconds to remain silent upon accepting a connection") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) else: logging.getLogger().setLevel(logging.INFO) adapter = Adapter(args.host, args.port, args.silence, get_topic()) listener_loop = threading.Thread(target=run_forever, name="listener_loop", args=(adapter.listener_loop,)) notify_loop = threading.Thread(target=run_forever, name="notify_loop", args=(adapter.notify_loop,)) # TODO: this method of exiting seems to sometimes leave the listener behind (?) # preventing anyone else from re-binding the port. listener_loop.start() notify_loop.start() listener_loop.join() notify_loop.join() if __name__ == '__main__': main()