Colin
539d9e45a2
they require fundamentally different sandboxing approaches. the daemon *can't* always use bwrap if it wants to run as non-root. meanwhile the CLI tools would mostly *prefer* to run under bwrap. in the long term i'll maybe upstream the systemd sandboxing into nixpkgs, where there looks to be desire for it
602 lines
24 KiB
Plaintext
Executable File
602 lines
24 KiB
Plaintext
Executable File
#!/usr/bin/env nix-shell
|
|
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p curl -p modemmanager-split.mmcli
|
|
|
|
# this script should run after ModemManager.service is started.
|
|
# typical invocation is `eg25-control --power-on --enable-gps`.
|
|
# after running, the user may `cat /dev/ttyUSB1` to view NMEA-encoded GPS information.
|
|
# the script attempts to be idempotent, such that it may be run multiple times per boot.
|
|
#
|
|
# this script downloads assisted GPS (AGPS) data via the system's default gateway (i.e. WiFi)
|
|
# and shares that with the modem. this quickens the process of acquiring a GPS fix.
|
|
#
|
|
# the script may also configure other parts of the modem as `eg25-manager` does.
|
|
# these options are less tested: see `--help` for more.
|
|
#
|
|
# PREREQUISITES/DEPENDENCIES:
|
|
# this script expects to run on megi's kernel, with `CONFIG_MODEM_POWER=y`.
|
|
# ModemManager must be launched with the `--debug` flag, so that `mmcli --command=...` works.
|
|
#
|
|
# ModemManager, and by extension this script, REQUIRES A SIM CARD IN YOUR PHONE.
|
|
# the sim doesn't need to be "activated". you can buy a $1 SIM and never purchase
|
|
# service and that works; it's just needed for ModemManager to boot the modem.
|
|
# this isn't a fundamental requirement; if one did everything via serial instead of
|
|
# ModemManager the SIM would not be necessary for GPS.
|
|
#
|
|
# EXPECTATIONS/TIPS:
|
|
# - with the right environment, you may get a GPS fix in < 30s.
|
|
# - the fix is likely to have a *lot* of jitter, like 10+ meters.
|
|
# - indoors, you shouldn't expect to *ever* get a cold-start GPS fix.
|
|
# - maybe you'll track 1 satellite if lucky: enough to receive GPS time but not for a GPS fix.
|
|
# - get a fix outdoors, then walk indoors: GPS is smart enough to maintain a spotty fix.
|
|
# - outdoors in suburbia, a fix might take 10-20 minutes.
|
|
# - i have better luck *placing my phone on the roof of my car* than holding it in the air with my hand.
|
|
# - maybe a big metal plate opposite the sky acts as a dish/antenna?
|
|
# - in Seattle, i track several GLONASS and GPS sats: about an even split.
|
|
# - the GPS sats have better SNR.
|
|
# - modem seems to not show any BeiDou or Galileo sats even if i enable them.
|
|
#
|
|
# eg25 modem/GPS docs:
|
|
# [GNSS-AP-Note]: https://wiki.pine64.org/images/0/09/Quectel_EC2x%26EG9x%26EG2x-G%26EM05_Series_GNSS_Application_Note_V1.3.pdf
|
|
#
|
|
# most acronyms are defined inline, particularly near variable/class declarations.
|
|
# glossary, for those which aren't:
|
|
#
|
|
# Global Navigation Satellite Systems (GNSS):
|
|
# - GPS (US)
|
|
# - GLONASS (RU)
|
|
# - Galileo (EU)
|
|
# - BeiDou (CN)
|
|
# ^ these are all global systems, usable outside the country that owns them
|
|
|
|
|
|
import argparse
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
POWER_ENDPOINT = "/sys/class/modem-power/modem-power/device/powered"
|
|
GPS_STREAM_ENDPOINT = "/dev/ttyUSB1"
|
|
# GNSS-AP-Note 1.4:
|
|
# also at xtrapath5 and xtrapath6 subdomains.
|
|
# the AGPS data here is an almanac good for 7 days.
|
|
AGPS_DATA_URI_BASE = "https://xtrapath4.izatcloud.net"
|
|
ON_DISK_TIME_FMT = '%Y/%m/%d,%H:%M:%S'
|
|
AGPS_CACHE_REFRESH_AFTER = datetime.timedelta(days=1)
|
|
DEFAULT_AGPS_DATA_VARIANT = lambda: AgpsDataVariant.gps_glonass_beidou
|
|
|
|
class AgpsDataVariant:
|
|
# GNSS-AP-Note 1.4:
|
|
gps_glonass = "xtra2.bin"
|
|
gps_glonass_beidou = "xtra3grc.bin"
|
|
# N.B.: not supported by all Quectel modems
|
|
# on stock Pinephone, ModemManager gives "LOC service: general failure"
|
|
gps_glonass_beidou_galileo = "xtra3grcej.bin"
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def destructive(fn: callable = None, return_ = None):
|
|
""" decorate `fn` so that it becomes a no-op when --dry-run is active """
|
|
def wrapped(self, *args, **kwargs):
|
|
if self.dry_run:
|
|
fmt_args = ", ".join(
|
|
[repr(a) for a in args] +
|
|
[f"{k}={v}" for k,v in kwargs.items()]
|
|
)
|
|
logger.info(f"[dry run] {fn.__name__}({fmt_args})")
|
|
return return_
|
|
else:
|
|
return fn(self, *args, **kwargs)
|
|
if fn:
|
|
return wrapped
|
|
else:
|
|
return lambda fn: destructive(fn, return_=return_)
|
|
|
|
def log_scope(at_enter: str, at_exit: str):
|
|
""" decorate a function so that it logs at start and end """
|
|
def decorator(fn: callable):
|
|
def wrapped(*args, **kwargs):
|
|
logger.info(at_enter)
|
|
ret = fn(*args, **kwargs)
|
|
logger.info(at_exit)
|
|
return ret
|
|
return wrapped
|
|
return decorator
|
|
|
|
class Executor:
|
|
def __init__(self, dry_run: bool = False):
|
|
self.dry_run = dry_run
|
|
|
|
def read_file(self, path: str, default: bytes = b'') -> bytes:
|
|
try:
|
|
with open(path, 'rb') as f:
|
|
return f.read()
|
|
except FileNotFoundError as e:
|
|
logger.debug(f"failed to read {path}: {e}")
|
|
return default
|
|
|
|
def ctime(self, path: str):
|
|
try:
|
|
return datetime.datetime.fromtimestamp(os.stat(path).st_ctime)
|
|
except FileNotFoundError as _:
|
|
return None
|
|
|
|
@destructive
|
|
def write_file(self, path: str, data: bytes) -> None:
|
|
logger.debug(f"echo {data!r} > {path}")
|
|
with open(path, 'wb') as f:
|
|
f.write(data)
|
|
|
|
@destructive
|
|
def mkdir(self, path: str) -> None:
|
|
logger.debug(f"mkdir {path}")
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
@destructive
|
|
def mv(self, from_: str, to: str) -> None:
|
|
logger.debug(f"mv {from_} -> {to}")
|
|
os.rename(from_, to)
|
|
|
|
@destructive
|
|
def rm(self, p: str) -> None:
|
|
logger.debug(f"rm {p}")
|
|
os.remove(p)
|
|
|
|
@destructive(return_=b'')
|
|
def exec(self, cmd: list[str], check: bool = True) -> bytes:
|
|
logger.debug(" ".join(cmd))
|
|
res = subprocess.run(cmd, capture_output=True)
|
|
logger.debug(res.stdout)
|
|
if res.stderr:
|
|
logger.warning(res.stderr)
|
|
if check:
|
|
res.check_returncode()
|
|
return res.stdout
|
|
|
|
class Filesystem:
|
|
def __init__(self, executor: Executor, root: str):
|
|
self.executor = executor
|
|
self.root = root
|
|
|
|
def translate_out(self, rel_path) -> str:
|
|
""" given a path rooted in this filesystem, translate it to an absolute path for use outside the FS """
|
|
return os.path.join(self.root, rel_path)
|
|
|
|
## wrapped methods around Executor internals
|
|
def read_file(self, path: str, default: bytes = b'') -> bytes:
|
|
return self.executor.read_file(self.translate_out(path), default)
|
|
|
|
def ctime(self, path: str):
|
|
return self.executor.ctime(self.translate_out(path))
|
|
|
|
def write_file(self, path: str, data: bytes) -> None:
|
|
self.executor.write_file(self.translate_out(path), data)
|
|
|
|
def mkdir(self, path: str) -> None:
|
|
self.executor.mkdir(self.translate_out(path))
|
|
|
|
def mv(self, from_: str, to: str) -> None:
|
|
self.executor.mv(self.translate_out(from_), self.translate_out(to))
|
|
|
|
def rm(self, p: str) -> None:
|
|
self.executor.rm(self.translate_out(p))
|
|
|
|
class GNSSConfig:
|
|
# GNSS-AP-Note 2.2.7
|
|
# Supported GNSS constellations. GPS is always ON
|
|
# 0 GLONASS OFF/BeiDou OFF/Galileo OFF
|
|
# 1 GLONASS ON/BeiDou ON/Galileo ON
|
|
# 2 GLONASS ON/BeiDou ON/Galileo OFF
|
|
# 3 GLONASS ON/BeiDou OFF/Galileo ON
|
|
# 4 GLONASS ON/BeiDou OFF/Galileo OFF
|
|
# 5 GLONASS OFF/BeiDou ON/Galileo ON
|
|
# 6 GLONASS OFF/BeiDou OFF/Galileo ON
|
|
# 7 GLONASS OFF/BeiDou ON/Galileo OFF
|
|
gps = "0"
|
|
gps_glonass_beidou_galileo = "1"
|
|
gps_glonass_beidou = "2"
|
|
gps_glonass_galilego = "3"
|
|
gps_glonass = "4"
|
|
gps_beidou_galileo = "5"
|
|
gps_galileo = "6"
|
|
gps_beidou = "7"
|
|
|
|
class ODPControl:
|
|
# GNSS-AP-Note 2.2.8
|
|
# 0 Disable ODP
|
|
# 1 Low power mode
|
|
# 2 Ready mode
|
|
#
|
|
# ODP = "On-Demand Positioning"
|
|
# Low power mode:
|
|
# - low-frequency background GNSS tracking session
|
|
# - adjusts interval between 10m (when signal is good) - 60m (when signal is bad)
|
|
# Ready mode:
|
|
# - 1 Hz positioning
|
|
# - keeps GNSS ready so that when application demands position it's immediately ready
|
|
# - automatically stops positioning after 60s??
|
|
disable = "0"
|
|
lower_power_mode = "1"
|
|
ready_mode = "2"
|
|
|
|
class DPOEnable:
|
|
# GNSS-AP-Note 2.2.9
|
|
# 0 Disable DPO
|
|
# 1 Enable the DPO with dynamic duty cycle
|
|
#
|
|
# DPO = "Dynamic Power Optimization"
|
|
# automatically shuts off radio under certain conditions
|
|
# more info: <https://sixfab.com/wp-content/uploads/2018/09/Quectel_UC20_GNSS_AT_Commands_Manual_V1.1.pdf> 1.4.1
|
|
disable = "0"
|
|
enable = "1"
|
|
|
|
class GPSNMEAType:
|
|
# GNSS-AP-Note 2.2.3
|
|
# Output type of GPS NMEA sentences in ORed.
|
|
disable = 0
|
|
gpgga = 1
|
|
gprmc = 2
|
|
gpgsv = 4
|
|
gpgsa = 8
|
|
gpvtg = 16
|
|
all = 31
|
|
|
|
class GlonassNmeaType:
|
|
# GNSS-AP-Note 2.2.4
|
|
# Configure output type of GLONASS NMEA sentences in ORed
|
|
disable = 0
|
|
glgsv = 1
|
|
gngsa = 2
|
|
gngns = 4
|
|
all = 7
|
|
|
|
class GalileoNmeaType:
|
|
# GNSS-AP-Note 2.2.5
|
|
disable = 0
|
|
gagsv = 1
|
|
all = 1
|
|
|
|
class BeiDouNmeaType:
|
|
# GNSS-AP-Note 2.2.6
|
|
disable = 0
|
|
pqgsa = 1
|
|
pqgsv = 2
|
|
all = 3
|
|
|
|
class AutoGps:
|
|
# GNSS-AP-Note 2.2.12
|
|
# Enable/disable GNSS to run automatically after the module is powered on.
|
|
disable = "0"
|
|
enable = "1"
|
|
|
|
class Sequencer:
|
|
def __init__(self, executor: Executor, modem: str, power_endpoint: str, state_fs: Filesystem):
|
|
self.executor = executor
|
|
self.modem = modem
|
|
self.power_endpoint = power_endpoint
|
|
self.state_fs = state_fs
|
|
|
|
def _mmcli(self, args: list[str], check: bool = True) -> str:
|
|
return self.executor.exec(
|
|
["mmcli", "--modem", self.modem] + args,
|
|
check=check
|
|
).decode('utf-8')
|
|
|
|
def _try_mmcli(self, args: list[str]) -> str:
|
|
try:
|
|
return self._mmcli(args)
|
|
except subprocess.CalledProcessError:
|
|
return None
|
|
|
|
def _at_cmd(self, cmd: str, check: bool = True) -> str:
|
|
# this returns the mmcli output, which looks like:
|
|
# response: 'blah'
|
|
# i.e., quoted, and with a `response: ` prefix
|
|
#
|
|
# if you don't have mmcli, for some reason, an alternative would be `atinout`
|
|
return self._mmcli([f"--command=+{cmd}"], check=check)
|
|
|
|
def _at_structured_cmd(self, cmd: str, subcmd: str | None = None, value: str | None = None, check: bool = True) -> str:
|
|
if not subcmd and not value:
|
|
return self._at_cmd(cmd, check=check)
|
|
elif not subcmd and value:
|
|
return self._at_cmd(f"{cmd}={value}", check=check)
|
|
elif subcmd and not value:
|
|
return self._at_cmd(f"{cmd}=\"{subcmd}\"", check=check)
|
|
else:
|
|
return self._at_cmd(f"{cmd}=\"{subcmd}\",{value}", check=check)
|
|
|
|
def _at_gnssconfig(self, cfg: GNSSConfig, **kwargs) -> str:
|
|
return self._at_structured_cmd("QGPSCFG", "gnssconfig", cfg, **kwargs)
|
|
|
|
def _at_odpcontrol(self, control: ODPControl, **kwargs) -> str:
|
|
return self._at_structured_cmd("QGPSCFG", "odpcontrol", control, **kwargs)
|
|
|
|
def _at_dpoenable(self, enable: DPOEnable, **kwargs) -> str:
|
|
return self._at_structured_cmd("QGPSCFG", "dpoenable", enable, **kwargs)
|
|
|
|
def _at_gpsnmeatype(self, ty: GPSNMEAType, **kwargs) -> str:
|
|
return self._at_structured_cmd("QGPSCFG", "gpsnmeatype", str(ty), **kwargs)
|
|
|
|
def _at_glonassnmeatype(self, ty: GlonassNmeaType, **kwargs) -> str:
|
|
return self._at_structured_cmd("QGPSCFG", "glonassnmeatype", str(ty), **kwargs)
|
|
|
|
def _at_galileonmeatype(self, ty: GalileoNmeaType, **kwargs) -> str:
|
|
return self._at_structured_cmd("QGPSCFG", "galileonmeatype", str(ty), **kwargs)
|
|
|
|
def _at_beidounmeatype(self, ty: BeiDouNmeaType, **kwargs) -> str:
|
|
return self._at_structured_cmd("QGPSCFG", "beidounmeatype", str(ty), **kwargs)
|
|
|
|
def _at_autogps(self, enable: AutoGps, **kwargs) -> str:
|
|
return self._at_structured_cmd("QGPSCFG", "autogps", enable, **kwargs)
|
|
|
|
def _download_assistance_data(self, variant: AgpsDataVariant) -> str | None:
|
|
self.state_fs.mkdir("new")
|
|
out_path = f"new/{variant}"
|
|
try:
|
|
self.executor.exec([
|
|
"curl",
|
|
f"{AGPS_DATA_URI_BASE}/{variant}",
|
|
"-o",
|
|
self.state_fs.translate_out(out_path)
|
|
])
|
|
return out_path
|
|
except subprocess.CalledProcessError as e:
|
|
logger.warning(f"AGPS data download failed: {e}")
|
|
return None
|
|
|
|
def _mark_assistance_data(self, locdata_path: str, good: bool) -> None:
|
|
'''
|
|
call after attempted upload to indicate if locdata is good/bad.
|
|
locdata_path might be a cached locdata or a new locdata.
|
|
'''
|
|
variant = os.path.basename(locdata_path)
|
|
|
|
if good:
|
|
# N.B.: if locdata_path is something that exists in cache/,
|
|
# then this rename is a safe no-op.
|
|
self.state_fs.mkdir("cache")
|
|
self.state_fs.mv(locdata_path, f"cache/{variant}")
|
|
else:
|
|
self.state_fs.rm(locdata_path)
|
|
|
|
|
|
def _get_cached_assistance_data(self, variant: AgpsDataVariant, cache_dir: str = "cache") -> tuple[bool, str | None]:
|
|
'''
|
|
returns:
|
|
- whether cached data is fresh (i.e. won't benefit from an update)
|
|
- a path to the cached AGPS data, or None if no valid cached entry
|
|
'''
|
|
|
|
locdata_path = f"{cache_dir}/{variant}"
|
|
|
|
last_cache_date = self.state_fs.ctime(locdata_path)
|
|
if last_cache_date is None:
|
|
return False, None
|
|
|
|
is_fresh = datetime.datetime.now() - last_cache_date < AGPS_CACHE_REFRESH_AFTER
|
|
return is_fresh, locdata_path
|
|
|
|
def _get_any_assistance_data(self, variant: AgpsDataVariant) -> tuple[bool, list[str]]:
|
|
'''
|
|
checks if we have fresh cached data.
|
|
- if not, tries to download fresh AGPS data
|
|
returns a list of location data paths to try, in order
|
|
- because new data might not necessarily be valid (download error).
|
|
- and stale data might still have *some* use.
|
|
'''
|
|
is_fresh_untested, untested_path = self._get_cached_assistance_data(variant, cache_dir="new")
|
|
is_fresh_cached, cached_path = self._get_cached_assistance_data(variant)
|
|
|
|
paths_to_try = []
|
|
if not (is_fresh_untested or is_fresh_cached):
|
|
better_data = self._download_assistance_data(variant)
|
|
if better_data:
|
|
untested_path = better_data
|
|
is_fresh_untested = True
|
|
|
|
if untested_path:
|
|
paths_to_try.append(untested_path)
|
|
if cached_path:
|
|
paths_to_try.append(cached_path)
|
|
|
|
return is_fresh_untested or is_fresh_cached, paths_to_try
|
|
|
|
|
|
@log_scope("powering modem...", "modem powered")
|
|
def power_on(self) -> None:
|
|
self.executor.write_file(self.power_endpoint, b'1')
|
|
while self._try_mmcli([]) is None:
|
|
logger.info("modem hasn't appeared: sleeping for 1s")
|
|
time.sleep(1) # wait for modem to appear
|
|
|
|
# set modem to use UTC time instead of local time.
|
|
# modemmanager sends CTZU=3 during init and that causes `AT+CCLK?` to return a timestamp that's off by 600+ days
|
|
# see: <https://gitlab.freedesktop.org/mobile-broadband/ModemManager/-/issues/360>
|
|
self._at_structured_cmd("CTZU", value="1")
|
|
|
|
@log_scope("halting modem...", "modem halted")
|
|
def power_off(self) -> None:
|
|
self.executor.write_file(self.power_endpoint, b'0')
|
|
while self._try_mmcli([]):
|
|
logger.info("modem still powered: sleeping for 1s")
|
|
time.sleep(1) # wait for modem to disappear
|
|
|
|
def at_check(self) -> None:
|
|
""" sanity check that the modem is listening for AT commands and responding reasonably """
|
|
hw = self._at_cmd("QGMR")
|
|
assert 'EG25GGBR07A08M2G' in hw or self.executor.dry_run, hw
|
|
|
|
def dump_debug_info(self) -> None:
|
|
logger.debug('checking if AGPS is enabled (1) or not (0)')
|
|
self._at_structured_cmd('QGPSXTRA?')
|
|
# see if the GPS assistance data is still within valid range
|
|
logger.debug('QGPSXTRADATA: <valid_duration_minutes>,<start_time_of_agps_data>')
|
|
self._at_structured_cmd('QGPSXTRADATA?')
|
|
logger.debug('checking what time the modem last synchronized with the network')
|
|
self._at_structured_cmd('QLTS')
|
|
logger.debug('checking what time the modem thinks it is (extrapolated from sync)')
|
|
self._at_structured_cmd('QLTS', value=1)
|
|
logger.debug('checking what time the modem thinks it is (from RTC)')
|
|
self._at_structured_cmd('CCLK?')
|
|
logger.debug('checking if nmea GPS source is enabled')
|
|
self._at_structured_cmd('QGPSCFG', 'nmeasrc')
|
|
logger.debug('checking if GPS is enabled (1) or not (0)')
|
|
self._at_structured_cmd('QGPS?')
|
|
logger.debug('checking if GPS has a fix. Error 516 if not')
|
|
self._at_structured_cmd('QGPSLOC', value='0', check=False)
|
|
logger.debug('dumping AGPS positioning mode bitfield')
|
|
self._at_structured_cmd('QGPSCFG', 'agpsposmode')
|
|
|
|
# N.B.: /dev/ttyUSB1 can block indefinitely on reads (particularly when competing with another consumer)
|
|
# logger.debug('dumping last modem GPS output, if available')
|
|
# gpsout = self.executor.read_file(GPS_STREAM_ENDPOINT)
|
|
# gpslines = gpsout.decode('utf-8').split('\n')[-12:]
|
|
# logger.debug('\n'.join(gpslines))
|
|
|
|
@log_scope("configuring audio...", "audio configured")
|
|
def enable_audio(self) -> None:
|
|
# cribbed from eg25-manager; i don't understand these
|
|
# QDAI call shouldn't be necessary if using Megi's FW:
|
|
# - <https://xnux.eu/devices/feature/modem-pp.html>
|
|
self._at_structured_cmd("QDAI", value="1,1,0,1,0,0,1,1")
|
|
# RI signaling using physical RI pin
|
|
self._at_structured_cmd("QCFG", "risignaltype", "\"physical\"")
|
|
# Enable VoLTE support
|
|
self._at_structured_cmd("QCFG", "ims", "1")
|
|
# Enable APREADY for PP 1.2
|
|
self._at_structured_cmd("QCFG", "apready", "1,0,500")
|
|
|
|
@log_scope("configuring urc...", "urc configured")
|
|
def enable_urc(self) -> None:
|
|
# cribbed from eg25-manager; i don't even know what URC is
|
|
# URC configuration for PP 1.2 (APREADY pin connected):
|
|
# * RING URC: normal pulse length
|
|
# * Incoming SMS URC: default pulse length
|
|
# * Other URC: default length
|
|
# * Report URCs on all ports (serial and USB) for FOSS firmware
|
|
# * Reporting of URCs without any delay
|
|
# * Configure URC pin to UART Ring Indicator
|
|
self._at_structured_cmd("QCFG", "urc/ri/ring", "\"pulse\",120,1000,5000,\"off\",1")
|
|
self._at_structured_cmd("QCFG", "urc/ri/smsincoming", "\"pulse\",120,1")
|
|
self._at_structured_cmd("QCFG", "urc/ri/other", "\"off\",1,1")
|
|
self._at_structured_cmd("QCFG", "urc/delay", "0")
|
|
self._at_structured_cmd("QCFG", "urc/cache", "0")
|
|
self._at_structured_cmd("QCFG", "urc/ri/pin", "uart_ri")
|
|
self._at_structured_cmd("QURCCFG", "urcport", "\"all\"")
|
|
|
|
@log_scope("ensuring on-disk AGPS...", "AGPS is available on-disk")
|
|
def ensure_agps_cache(self) -> None:
|
|
is_fresh, path = self._get_any_assistance_data(DEFAULT_AGPS_DATA_VARIANT())
|
|
assert is_fresh, "failed to ensure on-disk AGPS data"
|
|
|
|
@log_scope("configuring gps...", "gps configured")
|
|
def enable_gps(self) -> None:
|
|
# disable GNSS, because it's only configurable while offline
|
|
self._at_structured_cmd("QGPSEND", check=False)
|
|
# self._at_structured_cmd("QGPS", value="0")
|
|
|
|
# XXX: ModemManager plugin sets QGPSXTRA=1
|
|
# self._at_structured_cmd("QGPSXTRA", value="1")
|
|
|
|
# now = datetime.datetime.now().strftime('%Y/%m/%d,%H:%M:%S') # UTC
|
|
# self._at_structured_cmd("QGPSXTRATIME", value=f"0,\"{now}\"")
|
|
is_fresh, locdatas = self._get_any_assistance_data(DEFAULT_AGPS_DATA_VARIANT())
|
|
for locdata in locdatas:
|
|
ret = self._try_mmcli([f"--location-inject-assistance-data={self.state_fs.translate_out(locdata)}"])
|
|
is_success = ret is not None
|
|
self._mark_assistance_data(locdata, is_success)
|
|
if is_success:
|
|
break # no need to try any more locdatas
|
|
|
|
self._at_gnssconfig(GNSSConfig.gps_glonass_beidou_galileo)
|
|
self._at_odpcontrol(ODPControl.disable, check=False)
|
|
self._at_dpoenable(DPOEnable.disable, check=False) # N.B.: eg25-manager uses `DPOEnable.enable`
|
|
self._at_gpsnmeatype(GPSNMEAType.all)
|
|
self._at_glonassnmeatype(GlonassNmeaType.all)
|
|
self._at_galileonmeatype(GalileoNmeaType.all)
|
|
self._at_beidounmeatype(BeiDouNmeaType.all)
|
|
self._at_autogps(AutoGps.disable, check=False) #< don't start GPS on modem boot
|
|
# configure so GPS output is readable via /dev/ttyUSB1
|
|
# self._mmcli(["--location-enable-gps-unmanaged"])
|
|
# TODO: tune/document these QGPS values; a smarter setting here might reduce jitter?
|
|
self._at_structured_cmd("QGPS", value="1,255,1000,0,1")
|
|
|
|
@log_scope("halting gps...", "gps halted")
|
|
def disable_gps(self) -> None:
|
|
self._at_structured_cmd("QGPSEND", check=False)
|
|
|
|
@log_scope("configuring powersave...", "powersave configured")
|
|
def enable_powersave(self) -> None:
|
|
# Allow sleeping for power saving
|
|
self._at_structured_cmd("QSCLK", value="1")
|
|
# Disable fast poweroff for stability
|
|
self._at_structured_cmd("QCFG", "fast/poweroff", "0")
|
|
# Configure sleep and wake up pin levels to active low
|
|
self._at_structured_cmd("QCFG", "sleepind/level", "0")
|
|
self._at_structured_cmd("QCFG", "wakeupin/level", "0,0")
|
|
# Do not enter RAMDUMP mode, auto-reset instead
|
|
self._at_structured_cmd("QCFG", "ApRstLevel", "1")
|
|
self._at_structured_cmd("QCFG", "ModemRstLevel", "1")
|
|
|
|
|
|
def main():
|
|
logging.basicConfig()
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
parser = argparse.ArgumentParser(description="initialize the eg25 Pinephone modem for GPS tracking")
|
|
parser.add_argument('--modem', default='any', help='name of modem to configure (see mmcli --list-modems)')
|
|
parser.add_argument('--power-endpoint', default='/sys/class/modem-power/modem-power/device/powered', help='sysfs endpoint that can turn the modem on/off')
|
|
parser.add_argument('--state-dir', default='')
|
|
|
|
parser.add_argument("--dry-run", action='store_true', help="print commands instead of executing them")
|
|
parser.add_argument("--verbose", action='store_true', help="log each command before executing")
|
|
|
|
parser.add_argument('--power-on', action='store_true', help="enable power to the modem")
|
|
parser.add_argument('--enable-audio', action='store_true', help="configure audio for calling (?)")
|
|
parser.add_argument('--enable-urc', action='store_true', help="enable support for Unsolicited Return Codes (?)")
|
|
parser.add_argument('--ensure-agps-cache', action='store_true', help="ensure we have fresh assisted-GPS data available")
|
|
parser.add_argument('--enable-gps', action='store_true', help="enable the GPS and acquire tracking until asked to stop")
|
|
parser.add_argument('--enable-powersave', action='store_true', help="configure modem to sleep when possible")
|
|
parser.add_argument('--disable-gps', action='store_true', help="disable the GPS and stop any tracking")
|
|
parser.add_argument('--power-off', action='store_true', help="disable power to the modem")
|
|
parser.add_argument('--dump-debug-info', action='store_true', help="don't initialize anything, just dump debugging data")
|
|
|
|
args = parser.parse_args()
|
|
if args.verbose or args.dump_debug_info:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
state_dir = args.state_dir
|
|
if not state_dir:
|
|
state_dir = os.path.join(os.getenv("HOME", "/var/lib/eg25-control"), ".cache/eg25-control")
|
|
|
|
executor = Executor(dry_run=args.dry_run)
|
|
state_fs = Filesystem(executor, root=state_dir)
|
|
sequencer = Sequencer(executor, modem=args.modem, power_endpoint=args.power_endpoint, state_fs=state_fs)
|
|
|
|
if args.power_on:
|
|
sequencer.power_on()
|
|
if args.enable_audio:
|
|
sequencer.enable_audio()
|
|
if args.enable_urc:
|
|
sequencer.enable_urc()
|
|
if args.ensure_agps_cache:
|
|
sequencer.ensure_agps_cache() # N.B.: this will raise on failure, to get non-zero exit code
|
|
if args.enable_gps:
|
|
sequencer.enable_gps()
|
|
if args.enable_powersave:
|
|
sequencer.enable_powersave()
|
|
|
|
if args.disable_gps:
|
|
sequencer.disable_gps()
|
|
if args.power_off:
|
|
sequencer.power_off()
|
|
|
|
if args.dump_debug_info:
|
|
sequencer.dump_debug_info()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|