Compare commits

3 Commits

Author SHA1 Message Date
5bb577d50f cellidresolver: cache lookup results in memory
it can take literally 5s or more on moby to synthesize cell results. caching will hopefully help that.
2024-07-01 07:45:05 +00:00
2caacd27a6 cellidresolver: synthesize a cell tower if no exact match is found in the database 2024-06-21 16:20:47 +00:00
810508aa7e cellid_import: fix TypeError when --mcc or --mnc are unspecified
`int(None)` (in the case these arguments are unspecified) gives a TypeError.

```
$ cellid-ols-import -o cell_towers.db cell_towers.csv
[2024-06-21 13:34:48.169] DEBUG: Database version: 1
Traceback (most recent call last):
  File "cellid-ols-import", line 9, in <module>
    sys.exit(main())
             ^^^^^^
  File "ols/resolver/cellid/cellid_import.py", line 43, in main
    mcc = int_or_none(args.MCC)
          ^^^^^^^^^^^^^^^^^^^^^
  File "ols/resolver/cellid/cellid_import.py", line 18, in int_or_none
    return int(s)
           ^^^^^^
TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'
```
2024-06-21 13:36:43 +00:00
2 changed files with 55 additions and 4 deletions

View File

@@ -16,7 +16,7 @@ log = logging.getLogger(__name__)
def int_or_none(s):
try:
return int(s)
except ValueError:
except (TypeError, ValueError):
return None

View File

@@ -2,6 +2,7 @@ import logging
import os.path
import sqlite3
from datetime import datetime
from functools import lru_cache
from typing import Optional
from ..base import ResolverBase
@@ -36,6 +37,7 @@ class CellIdResolver(ResolverBase):
cur = self.con.cursor()
return cur.execute('PRAGMA user_version').fetchone()[0]
@lru_cache
def search_network(self, radiotype, mcc, net, area, cell) -> Optional[dict]:
cur = self.con.cursor()
# radiotype is not required in observations
@@ -56,13 +58,59 @@ class CellIdResolver(ResolverBase):
else:
return None
def _resolve_cell(self, radiotype, opc, lac, cid) -> Optional[dict]:
@lru_cache
def search_nearby(self, radiotype, mcc, net, area, cell) -> Optional[dict]:
"""
synthesize a cell tower from towers whose `cell` id is close to the id of interest,
under the assumption that towers with similar cell ids have similar location.
this assumption is crudely validated, by aborting if the distance between sampled
towers is absurdly large.
"""
M_PER_DEG = 111319
NUM_SAMPLES = 4 #< take this many nearest towers
MAX_RANGE = 20000 #< if the uncertainty is > this many meters, just return `None`
cur = self.con.cursor()
# radiotype is not required in observations
radio = radiotype.upper() if radiotype is not None else 'GSM'
# TODO: replace this "fetchone" with a sampling
nets = cur.execute("""
SELECT * FROM cell WHERE mcc = ? AND net = ? AND area = ?
ORDER BY abs(cell - ?);
""", (mcc, net, area, cell)).fetchmany(NUM_SAMPLES)
if not nets or len(nets) != NUM_SAMPLES:
return None
synth = {}
for field in 'lat', 'lon', 'range':
# float fields
synth[field] = sum(n[field] for n in nets) / NUM_SAMPLES
for field in 'updated', 'created', 'samples':
# int fields
synth[field] = sum(n[field] for n in nets) // NUM_SAMPLES
# guess a range, based on the standard deviation of sampled tower locations.
# each degree of latitude is about 100km, and the range field is in meters.
sum_of_squares = 0
for n in nets:
sum_of_squares += ((n['lat'] - synth['lat']) * M_PER_DEG) ** 2 + ((n['lon'] - synth['lon']) * M_PER_DEG) ** 2
rms = (sum_of_squares / NUM_SAMPLES)**0.5
log.debug(f'synthesized cell tower from {NUM_SAMPLES} real towers with {rms:.3}m std. dev')
synth['range'] += rms
if synth['range'] > MAX_RANGE:
return None
return synth
def _resolve_cell(self, radiotype, opc, lac, cid, search_fn) -> Optional[dict]:
radio = radiotype.upper()
mcc, mnc = opc_to_mcc_mnc(opc)
if mcc is None:
log.debug(f'opc to mcc,mnc conversion failed for {radio} {opc}_{lac}_{cid}')
return None
netd = self.search_network(radio, mcc, mnc, lac, cid)
netd = search_fn(radio, mcc, mnc, lac, cid)
if netd is None:
log.debug(f'Resolving failed for {radio} {opc}_{lac}_{cid}')
return None
@@ -77,7 +125,10 @@ class CellIdResolver(ResolverBase):
}
async def resolve_cell(self, radio, opc, lac, cid, **kwargs) -> Optional[dict]:
return self._resolve_cell(radio, opc, lac, cid)
best_match = self._resolve_cell(radio, opc, lac, cid, self.search_network)
if best_match is None:
best_match = self._resolve_cell(radio, opc, lac, cid, self.search_nearby)
return best_match
async def resolve_wifi(self, radio, mac, **kwargs) -> Optional[dict]:
return None