diff --git a/pkgs/additional/eg25-control/eg25-control b/pkgs/additional/eg25-control/eg25-control index 93b4df1a..9d8eaa95 100755 --- a/pkgs/additional/eg25-control/eg25-control +++ b/pkgs/additional/eg25-control/eg25-control @@ -61,6 +61,8 @@ POWER_ENDPOINT = "/sys/class/modem-power/modem-power/device/powered" # also at xtrapath5 and xtrapath6 subdomains. # the AGPS data here is an almanac good for 7 days. AGPS_DATA_URI_BASE = "https://xtrapath4.izatcloud.net" +ON_DISK_TIME_FMT = '%Y/%m/%d,%H:%M:%S' +AGPS_CACHE_REFRESH_AFTER = datetime.timedelta(days=1) class AgpsDataVariant: # GNSS-AP-Note 1.4: @@ -104,12 +106,25 @@ class Executor: def __init__(self, dry_run: bool = False): self.dry_run = dry_run + def read_file(self, path: str, default: bytes) -> bytes: + try: + with open(path, 'rb') as f: + return f.read() + except FileNotFoundError as e: + logger.debug(f"failed to read {path}: {e}") + return default + @destructive def write_file(self, path: str, data: bytes) -> None: logger.debug(f"echo {data!r} > {path}") with open(path, 'wb') as f: f.write(data) + @destructive + def mkdir(self, path: str) -> None: + logger.debug(f"mkdir {path}") + os.makedirs(path, exist_ok=True) + @destructive(return_=b'') def exec(self, cmd: list[str], check: bool = True) -> bytes: logger.debug(" ".join(cmd)) @@ -270,7 +285,7 @@ class Sequencer: def _at_autogps(self, enable: AutoGps) -> str: return self._at_structured_cmd("QGPSCFG", "autogps", enable) - def _get_assistance_data(self, variant: AgpsDataVariant) -> str | None: + def _download_assistance_data(self, variant: AgpsDataVariant) -> str | None: try: self.executor.exec(["curl", f"{self.AGPS_DATA_URI_BASE}/{variant}", "-o", variant]) return variant @@ -278,6 +293,54 @@ class Sequencer: logger.warning(f"AGPS data download failed: {e}") return None # TODO: could be smarter: return cached AGPS data? + def _cache_assistance_data(self, fresh_path: str) -> None: + '''call after successful upload to indicate that the data recently retrieved should be cached''' + if fresh_path.startswith("cache/"): + return # if the caller used cached data, avoid updating its timestamp + + # TODO: we should use fs time for this + now = datetime.datetime.now().strftime(ON_DISK_TIME_FMT) + try: + self.executor.mkdir("cache") + self.executor.exec(["mv", fresh_path, "cache"]) + self.executor.write_file(f"cache/{fresh_path}.ts", now.encode()) + except subprocess.CalledProcessError as e: + logger.warning(f"failed to cache AGPS data: {e}") + + def _get_cached_assistance_data(self, variant: AgpsDataVariant) -> tuple[bool, str | None]: + ''' + returns: + - whether cached data is fresh (i.e. won't benefit from an update) + - a path to the cached AGPS data, or None if no valid cached entry + ''' + last_cache_datestr = self.executor.read_file(f"cache/{variant}.ts", default=b'').decode('utf-8') + if not last_cache_datestr: + return False, None + + try: + last_cache_date = datetime.datetime.strptime(last_cache_datestr, ON_DISK_TIME_FMT) + except ValueError as e: + logger.warning(f"failed to decode cache timestamp for {variant}: {e}") + return False, None + + is_fresh = datetime.datetime.now() - last_cache_date < AGPS_CACHE_REFRESH_AFTER + return is_fresh, f"cache/{variant}" + + def _get_any_assistance_data(self, variant: AgpsDataVariant) -> str | None: + ''' + checks if we have fresh cached data: + - if so, returns that. + - if not, *tries* to update it. + - if update succeeds, returns this new data + - if update fails, returns stale cached data (because it might still have *some* use/lifetime) + ''' + is_fresh, cached_path = self._get_cached_assistance_data(variant) + if is_fresh: return cached_path + + better_data = self._download_assistance_data(variant) + return better_data or cached_path + + @log_scope("powering modem...", "modem powered") def power_on(self) -> None: self.executor.write_file(self.power_endpoint, b'1') @@ -285,6 +348,11 @@ class Sequencer: logger.info("modem hasn't appeared: sleeping for 1s") time.sleep(1) # wait for modem to appear + # set modem to use UTC time instead of local time. + # modemmanager sends CTZU=3 during init and that causes `AT+CCLK?` to return a timestamp that's off by 600+ days + # see: + self._at_structured_cmd("CTZU", value="1") + @log_scope("halting modem...", "modem halted") def power_off(self) -> None: self.executor.write_file(self.power_endpoint, b'0') @@ -351,11 +419,6 @@ class Sequencer: @log_scope("configuring gps...", "gps configured") def enable_gps(self) -> None: - # set modem to use UTC time instead of local time. - # modemmanager sends CTZU=3 during init and that causes `AT+CCLK?` to return a timestamp that's off by 600+ days - # see: - self._at_structured_cmd("CTZU", value="1") - # disable GNSS, because it's only configurable while offline self._at_structured_cmd("QGPSEND", check=False) # self._at_structured_cmd("QGPS", value="0") @@ -365,9 +428,11 @@ class Sequencer: # now = datetime.datetime.now().strftime('%Y/%m/%d,%H:%M:%S') # UTC # self._at_structured_cmd("QGPSXTRATIME", value=f"0,\"{now}\"") - locdata = self._get_assistance_data(AgpsDataVariant.gps_glonass_beidou) + locdata = self._get_any_assistance_data(AgpsDataVariant.gps_glonass_beidou) if locdata: - self._mmcli([f"--location-inject-assistance-data={locdata}"]) + ret = self._try_mmcli([f"--location-inject-assistance-data={locdata}"]) + if ret is not None: + self._cache_assistance_data(locdata) self._at_gnssconfig(GNSSConfig.gps_glonass_beidou_galileo) self._at_odpcontrol(ODPControl.disable)