nix-files/hosts/by-name/servo/services/ntfy/ntfy-waiter

127 lines
4.1 KiB
Plaintext
Executable File

#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p ntfy-sh
import argparse
import logging
import socket
import subprocess
import sys
import threading
import time
logger = logging.getLogger(__name__)
LISTEN_QUEUE = 3
WAKE_MESSAGE = b'notification\n'
class Client:
def __init__(self, sock, addr_info, live_after: float):
self.live_after = live_after
self.sock = sock
self.addr_info = addr_info
logger.info(f"accepted conn from {addr_info}")
def __cmp__(self, other: 'Client'):
return cmp(self.addr_info, other.addr_info)
def maybe_notify(self, message: bytes) -> bool:
"""
returns false if we wanted to notify the client but failed (e.g. socket died).
true otherwise.
"""
if time.time() < self.live_after:
logger.debug(f"not notifying client because it's not ready: {self.addr_info}")
return True
try:
self.sock.sendall(message)
except Exception as e:
logger.warning(f"failed to notify client {self.addr_info} {e}")
return False
else:
logger.info(f"successfully notified {self.addr_info}: {message}")
return True
class Adapter:
def __init__(self, host: str, port: int, silence: int, topic: str):
self.host = host
self.port = port
self.silence = silence
self.topic = topic
self.clients = set()
def listener_loop(self):
logger.info(f"listening for connections on {self.host}:{self.port}")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((self.host, self.port))
s.listen(LISTEN_QUEUE)
while True:
conn, addr_info = s.accept()
self.clients.add(Client(conn, addr_info, live_after = time.time() + self.silence))
def notify_clients(self, message: bytes = WAKE_MESSAGE):
# notify every client, and drop any which have disconnected
# XXX: paranoia about me not knowing Python's guarantees for mutation during set iteration
clients = set(self.clients)
dead_clients = [
c for c in clients if not c.maybe_notify(message)
]
for c in dead_clients:
self.clients.remove(c)
def notify_loop(self):
logger.info("waiting for notification events")
ntfy_proc = subprocess.Popen(
[
"ntfy",
"sub",
f"https://ntfy.uninsane.org/{self.topic}"
],
stdout=subprocess.PIPE
)
for line in iter(ntfy_proc.stdout.readline, b''):
logger.debug(f"received notification: {line}")
self.notify_clients()
def get_topic() -> str:
return open('/run/secrets/ntfy-sh-topic', 'rt').read().strip()
def run_forever(callable):
callable()
logger.error(f"{callable} unexpectedly returned")
sys.exit(1)
def main():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
parser = argparse.ArgumentParser(description="accept connections and notify the other end upon ntfy activity, with a guaranteed amount of silence")
parser.add_argument('--verbose', action='store_true')
parser.add_argument('--host', type=str, default='')
parser.add_argument('--port', type=int)
parser.add_argument('--silence', type=int, help="number of seconds to remain silent upon accepting a connection")
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
adapter = Adapter(args.host, args.port, args.silence, get_topic())
listener_loop = threading.Thread(target=run_forever, name="listener_loop", args=(adapter.listener_loop,))
notify_loop = threading.Thread(target=run_forever, name="notify_loop", args=(adapter.notify_loop,))
# TODO: this method of exiting seems to leave the listener behind,
# preventing anyone else from re-binding the port.
listener_loop.start()
notify_loop.start()
listener_loop.join()
notify_loop.join()
if __name__ == '__main__':
main()