wowlan: define a script which can set the patterns at runtime

this will be a little easier to debug on the device itself
This commit is contained in:
Colin 2023-10-10 08:00:25 +00:00
parent 114df5efab
commit 29dde0240b
3 changed files with 215 additions and 0 deletions

View File

@ -0,0 +1,13 @@
{ static-nix-shell
, iw
, wirelesstools
}:
static-nix-shell.mkPython3Bin {
pname = "rtl8723cs_wowlan";
src = ./.;
pkgs = {
inherit iw wirelesstools;
};
}

View File

@ -0,0 +1,201 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p iw -p wirelesstools
# vim: set filetype=python :
# common operations:
# enable-clean
# arp --dest-ip A.B.C.D
# tcp --source-port N
import argparse
import logging
import subprocess
logger = logging.getLogger(__name__)
def octet_to_hex(o: int) -> str:
''' format an octet as a two-character hex string (lowercase) '''
return '%02X' % o
class Encodable:
@staticmethod
def get_octets(e: 'Encodable|None', len_: int) -> int|None:
if e is None: return [None]*len_
octets = e.octets()
assert len(octets) == len_, octets
return octets
@staticmethod
def get_octet(e: 'Encodable|None', idx: int) -> int|None:
if e is None: return None
return e.octets()[idx]
def octects(self) -> list[int|None]:
raise NotImplementedError()
def __str__(self) -> str:
return ':'.join(octet_to_hex(b) if b is not None else '-' for b in self.octets())
class Port(Encodable):
def __init__(self, port: int|str):
self.port = int(port) if isinstance(port, str) else port
def octets(self) -> list[int]:
return [self.port // 256, self.port % 256]
class IpAddr(Encodable):
def __init__(self, addr: str):
pieces = addr.split('.')
self._octets = [int(p) if p else 0 for p in pieces]
def octets(self) -> list[int]:
return self._octets
class EtherType:
# ethertype: <https://en.wikipedia.org/wiki/EtherType#Values>
IPv4 = [ 0x08, 0x00 ] # 0x0800
ARP = [ 0x08, 0x06 ] # 0x0806
class EthernetFrame(Encodable):
def __init__(self, ether_type: EtherType, payload: Encodable):
self.ether_type = ether_type
self.payload = payload
def octets(self) -> list[int|None]:
return [
# ethernet frame: <https://en.wikipedia.org/wiki/Ethernet_frame#Structure>
## dest MAC address (this should be the device's MAC, but i think that's implied?)
None, None, None, None, None, None,
## src MAC address
None, None, None, None, None, None,
## ethertype: <https://en.wikipedia.org/wiki/EtherType#Values>
self.ether_type[0], self.ether_type[1]
] + self.payload.octets()
class ArpFrame(Encodable):
def __init__(self, dest_ip: IpAddr|None):
self.dest_ip = dest_ip
def octets(self) -> list[int|None]:
return [
# ARP frame: <https://en.wikipedia.org/wiki/Address_Resolution_Protocol#Packet_structure>
## hardware type
None, None,
## protocol type. same coding as EtherType
0x08, 0x00, # 0x0800 = IPv4
## hardware address length (i.e. MAC)
0x06,
## protocol address length (i.e. IP address)
0x04,
## operation
0x00, 0x01, # 0x0001 = request
## sender hardware address
None, None, None, None, None, None,
## sender protocol address
None, None, None, None,
## target hardware address
## this is left as "Don't Care" because the packets we want to match
## are those mapping protocol addr -> hw addr.
## sometimes clients do include this field if they've seen the address before though
None, None, None, None, None, None,
## target protocol address
] + Encodable.get_octets(self.dest_ip, 4)
class TcpFrame(Encodable):
def __init__(self, source_port: Port|None=None, dest_port: Port|None=None):
self.source_port = source_port
self.dest_port = dest_port
def octets(self) -> list[int|None]:
return [
# IP frame: <https://en.wikipedia.org/wiki/Internet_Protocol_version_4#Header>
## Version, Internet Header Length. 0x45 = 69 decimal
None, # should be 69 (0x45), but fails to wake if i include this
## Differentiated Services Code Point (DSCP), Explicit Congestion Notification (ECN)
None,
## total length
None, None,
## identification
None, None,
## flags, fragment offset
None, None,
## Time-to-live
None,
## protocol: <https://en.wikipedia.org/wiki/List_of_IP_protocol_numbers>
0x06, # 6 = TCP
## header checksum
None, None,
## source IP addr
None, None, None, None,
## dest IP addr
None, None, None, None,
# TCP frame: <https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure>
Encodable.get_octet(self.source_port, 0), Encodable.get_octet(self.source_port, 1),
Encodable.get_octet(self.dest_port, 0), Encodable.get_octet(self.dest_port, 1),
## rest is Don't Care
]
def build_arp(dest_ip: str|None = None) -> EthernetFrame:
dest_ip = IpAddr(dest_ip) if dest_ip is not None else None
return EthernetFrame(EtherType.ARP, ArpFrame(dest_ip))
def build_tcp(source_port: int|None = None, dest_port: int|None = None) -> EthernetFrame:
source_port = Port(source_port) if source_port is not None else None
dest_port = Port(dest_port) if dest_port is not None else None
return EthernetFrame(EtherType.IPv4, TcpFrame(source_port=source_port, dest_port=dest_port))
def exec_with(executor, args: list[str]):
logger.debug("invoking: {}".format(' '.join(args)))
executor(args)
def exec_real(args: list[str]):
exec_with(subprocess.check_output, args)
def exec_dry(args: list[str]):
exec_with(lambda _: None, args)
def main():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
parser = argparse.ArgumentParser(description="configure the RTL8723cs WiFi chip to wake the CPU on specific incoming packets")
parser.add_argument('--dry-run', action='store_true')
subparsers = parser.add_subparsers(help="type of match")
enable_clean_parser = subparsers.add_parser('enable-clean', help="enable WOWLAN and wipe existing patterns")
enable_clean_parser.set_defaults(type_='enable_clean')
arp_parser = subparsers.add_parser('arp', help="wake on ARP request")
arp_parser.set_defaults(type_='arp')
arp_parser.add_argument('--dest-ip')
tcp_parser = subparsers.add_parser('tcp', help="wake on TCP packet")
tcp_parser.set_defaults(type_='tcp')
tcp_parser.add_argument('--source-port', type=int)
tcp_parser.add_argument('--dest-port', type=int)
args = parser.parse_args()
if args.dry_run:
exec_ = exec_dry
else:
exec_ = exec_real
if args.type_ == 'enable_clean':
exec_(['iw', 'phy0', 'wowlan', 'enable', 'any'])
exec_(['iwprv', 'wlan0', 'wow_set_pattern', 'clean'])
frame = None
if args.type_ == 'arp':
frame = build_arp(dest_ip=args.dest_ip)
if args.type_ == 'tcp':
frame = build_tcp(source_port=args.source_port, dest_port=args.dest_port)
if frame is not None:
pattern = str(frame)
exec_(['iwpriv', 'wlan0', 'wow_set_pattern', f'pattern={pattern}'])
if __name__ == '__main__':
main()

View File

@ -52,6 +52,7 @@ let
mx-sanebot = callPackage ./additional/mx-sanebot { };
phog = callPackage ./additional/phog { };
rtl8723cs-firmware = callPackage ./additional/rtl8723cs-firmware { };
rtl8723cs-wowlan = callPackage ./additional/rtl8723cs-wowlan { };
sane-scripts = lib.recurseIntoAttrs (callPackage ./additional/sane-scripts { });
sane-weather = callPackage ./additional/sane-weather { };
static-nix-shell = callPackage ./additional/static-nix-shell { };