eg25-control: cache the location assistance data

This commit is contained in:
Colin 2023-09-15 02:35:31 +00:00
parent a0c2ed38e6
commit 92451d1e28

View File

@ -61,6 +61,8 @@ POWER_ENDPOINT = "/sys/class/modem-power/modem-power/device/powered"
# also at xtrapath5 and xtrapath6 subdomains.
# the AGPS data here is an almanac good for 7 days.
AGPS_DATA_URI_BASE = "https://xtrapath4.izatcloud.net"
ON_DISK_TIME_FMT = '%Y/%m/%d,%H:%M:%S'
AGPS_CACHE_REFRESH_AFTER = datetime.timedelta(days=1)
class AgpsDataVariant:
# GNSS-AP-Note 1.4:
@ -104,12 +106,25 @@ class Executor:
def __init__(self, dry_run: bool = False):
self.dry_run = dry_run
def read_file(self, path: str, default: bytes) -> bytes:
try:
with open(path, 'rb') as f:
return f.read()
except FileNotFoundError as e:
logger.debug(f"failed to read {path}: {e}")
return default
@destructive
def write_file(self, path: str, data: bytes) -> None:
logger.debug(f"echo {data!r} > {path}")
with open(path, 'wb') as f:
f.write(data)
@destructive
def mkdir(self, path: str) -> None:
logger.debug(f"mkdir {path}")
os.makedirs(path, exist_ok=True)
@destructive(return_=b'')
def exec(self, cmd: list[str], check: bool = True) -> bytes:
logger.debug(" ".join(cmd))
@ -270,7 +285,7 @@ class Sequencer:
def _at_autogps(self, enable: AutoGps) -> str:
return self._at_structured_cmd("QGPSCFG", "autogps", enable)
def _get_assistance_data(self, variant: AgpsDataVariant) -> str | None:
def _download_assistance_data(self, variant: AgpsDataVariant) -> str | None:
try:
self.executor.exec(["curl", f"{self.AGPS_DATA_URI_BASE}/{variant}", "-o", variant])
return variant
@ -278,6 +293,54 @@ class Sequencer:
logger.warning(f"AGPS data download failed: {e}")
return None # TODO: could be smarter: return cached AGPS data?
def _cache_assistance_data(self, fresh_path: str) -> None:
'''call after successful upload to indicate that the data recently retrieved should be cached'''
if fresh_path.startswith("cache/"):
return # if the caller used cached data, avoid updating its timestamp
# TODO: we should use fs time for this
now = datetime.datetime.now().strftime(ON_DISK_TIME_FMT)
try:
self.executor.mkdir("cache")
self.executor.exec(["mv", fresh_path, "cache"])
self.executor.write_file(f"cache/{fresh_path}.ts", now.encode())
except subprocess.CalledProcessError as e:
logger.warning(f"failed to cache AGPS data: {e}")
def _get_cached_assistance_data(self, variant: AgpsDataVariant) -> tuple[bool, str | None]:
'''
returns:
- whether cached data is fresh (i.e. won't benefit from an update)
- a path to the cached AGPS data, or None if no valid cached entry
'''
last_cache_datestr = self.executor.read_file(f"cache/{variant}.ts", default=b'').decode('utf-8')
if not last_cache_datestr:
return False, None
try:
last_cache_date = datetime.datetime.strptime(last_cache_datestr, ON_DISK_TIME_FMT)
except ValueError as e:
logger.warning(f"failed to decode cache timestamp for {variant}: {e}")
return False, None
is_fresh = datetime.datetime.now() - last_cache_date < AGPS_CACHE_REFRESH_AFTER
return is_fresh, f"cache/{variant}"
def _get_any_assistance_data(self, variant: AgpsDataVariant) -> str | None:
'''
checks if we have fresh cached data:
- if so, returns that.
- if not, *tries* to update it.
- if update succeeds, returns this new data
- if update fails, returns stale cached data (because it might still have *some* use/lifetime)
'''
is_fresh, cached_path = self._get_cached_assistance_data(variant)
if is_fresh: return cached_path
better_data = self._download_assistance_data(variant)
return better_data or cached_path
@log_scope("powering modem...", "modem powered")
def power_on(self) -> None:
self.executor.write_file(self.power_endpoint, b'1')
@ -285,6 +348,11 @@ class Sequencer:
logger.info("modem hasn't appeared: sleeping for 1s")
time.sleep(1) # wait for modem to appear
# set modem to use UTC time instead of local time.
# modemmanager sends CTZU=3 during init and that causes `AT+CCLK?` to return a timestamp that's off by 600+ days
# see: <https://gitlab.freedesktop.org/mobile-broadband/ModemManager/-/issues/360>
self._at_structured_cmd("CTZU", value="1")
@log_scope("halting modem...", "modem halted")
def power_off(self) -> None:
self.executor.write_file(self.power_endpoint, b'0')
@ -351,11 +419,6 @@ class Sequencer:
@log_scope("configuring gps...", "gps configured")
def enable_gps(self) -> None:
# set modem to use UTC time instead of local time.
# modemmanager sends CTZU=3 during init and that causes `AT+CCLK?` to return a timestamp that's off by 600+ days
# see: <https://gitlab.freedesktop.org/mobile-broadband/ModemManager/-/issues/360>
self._at_structured_cmd("CTZU", value="1")
# disable GNSS, because it's only configurable while offline
self._at_structured_cmd("QGPSEND", check=False)
# self._at_structured_cmd("QGPS", value="0")
@ -365,9 +428,11 @@ class Sequencer:
# now = datetime.datetime.now().strftime('%Y/%m/%d,%H:%M:%S') # UTC
# self._at_structured_cmd("QGPSXTRATIME", value=f"0,\"{now}\"")
locdata = self._get_assistance_data(AgpsDataVariant.gps_glonass_beidou)
locdata = self._get_any_assistance_data(AgpsDataVariant.gps_glonass_beidou)
if locdata:
self._mmcli([f"--location-inject-assistance-data={locdata}"])
ret = self._try_mmcli([f"--location-inject-assistance-data={locdata}"])
if ret is not None:
self._cache_assistance_data(locdata)
self._at_gnssconfig(GNSSConfig.gps_glonass_beidou_galileo)
self._at_odpcontrol(ODPControl.disable)