eg25-control: fix --enable-gps and --ensure-agps commands

these were failing due to pathing changes from systemd -> s6
This commit is contained in:
Colin 2024-05-23 02:35:01 +00:00
parent d9922f8aa8
commit e6a8f5bae8
2 changed files with 61 additions and 32 deletions

View File

@ -22,6 +22,8 @@ in
cleanupCommand = "eg25-control --disable-gps --dump-debug-info --verbose";
depends = [ "eg25-control-powered" ];
};
persist.byStore.plaintext = [ ".cache/eg25-control" ]; #< for cached agps data
};
# TODO: port to s6
@ -38,9 +40,7 @@ in
ExecStart = "${cfg.package}/bin/eg25-control --ensure-agps-cache --verbose";
Restart = "no";
User = "eg25-control";
WorkingDirectory = "/var/lib/eg25-control";
StateDirectory = "eg25-control";
User = "colin";
};
startAt = "hourly"; # this is a bit more than necessary, but idk systemd calendar syntax
after = [ "network-online.target" "nss-lookup.target" ];
@ -48,26 +48,10 @@ in
# wantedBy = [ "network-online.target" ]; # auto-start immediately after boot
};
users = lib.mkIf cfg.enabled {
groups.eg25-control = {};
users.eg25-control = {
group = "eg25-control";
isSystemUser = true;
home = "/var/lib/eg25-control";
extraGroups = [
"dialout" # required to read /dev/ttyUSB1
"networkmanager" # required to authenticate with mmcli
];
};
};
sane.persist.sys.byStore.plaintext = lib.mkIf cfg.enabled [
# to persist agps data, i think.
{ user = "eg25-control"; group = "eg25-control"; path = "/var/lib/eg25-control"; }
];
services.udev.extraRules = let
chmod = "${pkgs.coreutils}/bin/chmod";
chown = "${pkgs.coreutils}/bin/chown";
in ''
in lib.optionalString cfg.enabled ''
# make Modem controllable by user
DRIVER=="modem-power", RUN+="${chmod} g+w /sys%p/powered", RUN+="${chown} :networkmanager /sys%p/powered"
'';

View File

@ -117,6 +117,12 @@ class Executor:
logger.debug(f"failed to read {path}: {e}")
return default
def ctime(self, path: str):
try:
return datetime.datetime.fromtimestamp(os.stat(path).st_ctime)
except FileNotFoundError as _:
return None
@destructive
def write_file(self, path: str, data: bytes) -> None:
logger.debug(f"echo {data!r} > {path}")
@ -149,6 +155,34 @@ class Executor:
res.check_returncode()
return res.stdout
class Filesystem:
def __init__(self, executor: Executor, root: str):
self.executor = executor
self.root = root
def translate_out(self, rel_path) -> str:
""" given a path rooted in this filesystem, translate it to an absolute path for use outside the FS """
return os.path.join(self.root, rel_path)
## wrapped methods around Executor internals
def read_file(self, path: str, default: bytes = b'') -> bytes:
return self.executor.read_file(self.translate_out(path), default)
def ctime(self, path: str):
return self.executor.ctime(self.translate_out(path))
def write_file(self, path: str, data: bytes) -> None:
self.executor.write_file(self.translate_out(path), data)
def mkdir(self, path: str) -> None:
self.executor.mkdir(self.translate_out(path))
def mv(self, from_: str, to: str) -> None:
self.executor.mv(self.translate_out(from_), self.translate_out(to))
def rm(self, p: str) -> None:
self.executor.rm(self.translate_out(p))
class GNSSConfig:
# GNSS-AP-Note 2.2.7
# Supported GNSS constellations. GPS is always ON
@ -238,10 +272,11 @@ class AutoGps:
enable = "1"
class Sequencer:
def __init__(self, executor: Executor, modem: str, power_endpoint: str):
def __init__(self, executor: Executor, modem: str, power_endpoint: str, state_fs: Filesystem):
self.executor = executor
self.modem = modem
self.power_endpoint = power_endpoint
self.state_fs = state_fs
def _mmcli(self, args: list[str], check: bool = True) -> str:
return self.executor.exec(
@ -298,10 +333,15 @@ class Sequencer:
return self._at_structured_cmd("QGPSCFG", "autogps", enable, **kwargs)
def _download_assistance_data(self, variant: AgpsDataVariant) -> str | None:
self.executor.mkdir("new")
self.state_fs.mkdir("new")
out_path = f"new/{variant}"
try:
self.executor.exec(["curl", f"{AGPS_DATA_URI_BASE}/{variant}", "-o", out_path])
self.executor.exec([
"curl",
f"{AGPS_DATA_URI_BASE}/{variant}",
"-o",
self.state_fs.translate_out(out_path)
])
return out_path
except subprocess.CalledProcessError as e:
logger.warning(f"AGPS data download failed: {e}")
@ -317,10 +357,10 @@ class Sequencer:
if good:
# N.B.: if locdata_path is something that exists in cache/,
# then this rename is a safe no-op.
self.executor.mkdir("cache")
self.executor.mv(locdata_path, f"cache/{variant}")
self.state_fs.mkdir("cache")
self.state_fs.mv(locdata_path, f"cache/{variant}")
else:
self.executor.rm(locdata_path)
self.state_fs.rm(locdata_path)
def _get_cached_assistance_data(self, variant: AgpsDataVariant, cache_dir: str = "cache") -> tuple[bool, str | None]:
@ -332,9 +372,8 @@ class Sequencer:
locdata_path = f"{cache_dir}/{variant}"
try:
last_cache_date = datetime.datetime.fromtimestamp(os.stat(locdata_path).st_ctime)
except FileNotFoundError as e:
last_cache_date = self.state_fs.ctime(locdata_path)
if last_cache_date is None:
return False, None
is_fresh = datetime.datetime.now() - last_cache_date < AGPS_CACHE_REFRESH_AFTER
@ -466,7 +505,7 @@ class Sequencer:
# self._at_structured_cmd("QGPSXTRATIME", value=f"0,\"{now}\"")
is_fresh, locdatas = self._get_any_assistance_data(DEFAULT_AGPS_DATA_VARIANT())
for locdata in locdatas:
ret = self._try_mmcli([f"--location-inject-assistance-data={locdata}"])
ret = self._try_mmcli([f"--location-inject-assistance-data={self.state_fs.translate_out(locdata)}"])
is_success = ret is not None
self._mark_assistance_data(locdata, is_success)
if is_success:
@ -510,6 +549,7 @@ def main():
parser = argparse.ArgumentParser(description="initialize the eg25 Pinephone modem for GPS tracking")
parser.add_argument('--modem', default='any', help='name of modem to configure (see mmcli --list-modems)')
parser.add_argument('--power-endpoint', default='/sys/class/modem-power/modem-power/device/powered', help='sysfs endpoint that can turn the modem on/off')
parser.add_argument('--state-dir', default='')
parser.add_argument("--dry-run", action='store_true', help="print commands instead of executing them")
parser.add_argument("--verbose", action='store_true', help="log each command before executing")
@ -528,8 +568,13 @@ def main():
if args.verbose or args.dump_debug_info:
logging.getLogger().setLevel(logging.DEBUG)
executor = Executor(args.dry_run)
sequencer = Sequencer(executor, modem=args.modem, power_endpoint=args.power_endpoint)
state_dir = args.state_dir
if not state_dir:
state_dir = os.path.join(os.getenv("HOME", "/var/lib/eg25-control"), ".cache/eg25-control")
executor = Executor(dry_run=args.dry_run)
state_fs = Filesystem(executor, root=state_dir)
sequencer = Sequencer(executor, modem=args.modem, power_endpoint=args.power_endpoint, state_fs=state_fs)
if args.power_on:
sequencer.power_on()