#!/usr/bin/env nix-shell #!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p rtl8723cs-wowlan -p util-linux # yeah, this isn't technically a hook, but the hook infrastructure isn't actually # restricted to stuff that starts with sxmo_hook_ ... # # this script is only called by sxmo_autosuspend, which is small, so if i wanted to # be more proper i could instead re-implement autosuspend + integrations. # # N.B.: if any wake locks are acquired between invocation of this script and the # rtcwake call below, suspend will fail -- even if those locks are released during # the same period. # # this is because the caller of this script writes /sys/power/wakeup_count, and the # kernel checks consistency with that during the actual suspend request. # see: # # for this reason, keep this script as short as possible. # # common sources of wakelocks (which one may wish to reduce) include: # - `sxmo_led.sh blink` (every 2s, by default) import argparse import logging import socket import subprocess import time logger = logging.getLogger(__name__) NTFY_HOST = 'uninsane.org' NTFY_PORT_BASE = 5550 # duration in seconds to sleep for SUSPEND_TIME = 300 # take care that WOWLAN_DELAY might include more than you think (e.g. time spent configuring wowlan pattern rules) WOWLAN_DELAY = 6 # SXMO LED blink frequency to set on resume BLINK_FREQ = 5 class Executor: def __init__(self, dry_run: bool = False): self.dry_run = dry_run def exec(self, cmd: list[str], sudo: bool = False, check: bool = True, wait: bool = True): if check: assert wait, "can't check_output without first waiting for process completion" if sudo: cmd = [ 'doas' ] + cmd logger.debug(" ".join(cmd)) if self.dry_run: return if wait: try: res = subprocess.run(cmd, capture_output=True) except Exception as e: if check: raise logger.warning(f"error invoking subprocess: {e}") return logger.debug(res.stdout) if res.stderr: logger.warning(res.stderr) if check: res.check_returncode() else: res = subprocess.Popen(cmd) return res def try_connect(self, dest, delay): logger.debug(f"opening socket to {dest} with timeout {delay}") if self.dry_run: return None try: return socket.create_connection(dest, delay) except Exception as e: logger.warning(f"failed to connect: {e}") class Suspender: def __init__(self, executor: Executor, wowlan_delay: float): self.executor = executor self.wowlan_delay = wowlan_delay self.ntfy_socket = None def ntfy_addr(self) -> (int, int|None, str|None): ''' returns (remote port, local port, local ip) ''' remote_port = NTFY_PORT_BASE + self.wowlan_delay local_ip, local_port = self.ntfy_socket.getsockname() if self.ntfy_socket is not None else (None, None) return remote_port, local_port, local_ip def open_ntfy_stream(self): self.ntfy_socket = self.executor.try_connect((NTFY_HOST, self.ntfy_addr()[0]), 0.5*self.wowlan_delay) def close_ntfy_stream(self): ''' call before exit to ensure socket is cleanly shut down and not leaked ''' try: self.ntfy_socket.shutdown(socket.SHUT_RDWR) self.ntfy_socket.close() except: pass # shutdown can error if the socket was already terminated (by the remote) def configure_wowlan(self): # TODO: don't do this wowlan stuff every single time. # - it's costly (can take like 1sec) # alternative is to introduce some layer of cache: # - do so in a way such that WiFi connection state changes invalidate the cache # - because wowlan enable w/o connection may well behave differently than w/ connection # - calculating IP addr from link, and then caching on the args we call our helper with may well suffice # and no need to invoke a subprocess here, when it's just python code calling other python code! self.executor.exec(['rtl8723cs-wowlan', 'enable-clean'], sudo=True, check=False) # wake on ssh self.executor.exec(['rtl8723cs-wowlan', 'tcp', '--dest-port', '22', '--dest-ip', 'SELF'], sudo=True, check=False) # wake on notification (ntfy/Universal Push) remote_port, local_port, local_ip = self.ntfy_addr() dest_port_args = ['--dest-port', str(local_port)] if local_port is not None else [] dest_ip_args = ['--dest-ip', local_ip] if local_ip is not None else ['--dest-ip', 'SELF'] self.executor.exec(['rtl8723cs-wowlan', 'tcp', '--source-port', str(remote_port)] + dest_port_args + dest_ip_args, sudo=True, check=False) # wake if someone doesn't know how to route to us, because that could obstruct the above # self.executor.exec(['rtl8723cs-wowlan', 'arp', '--dest-ip', 'SELF'], sudo=True, check=False) # specifically wake upon ARP request via the broadcast address. # peers don't usually go straight for broadcast, but rather try ARPing the mac they knew us on. # hence, waking on broadcast makes this a bit less racy (less likely to see broadcast ARP immediately upon suspend enter). # N.B.: wowlan also offloads arp, so this rule shouldn't be exercised except when things glitch. self.executor.exec(['rtl8723cs-wowlan', 'arp', '--dest-ip', 'SELF', '--dest-mac', 'ff:ff:ff:ff:ff:ff'], sudo=True, check=False) def suspend(self, duration: int, mode: str): logger.info(f"calling suspend for duration: {duration}") if mode == 'rtcwake': self.executor.exec(['rtcwake', '-m', 'mem', '-s', str(duration)], sudo=True, check=False) elif mode == 'sleep': time.sleep(duration) else: assert False, f"unknown suspend mode: {mode}" class SxmoApi: def __init__(self, executor: Executor): self.executor = executor def halt_services(self) -> None: res = self.executor.exec(['sxmo_jobs.sh', 'running', 'periodic_blink'], check=False) self.was_blinking = res and res.returncode == 0 if self.was_blinking: self.executor.exec(['sxmo_jobs.sh', 'stop', 'periodic_blink'], check=False) def resume_services(self) -> None: if self.was_blinking: # XXX: sxmo_jobs.sh is supposed to run the job in the background, but somehow it fails (blocks), only when invoked from Python. # oh well, just call it asynchronously (wait=False) self.executor.exec(['sxmo_jobs.sh', 'start', 'periodic_blink', 'sxmo_run_periodically.sh', str(BLINK_FREQ), 'sxmo_led.sh', 'blink', 'red', 'blue'], check=False, wait=False) def call_postwake_hook(self) -> None: self.executor.exec(['sxmo_hook_postwake.sh'], check=False) def main(): logging.basicConfig() logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser(description="suspend the pinephone to RAM, and configure wake triggers to make that appear more transparent") parser.add_argument("--dry-run", action='store_true', help="print commands instead of executing them") parser.add_argument("--verbose", action='store_true', help="log each command before executing") parser.add_argument("--duration", type=int, default=SUSPEND_TIME, help="maximum duration to sleep for, in seconds") parser.add_argument("--wowlan-delay", type=int, default=WOWLAN_DELAY, help="minimum number of seconds after entering sleep during which wowlan is racy") parser.add_argument("--suspend-mode", choices=['rtcwake', 'sleep'], default='rtcwake', help="how to sleep") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) suspend_time = args.duration wowlan_delay = args.wowlan_delay suspend_mode = args.suspend_mode executor = Executor(dry_run=args.dry_run) suspender = Suspender(executor, wowlan_delay=wowlan_delay) sxmo_api = SxmoApi(executor) sxmo_api.halt_services() suspender.open_ntfy_stream() suspender.configure_wowlan() time_start = time.time() # irq_start="$(cat /proc/interrupts | grep 'rtw_wifi_gpio_wakeup' | tr -s ' ' | xargs echo | cut -d' ' -f 2)" suspender.suspend(suspend_time, mode=suspend_mode) # irq_end="$(cat /proc/interrupts | grep 'rtw_wifi_gpio_wakeup' | tr -s ' ' | xargs echo | cut -d' ' -f 2)" time_spent = time.time() - time_start logger.info(f"suspended for {time_spent:.0f} seconds") suspender.close_ntfy_stream() sxmo_api.resume_services() sxmo_api.call_postwake_hook() if __name__ == '__main__': main()