diff --git a/pkgs/additional/eg25-control/eg25-control b/pkgs/additional/eg25-control/eg25-control index 7bb47dac..114f5625 100755 --- a/pkgs/additional/eg25-control/eg25-control +++ b/pkgs/additional/eg25-control/eg25-control @@ -64,6 +64,7 @@ POWER_ENDPOINT = "/sys/class/modem-power/modem-power/device/powered" AGPS_DATA_URI_BASE = "https://xtrapath4.izatcloud.net" ON_DISK_TIME_FMT = '%Y/%m/%d,%H:%M:%S' AGPS_CACHE_REFRESH_AFTER = datetime.timedelta(days=1) +DEFAULT_AGPS_DATA_VARIANT = lambda: AgpsDataVariant.gps_glonass_beidou class AgpsDataVariant: # GNSS-AP-Note 1.4: @@ -131,6 +132,11 @@ class Executor: logger.debug(f"mv {from_} -> {to}") os.rename(from_, to) + @destructive + def rm(self, p: str) -> None: + logger.debug(f"rm {p}") + os.remove(p) + @destructive(return_=b'') def exec(self, cmd: list[str], check: bool = True) -> bytes: logger.debug(" ".join(cmd)) @@ -301,29 +307,35 @@ class Sequencer: logger.warning(f"AGPS data download failed: {e}") return None - def _cache_assistance_data(self, fresh_path: str) -> None: - '''call after successful upload to indicate that the data recently retrieved should be cached''' - if fresh_path.startswith("cache/"): - return # if the caller used cached data, avoid updating its timestamp - - variant = os.path.basename(fresh_path) + def _mark_assistance_data(self, locdata_path: str, good: bool) -> None: + ''' + call after attempted upload to indicate if locdata is good/bad. + locdata_path might be a cached locdata or a new locdata. + ''' + variant = os.path.basename(locdata_path) try: - timestamp = datetime.datetime.fromtimestamp(os.stat(fresh_path).st_mtime) - except FileNotFoundError as e: - logger.warning(f"failed to cache previously-downloaded assistance data: {e}") + timestamp = datetime.datetime.fromtimestamp(os.stat(locdata_path).st_mtime) + except FileNotFoundError as e: # main reason to handle this is so --dry-run works + logger.warning(f"failed to mark previously-downloaded assistance data: {e}") return - self.executor.mkdir("cache") - self.executor.mv(fresh_path, f"cache/{variant}") - self.executor.write_file(f"cache/{variant}.ts", timestamp.encode()) + if good: + # N.B.: if locdata_path is something that exists in cache/, + # then this rename is a safe no-op (no-op renames don't update ctime/mtime). + self.executor.mkdir("cache") + self.executor.mv(locdata_path, f"cache/{variant}") + self.executor.write_file(f"cache/{variant}.ts", timestamp.encode()) # TODO: why aren't we using fs-native timestamps here? + else: + self.executor.rm(locdata_path) - def _get_cached_assistance_data(self, variant: AgpsDataVariant) -> tuple[bool, str | None]: + + def _get_cached_assistance_data(self, variant: AgpsDataVariant, cache_dir: str = "cache") -> tuple[bool, str | None]: ''' returns: - whether cached data is fresh (i.e. won't benefit from an update) - a path to the cached AGPS data, or None if no valid cached entry ''' - last_cache_datestr = self.executor.read_file(f"cache/{variant}.ts", default=b'').decode('utf-8') + last_cache_datestr = self.executor.read_file(f"{cache_dir}/{variant}.ts", default=b'').decode('utf-8') if not last_cache_datestr: return False, None @@ -334,21 +346,32 @@ class Sequencer: return False, None is_fresh = datetime.datetime.now() - last_cache_date < AGPS_CACHE_REFRESH_AFTER - return is_fresh, f"cache/{variant}" + return is_fresh, f"{cache_dir}/{variant}" - def _get_any_assistance_data(self, variant: AgpsDataVariant) -> str | None: + def _get_any_assistance_data(self, variant: AgpsDataVariant) -> tuple[bool, list[str]]: ''' - checks if we have fresh cached data: - - if so, returns that. - - if not, *tries* to update it. - - if update succeeds, returns this new data - - if update fails, returns stale cached data (because it might still have *some* use/lifetime) + checks if we have fresh cached data. + - if not, tries to download fresh AGPS data + returns a list of location data paths to try, in order + - because new data might not necessarily be valid (download error). + - and stale data might still have *some* use. ''' - is_fresh, cached_path = self._get_cached_assistance_data(variant) - if is_fresh: return cached_path + is_fresh_untested, untested_path = self._get_cached_assistance_data(variant, cache_dir="new") + is_fresh_cached, cached_path = self._get_cached_assistance_data(variant) - better_data = self._download_assistance_data(variant) - return better_data or cached_path + paths_to_try = [] + if not (is_fresh_untested or is_fresh_cached): + better_data = self._download_assistance_data(variant) + if better_data: + untested_path = better_data + is_fresh_untested = True + + if untested_path: + paths_to_try.append(untested_path) + if cached_path: + paths_to_try.append(cached_path) + + return is_fresh_untested or is_fresh_cached, paths_to_try @log_scope("powering modem...", "modem powered") @@ -427,6 +450,11 @@ class Sequencer: self._at_structured_cmd("QCFG", "urc/ri/pin", "uart_ri") self._at_structured_cmd("QURCCFG", "urcport", "\"all\"") + @log_scope("ensuring on-disk AGPS...", "AGPS is available on-disk") + def ensure_agps_cache(self) -> None: + is_fresh, path = self._get_any_assistance_data(DEFAULT_AGPS_DATA_VARIANT()) + assert is_fresh, "failed to ensure on-disk AGPS data" + @log_scope("configuring gps...", "gps configured") def enable_gps(self) -> None: # disable GNSS, because it's only configurable while offline @@ -438,11 +466,13 @@ class Sequencer: # now = datetime.datetime.now().strftime('%Y/%m/%d,%H:%M:%S') # UTC # self._at_structured_cmd("QGPSXTRATIME", value=f"0,\"{now}\"") - locdata = self._get_any_assistance_data(AgpsDataVariant.gps_glonass_beidou) - if locdata: + is_fresh, locdatas = self._get_any_assistance_data(DEFAULT_AGPS_DATA_VARIANT()) + for locdata in locdatas: ret = self._try_mmcli([f"--location-inject-assistance-data={locdata}"]) - if ret is not None: - self._cache_assistance_data(locdata) + is_success = ret is not None + self._mark_assistance_data(locdata, is_success) + if is_success: + break # no need to try any more locdatas self._at_gnssconfig(GNSSConfig.gps_glonass_beidou_galileo) self._at_odpcontrol(ODPControl.disable) @@ -489,6 +519,7 @@ def main(): parser.add_argument('--power-on', action='store_true', help="enable power to the modem") parser.add_argument('--enable-audio', action='store_true', help="configure audio for calling (?)") parser.add_argument('--enable-urc', action='store_true', help="enable support for Unsolicited Return Codes (?)") + parser.add_argument('--ensure-agps-cache', action='store_true', help="ensure we have fresh assisted-GPS data available") parser.add_argument('--enable-gps', action='store_true', help="enable the GPS and acquire tracking until asked to stop") parser.add_argument('--enable-powersave', action='store_true', help="configure modem to sleep when possible") parser.add_argument('--disable-gps', action='store_true', help="disable the GPS and stop any tracking") @@ -508,6 +539,8 @@ def main(): sequencer.enable_audio() if args.enable_urc: sequencer.enable_urc() + if args.ensure_agps_cache: + sequencer.ensure_agps_cache() # N.B.: this will raise on failure, to get non-zero exit code if args.enable_gps: sequencer.enable_gps() if args.enable_powersave: