#!/usr/bin/env nix-shell #!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p hostname-debian -p iw -p wirelesstools # vim: set filetype=python : # common operations: # enable-clean # arp --dest-ip A.B.C.D # tcp --source-port N import argparse import logging import os import subprocess logger = logging.getLogger(__name__) def octet_to_hex(o: int) -> str: ''' format an octet as a two-character hex string (lowercase) ''' return '%02X' % o def get_ipaddrs() -> list['IpAddr']: ''' return the IP address of all known interfaces ''' # N.B.: --all-ip-addresses is provided only by hostname-debian package, not default hostname. # however `hostname --all-ip-addresses` could dispatch to either, since hostname is likely on the user's PATH. # so we have to try all the `hostname`s we can find. # TODO: maybe try nix-level @hostname@-type substitution? paths = os.getenv('PATH').split(':') addrs = [] for p in paths: try: addrs = subprocess.check_output([f'{p}/domainname', '--all-ip-addresses']).decode('utf-8').strip().split(' ') except: continue else: break logger.debug(f'get_ipaddrs got: {addrs}') return [ IpAddr(a) for a in addrs ] class Encodable: @staticmethod def get_octets(e: 'Encodable|None', len_: int) -> int|None: if e is None: return [None]*len_ octets = e.octets() assert len(octets) == len_, octets return octets @staticmethod def get_octet(e: 'Encodable|None', idx: int) -> int|None: if e is None: return None return e.octets()[idx] def octects(self) -> list[int|None]: raise NotImplementedError() def __str__(self) -> str: return ':'.join(octet_to_hex(b) if b is not None else '-' for b in self.octets()) class Port(Encodable): def __init__(self, port: int|str): self.port = int(port) if isinstance(port, str) else port def octets(self) -> list[int]: return [self.port // 256, self.port % 256] class IpAddr(Encodable): def __init__(self, addr: str): pieces = addr.split('.') self._octets = [int(p) if p else 0 for p in pieces] @staticmethod def parse_any(addr: str) -> list['IpAddr']: if addr == 'SELF': return get_ipaddrs() else: return [IpAddr(addr)] def octets(self) -> list[int]: return self._octets class MacAddr(Encodable): def __init__(self, addr: str): pieces = addr.lower().split(':') self._octets = [int(p, 16) if p else 0 for p in pieces] def octets(self) -> list[int]: return self._octets class EtherType: # ethertype: IPv4 = [ 0x08, 0x00 ] # 0x0800 ARP = [ 0x08, 0x06 ] # 0x0806 class IpProtocol: # https://en.wikipedia.org/wiki/List_of_IP_protocol_numbers TCP = 0x06 UDP = 0x11 class EthernetFrame(Encodable): def __init__(self, ether_type: EtherType, payload: Encodable, dest_mac: MacAddr|None = None): self.ether_type = ether_type self.payload = payload self.dest_mac = dest_mac def octets(self) -> list[int|None]: dest_mac_ = Encodable.get_octets(self.dest_mac, 6) return [ # ethernet frame: ## dest MAC address (this should be the device's MAC, but i think that's implied?) dest_mac_[0], dest_mac_[1], dest_mac_[2], dest_mac_[3], dest_mac_[4], dest_mac_[5], ## src MAC address None, None, None, None, None, None, ## ethertype: self.ether_type[0], self.ether_type[1] ] + self.payload.octets() class ArpFrame(Encodable): def __init__(self, dest_ip: IpAddr|None, dest_mac: MacAddr|None): self.dest_ip = dest_ip self.dest_mac = dest_mac def octets(self) -> list[int|None]: dest_ip_ = Encodable.get_octets(self.dest_ip, 4) dest_mac_ = Encodable.get_octets(self.dest_mac, 6) return [ # ARP frame: ## hardware type None, None, ## protocol type. same coding as EtherType 0x08, 0x00, # 0x0800 = IPv4 ## hardware address length (i.e. MAC) 0x06, ## protocol address length (i.e. IP address) 0x04, ## operation 0x00, 0x01, # 0x0001 = request ## sender hardware address None, None, None, None, None, None, ## sender protocol address None, None, None, None, ## target hardware address ## caller is fine to leave this as "Don't Care" (None) because the packets we want to match ## are those mapping protocol addr -> hw addr. ## sometimes clients do include this field if they've seen the address before. ## otherwise clients use the broadcast mac, i.e. [ff::ff] dest_mac_[0], dest_mac_[1], dest_mac_[2], dest_mac_[3], dest_mac_[4], dest_mac_[5], ## target protocol address dest_ip_[0], dest_ip_[1], dest_ip_[2], dest_ip_[3], ] class IpFrame(Encodable): def __init__(self, proto: IpProtocol, payload: Encodable, dest_ip: IpAddr|None = None): self.proto = proto self.payload = payload self.dest_ip = dest_ip def octets(self) -> list[int|None]: dest_ip_ = Encodable.get_octets(self.dest_ip, 4) return [ # IP frame: ## Version, Internet Header Length 0x45, # should be 0x45, but fails to wake if i include this ## Differentiated Services Code Point (DSCP), Explicit Congestion Notification (ECN) None, ## total length None, None, ## identification None, None, ## flags, fragment offset None, None, ## Time-to-live None, ## protocol: self.proto, ## header checksum None, None, ## source IP addr None, None, None, None, ## dest IP addr dest_ip_[0], dest_ip_[1], dest_ip_[2], dest_ip_[3], ] + self.payload.octets() class TcpFrame(Encodable): def __init__( self, source_port: Port|None=None, dest_port: Port|None=None, ): self.source_port = source_port self.dest_port = dest_port def octets(self) -> list[int|None]: source_port_ = Encodable.get_octets(self.source_port, 2) dest_port_ = Encodable.get_octets(self.dest_port, 2) return [ # TCP frame: source_port_[0], source_port_[1], dest_port_[0], dest_port_[1], ## rest is Don't Care ] # UDP header is a subset of the TCP header # UdpFrame = TcpFrame def ips_from_str(ip: str|None = None) -> list[IpAddr|None]: ''' return all known IPs, or [ None ] if IP addr isn't known ''' ips = IpAddr.parse_any(ip) if ip is not None else [] return ips or [None] def build_arp(dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]: dest_ips = ips_from_str(dest_ip) dest_mac = MacAddr(dest_mac) if dest_mac is not None else None return [EthernetFrame(EtherType.ARP, ArpFrame(dest_ip=ip, dest_mac=dest_mac)) for ip in dest_ips] def build_tcp(source_port: int|None = None, dest_port: int|None = None, dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]: source_port = Port(source_port) if source_port is not None else None dest_port = Port(dest_port) if dest_port is not None else None dest_ips = ips_from_str(dest_ip) dest_mac = MacAddr(dest_mac) if dest_mac is not None else None return [ EthernetFrame(EtherType.IPv4, dest_mac=dest_mac, payload=IpFrame(IpProtocol.TCP, dest_ip=ip, payload=TcpFrame(source_port=source_port, dest_port=dest_port) ) ) for ip in dest_ips ] def build_udp(source_port: int|None = None, dest_port: int|None = None, dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]: source_port = Port(source_port) if source_port is not None else None dest_port = Port(dest_port) if dest_port is not None else None dest_ips = ips_from_str(dest_ip) dest_mac = MacAddr(dest_mac) if dest_mac is not None else None return [ EthernetFrame(EtherType.IPv4, dest_mac=dest_mac, payload=IpFrame(IpProtocol.UDP, dest_ip=ip, payload=UdpFrame(source_port=source_port, dest_port=dest_port) ) ) for ip in dest_ips ] def exec_with(executor, args: list[str]): logger.debug("invoking: {}".format(' '.join(args))) executor(args) def exec_real(args: list[str]): exec_with(subprocess.check_output, args) def exec_dry(args: list[str]): exec_with(lambda _: None, args) def main(): logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) parser = argparse.ArgumentParser(description="configure the RTL8723cs WiFi chip to wake the CPU on specific incoming packets") parser.add_argument('--dry-run', action='store_true') subparsers = parser.add_subparsers(help="type of match") enable_clean_parser = subparsers.add_parser('enable-clean', help="enable WOWLAN and wipe existing patterns") enable_clean_parser.set_defaults(type_='enable_clean') arp_parser = subparsers.add_parser('arp', help="wake on ARP request") arp_parser.set_defaults(type_='arp') arp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic") arp_parser.add_argument('--dest-mac', help="ab:cd:...") tcp_parser = subparsers.add_parser('tcp', help="wake on TCP packet") tcp_parser.set_defaults(type_='tcp') tcp_parser.add_argument('--source-port', type=int) tcp_parser.add_argument('--dest-port', type=int) tcp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic") tcp_parser.add_argument('--dest-mac', help="ab:cd:...") udp_parser = subparsers.add_parser('udp', help="wake on UDP packet") udp_parser.set_defaults(type_='udp') udp_parser.add_argument('--source-port', type=int) udp_parser.add_argument('--dest-port', type=int) udp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic") udp_parser.add_argument('--dest-mac', help="ab:cd:...") args = parser.parse_args() if args.dry_run: exec_ = exec_dry else: exec_ = exec_real if args.type_ == 'enable_clean': exec_(['iw', 'phy0', 'wowlan', 'enable', 'any']) exec_(['iwpriv', 'wlan0', 'wow_set_pattern', 'clean']) frames = [] if args.type_ == 'arp': frames = build_arp(dest_ip=args.dest_ip, dest_mac=args.dest_mac) if args.type_ == 'tcp': frames = build_tcp(source_port=args.source_port, dest_port=args.dest_port, dest_ip=args.dest_ip, dest_mac=args.dest_mac) if args.type_ == 'udp': frames = build_udp(source_port=args.source_port, dest_port=args.dest_port, dest_ip=args.dest_ip, dest_mac=args.dest_mac) for frame in frames: pattern = str(frame) exec_(['iwpriv', 'wlan0', 'wow_set_pattern', f'pattern={pattern}']) if __name__ == '__main__': main()