eg25-control: split out a Phy abstraction behind which to hide megi's modem-power API

This commit is contained in:
2024-09-20 03:07:09 +00:00
parent 8559de949e
commit c81a6f51e2

View File

@@ -280,11 +280,36 @@ class AutoGps:
disable = "0"
enable = "1"
class Phy:
"""
abstract base class for interfaceing with the modem on the physical layer.
i.e. powering on/off, and potentially gpio-level signalling.
"""
def power_on(self) -> None:
raise NotImplementedError()
def power_off(self) -> None:
raise NotImplementedError()
class MegiPhy(Phy):
"""
Phy implementation built on megi's modem-power driver (requires megi kernel patches)
"""
def __init__(self, executor: Executor, power_endpoint: str):
self.executor = executor
self.power_endpoint = power_endpoint
def power_on(self) -> None:
self.executor.write_file(self.power_endpoint, b'1')
def power_off(self) -> None:
self.executor.write_file(self.power_endpoint, b'0')
class Sequencer:
def __init__(self, executor: Executor, modem: str, power_endpoint: str, state_fs: Filesystem):
def __init__(self, executor: Executor, modem: str, modem_phy: Phy, state_fs: Filesystem):
self.executor = executor
self.modem = modem
self.power_endpoint = power_endpoint
self.modem_phy = modem_phy
self.state_fs = state_fs
def _mmcli(self, args: list[str], check: bool = True) -> str:
@@ -416,7 +441,7 @@ class Sequencer:
@log_scope("powering modem...", "modem powered")
def power_on(self) -> None:
self.executor.write_file(self.power_endpoint, b'1')
self.modem_phy.power_on()
while self._try_mmcli([]) is None:
logger.info("modem hasn't appeared: sleeping for 1s")
time.sleep(1) # wait for modem to appear
@@ -428,7 +453,7 @@ class Sequencer:
@log_scope("halting modem...", "modem halted")
def power_off(self) -> None:
self.executor.write_file(self.power_endpoint, b'0')
self.modem_phy.power_off()
while self._try_mmcli([]):
logger.info("modem still powered: sleeping for 1s")
time.sleep(1) # wait for modem to disappear
@@ -583,7 +608,7 @@ def main():
executor = Executor(dry_run=args.dry_run)
state_fs = Filesystem(executor, root=state_dir)
sequencer = Sequencer(executor, modem=args.modem, power_endpoint=args.power_endpoint, state_fs=state_fs)
sequencer = Sequencer(executor, modem=args.modem, modem_phy=MegiPhy(executor, args.power_endpoint), state_fs=state_fs)
if args.power_on:
sequencer.power_on()