nix-files/pkgs/additional/eg25-control/eg25-control

557 lines
23 KiB
Plaintext
Executable File

#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p curl -p modemmanager
# this script should run after ModemManager.service is started.
# typical invocation is `eg25-control --power-on --enable-gps`.
# after running, the user may `cat /dev/ttyUSB1` to view NMEA-encoded GPS information.
# the script attempts to be idempotent, such that it may be run multiple times per boot.
#
# this script downloads assisted GPS (AGPS) data via the system's default gateway (i.e. WiFi)
# and shares that with the modem. this quickens the process of acquiring a GPS fix.
#
# the script may also configure other parts of the modem as `eg25-manager` does.
# these options are less tested: see `--help` for more.
#
# PREREQUISITES/DEPENDENCIES:
# this script expects to run on megi's kernel, with `CONFIG_MODEM_POWER=y`.
# ModemManager must be launched with the `--debug` flag, so that `mmcli --command=...` works.
#
# ModemManager, and by extension this script, REQUIRES A SIM CARD IN YOUR PHONE.
# the sim doesn't need to be "activated". you can buy a $1 SIM and never purchase
# service and that works; it's just needed for ModemManager to boot the modem.
# this isn't a fundamental requirement; if one did everything via serial instead of
# ModemManager the SIM would not be necessary for GPS.
#
# EXPECTATIONS/TIPS:
# - with the right environment, you may get a GPS fix in < 30s.
# - the fix is likely to have a *lot* of jitter, like 10+ meters.
# - indoors, you shouldn't expect to *ever* get a cold-start GPS fix.
# - maybe you'll track 1 satellite if lucky: enough to receive GPS time but not for a GPS fix.
# - get a fix outdoors, then walk indoors: GPS is smart enough to maintain a spotty fix.
# - outdoors in suburbia, a fix might take 10-20 minutes.
# - i have better luck *placing my phone on the roof of my car* than holding it in the air with my hand.
# - maybe a big metal plate opposite the sky acts as a dish/antenna?
# - in Seattle, i track several GLONASS and GPS sats: about an even split.
# - the GPS sats have better SNR.
# - modem seems to not show any BeiDou or Galileo sats even if i enable them.
#
# eg25 modem/GPS docs:
# [GNSS-AP-Note]: https://wiki.pine64.org/images/0/09/Quectel_EC2x%26EG9x%26EG2x-G%26EM05_Series_GNSS_Application_Note_V1.3.pdf
#
# most acronyms are defined inline, particularly near variable/class declarations.
# glossary, for those which aren't:
#
# Global Navigation Satellite Systems (GNSS):
# - GPS (US)
# - GLONASS (RU)
# - Galileo (EU)
# - BeiDou (CN)
# ^ these are all global systems, usable outside the country that owns them
import argparse
import datetime
import logging
import os
import subprocess
import sys
import time
POWER_ENDPOINT = "/sys/class/modem-power/modem-power/device/powered"
GPS_STREAM_ENDPOINT = "/dev/ttyUSB1"
# GNSS-AP-Note 1.4:
# also at xtrapath5 and xtrapath6 subdomains.
# the AGPS data here is an almanac good for 7 days.
AGPS_DATA_URI_BASE = "https://xtrapath4.izatcloud.net"
ON_DISK_TIME_FMT = '%Y/%m/%d,%H:%M:%S'
AGPS_CACHE_REFRESH_AFTER = datetime.timedelta(days=1)
DEFAULT_AGPS_DATA_VARIANT = lambda: AgpsDataVariant.gps_glonass_beidou
class AgpsDataVariant:
# GNSS-AP-Note 1.4:
gps_glonass = "xtra2.bin"
gps_glonass_beidou = "xtra3grc.bin"
# N.B.: not supported by all Quectel modems
# on stock Pinephone, ModemManager gives "LOC service: general failure"
gps_glonass_beidou_galileo = "xtra3grcej.bin"
logger = logging.getLogger(__name__)
def destructive(fn: callable = None, return_ = None):
""" decorate `fn` so that it becomes a no-op when --dry-run is active """
def wrapped(self, *args, **kwargs):
if self.dry_run:
fmt_args = ", ".join(
[repr(a) for a in args] +
[f"{k}={v}" for k,v in kwargs.items()]
)
logger.info(f"[dry run] {fn.__name__}({fmt_args})")
return return_
else:
return fn(self, *args, **kwargs)
if fn:
return wrapped
else:
return lambda fn: destructive(fn, return_=return_)
def log_scope(at_enter: str, at_exit: str):
""" decorate a function so that it logs at start and end """
def decorator(fn: callable):
def wrapped(*args, **kwargs):
logger.info(at_enter)
ret = fn(*args, **kwargs)
logger.info(at_exit)
return ret
return wrapped
return decorator
class Executor:
def __init__(self, dry_run: bool = False):
self.dry_run = dry_run
def read_file(self, path: str, default: bytes = b'') -> bytes:
try:
with open(path, 'rb') as f:
return f.read()
except FileNotFoundError as e:
logger.debug(f"failed to read {path}: {e}")
return default
@destructive
def write_file(self, path: str, data: bytes) -> None:
logger.debug(f"echo {data!r} > {path}")
with open(path, 'wb') as f:
f.write(data)
@destructive
def mkdir(self, path: str) -> None:
logger.debug(f"mkdir {path}")
os.makedirs(path, exist_ok=True)
@destructive
def mv(self, from_: str, to: str) -> None:
logger.debug(f"mv {from_} -> {to}")
os.rename(from_, to)
@destructive
def rm(self, p: str) -> None:
logger.debug(f"rm {p}")
os.remove(p)
@destructive(return_=b'')
def exec(self, cmd: list[str], check: bool = True) -> bytes:
logger.debug(" ".join(cmd))
res = subprocess.run(cmd, capture_output=True)
logger.debug(res.stdout)
if res.stderr:
logger.warning(res.stderr)
if check:
res.check_returncode()
return res.stdout
class GNSSConfig:
# GNSS-AP-Note 2.2.7
# Supported GNSS constellations. GPS is always ON
# 0 GLONASS OFF/BeiDou OFF/Galileo OFF
# 1 GLONASS ON/BeiDou ON/Galileo ON
# 2 GLONASS ON/BeiDou ON/Galileo OFF
# 3 GLONASS ON/BeiDou OFF/Galileo ON
# 4 GLONASS ON/BeiDou OFF/Galileo OFF
# 5 GLONASS OFF/BeiDou ON/Galileo ON
# 6 GLONASS OFF/BeiDou OFF/Galileo ON
# 7 GLONASS OFF/BeiDou ON/Galileo OFF
gps = "0"
gps_glonass_beidou_galileo = "1"
gps_glonass_beidou = "2"
gps_glonass_galilego = "3"
gps_glonass = "4"
gps_beidou_galileo = "5"
gps_galileo = "6"
gps_beidou = "7"
class ODPControl:
# GNSS-AP-Note 2.2.8
# 0 Disable ODP
# 1 Low power mode
# 2 Ready mode
#
# ODP = "On-Demand Positioning"
# Low power mode:
# - low-frequency background GNSS tracking session
# - adjusts interval between 10m (when signal is good) - 60m (when signal is bad)
# Ready mode:
# - 1 Hz positioning
# - keeps GNSS ready so that when application demands position it's immediately ready
# - automatically stops positioning after 60s??
disable = "0"
lower_power_mode = "1"
ready_mode = "2"
class DPOEnable:
# GNSS-AP-Note 2.2.9
# 0 Disable DPO
# 1 Enable the DPO with dynamic duty cycle
#
# DPO = "Dynamic Power Optimization"
# automatically shuts off radio under certain conditions
# more info: <https://sixfab.com/wp-content/uploads/2018/09/Quectel_UC20_GNSS_AT_Commands_Manual_V1.1.pdf> 1.4.1
disable = "0"
enable = "1"
class GPSNMEAType:
# GNSS-AP-Note 2.2.3
# Output type of GPS NMEA sentences in ORed.
disable = 0
gpgga = 1
gprmc = 2
gpgsv = 4
gpgsa = 8
gpvtg = 16
all = 31
class GlonassNmeaType:
# GNSS-AP-Note 2.2.4
# Configure output type of GLONASS NMEA sentences in ORed
disable = 0
glgsv = 1
gngsa = 2
gngns = 4
all = 7
class GalileoNmeaType:
# GNSS-AP-Note 2.2.5
disable = 0
gagsv = 1
all = 1
class BeiDouNmeaType:
# GNSS-AP-Note 2.2.6
disable = 0
pqgsa = 1
pqgsv = 2
all = 3
class AutoGps:
# GNSS-AP-Note 2.2.12
# Enable/disable GNSS to run automatically after the module is powered on.
disable = "0"
enable = "1"
class Sequencer:
def __init__(self, executor: Executor, modem: str, power_endpoint: str):
self.executor = executor
self.modem = modem
self.power_endpoint = power_endpoint
def _mmcli(self, args: list[str], check: bool = True) -> str:
return self.executor.exec(
["mmcli", "--modem", self.modem] + args,
check=check
).decode('utf-8')
def _try_mmcli(self, args: list[str]) -> str:
try:
return self._mmcli(args)
except subprocess.CalledProcessError:
return None
def _at_cmd(self, cmd: str, check: bool = True) -> str:
# this returns the mmcli output, which looks like:
# response: 'blah'
# i.e., quoted, and with a `response: ` prefix
#
# if you don't have mmcli, for some reason, an alternative would be `atinout`
return self._mmcli([f"--command=+{cmd}"], check=check)
def _at_structured_cmd(self, cmd: str, subcmd: str | None = None, value: str | None = None, check: bool = True) -> str:
if not subcmd and not value:
return self._at_cmd(cmd, check=check)
elif not subcmd and value:
return self._at_cmd(f"{cmd}={value}", check=check)
elif subcmd and not value:
return self._at_cmd(f"{cmd}=\"{subcmd}\"", check=check)
else:
return self._at_cmd(f"{cmd}=\"{subcmd}\",{value}", check=check)
def _at_gnssconfig(self, cfg: GNSSConfig, **kwargs) -> str:
return self._at_structured_cmd("QGPSCFG", "gnssconfig", cfg, **kwargs)
def _at_odpcontrol(self, control: ODPControl, **kwargs) -> str:
return self._at_structured_cmd("QGPSCFG", "odpcontrol", control, **kwargs)
def _at_dpoenable(self, enable: DPOEnable, **kwargs) -> str:
return self._at_structured_cmd("QGPSCFG", "dpoenable", enable, **kwargs)
def _at_gpsnmeatype(self, ty: GPSNMEAType, **kwargs) -> str:
return self._at_structured_cmd("QGPSCFG", "gpsnmeatype", str(ty), **kwargs)
def _at_glonassnmeatype(self, ty: GlonassNmeaType, **kwargs) -> str:
return self._at_structured_cmd("QGPSCFG", "glonassnmeatype", str(ty), **kwargs)
def _at_galileonmeatype(self, ty: GalileoNmeaType, **kwargs) -> str:
return self._at_structured_cmd("QGPSCFG", "galileonmeatype", str(ty), **kwargs)
def _at_beidounmeatype(self, ty: BeiDouNmeaType, **kwargs) -> str:
return self._at_structured_cmd("QGPSCFG", "beidounmeatype", str(ty), **kwargs)
def _at_autogps(self, enable: AutoGps, **kwargs) -> str:
return self._at_structured_cmd("QGPSCFG", "autogps", enable, **kwargs)
def _download_assistance_data(self, variant: AgpsDataVariant) -> str | None:
self.executor.mkdir("new")
out_path = f"new/{variant}"
try:
self.executor.exec(["curl", f"{AGPS_DATA_URI_BASE}/{variant}", "-o", out_path])
return out_path
except subprocess.CalledProcessError as e:
logger.warning(f"AGPS data download failed: {e}")
return None
def _mark_assistance_data(self, locdata_path: str, good: bool) -> None:
'''
call after attempted upload to indicate if locdata is good/bad.
locdata_path might be a cached locdata or a new locdata.
'''
variant = os.path.basename(locdata_path)
if good:
# N.B.: if locdata_path is something that exists in cache/,
# then this rename is a safe no-op.
self.executor.mkdir("cache")
self.executor.mv(locdata_path, f"cache/{variant}")
else:
self.executor.rm(locdata_path)
def _get_cached_assistance_data(self, variant: AgpsDataVariant, cache_dir: str = "cache") -> tuple[bool, str | None]:
'''
returns:
- whether cached data is fresh (i.e. won't benefit from an update)
- a path to the cached AGPS data, or None if no valid cached entry
'''
locdata_path = f"{cache_dir}/{variant}"
try:
last_cache_date = datetime.datetime.fromtimestamp(os.stat(locdata_path).st_ctime)
except FileNotFoundError as e:
return False, None
is_fresh = datetime.datetime.now() - last_cache_date < AGPS_CACHE_REFRESH_AFTER
return is_fresh, locdata_path
def _get_any_assistance_data(self, variant: AgpsDataVariant) -> tuple[bool, list[str]]:
'''
checks if we have fresh cached data.
- if not, tries to download fresh AGPS data
returns a list of location data paths to try, in order
- because new data might not necessarily be valid (download error).
- and stale data might still have *some* use.
'''
is_fresh_untested, untested_path = self._get_cached_assistance_data(variant, cache_dir="new")
is_fresh_cached, cached_path = self._get_cached_assistance_data(variant)
paths_to_try = []
if not (is_fresh_untested or is_fresh_cached):
better_data = self._download_assistance_data(variant)
if better_data:
untested_path = better_data
is_fresh_untested = True
if untested_path:
paths_to_try.append(untested_path)
if cached_path:
paths_to_try.append(cached_path)
return is_fresh_untested or is_fresh_cached, paths_to_try
@log_scope("powering modem...", "modem powered")
def power_on(self) -> None:
self.executor.write_file(self.power_endpoint, b'1')
while self._try_mmcli([]) is None:
logger.info("modem hasn't appeared: sleeping for 1s")
time.sleep(1) # wait for modem to appear
# set modem to use UTC time instead of local time.
# modemmanager sends CTZU=3 during init and that causes `AT+CCLK?` to return a timestamp that's off by 600+ days
# see: <https://gitlab.freedesktop.org/mobile-broadband/ModemManager/-/issues/360>
self._at_structured_cmd("CTZU", value="1")
@log_scope("halting modem...", "modem halted")
def power_off(self) -> None:
self.executor.write_file(self.power_endpoint, b'0')
while self._try_mmcli([]):
logger.info("modem still powered: sleeping for 1s")
time.sleep(1) # wait for modem to disappear
def at_check(self) -> None:
""" sanity check that the modem is listening for AT commands and responding reasonably """
hw = self._at_cmd("QGMR")
assert 'EG25GGBR07A08M2G' in hw or self.executor.dry_run, hw
def dump_debug_info(self) -> None:
logger.debug('checking if AGPS is enabled (1) or not (0)')
self._at_structured_cmd('QGPSXTRA?')
# see if the GPS assistance data is still within valid range
logger.debug('QGPSXTRADATA: <valid_duration_minutes>,<start_time_of_agps_data>')
self._at_structured_cmd('QGPSXTRADATA?')
logger.debug('checking what time the modem last synchronized with the network')
self._at_structured_cmd('QLTS')
logger.debug('checking what time the modem thinks it is (extrapolated from sync)')
self._at_structured_cmd('QLTS', value=1)
logger.debug('checking what time the modem thinks it is (from RTC)')
self._at_structured_cmd('CCLK?')
logger.debug('checking if nmea GPS source is enabled')
self._at_structured_cmd('QGPSCFG', 'nmeasrc')
logger.debug('checking if GPS is enabled (1) or not (0)')
self._at_structured_cmd('QGPS?')
logger.debug('checking if GPS has a fix. Error 516 if not')
self._at_structured_cmd('QGPSLOC', value='0', check=False)
logger.debug('dumping AGPS positioning mode bitfield')
self._at_structured_cmd('QGPSCFG', 'agpsposmode')
# N.B.: /dev/ttyUSB1 can block indefinitely on reads (particularly when competing with another consumer)
# logger.debug('dumping last modem GPS output, if available')
# gpsout = self.executor.read_file(GPS_STREAM_ENDPOINT)
# gpslines = gpsout.decode('utf-8').split('\n')[-12:]
# logger.debug('\n'.join(gpslines))
@log_scope("configuring audio...", "audio configured")
def enable_audio(self) -> None:
# cribbed from eg25-manager; i don't understand these
# QDAI call shouldn't be necessary if using Megi's FW:
# - <https://xnux.eu/devices/feature/modem-pp.html>
self._at_structured_cmd("QDAI", value="1,1,0,1,0,0,1,1")
# RI signaling using physical RI pin
self._at_structured_cmd("QCFG", "risignaltype", "\"physical\"")
# Enable VoLTE support
self._at_structured_cmd("QCFG", "ims", "1")
# Enable APREADY for PP 1.2
self._at_structured_cmd("QCFG", "apready", "1,0,500")
@log_scope("configuring urc...", "urc configured")
def enable_urc(self) -> None:
# cribbed from eg25-manager; i don't even know what URC is
# URC configuration for PP 1.2 (APREADY pin connected):
# * RING URC: normal pulse length
# * Incoming SMS URC: default pulse length
# * Other URC: default length
# * Report URCs on all ports (serial and USB) for FOSS firmware
# * Reporting of URCs without any delay
# * Configure URC pin to UART Ring Indicator
self._at_structured_cmd("QCFG", "urc/ri/ring", "\"pulse\",120,1000,5000,\"off\",1")
self._at_structured_cmd("QCFG", "urc/ri/smsincoming", "\"pulse\",120,1")
self._at_structured_cmd("QCFG", "urc/ri/other", "\"off\",1,1")
self._at_structured_cmd("QCFG", "urc/delay", "0")
self._at_structured_cmd("QCFG", "urc/cache", "0")
self._at_structured_cmd("QCFG", "urc/ri/pin", "uart_ri")
self._at_structured_cmd("QURCCFG", "urcport", "\"all\"")
@log_scope("ensuring on-disk AGPS...", "AGPS is available on-disk")
def ensure_agps_cache(self) -> None:
is_fresh, path = self._get_any_assistance_data(DEFAULT_AGPS_DATA_VARIANT())
assert is_fresh, "failed to ensure on-disk AGPS data"
@log_scope("configuring gps...", "gps configured")
def enable_gps(self) -> None:
# disable GNSS, because it's only configurable while offline
self._at_structured_cmd("QGPSEND", check=False)
# self._at_structured_cmd("QGPS", value="0")
# XXX: ModemManager plugin sets QGPSXTRA=1
# self._at_structured_cmd("QGPSXTRA", value="1")
# now = datetime.datetime.now().strftime('%Y/%m/%d,%H:%M:%S') # UTC
# self._at_structured_cmd("QGPSXTRATIME", value=f"0,\"{now}\"")
is_fresh, locdatas = self._get_any_assistance_data(DEFAULT_AGPS_DATA_VARIANT())
for locdata in locdatas:
ret = self._try_mmcli([f"--location-inject-assistance-data={locdata}"])
is_success = ret is not None
self._mark_assistance_data(locdata, is_success)
if is_success:
break # no need to try any more locdatas
self._at_gnssconfig(GNSSConfig.gps_glonass_beidou_galileo)
self._at_odpcontrol(ODPControl.disable, check=False)
self._at_dpoenable(DPOEnable.disable, check=False) # N.B.: eg25-manager uses `DPOEnable.enable`
self._at_gpsnmeatype(GPSNMEAType.all)
self._at_glonassnmeatype(GlonassNmeaType.all)
self._at_galileonmeatype(GalileoNmeaType.all)
self._at_beidounmeatype(BeiDouNmeaType.all)
self._at_autogps(AutoGps.disable, check=False) #< don't start GPS on modem boot
# configure so GPS output is readable via /dev/ttyUSB1
# self._mmcli(["--location-enable-gps-unmanaged"])
# TODO: tune/document these QGPS values; a smarter setting here might reduce jitter?
self._at_structured_cmd("QGPS", value="1,255,1000,0,1")
@log_scope("halting gps...", "gps halted")
def disable_gps(self) -> None:
self._at_structured_cmd("QGPSEND", check=False)
@log_scope("configuring powersave...", "powersave configured")
def enable_powersave(self) -> None:
# Allow sleeping for power saving
self._at_structured_cmd("QSCLK", value="1")
# Disable fast poweroff for stability
self._at_structured_cmd("QCFG", "fast/poweroff", "0")
# Configure sleep and wake up pin levels to active low
self._at_structured_cmd("QCFG", "sleepind/level", "0")
self._at_structured_cmd("QCFG", "wakeupin/level", "0,0")
# Do not enter RAMDUMP mode, auto-reset instead
self._at_structured_cmd("QCFG", "ApRstLevel", "1")
self._at_structured_cmd("QCFG", "ModemRstLevel", "1")
def main():
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser(description="initialize the eg25 Pinephone modem for GPS tracking")
parser.add_argument('--modem', default='any', help='name of modem to configure (see mmcli --list-modems)')
parser.add_argument('--power-endpoint', default='/sys/class/modem-power/modem-power/device/powered', help='sysfs endpoint that can turn the modem on/off')
parser.add_argument("--dry-run", action='store_true', help="print commands instead of executing them")
parser.add_argument("--verbose", action='store_true', help="log each command before executing")
parser.add_argument('--power-on', action='store_true', help="enable power to the modem")
parser.add_argument('--enable-audio', action='store_true', help="configure audio for calling (?)")
parser.add_argument('--enable-urc', action='store_true', help="enable support for Unsolicited Return Codes (?)")
parser.add_argument('--ensure-agps-cache', action='store_true', help="ensure we have fresh assisted-GPS data available")
parser.add_argument('--enable-gps', action='store_true', help="enable the GPS and acquire tracking until asked to stop")
parser.add_argument('--enable-powersave', action='store_true', help="configure modem to sleep when possible")
parser.add_argument('--disable-gps', action='store_true', help="disable the GPS and stop any tracking")
parser.add_argument('--power-off', action='store_true', help="disable power to the modem")
parser.add_argument('--dump-debug-info', action='store_true', help="don't initialize anything, just dump debugging data")
args = parser.parse_args()
if args.verbose or args.dump_debug_info:
logging.getLogger().setLevel(logging.DEBUG)
executor = Executor(args.dry_run)
sequencer = Sequencer(executor, modem=args.modem, power_endpoint=args.power_endpoint)
if args.power_on:
sequencer.power_on()
if args.enable_audio:
sequencer.enable_audio()
if args.enable_urc:
sequencer.enable_urc()
if args.ensure_agps_cache:
sequencer.ensure_agps_cache() # N.B.: this will raise on failure, to get non-zero exit code
if args.enable_gps:
sequencer.enable_gps()
if args.enable_powersave:
sequencer.enable_powersave()
if args.disable_gps:
sequencer.disable_gps()
if args.power_off:
sequencer.power_off()
if args.dump_debug_info:
sequencer.dump_debug_info()
if __name__ == '__main__':
main()