diff --git a/pkgs/additional/rtl8723cs-wowlan/default.nix b/pkgs/additional/rtl8723cs-wowlan/default.nix new file mode 100644 index 00000000..e83e7b68 --- /dev/null +++ b/pkgs/additional/rtl8723cs-wowlan/default.nix @@ -0,0 +1,13 @@ +{ static-nix-shell +, iw +, wirelesstools +}: + +static-nix-shell.mkPython3Bin { + pname = "rtl8723cs_wowlan"; + src = ./.; + pkgs = { + inherit iw wirelesstools; + }; +} + diff --git a/pkgs/additional/rtl8723cs-wowlan/rtl8723cs_wowlan b/pkgs/additional/rtl8723cs-wowlan/rtl8723cs_wowlan new file mode 100755 index 00000000..319f7893 --- /dev/null +++ b/pkgs/additional/rtl8723cs-wowlan/rtl8723cs_wowlan @@ -0,0 +1,201 @@ +#!/usr/bin/env nix-shell +#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p iw -p wirelesstools +# vim: set filetype=python : + +# common operations: +# enable-clean +# arp --dest-ip A.B.C.D +# tcp --source-port N + +import argparse +import logging +import subprocess + +logger = logging.getLogger(__name__) + +def octet_to_hex(o: int) -> str: + ''' format an octet as a two-character hex string (lowercase) ''' + return '%02X' % o + +class Encodable: + @staticmethod + def get_octets(e: 'Encodable|None', len_: int) -> int|None: + if e is None: return [None]*len_ + + octets = e.octets() + assert len(octets) == len_, octets + return octets + + @staticmethod + def get_octet(e: 'Encodable|None', idx: int) -> int|None: + if e is None: return None + return e.octets()[idx] + + def octects(self) -> list[int|None]: + raise NotImplementedError() + + def __str__(self) -> str: + return ':'.join(octet_to_hex(b) if b is not None else '-' for b in self.octets()) + +class Port(Encodable): + def __init__(self, port: int|str): + self.port = int(port) if isinstance(port, str) else port + def octets(self) -> list[int]: + return [self.port // 256, self.port % 256] + +class IpAddr(Encodable): + def __init__(self, addr: str): + pieces = addr.split('.') + self._octets = [int(p) if p else 0 for p in pieces] + + def octets(self) -> list[int]: + return self._octets + +class EtherType: + # ethertype: + IPv4 = [ 0x08, 0x00 ] # 0x0800 + ARP = [ 0x08, 0x06 ] # 0x0806 + +class EthernetFrame(Encodable): + def __init__(self, ether_type: EtherType, payload: Encodable): + self.ether_type = ether_type + self.payload = payload + + def octets(self) -> list[int|None]: + return [ + # ethernet frame: + ## dest MAC address (this should be the device's MAC, but i think that's implied?) + None, None, None, None, None, None, + ## src MAC address + None, None, None, None, None, None, + ## ethertype: + self.ether_type[0], self.ether_type[1] + ] + self.payload.octets() + +class ArpFrame(Encodable): + def __init__(self, dest_ip: IpAddr|None): + self.dest_ip = dest_ip + + def octets(self) -> list[int|None]: + return [ + # ARP frame: + ## hardware type + None, None, + ## protocol type. same coding as EtherType + 0x08, 0x00, # 0x0800 = IPv4 + ## hardware address length (i.e. MAC) + 0x06, + ## protocol address length (i.e. IP address) + 0x04, + ## operation + 0x00, 0x01, # 0x0001 = request + ## sender hardware address + None, None, None, None, None, None, + ## sender protocol address + None, None, None, None, + ## target hardware address + ## this is left as "Don't Care" because the packets we want to match + ## are those mapping protocol addr -> hw addr. + ## sometimes clients do include this field if they've seen the address before though + None, None, None, None, None, None, + ## target protocol address + ] + Encodable.get_octets(self.dest_ip, 4) + +class TcpFrame(Encodable): + def __init__(self, source_port: Port|None=None, dest_port: Port|None=None): + self.source_port = source_port + self.dest_port = dest_port + + def octets(self) -> list[int|None]: + return [ + # IP frame: + ## Version, Internet Header Length. 0x45 = 69 decimal + None, # should be 69 (0x45), but fails to wake if i include this + ## Differentiated Services Code Point (DSCP), Explicit Congestion Notification (ECN) + None, + ## total length + None, None, + ## identification + None, None, + ## flags, fragment offset + None, None, + ## Time-to-live + None, + ## protocol: + 0x06, # 6 = TCP + ## header checksum + None, None, + ## source IP addr + None, None, None, None, + ## dest IP addr + None, None, None, None, + + # TCP frame: + Encodable.get_octet(self.source_port, 0), Encodable.get_octet(self.source_port, 1), + Encodable.get_octet(self.dest_port, 0), Encodable.get_octet(self.dest_port, 1), + ## rest is Don't Care + ] + + +def build_arp(dest_ip: str|None = None) -> EthernetFrame: + dest_ip = IpAddr(dest_ip) if dest_ip is not None else None + return EthernetFrame(EtherType.ARP, ArpFrame(dest_ip)) + +def build_tcp(source_port: int|None = None, dest_port: int|None = None) -> EthernetFrame: + source_port = Port(source_port) if source_port is not None else None + dest_port = Port(dest_port) if dest_port is not None else None + return EthernetFrame(EtherType.IPv4, TcpFrame(source_port=source_port, dest_port=dest_port)) + +def exec_with(executor, args: list[str]): + logger.debug("invoking: {}".format(' '.join(args))) + executor(args) + +def exec_real(args: list[str]): + exec_with(subprocess.check_output, args) + +def exec_dry(args: list[str]): + exec_with(lambda _: None, args) + +def main(): + logging.basicConfig() + logging.getLogger().setLevel(logging.DEBUG) + + parser = argparse.ArgumentParser(description="configure the RTL8723cs WiFi chip to wake the CPU on specific incoming packets") + parser.add_argument('--dry-run', action='store_true') + subparsers = parser.add_subparsers(help="type of match") + + enable_clean_parser = subparsers.add_parser('enable-clean', help="enable WOWLAN and wipe existing patterns") + enable_clean_parser.set_defaults(type_='enable_clean') + + arp_parser = subparsers.add_parser('arp', help="wake on ARP request") + arp_parser.set_defaults(type_='arp') + arp_parser.add_argument('--dest-ip') + + tcp_parser = subparsers.add_parser('tcp', help="wake on TCP packet") + tcp_parser.set_defaults(type_='tcp') + tcp_parser.add_argument('--source-port', type=int) + tcp_parser.add_argument('--dest-port', type=int) + + args = parser.parse_args() + + if args.dry_run: + exec_ = exec_dry + else: + exec_ = exec_real + + if args.type_ == 'enable_clean': + exec_(['iw', 'phy0', 'wowlan', 'enable', 'any']) + exec_(['iwprv', 'wlan0', 'wow_set_pattern', 'clean']) + + frame = None + if args.type_ == 'arp': + frame = build_arp(dest_ip=args.dest_ip) + if args.type_ == 'tcp': + frame = build_tcp(source_port=args.source_port, dest_port=args.dest_port) + + if frame is not None: + pattern = str(frame) + exec_(['iwpriv', 'wlan0', 'wow_set_pattern', f'pattern={pattern}']) + +if __name__ == '__main__': + main() diff --git a/pkgs/default.nix b/pkgs/default.nix index 4b7544d0..6b42796c 100644 --- a/pkgs/default.nix +++ b/pkgs/default.nix @@ -52,6 +52,7 @@ let mx-sanebot = callPackage ./additional/mx-sanebot { }; phog = callPackage ./additional/phog { }; rtl8723cs-firmware = callPackage ./additional/rtl8723cs-firmware { }; + rtl8723cs-wowlan = callPackage ./additional/rtl8723cs-wowlan { }; sane-scripts = lib.recurseIntoAttrs (callPackage ./additional/sane-scripts { }); sane-weather = callPackage ./additional/sane-weather { }; static-nix-shell = callPackage ./additional/static-nix-shell { };