|
|
|
@@ -2,6 +2,7 @@ import logging
|
|
|
|
|
import os.path
|
|
|
|
|
import sqlite3
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from functools import lru_cache
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
from ..base import ResolverBase
|
|
|
|
@@ -36,6 +37,7 @@ class CellIdResolver(ResolverBase):
|
|
|
|
|
cur = self.con.cursor()
|
|
|
|
|
return cur.execute('PRAGMA user_version').fetchone()[0]
|
|
|
|
|
|
|
|
|
|
@lru_cache
|
|
|
|
|
def search_network(self, radiotype, mcc, net, area, cell) -> Optional[dict]:
|
|
|
|
|
cur = self.con.cursor()
|
|
|
|
|
# radiotype is not required in observations
|
|
|
|
@@ -56,13 +58,59 @@ class CellIdResolver(ResolverBase):
|
|
|
|
|
else:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _resolve_cell(self, radiotype, opc, lac, cid) -> Optional[dict]:
|
|
|
|
|
@lru_cache
|
|
|
|
|
def search_nearby(self, radiotype, mcc, net, area, cell) -> Optional[dict]:
|
|
|
|
|
"""
|
|
|
|
|
synthesize a cell tower from towers whose `cell` id is close to the id of interest,
|
|
|
|
|
under the assumption that towers with similar cell ids have similar location.
|
|
|
|
|
this assumption is crudely validated, by aborting if the distance between sampled
|
|
|
|
|
towers is absurdly large.
|
|
|
|
|
"""
|
|
|
|
|
M_PER_DEG = 111319
|
|
|
|
|
NUM_SAMPLES = 4 #< take this many nearest towers
|
|
|
|
|
MAX_RANGE = 20000 #< if the uncertainty is > this many meters, just return `None`
|
|
|
|
|
cur = self.con.cursor()
|
|
|
|
|
# radiotype is not required in observations
|
|
|
|
|
radio = radiotype.upper() if radiotype is not None else 'GSM'
|
|
|
|
|
|
|
|
|
|
# TODO: replace this "fetchone" with a sampling
|
|
|
|
|
nets = cur.execute("""
|
|
|
|
|
SELECT * FROM cell WHERE mcc = ? AND net = ? AND area = ?
|
|
|
|
|
ORDER BY abs(cell - ?);
|
|
|
|
|
""", (mcc, net, area, cell)).fetchmany(NUM_SAMPLES)
|
|
|
|
|
|
|
|
|
|
if not nets or len(nets) != NUM_SAMPLES:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
synth = {}
|
|
|
|
|
for field in 'lat', 'lon', 'range':
|
|
|
|
|
# float fields
|
|
|
|
|
synth[field] = sum(n[field] for n in nets) / NUM_SAMPLES
|
|
|
|
|
for field in 'updated', 'created', 'samples':
|
|
|
|
|
# int fields
|
|
|
|
|
synth[field] = sum(n[field] for n in nets) // NUM_SAMPLES
|
|
|
|
|
|
|
|
|
|
# guess a range, based on the standard deviation of sampled tower locations.
|
|
|
|
|
# each degree of latitude is about 100km, and the range field is in meters.
|
|
|
|
|
sum_of_squares = 0
|
|
|
|
|
for n in nets:
|
|
|
|
|
sum_of_squares += ((n['lat'] - synth['lat']) * M_PER_DEG) ** 2 + ((n['lon'] - synth['lon']) * M_PER_DEG) ** 2
|
|
|
|
|
rms = (sum_of_squares / NUM_SAMPLES)**0.5
|
|
|
|
|
log.debug(f'synthesized cell tower from {NUM_SAMPLES} real towers with {rms:.3}m std. dev')
|
|
|
|
|
synth['range'] += rms
|
|
|
|
|
|
|
|
|
|
if synth['range'] > MAX_RANGE:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
return synth
|
|
|
|
|
|
|
|
|
|
def _resolve_cell(self, radiotype, opc, lac, cid, search_fn) -> Optional[dict]:
|
|
|
|
|
radio = radiotype.upper()
|
|
|
|
|
mcc, mnc = opc_to_mcc_mnc(opc)
|
|
|
|
|
if mcc is None:
|
|
|
|
|
log.debug(f'opc to mcc,mnc conversion failed for {radio} {opc}_{lac}_{cid}')
|
|
|
|
|
return None
|
|
|
|
|
netd = self.search_network(radio, mcc, mnc, lac, cid)
|
|
|
|
|
netd = search_fn(radio, mcc, mnc, lac, cid)
|
|
|
|
|
if netd is None:
|
|
|
|
|
log.debug(f'Resolving failed for {radio} {opc}_{lac}_{cid}')
|
|
|
|
|
return None
|
|
|
|
@@ -77,7 +125,10 @@ class CellIdResolver(ResolverBase):
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def resolve_cell(self, radio, opc, lac, cid, **kwargs) -> Optional[dict]:
|
|
|
|
|
return self._resolve_cell(radio, opc, lac, cid)
|
|
|
|
|
best_match = self._resolve_cell(radio, opc, lac, cid, self.search_network)
|
|
|
|
|
if best_match is None:
|
|
|
|
|
best_match = self._resolve_cell(radio, opc, lac, cid, self.search_nearby)
|
|
|
|
|
return best_match
|
|
|
|
|
|
|
|
|
|
async def resolve_wifi(self, radio, mac, **kwargs) -> Optional[dict]:
|
|
|
|
|
return None
|
|
|
|
|