eg25-control: add --ensure-agps-cache operation

This commit is contained in:
Colin 2023-09-15 03:33:00 +00:00
parent 0f3f566d25
commit bd18a6871c

View File

@ -64,6 +64,7 @@ POWER_ENDPOINT = "/sys/class/modem-power/modem-power/device/powered"
AGPS_DATA_URI_BASE = "https://xtrapath4.izatcloud.net"
ON_DISK_TIME_FMT = '%Y/%m/%d,%H:%M:%S'
AGPS_CACHE_REFRESH_AFTER = datetime.timedelta(days=1)
DEFAULT_AGPS_DATA_VARIANT = lambda: AgpsDataVariant.gps_glonass_beidou
class AgpsDataVariant:
# GNSS-AP-Note 1.4:
@ -131,6 +132,11 @@ class Executor:
logger.debug(f"mv {from_} -> {to}")
os.rename(from_, to)
@destructive
def rm(self, p: str) -> None:
logger.debug(f"rm {p}")
os.remove(p)
@destructive(return_=b'')
def exec(self, cmd: list[str], check: bool = True) -> bytes:
logger.debug(" ".join(cmd))
@ -301,29 +307,35 @@ class Sequencer:
logger.warning(f"AGPS data download failed: {e}")
return None
def _cache_assistance_data(self, fresh_path: str) -> None:
'''call after successful upload to indicate that the data recently retrieved should be cached'''
if fresh_path.startswith("cache/"):
return # if the caller used cached data, avoid updating its timestamp
variant = os.path.basename(fresh_path)
def _mark_assistance_data(self, locdata_path: str, good: bool) -> None:
'''
call after attempted upload to indicate if locdata is good/bad.
locdata_path might be a cached locdata or a new locdata.
'''
variant = os.path.basename(locdata_path)
try:
timestamp = datetime.datetime.fromtimestamp(os.stat(fresh_path).st_mtime)
except FileNotFoundError as e:
logger.warning(f"failed to cache previously-downloaded assistance data: {e}")
timestamp = datetime.datetime.fromtimestamp(os.stat(locdata_path).st_mtime)
except FileNotFoundError as e: # main reason to handle this is so --dry-run works
logger.warning(f"failed to mark previously-downloaded assistance data: {e}")
return
self.executor.mkdir("cache")
self.executor.mv(fresh_path, f"cache/{variant}")
self.executor.write_file(f"cache/{variant}.ts", timestamp.encode())
if good:
# N.B.: if locdata_path is something that exists in cache/,
# then this rename is a safe no-op (no-op renames don't update ctime/mtime).
self.executor.mkdir("cache")
self.executor.mv(locdata_path, f"cache/{variant}")
self.executor.write_file(f"cache/{variant}.ts", timestamp.encode()) # TODO: why aren't we using fs-native timestamps here?
else:
self.executor.rm(locdata_path)
def _get_cached_assistance_data(self, variant: AgpsDataVariant) -> tuple[bool, str | None]:
def _get_cached_assistance_data(self, variant: AgpsDataVariant, cache_dir: str = "cache") -> tuple[bool, str | None]:
'''
returns:
- whether cached data is fresh (i.e. won't benefit from an update)
- a path to the cached AGPS data, or None if no valid cached entry
'''
last_cache_datestr = self.executor.read_file(f"cache/{variant}.ts", default=b'').decode('utf-8')
last_cache_datestr = self.executor.read_file(f"{cache_dir}/{variant}.ts", default=b'').decode('utf-8')
if not last_cache_datestr:
return False, None
@ -334,21 +346,32 @@ class Sequencer:
return False, None
is_fresh = datetime.datetime.now() - last_cache_date < AGPS_CACHE_REFRESH_AFTER
return is_fresh, f"cache/{variant}"
return is_fresh, f"{cache_dir}/{variant}"
def _get_any_assistance_data(self, variant: AgpsDataVariant) -> str | None:
def _get_any_assistance_data(self, variant: AgpsDataVariant) -> tuple[bool, list[str]]:
'''
checks if we have fresh cached data:
- if so, returns that.
- if not, *tries* to update it.
- if update succeeds, returns this new data
- if update fails, returns stale cached data (because it might still have *some* use/lifetime)
checks if we have fresh cached data.
- if not, tries to download fresh AGPS data
returns a list of location data paths to try, in order
- because new data might not necessarily be valid (download error).
- and stale data might still have *some* use.
'''
is_fresh, cached_path = self._get_cached_assistance_data(variant)
if is_fresh: return cached_path
is_fresh_untested, untested_path = self._get_cached_assistance_data(variant, cache_dir="new")
is_fresh_cached, cached_path = self._get_cached_assistance_data(variant)
better_data = self._download_assistance_data(variant)
return better_data or cached_path
paths_to_try = []
if not (is_fresh_untested or is_fresh_cached):
better_data = self._download_assistance_data(variant)
if better_data:
untested_path = better_data
is_fresh_untested = True
if untested_path:
paths_to_try.append(untested_path)
if cached_path:
paths_to_try.append(cached_path)
return is_fresh_untested or is_fresh_cached, paths_to_try
@log_scope("powering modem...", "modem powered")
@ -427,6 +450,11 @@ class Sequencer:
self._at_structured_cmd("QCFG", "urc/ri/pin", "uart_ri")
self._at_structured_cmd("QURCCFG", "urcport", "\"all\"")
@log_scope("ensuring on-disk AGPS...", "AGPS is available on-disk")
def ensure_agps_cache(self) -> None:
is_fresh, path = self._get_any_assistance_data(DEFAULT_AGPS_DATA_VARIANT())
assert is_fresh, "failed to ensure on-disk AGPS data"
@log_scope("configuring gps...", "gps configured")
def enable_gps(self) -> None:
# disable GNSS, because it's only configurable while offline
@ -438,11 +466,13 @@ class Sequencer:
# now = datetime.datetime.now().strftime('%Y/%m/%d,%H:%M:%S') # UTC
# self._at_structured_cmd("QGPSXTRATIME", value=f"0,\"{now}\"")
locdata = self._get_any_assistance_data(AgpsDataVariant.gps_glonass_beidou)
if locdata:
is_fresh, locdatas = self._get_any_assistance_data(DEFAULT_AGPS_DATA_VARIANT())
for locdata in locdatas:
ret = self._try_mmcli([f"--location-inject-assistance-data={locdata}"])
if ret is not None:
self._cache_assistance_data(locdata)
is_success = ret is not None
self._mark_assistance_data(locdata, is_success)
if is_success:
break # no need to try any more locdatas
self._at_gnssconfig(GNSSConfig.gps_glonass_beidou_galileo)
self._at_odpcontrol(ODPControl.disable)
@ -489,6 +519,7 @@ def main():
parser.add_argument('--power-on', action='store_true', help="enable power to the modem")
parser.add_argument('--enable-audio', action='store_true', help="configure audio for calling (?)")
parser.add_argument('--enable-urc', action='store_true', help="enable support for Unsolicited Return Codes (?)")
parser.add_argument('--ensure-agps-cache', action='store_true', help="ensure we have fresh assisted-GPS data available")
parser.add_argument('--enable-gps', action='store_true', help="enable the GPS and acquire tracking until asked to stop")
parser.add_argument('--enable-powersave', action='store_true', help="configure modem to sleep when possible")
parser.add_argument('--disable-gps', action='store_true', help="disable the GPS and stop any tracking")
@ -508,6 +539,8 @@ def main():
sequencer.enable_audio()
if args.enable_urc:
sequencer.enable_urc()
if args.ensure_agps_cache:
sequencer.ensure_agps_cache() # N.B.: this will raise on failure, to get non-zero exit code
if args.enable_gps:
sequencer.enable_gps()
if args.enable_powersave: