diff --git a/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/index.md b/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/index.md index 5710f72..1b56fe3 100644 --- a/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/index.md +++ b/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/index.md @@ -48,7 +48,8 @@ enter... notification servers! [ipv4-header]: https://en.wikipedia.org/wiki/Internet_Protocol_version_4#Header [tcp-header]: https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure -[wowlan-script]: TODO +[wowlan-script]: https://git.uninsane.org/colin/nix-files/src/commit/01de6f84cfc2291092a6809e61339e992f9b97b4/pkgs/additional/rtl8723cs-wowlan/rtl8723cs-wowlan +[wowlan-script-mirror]: rtl8723cs-wowlan.py ## Notification Servers @@ -220,7 +221,7 @@ anyway, put your phone to sleep, have someone send you a message that Synapse wo Prosody has [mod_cloud_notify][mod_cloud_notify] and [XEP-0357][XEP-0357], but at the time of writing, client support is wanting. easier is to hack it in server-side. -Prosody has a Lua-based module system, so we can just author our own `mod_ntfy_push`, drop it in the modules directory, and then import it: +Prosody has a Lua-based module system, so we can just author our own `mod_ntfy_push` (available [here](mod_ntfy_push) or [here](mod_ntfy_push_mirror)), drop it in the modules directory, and then import it: ```nix services.prosody.extraPluginsPath = [ ./folder/containing/mod_ntfy_push ]; @@ -228,14 +229,14 @@ services.prosody.extraModules = [ "ntfy_push" ]; services.prosody.extraConfig = '' ntfy_endpoint = "https://MY.NTFY.HOST/TEST_WOWLAN_TOPIC" ''; - -TODO: link `mod_ntfy_push.lua` here ``` i've only authored this module to alert me on jingle calls. one could presumably alert on DMs, MUC messages, or anything more particular by finding the right Prosody hooks (mod_cloud_notify may be an ok guide for that). [mod_cloud_notify]: https://modules.prosody.im/mod_cloud_notify [XEP-0357]: https://xmpp.org/extensions/xep-0357.html +[mod_ntfy_push]: https://git.uninsane.org/colin/nix-files/src/commit/01de6f84cfc2291092a6809e61339e992f9b97b4/hosts/by-name/servo/services/prosody/modules/mod_sane_ntfy.lua +[mod_ntfy_push_mirror]: mod_sane_notify.lua ### Pinephone Wowlan Race Condition @@ -250,7 +251,7 @@ there are other reasons you may want to not sleep for extended durations: NATs. in any case, longer sleep durations give diminishing returns. if you sleep for 600s, and then wake for 15s before going back to sleep, you've already captured 95% of the gains you could get from this sleep method. sleeping for an hour isn't going to get you that much more battery life. -[race-fix]: TODO +[race-fix]: https://git.uninsane.org/colin/nix-files/src/commit/01de6f84cfc2291092a6809e61339e992f9b97b4/hosts/by-name/servo/services/ntfy/ntfy-waiter [TCP-NAT]: https://datatracker.ietf.org/doc/html/rfc5382#section-5 [UDP-NAT]: https://www.rfc-editor.org/rfc/rfc4787#section-4.3 diff --git a/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/mod_sane_ntfy.lua b/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/mod_sane_ntfy.lua new file mode 100644 index 0000000..8ea3dab --- /dev/null +++ b/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/mod_sane_ntfy.lua @@ -0,0 +1,52 @@ +-- simple proof-of-concept Prosody module +-- module development guide: +-- module API docs: +-- +-- much of this code is lifted from Prosody's own `mod_cloud_notify` + +local jid = require"util.jid"; + +local ntfy = module:get_option_string("ntfy_binary", "ntfy"); +local ntfy_topic = module:get_option_string("ntfy_topic", "xmpp"); + +module:log("info", "initialized"); + +local function is_urgent(stanza) + if stanza.name == "message" then + if stanza:get_child("propose", "urn:xmpp:jingle-message:0") then + return true, "jingle call"; + end + end +end + +local function publish_ntfy(message) + -- message should be the message to publish + local ntfy_url = string.format("https://ntfy.uninsane.org/%s", ntfy_topic) + local cmd = string.format("%s pub %q %q", ntfy, ntfy_url, message) + module.log("debug", "invoking ntfy: %s", cmd) + local success, reason, code = os.execute(cmd) + if not success then + module:log("warn", "ntfy failed: %s => %s %d", cmd, reason, code) + end +end + + +local function archive_message_added(event) + -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } + local stanza = event.stanza; + local to = stanza.attr.to; + to = to and jid.split(to) or event.origin.username; + + -- only notify if the stanza destination is the mam user we store it for + if event.for_user == to then + local is_urgent_stanza, urgent_reason = is_urgent(event.stanza); + + if is_urgent_stanza then + module:log("info", "urgent push for %s (%s)", to, urgent_reason); + publish_ntfy(urgent_reason) + end + end +end + + +module:hook("archive-message-added", archive_message_added); diff --git a/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/rtl8723cs-wowlan.py b/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/rtl8723cs-wowlan.py new file mode 100755 index 0000000..4d3f15f --- /dev/null +++ b/content/blog/DRAFT-2023-11-06-mobile-linux-push-notifications/rtl8723cs-wowlan.py @@ -0,0 +1,305 @@ +#!/usr/bin/env nix-shell +#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p hostname-debian -p iw -p wirelesstools +# vim: set filetype=python : + +# common operations: +# enable-clean +# arp --dest-ip A.B.C.D +# tcp --source-port N + +import argparse +import logging +import os +import subprocess + +logger = logging.getLogger(__name__) + +def octet_to_hex(o: int) -> str: + ''' format an octet as a two-character hex string (lowercase) ''' + return '%02X' % o + +def get_ipaddrs() -> list['IpAddr']: + ''' return the IP address of all known interfaces ''' + # N.B.: --all-ip-addresses is provided only by hostname-debian package, not default hostname. + # however `hostname --all-ip-addresses` could dispatch to either, since hostname is likely on the user's PATH. + # so we have to try all the `hostname`s we can find. + # TODO: maybe try nix-level @hostname@-type substitution? + paths = os.getenv('PATH').split(':') + addrs = [] + for p in paths: + try: + addrs = subprocess.check_output([f'{p}/domainname', '--all-ip-addresses']).decode('utf-8').strip().split(' ') + except: continue + else: + break + logger.debug(f'get_ipaddrs got: {addrs}') + return [ IpAddr(a) for a in addrs ] + + +class Encodable: + @staticmethod + def get_octets(e: 'Encodable|None', len_: int) -> int|None: + if e is None: return [None]*len_ + + octets = e.octets() + assert len(octets) == len_, octets + return octets + + @staticmethod + def get_octet(e: 'Encodable|None', idx: int) -> int|None: + if e is None: return None + return e.octets()[idx] + + def octects(self) -> list[int|None]: + raise NotImplementedError() + + def __str__(self) -> str: + return ':'.join(octet_to_hex(b) if b is not None else '-' for b in self.octets()) + +class Port(Encodable): + def __init__(self, port: int|str): + self.port = int(port) if isinstance(port, str) else port + def octets(self) -> list[int]: + return [self.port // 256, self.port % 256] + +class IpAddr(Encodable): + def __init__(self, addr: str): + pieces = addr.split('.') + self._octets = [int(p) if p else 0 for p in pieces] + + @staticmethod + def parse_any(addr: str) -> list['IpAddr']: + if addr == 'SELF': + return get_ipaddrs() + else: + return [IpAddr(addr)] + + def octets(self) -> list[int]: + return self._octets + +class MacAddr(Encodable): + def __init__(self, addr: str): + pieces = addr.lower().split(':') + self._octets = [int(p, 16) if p else 0 for p in pieces] + + def octets(self) -> list[int]: + return self._octets + +class EtherType: + # ethertype: + IPv4 = [ 0x08, 0x00 ] # 0x0800 + ARP = [ 0x08, 0x06 ] # 0x0806 + +class IpProtocol: + # https://en.wikipedia.org/wiki/List_of_IP_protocol_numbers + TCP = 0x06 + UDP = 0x11 + +class EthernetFrame(Encodable): + def __init__(self, ether_type: EtherType, payload: Encodable, dest_mac: MacAddr|None = None): + self.ether_type = ether_type + self.payload = payload + self.dest_mac = dest_mac + + def octets(self) -> list[int|None]: + dest_mac_ = Encodable.get_octets(self.dest_mac, 6) + return [ + # ethernet frame: + ## dest MAC address (this should be the device's MAC, but i think that's implied?) + dest_mac_[0], dest_mac_[1], dest_mac_[2], dest_mac_[3], dest_mac_[4], dest_mac_[5], + ## src MAC address + None, None, None, None, None, None, + ## ethertype: + self.ether_type[0], self.ether_type[1] + ] + self.payload.octets() + +class ArpFrame(Encodable): + def __init__(self, dest_ip: IpAddr|None, dest_mac: MacAddr|None): + self.dest_ip = dest_ip + self.dest_mac = dest_mac + + def octets(self) -> list[int|None]: + dest_ip_ = Encodable.get_octets(self.dest_ip, 4) + dest_mac_ = Encodable.get_octets(self.dest_mac, 6) + return [ + # ARP frame: + ## hardware type + None, None, + ## protocol type. same coding as EtherType + 0x08, 0x00, # 0x0800 = IPv4 + ## hardware address length (i.e. MAC) + 0x06, + ## protocol address length (i.e. IP address) + 0x04, + ## operation + 0x00, 0x01, # 0x0001 = request + ## sender hardware address + None, None, None, None, None, None, + ## sender protocol address + None, None, None, None, + ## target hardware address + ## caller is fine to leave this as "Don't Care" (None) because the packets we want to match + ## are those mapping protocol addr -> hw addr. + ## sometimes clients do include this field if they've seen the address before. + ## otherwise clients use the broadcast mac, i.e. [ff::ff] + dest_mac_[0], dest_mac_[1], dest_mac_[2], dest_mac_[3], dest_mac_[4], dest_mac_[5], + ## target protocol address + dest_ip_[0], dest_ip_[1], dest_ip_[2], dest_ip_[3], + ] + +class IpFrame(Encodable): + def __init__(self, proto: IpProtocol, payload: Encodable, dest_ip: IpAddr|None = None): + self.proto = proto + self.payload = payload + self.dest_ip = dest_ip + + def octets(self) -> list[int|None]: + dest_ip_ = Encodable.get_octets(self.dest_ip, 4) + return [ + # IP frame: + ## Version, Internet Header Length + 0x45, # should be 0x45, but fails to wake if i include this + ## Differentiated Services Code Point (DSCP), Explicit Congestion Notification (ECN) + None, + ## total length + None, None, + ## identification + None, None, + ## flags, fragment offset + None, None, + ## Time-to-live + None, + ## protocol: + self.proto, + ## header checksum + None, None, + ## source IP addr + None, None, None, None, + ## dest IP addr + dest_ip_[0], dest_ip_[1], dest_ip_[2], dest_ip_[3], + ] + self.payload.octets() + +class TcpFrame(Encodable): + def __init__( + self, + source_port: Port|None=None, + dest_port: Port|None=None, + ): + self.source_port = source_port + self.dest_port = dest_port + + def octets(self) -> list[int|None]: + source_port_ = Encodable.get_octets(self.source_port, 2) + dest_port_ = Encodable.get_octets(self.dest_port, 2) + return [ + # TCP frame: + source_port_[0], source_port_[1], + dest_port_[0], dest_port_[1], + ## rest is Don't Care + ] + +# UDP header is a subset of the TCP header +# +UdpFrame = TcpFrame + +def ips_from_str(ip: str|None = None) -> list[IpAddr|None]: + ''' return all known IPs, or [ None ] if IP addr isn't known ''' + ips = IpAddr.parse_any(ip) if ip is not None else [] + return ips or [None] + +def build_arp(dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]: + dest_ips = ips_from_str(dest_ip) + dest_mac = MacAddr(dest_mac) if dest_mac is not None else None + return [EthernetFrame(EtherType.ARP, ArpFrame(dest_ip=ip, dest_mac=dest_mac)) for ip in dest_ips] + +def build_tcp(source_port: int|None = None, dest_port: int|None = None, dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]: + source_port = Port(source_port) if source_port is not None else None + dest_port = Port(dest_port) if dest_port is not None else None + dest_ips = ips_from_str(dest_ip) + dest_mac = MacAddr(dest_mac) if dest_mac is not None else None + return [ + EthernetFrame(EtherType.IPv4, dest_mac=dest_mac, + payload=IpFrame(IpProtocol.TCP, dest_ip=ip, + payload=TcpFrame(source_port=source_port, dest_port=dest_port) + ) + ) for ip in dest_ips + ] + +def build_udp(source_port: int|None = None, dest_port: int|None = None, dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]: + source_port = Port(source_port) if source_port is not None else None + dest_port = Port(dest_port) if dest_port is not None else None + dest_ips = ips_from_str(dest_ip) + dest_mac = MacAddr(dest_mac) if dest_mac is not None else None + return [ + EthernetFrame(EtherType.IPv4, dest_mac=dest_mac, + payload=IpFrame(IpProtocol.UDP, dest_ip=ip, + payload=UdpFrame(source_port=source_port, dest_port=dest_port) + ) + ) for ip in dest_ips + ] + +def exec_with(executor, args: list[str]): + logger.debug("invoking: {}".format(' '.join(args))) + executor(args) + +def exec_real(args: list[str]): + exec_with(subprocess.check_output, args) + +def exec_dry(args: list[str]): + exec_with(lambda _: None, args) + +def main(): + logging.basicConfig() + logging.getLogger().setLevel(logging.DEBUG) + + parser = argparse.ArgumentParser(description="configure the RTL8723cs WiFi chip to wake the CPU on specific incoming packets") + parser.add_argument('--dry-run', action='store_true') + subparsers = parser.add_subparsers(help="type of match") + + enable_clean_parser = subparsers.add_parser('enable-clean', help="enable WOWLAN and wipe existing patterns") + enable_clean_parser.set_defaults(type_='enable_clean') + + arp_parser = subparsers.add_parser('arp', help="wake on ARP request") + arp_parser.set_defaults(type_='arp') + arp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic") + arp_parser.add_argument('--dest-mac', help="ab:cd:...") + + tcp_parser = subparsers.add_parser('tcp', help="wake on TCP packet") + tcp_parser.set_defaults(type_='tcp') + tcp_parser.add_argument('--source-port', type=int) + tcp_parser.add_argument('--dest-port', type=int) + tcp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic") + tcp_parser.add_argument('--dest-mac', help="ab:cd:...") + + udp_parser = subparsers.add_parser('udp', help="wake on UDP packet") + udp_parser.set_defaults(type_='udp') + udp_parser.add_argument('--source-port', type=int) + udp_parser.add_argument('--dest-port', type=int) + udp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic") + udp_parser.add_argument('--dest-mac', help="ab:cd:...") + + args = parser.parse_args() + + if args.dry_run: + exec_ = exec_dry + else: + exec_ = exec_real + + if args.type_ == 'enable_clean': + exec_(['iw', 'phy0', 'wowlan', 'enable', 'any']) + exec_(['iwpriv', 'wlan0', 'wow_set_pattern', 'clean']) + + frames = [] + if args.type_ == 'arp': + frames = build_arp(dest_ip=args.dest_ip, dest_mac=args.dest_mac) + if args.type_ == 'tcp': + frames = build_tcp(source_port=args.source_port, dest_port=args.dest_port, dest_ip=args.dest_ip, dest_mac=args.dest_mac) + if args.type_ == 'udp': + frames = build_udp(source_port=args.source_port, dest_port=args.dest_port, dest_ip=args.dest_ip, dest_mac=args.dest_mac) + + for frame in frames: + pattern = str(frame) + exec_(['iwpriv', 'wlan0', 'wow_set_pattern', f'pattern={pattern}']) + +if __name__ == '__main__': + main()