152 lines
5.2 KiB
Plaintext
Executable File
152 lines
5.2 KiB
Plaintext
Executable File
#!/usr/bin/env nix-shell
|
|
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p ntfy-sh
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
LISTEN_QUEUE = 3
|
|
WAKE_MESSAGE = b'notification\n'
|
|
|
|
class Client:
|
|
def __init__(self, sock, addr_info, live_after: float):
|
|
self.live_after = live_after
|
|
self.sock = sock
|
|
self.addr_info = addr_info
|
|
|
|
def __cmp__(self, other: 'Client'):
|
|
return cmp(self.addr_info, other.addr_info)
|
|
|
|
def try_notify(self, message: bytes) -> bool:
|
|
"""
|
|
returns true if we send a packet to notify client.
|
|
fals otherwise (e.g. the socket is dead).
|
|
"""
|
|
ttl = self.live_after - time.time()
|
|
if ttl > 0:
|
|
logger.debug(f"sleeping {ttl:.2f}s until client {self.addr_info} is ready to receive notification")
|
|
time.sleep(ttl)
|
|
|
|
try:
|
|
self.sock.sendall(message)
|
|
except Exception as e:
|
|
logger.warning(f"failed to notify client {self.addr_info} {e}")
|
|
return False
|
|
else:
|
|
logger.info(f"successfully notified {self.addr_info}: {message}")
|
|
return True
|
|
|
|
class Adapter:
|
|
def __init__(self, host: str, port: int, silence: int, topic: str):
|
|
self.host = host
|
|
self.port = port
|
|
self.silence = silence
|
|
self.topic = topic
|
|
self.clients = set()
|
|
|
|
def log_clients(self):
|
|
clients_str = '\n'.join(f' {c.addr_info}' for c in self.clients)
|
|
logger.debug(f"clients alive ({len(self.clients)}):\n{clients_str}")
|
|
|
|
def add_client(self, client: Client):
|
|
# it's a little bit risky to keep more than one client at the same IP address,
|
|
# because it's possible a notification comes in and we ring the old connection,
|
|
# even when the new connection says "don't ring yet".
|
|
for c in set(self.clients):
|
|
if c.addr_info[0] == client.addr_info[0]:
|
|
logger.info(f"purging old client before adding new one at same address: {c.addr_info} -> {client.addr_info}")
|
|
self.clients.remove(c)
|
|
|
|
logger.info(f"accepted client at {client.addr_info}")
|
|
self.clients.add(client)
|
|
|
|
def listener_loop(self):
|
|
logger.info(f"listening for connections on {self.host}:{self.port}")
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.bind((self.host, self.port))
|
|
s.listen(LISTEN_QUEUE)
|
|
while True:
|
|
conn, addr_info = s.accept()
|
|
self.add_client(Client(conn, addr_info, live_after = time.time() + self.silence))
|
|
|
|
def notify_clients(self, message: bytes = WAKE_MESSAGE):
|
|
# notify every client, and drop any which have disconnected.
|
|
# note that we notify based on age (oldest -> youngest)
|
|
# because notifying young clients might entail sleeping until they're ready.
|
|
clients = sorted(self.clients, key=lambda c: (c.live_after, c.addr_info))
|
|
|
|
dead_clients = [
|
|
c for c in clients if not c.try_notify(message)
|
|
]
|
|
for c in dead_clients:
|
|
self.clients.remove(c)
|
|
|
|
self.log_clients()
|
|
|
|
def notify_loop(self):
|
|
logger.info("waiting for notification events")
|
|
ntfy_proc = subprocess.Popen(
|
|
[
|
|
"ntfy",
|
|
"sub",
|
|
f"https://ntfy.uninsane.org/{self.topic}"
|
|
],
|
|
stdout=subprocess.PIPE
|
|
)
|
|
for line in iter(ntfy_proc.stdout.readline, b''):
|
|
logger.debug(f"received notification: {line}")
|
|
self.notify_clients()
|
|
|
|
def get_topic() -> str:
|
|
return open('/run/secrets/ntfy-sh-topic', 'rt').read().strip()
|
|
|
|
def run_forever(callable):
|
|
try:
|
|
callable()
|
|
except Exception as e:
|
|
logger.error(f"{callable} failed: {e}")
|
|
else:
|
|
logger.error(f"{callable} unexpectedly returned")
|
|
# sys.exit(1)
|
|
os._exit(1) # sometimes `sys.exit()` doesn't actually exit...
|
|
|
|
def main():
|
|
logging.basicConfig()
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
parser = argparse.ArgumentParser(description="accept connections and notify the other end upon ntfy activity, with a guaranteed amount of silence")
|
|
parser.add_argument('--verbose', action='store_true')
|
|
parser.add_argument('--host', type=str, default='')
|
|
parser.add_argument('--port', type=int)
|
|
parser.add_argument('--silence', type=int, help="number of seconds to remain silent upon accepting a connection")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
else:
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
adapter = Adapter(args.host, args.port, args.silence, get_topic())
|
|
|
|
listener_loop = threading.Thread(target=run_forever, name="listener_loop", args=(adapter.listener_loop,))
|
|
notify_loop = threading.Thread(target=run_forever, name="notify_loop", args=(adapter.notify_loop,))
|
|
|
|
# TODO: this method of exiting seems to sometimes leave the listener behind (?)
|
|
# preventing anyone else from re-binding the port.
|
|
listener_loop.start()
|
|
notify_loop.start()
|
|
listener_loop.join()
|
|
notify_loop.join()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|