306 lines
12 KiB
Python
Executable File
306 lines
12 KiB
Python
Executable File
#!/usr/bin/env nix-shell
|
|
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p hostname-debian -p iw -p wirelesstools
|
|
# vim: set filetype=python :
|
|
|
|
# common operations:
|
|
# enable-clean
|
|
# arp --dest-ip A.B.C.D
|
|
# tcp --source-port N
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def octet_to_hex(o: int) -> str:
|
|
''' format an octet as a two-character hex string (lowercase) '''
|
|
return '%02X' % o
|
|
|
|
def get_ipaddrs() -> list['IpAddr']:
|
|
''' return the IP address of all known interfaces '''
|
|
# N.B.: --all-ip-addresses is provided only by hostname-debian package, not default hostname.
|
|
# however `hostname --all-ip-addresses` could dispatch to either, since hostname is likely on the user's PATH.
|
|
# so we have to try all the `hostname`s we can find.
|
|
# TODO: maybe try nix-level @hostname@-type substitution?
|
|
paths = os.getenv('PATH').split(':')
|
|
addrs = []
|
|
for p in paths:
|
|
try:
|
|
addrs = subprocess.check_output([f'{p}/domainname', '--all-ip-addresses']).decode('utf-8').strip().split(' ')
|
|
except: continue
|
|
else:
|
|
break
|
|
logger.debug(f'get_ipaddrs got: {addrs}')
|
|
return [ IpAddr(a) for a in addrs ]
|
|
|
|
|
|
class Encodable:
|
|
@staticmethod
|
|
def get_octets(e: 'Encodable|None', len_: int) -> int|None:
|
|
if e is None: return [None]*len_
|
|
|
|
octets = e.octets()
|
|
assert len(octets) == len_, octets
|
|
return octets
|
|
|
|
@staticmethod
|
|
def get_octet(e: 'Encodable|None', idx: int) -> int|None:
|
|
if e is None: return None
|
|
return e.octets()[idx]
|
|
|
|
def octects(self) -> list[int|None]:
|
|
raise NotImplementedError()
|
|
|
|
def __str__(self) -> str:
|
|
return ':'.join(octet_to_hex(b) if b is not None else '-' for b in self.octets())
|
|
|
|
class Port(Encodable):
|
|
def __init__(self, port: int|str):
|
|
self.port = int(port) if isinstance(port, str) else port
|
|
def octets(self) -> list[int]:
|
|
return [self.port // 256, self.port % 256]
|
|
|
|
class IpAddr(Encodable):
|
|
def __init__(self, addr: str):
|
|
pieces = addr.split('.')
|
|
self._octets = [int(p) if p else 0 for p in pieces]
|
|
|
|
@staticmethod
|
|
def parse_any(addr: str) -> list['IpAddr']:
|
|
if addr == 'SELF':
|
|
return get_ipaddrs()
|
|
else:
|
|
return [IpAddr(addr)]
|
|
|
|
def octets(self) -> list[int]:
|
|
return self._octets
|
|
|
|
class MacAddr(Encodable):
|
|
def __init__(self, addr: str):
|
|
pieces = addr.lower().split(':')
|
|
self._octets = [int(p, 16) if p else 0 for p in pieces]
|
|
|
|
def octets(self) -> list[int]:
|
|
return self._octets
|
|
|
|
class EtherType:
|
|
# ethertype: <https://en.wikipedia.org/wiki/EtherType#Values>
|
|
IPv4 = [ 0x08, 0x00 ] # 0x0800
|
|
ARP = [ 0x08, 0x06 ] # 0x0806
|
|
|
|
class IpProtocol:
|
|
# https://en.wikipedia.org/wiki/List_of_IP_protocol_numbers
|
|
TCP = 0x06
|
|
UDP = 0x11
|
|
|
|
class EthernetFrame(Encodable):
|
|
def __init__(self, ether_type: EtherType, payload: Encodable, dest_mac: MacAddr|None = None):
|
|
self.ether_type = ether_type
|
|
self.payload = payload
|
|
self.dest_mac = dest_mac
|
|
|
|
def octets(self) -> list[int|None]:
|
|
dest_mac_ = Encodable.get_octets(self.dest_mac, 6)
|
|
return [
|
|
# ethernet frame: <https://en.wikipedia.org/wiki/Ethernet_frame#Structure>
|
|
## dest MAC address (this should be the device's MAC, but i think that's implied?)
|
|
dest_mac_[0], dest_mac_[1], dest_mac_[2], dest_mac_[3], dest_mac_[4], dest_mac_[5],
|
|
## src MAC address
|
|
None, None, None, None, None, None,
|
|
## ethertype: <https://en.wikipedia.org/wiki/EtherType#Values>
|
|
self.ether_type[0], self.ether_type[1]
|
|
] + self.payload.octets()
|
|
|
|
class ArpFrame(Encodable):
|
|
def __init__(self, dest_ip: IpAddr|None, dest_mac: MacAddr|None):
|
|
self.dest_ip = dest_ip
|
|
self.dest_mac = dest_mac
|
|
|
|
def octets(self) -> list[int|None]:
|
|
dest_ip_ = Encodable.get_octets(self.dest_ip, 4)
|
|
dest_mac_ = Encodable.get_octets(self.dest_mac, 6)
|
|
return [
|
|
# ARP frame: <https://en.wikipedia.org/wiki/Address_Resolution_Protocol#Packet_structure>
|
|
## hardware type
|
|
None, None,
|
|
## protocol type. same coding as EtherType
|
|
0x08, 0x00, # 0x0800 = IPv4
|
|
## hardware address length (i.e. MAC)
|
|
0x06,
|
|
## protocol address length (i.e. IP address)
|
|
0x04,
|
|
## operation
|
|
0x00, 0x01, # 0x0001 = request
|
|
## sender hardware address
|
|
None, None, None, None, None, None,
|
|
## sender protocol address
|
|
None, None, None, None,
|
|
## target hardware address
|
|
## caller is fine to leave this as "Don't Care" (None) because the packets we want to match
|
|
## are those mapping protocol addr -> hw addr.
|
|
## sometimes clients do include this field if they've seen the address before.
|
|
## otherwise clients use the broadcast mac, i.e. [ff::ff]
|
|
dest_mac_[0], dest_mac_[1], dest_mac_[2], dest_mac_[3], dest_mac_[4], dest_mac_[5],
|
|
## target protocol address
|
|
dest_ip_[0], dest_ip_[1], dest_ip_[2], dest_ip_[3],
|
|
]
|
|
|
|
class IpFrame(Encodable):
|
|
def __init__(self, proto: IpProtocol, payload: Encodable, dest_ip: IpAddr|None = None):
|
|
self.proto = proto
|
|
self.payload = payload
|
|
self.dest_ip = dest_ip
|
|
|
|
def octets(self) -> list[int|None]:
|
|
dest_ip_ = Encodable.get_octets(self.dest_ip, 4)
|
|
return [
|
|
# IP frame: <https://en.wikipedia.org/wiki/Internet_Protocol_version_4#Header>
|
|
## Version, Internet Header Length
|
|
0x45, # should be 0x45, but fails to wake if i include this
|
|
## Differentiated Services Code Point (DSCP), Explicit Congestion Notification (ECN)
|
|
None,
|
|
## total length
|
|
None, None,
|
|
## identification
|
|
None, None,
|
|
## flags, fragment offset
|
|
None, None,
|
|
## Time-to-live
|
|
None,
|
|
## protocol: <https://en.wikipedia.org/wiki/List_of_IP_protocol_numbers>
|
|
self.proto,
|
|
## header checksum
|
|
None, None,
|
|
## source IP addr
|
|
None, None, None, None,
|
|
## dest IP addr
|
|
dest_ip_[0], dest_ip_[1], dest_ip_[2], dest_ip_[3],
|
|
] + self.payload.octets()
|
|
|
|
class TcpFrame(Encodable):
|
|
def __init__(
|
|
self,
|
|
source_port: Port|None=None,
|
|
dest_port: Port|None=None,
|
|
):
|
|
self.source_port = source_port
|
|
self.dest_port = dest_port
|
|
|
|
def octets(self) -> list[int|None]:
|
|
source_port_ = Encodable.get_octets(self.source_port, 2)
|
|
dest_port_ = Encodable.get_octets(self.dest_port, 2)
|
|
return [
|
|
# TCP frame: <https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure>
|
|
source_port_[0], source_port_[1],
|
|
dest_port_[0], dest_port_[1],
|
|
## rest is Don't Care
|
|
]
|
|
|
|
# UDP header is a subset of the TCP header
|
|
# <https://en.wikipedia.org/wiki/User_Datagram_Protocol#UDP_datagram_structure>
|
|
UdpFrame = TcpFrame
|
|
|
|
def ips_from_str(ip: str|None = None) -> list[IpAddr|None]:
|
|
''' return all known IPs, or [ None ] if IP addr isn't known '''
|
|
ips = IpAddr.parse_any(ip) if ip is not None else []
|
|
return ips or [None]
|
|
|
|
def build_arp(dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]:
|
|
dest_ips = ips_from_str(dest_ip)
|
|
dest_mac = MacAddr(dest_mac) if dest_mac is not None else None
|
|
return [EthernetFrame(EtherType.ARP, ArpFrame(dest_ip=ip, dest_mac=dest_mac)) for ip in dest_ips]
|
|
|
|
def build_tcp(source_port: int|None = None, dest_port: int|None = None, dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]:
|
|
source_port = Port(source_port) if source_port is not None else None
|
|
dest_port = Port(dest_port) if dest_port is not None else None
|
|
dest_ips = ips_from_str(dest_ip)
|
|
dest_mac = MacAddr(dest_mac) if dest_mac is not None else None
|
|
return [
|
|
EthernetFrame(EtherType.IPv4, dest_mac=dest_mac,
|
|
payload=IpFrame(IpProtocol.TCP, dest_ip=ip,
|
|
payload=TcpFrame(source_port=source_port, dest_port=dest_port)
|
|
)
|
|
) for ip in dest_ips
|
|
]
|
|
|
|
def build_udp(source_port: int|None = None, dest_port: int|None = None, dest_ip: str|None = None, dest_mac: str|None = None) -> list[EthernetFrame]:
|
|
source_port = Port(source_port) if source_port is not None else None
|
|
dest_port = Port(dest_port) if dest_port is not None else None
|
|
dest_ips = ips_from_str(dest_ip)
|
|
dest_mac = MacAddr(dest_mac) if dest_mac is not None else None
|
|
return [
|
|
EthernetFrame(EtherType.IPv4, dest_mac=dest_mac,
|
|
payload=IpFrame(IpProtocol.UDP, dest_ip=ip,
|
|
payload=UdpFrame(source_port=source_port, dest_port=dest_port)
|
|
)
|
|
) for ip in dest_ips
|
|
]
|
|
|
|
def exec_with(executor, args: list[str]):
|
|
logger.debug("invoking: {}".format(' '.join(args)))
|
|
executor(args)
|
|
|
|
def exec_real(args: list[str]):
|
|
exec_with(subprocess.check_output, args)
|
|
|
|
def exec_dry(args: list[str]):
|
|
exec_with(lambda _: None, args)
|
|
|
|
def main():
|
|
logging.basicConfig()
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
parser = argparse.ArgumentParser(description="configure the RTL8723cs WiFi chip to wake the CPU on specific incoming packets")
|
|
parser.add_argument('--dry-run', action='store_true')
|
|
subparsers = parser.add_subparsers(help="type of match")
|
|
|
|
enable_clean_parser = subparsers.add_parser('enable-clean', help="enable WOWLAN and wipe existing patterns")
|
|
enable_clean_parser.set_defaults(type_='enable_clean')
|
|
|
|
arp_parser = subparsers.add_parser('arp', help="wake on ARP request")
|
|
arp_parser.set_defaults(type_='arp')
|
|
arp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic")
|
|
arp_parser.add_argument('--dest-mac', help="ab:cd:...")
|
|
|
|
tcp_parser = subparsers.add_parser('tcp', help="wake on TCP packet")
|
|
tcp_parser.set_defaults(type_='tcp')
|
|
tcp_parser.add_argument('--source-port', type=int)
|
|
tcp_parser.add_argument('--dest-port', type=int)
|
|
tcp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic")
|
|
tcp_parser.add_argument('--dest-mac', help="ab:cd:...")
|
|
|
|
udp_parser = subparsers.add_parser('udp', help="wake on UDP packet")
|
|
udp_parser.set_defaults(type_='udp')
|
|
udp_parser.add_argument('--source-port', type=int)
|
|
udp_parser.add_argument('--dest-port', type=int)
|
|
udp_parser.add_argument('--dest-ip', help="a.b.c.d or the special 'SELF' for automatic")
|
|
udp_parser.add_argument('--dest-mac', help="ab:cd:...")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.dry_run:
|
|
exec_ = exec_dry
|
|
else:
|
|
exec_ = exec_real
|
|
|
|
if args.type_ == 'enable_clean':
|
|
exec_(['iw', 'phy0', 'wowlan', 'enable', 'any'])
|
|
exec_(['iwpriv', 'wlan0', 'wow_set_pattern', 'clean'])
|
|
|
|
frames = []
|
|
if args.type_ == 'arp':
|
|
frames = build_arp(dest_ip=args.dest_ip, dest_mac=args.dest_mac)
|
|
if args.type_ == 'tcp':
|
|
frames = build_tcp(source_port=args.source_port, dest_port=args.dest_port, dest_ip=args.dest_ip, dest_mac=args.dest_mac)
|
|
if args.type_ == 'udp':
|
|
frames = build_udp(source_port=args.source_port, dest_port=args.dest_port, dest_ip=args.dest_ip, dest_mac=args.dest_mac)
|
|
|
|
for frame in frames:
|
|
pattern = str(frame)
|
|
exec_(['iwpriv', 'wlan0', 'wow_set_pattern', f'pattern={pattern}'])
|
|
|
|
if __name__ == '__main__':
|
|
main()
|