mobile-linux-push-notifications: add and mirror all my referenced scripts/config

This commit is contained in:
Colin 2023-12-09 13:26:20 +00:00
parent 83afdc0e47
commit 167e811c1d
3 changed files with 363 additions and 5 deletions

View File

@ -48,7 +48,8 @@ enter... notification servers!
[ipv4-header]: https://en.wikipedia.org/wiki/Internet_Protocol_version_4#Header
[tcp-header]: https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure
[wowlan-script]: TODO
[wowlan-script]: https://git.uninsane.org/colin/nix-files/src/commit/01de6f84cfc2291092a6809e61339e992f9b97b4/pkgs/additional/rtl8723cs-wowlan/rtl8723cs-wowlan
[wowlan-script-mirror]: rtl8723cs-wowlan.py
## Notification Servers
@ -220,7 +221,7 @@ anyway, put your phone to sleep, have someone send you a message that Synapse wo
Prosody has [mod_cloud_notify][mod_cloud_notify] and [XEP-0357][XEP-0357], but at the time of writing, client support is wanting. easier is to hack it in server-side.
Prosody has a Lua-based module system, so we can just author our own `mod_ntfy_push`, drop it in the modules directory, and then import it:
Prosody has a Lua-based module system, so we can just author our own `mod_ntfy_push` (available [here](mod_ntfy_push) or [here](mod_ntfy_push_mirror)), drop it in the modules directory, and then import it:
```nix
services.prosody.extraPluginsPath = [ ./folder/containing/mod_ntfy_push ];
@ -228,14 +229,14 @@ services.prosody.extraModules = [ "ntfy_push" ];
services.prosody.extraConfig = ''
ntfy_endpoint = "https://MY.NTFY.HOST/TEST_WOWLAN_TOPIC"
'';
TODO: link `mod_ntfy_push.lua` here
```
i've only authored this module to alert me on jingle calls. one could presumably alert on DMs, MUC messages, or anything more particular by finding the right Prosody hooks (mod_cloud_notify may be an ok guide for that).
[mod_cloud_notify]: https://modules.prosody.im/mod_cloud_notify
[XEP-0357]: https://xmpp.org/extensions/xep-0357.html
[mod_ntfy_push]: https://git.uninsane.org/colin/nix-files/src/commit/01de6f84cfc2291092a6809e61339e992f9b97b4/hosts/by-name/servo/services/prosody/modules/mod_sane_ntfy.lua
[mod_ntfy_push_mirror]: mod_sane_notify.lua
### Pinephone Wowlan Race Condition
@ -250,7 +251,7 @@ there are other reasons you may want to not sleep for extended durations: NATs.
in any case, longer sleep durations give diminishing returns. if you sleep for 600s, and then wake for 15s before going back to sleep, you've already captured 95% of the gains you could get from this sleep method. sleeping for an hour isn't going to get you that much more battery life.
[race-fix]: TODO
[race-fix]: https://git.uninsane.org/colin/nix-files/src/commit/01de6f84cfc2291092a6809e61339e992f9b97b4/hosts/by-name/servo/services/ntfy/ntfy-waiter
[TCP-NAT]: https://datatracker.ietf.org/doc/html/rfc5382#section-5
[UDP-NAT]: https://www.rfc-editor.org/rfc/rfc4787#section-4.3

View File

@ -0,0 +1,52 @@
-- simple proof-of-concept Prosody module
-- module development guide: <https://prosody.im/doc/developers/modules>
-- module API docs: <https://prosody.im/doc/developers/moduleapi>
--
-- much of this code is lifted from Prosody's own `mod_cloud_notify`
local jid = require"util.jid";
local ntfy = module:get_option_string("ntfy_binary", "ntfy");
local ntfy_topic = module:get_option_string("ntfy_topic", "xmpp");
module:log("info", "initialized");
local function is_urgent(stanza)
if stanza.name == "message" then
if stanza:get_child("propose", "urn:xmpp:jingle-message:0") then
return true, "jingle call";
end
end
end
local function publish_ntfy(message)
-- message should be the message to publish
local ntfy_url = string.format("https://ntfy.uninsane.org/%s", ntfy_topic)
local cmd = string.format("%s pub %q %q", ntfy, ntfy_url, message)
module.log("debug", "invoking ntfy: %s", cmd)
local success, reason, code = os.execute(cmd)
if not success then
module:log("warn", "ntfy failed: %s => %s %d", cmd, reason, code)
end
end
local function archive_message_added(event)
-- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
local stanza = event.stanza;
local to = stanza.attr.to;
to = to and jid.split(to) or event.origin.username;
-- only notify if the stanza destination is the mam user we store it for
if event.for_user == to then
local is_urgent_stanza, urgent_reason = is_urgent(event.stanza);
if is_urgent_stanza then
module:log("info", "urgent push for %s (%s)", to, urgent_reason);
publish_ntfy(urgent_reason)
end
end
end
module:hook("archive-message-added", archive_message_added);

View File

@ -0,0 +1,305 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p hostname-debian -p iw -p wirelesstools
# vim: set filetype=python :
# common operations:
# enable-clean
# arp --dest-ip A.B.C.D
# tcp --source-port N
import argparse
import logging
import os
import subprocess
logger = logging.getLogger(__name__)
def octet_to_hex(o: int) -> str:
''' format an octet as a two-character hex string (lowercase) '''
return '%02X' % o
def get_ipaddrs() -> list['IpAddr']:
''' return the IP address of all known interfaces '''
# N.B.: --all-ip-addresses is provided only by hostname-debian package, not default hostname.
# however `hostname --all-ip-addresses` could dispatch to either, since hostname is likely on the user's PATH.
# so we have to try all the `hostname`s we can find.
# TODO: maybe try nix-level @hostname@-type substitution?
paths = os.getenv('PATH').split(':')
addrs = []
for p in paths:
try:
addrs = subprocess.check_output([f'{p}/domainname', '--all-ip-addresses']).decode('utf-8').strip().split(' ')
except: continue
else:
break
logger.debug(f'get_ipaddrs got: {addrs}')
return [ IpAddr(a) for a in addrs ]
class Encodable:
@staticmethod
def get_octets(e: 'Encodable|None', len_: int) -> int|None:
if e is None: return [None]*len_
octets = e.octets()
assert len(octets) == len_, octets
return octets
@staticmethod
def get_octet(e: 'Encodable|None', idx: int) -> int|None:
if e is None: return None
return e.octets()[idx]
def octects(self) -> list[int|None]:
raise NotImplementedError()
def __str__(self) -> str:
return ':'.join(octet_to_hex(b) if b is not None else '-' for b in self.octets())
class Port(Encodable):
def __init__(self, port: int|str):
self.port = int(port) if isinstance(port, str) else port
def octets(self) -> list[int]:
return [self.port // 256, self.port % 256]
class IpAddr(Encodable):
def __init__(self, addr: str):
pieces = addr.split('.')
self._octets = [int(p) if p else 0 for p in pieces]
@staticmethod
def parse_any(addr: str) -> list['IpAddr']:
if addr == 'SELF':
return get_ipaddrs()
else:
return [IpAddr(addr)]
def octets(self) -> list[int]:
return self._octets
class MacAddr(Encodable):
def __init__(self, addr: str):
pieces = addr.lower().split(':')
self._octets = [int(p, 16) if p else 0 for p in pieces]
def octets(self) -> list[int]:
return self._octets
class EtherType:
# ethertype: <https://en.wikipedia.org/wiki/EtherType#Values>
IPv4 = [ 0x08, 0x00 ] # 0x0800
ARP = [ 0x08, 0x06 ] # 0x0806
class IpProtocol:
# https://en.wikipedia.org/wiki/List_of_IP_protocol_numbers
TCP = 0x06
UDP = 0x11
class EthernetFrame(Encodable):
def __init__(self, ether_type: EtherType, payload: Encodable, dest_mac: MacAddr|None = None):
self.ether_type = ether_type
self.payload = payload
self.dest_mac = dest_mac
def octets(self) -> list[int|None]:
dest_mac_ = Encodable.get_octets(self.dest_mac, 6)
return [
# ethernet frame: <https://en.wikipedia.org/wiki/Ethernet_frame#Structure>
## dest MAC address (this should be the device's MAC, but i think that's implied?)
dest_mac_[0], dest_mac_[1], dest_mac_[2], dest_mac_[3], dest_mac_[4], dest_mac_[5],
## src MAC address
None, None, None, None, None, None,
## ethertype: <https://en.wikipedia.org/wiki/EtherType#Values>
self.ether_type[0], self.ether_type[1]
] + self.payload.octets()
class ArpFrame(Encodable):
def __init__(self, dest_ip: IpAddr|None, dest_mac: MacAddr|None):
self.dest_ip = dest_ip
self.dest_mac = dest_mac
def octets(self) -> list[int|None]:
dest_ip_ = Encodable.get_octets(self.dest_ip, 4)
dest_mac_ = Encodable.get_octets(self.dest_mac, 6)
return [
# ARP frame: <https://en.wikipedia.org/wiki/Address_Resolution_Protocol#Packet_structure>
## hardware type
None, None,
## protocol type. same coding as EtherType
0x08, 0x00, # 0x0800 = IPv4
## hardware address length (i.e. MAC)
0x06,
## protocol address length (i.e. IP address)
0x04,
## operation
0x00, 0x01, # 0x0001 = request
## sender hardware address
None, None, None, None, None, None,
## sender protocol address
None, None, None, None,
## target hardware address
## caller is fine to leave this as "Don't Care" (None) because the packets we want to match
## are those mapping protocol addr -> hw addr.
## sometimes clients do include this field if they've seen the address before.
## otherwise clients use the broadcast mac, i.e. [ff::ff]
dest_mac_[0], dest_mac_[1], dest_mac_[2], dest_mac_[3], dest_mac_[4], dest_mac_[5],
## target protocol address
dest_ip_[0], dest_ip_[1], dest_ip_[2], dest_ip_[3],
]
class IpFrame(Encodable):
def __init__(self, proto: IpProtocol, payload: Encodable, dest_ip: IpAddr|None = None):
self.proto = proto
self.payload = payload
self.dest_ip = dest_ip
def octets(self) -> list[int|None]:
dest_ip_ = Encodable.get_octets(self.dest_ip, 4)
return [
# IP frame: <https://en.wikipedia.org/wiki/Internet_Protocol_version_4#Header>
## Version, Internet Header Length
0x45, # should be 0x45, but fails to wake if i include this
## Differentiated Services Code Point (DSCP), Explicit Congestion Notification (ECN)
None,
## total length
None, None,
## identification
None, None,
## flags, fragment offset
None, None,
## Time-to-live
None,
## protocol: <https://en.wikipedia.org/wiki/List_of_IP_protocol_numbers>
self.proto,
## header checksum
None, None,
## source IP addr
None, None, None, None,
## dest IP addr
dest_ip_[0], dest_ip_[1], dest_ip_[2], dest_ip_[3],
] + self.payload.octets()
class TcpFrame(Encodable):
def __init__(
self,
source_port: Port|None=None,
dest_port: Port|None=None,
):
self.source_port = source_port
self.dest_port = dest_port
def octets(self) -> list[int|None]:
source_port_ = Encodable.get_octets(self.source_port, 2)
dest_port_ = Encodable.get_octets(self.dest_port, 2)
return [
# TCP frame: <https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure>
source_port_[0], source_port_[1],
dest_port_[0], dest_port_[1],
## rest is Don't Care
]
# UDP header is a subset of the TCP header
# <https://en.wikipedia.org/wiki/User_Datagram_Protocol#UDP_datagram_structure>
UdpFrame = TcpFrame
def ips_from_str(ip: str|None = None) -> list[IpAddr|None]:
''' return all known IPs, or [ None ] if IP addr isn't known '''
ips = IpAddr.parse_any(ip) if ip is not None else []
return ips or [None]
def build_arp(dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]:
dest_ips = ips_from_str(dest_ip)
dest_mac = MacAddr(dest_mac) if dest_mac is not None else None
return [EthernetFrame(EtherType.ARP, ArpFrame(dest_ip=ip, dest_mac=dest_mac)) for ip in dest_ips]
def build_tcp(source_port: int|None = None, dest_port: int|None = None, dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]:
source_port = Port(source_port) if source_port is not None else None
dest_port = Port(dest_port) if dest_port is not None else None
dest_ips = ips_from_str(dest_ip)
dest_mac = MacAddr(dest_mac) if dest_mac is not None else None
return [
EthernetFrame(EtherType.IPv4, dest_mac=dest_mac,
payload=IpFrame(IpProtocol.TCP, dest_ip=ip,
payload=TcpFrame(source_port=source_port, dest_port=dest_port)
)
) for ip in dest_ips
]
def build_udp(source_port: int|None = None, dest_port: int|None = None, dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]:
source_port = Port(source_port) if source_port is not None else None
dest_port = Port(dest_port) if dest_port is not None else None
dest_ips = ips_from_str(dest_ip)
dest_mac = MacAddr(dest_mac) if dest_mac is not None else None
return [
EthernetFrame(EtherType.IPv4, dest_mac=dest_mac,
payload=IpFrame(IpProtocol.UDP, dest_ip=ip,
payload=UdpFrame(source_port=source_port, dest_port=dest_port)
)
) for ip in dest_ips
]
def exec_with(executor, args: list[str]):
logger.debug("invoking: {}".format(' '.join(args)))
executor(args)
def exec_real(args: list[str]):
exec_with(subprocess.check_output, args)
def exec_dry(args: list[str]):
exec_with(lambda _: None, args)
def main():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
parser = argparse.ArgumentParser(description="configure the RTL8723cs WiFi chip to wake the CPU on specific incoming packets")
parser.add_argument('--dry-run', action='store_true')
subparsers = parser.add_subparsers(help="type of match")
enable_clean_parser = subparsers.add_parser('enable-clean', help="enable WOWLAN and wipe existing patterns")
enable_clean_parser.set_defaults(type_='enable_clean')
arp_parser = subparsers.add_parser('arp', help="wake on ARP request")
arp_parser.set_defaults(type_='arp')
arp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic")
arp_parser.add_argument('--dest-mac', help="ab:cd:...")
tcp_parser = subparsers.add_parser('tcp', help="wake on TCP packet")
tcp_parser.set_defaults(type_='tcp')
tcp_parser.add_argument('--source-port', type=int)
tcp_parser.add_argument('--dest-port', type=int)
tcp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic")
tcp_parser.add_argument('--dest-mac', help="ab:cd:...")
udp_parser = subparsers.add_parser('udp', help="wake on UDP packet")
udp_parser.set_defaults(type_='udp')
udp_parser.add_argument('--source-port', type=int)
udp_parser.add_argument('--dest-port', type=int)
udp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic")
udp_parser.add_argument('--dest-mac', help="ab:cd:...")
args = parser.parse_args()
if args.dry_run:
exec_ = exec_dry
else:
exec_ = exec_real
if args.type_ == 'enable_clean':
exec_(['iw', 'phy0', 'wowlan', 'enable', 'any'])
exec_(['iwpriv', 'wlan0', 'wow_set_pattern', 'clean'])
frames = []
if args.type_ == 'arp':
frames = build_arp(dest_ip=args.dest_ip, dest_mac=args.dest_mac)
if args.type_ == 'tcp':
frames = build_tcp(source_port=args.source_port, dest_port=args.dest_port, dest_ip=args.dest_ip, dest_mac=args.dest_mac)
if args.type_ == 'udp':
frames = build_udp(source_port=args.source_port, dest_port=args.dest_port, dest_ip=args.dest_ip, dest_mac=args.dest_mac)
for frame in frames:
pattern = str(frame)
exec_(['iwpriv', 'wlan0', 'wow_set_pattern', f'pattern={pattern}'])
if __name__ == '__main__':
main()