cellidresolver: synthesize a cell tower if no exact match is found in the database

This commit is contained in:
2024-06-21 16:20:47 +00:00
parent 810508aa7e
commit 2caacd27a6

View File

@@ -56,13 +56,58 @@ class CellIdResolver(ResolverBase):
else:
return None
def _resolve_cell(self, radiotype, opc, lac, cid) -> Optional[dict]:
def search_nearby(self, radiotype, mcc, net, area, cell) -> Optional[dict]:
"""
synthesize a cell tower from towers whose `cell` id is close to the id of interest,
under the assumption that towers with similar cell ids have similar location.
this assumption is crudely validated, by aborting if the distance between sampled
towers is absurdly large.
"""
M_PER_DEG = 111319
NUM_SAMPLES = 4 #< take this many nearest towers
MAX_RANGE = 20000 #< if the uncertainty is > this many meters, just return `None`
cur = self.con.cursor()
# radiotype is not required in observations
radio = radiotype.upper() if radiotype is not None else 'GSM'
# TODO: replace this "fetchone" with a sampling
nets = cur.execute("""
SELECT * FROM cell WHERE mcc = ? AND net = ? AND area = ?
ORDER BY abs(cell - ?);
""", (mcc, net, area, cell)).fetchmany(NUM_SAMPLES)
if not nets or len(nets) != NUM_SAMPLES:
return None
synth = {}
for field in 'lat', 'lon', 'range':
# float fields
synth[field] = sum(n[field] for n in nets) / NUM_SAMPLES
for field in 'updated', 'created', 'samples':
# int fields
synth[field] = sum(n[field] for n in nets) // NUM_SAMPLES
# guess a range, based on the standard deviation of sampled tower locations.
# each degree of latitude is about 100km, and the range field is in meters.
sum_of_squares = 0
for n in nets:
sum_of_squares += ((n['lat'] - synth['lat']) * M_PER_DEG) ** 2 + ((n['lon'] - synth['lon']) * M_PER_DEG) ** 2
rms = (sum_of_squares / NUM_SAMPLES)**0.5
log.debug(f'synthesized cell tower from {NUM_SAMPLES} real towers with {rms:.3}m std. dev')
synth['range'] += rms
if synth['range'] > MAX_RANGE:
return None
return synth
def _resolve_cell(self, radiotype, opc, lac, cid, search_fn) -> Optional[dict]:
radio = radiotype.upper()
mcc, mnc = opc_to_mcc_mnc(opc)
if mcc is None:
log.debug(f'opc to mcc,mnc conversion failed for {radio} {opc}_{lac}_{cid}')
return None
netd = self.search_network(radio, mcc, mnc, lac, cid)
netd = search_fn(radio, mcc, mnc, lac, cid)
if netd is None:
log.debug(f'Resolving failed for {radio} {opc}_{lac}_{cid}')
return None
@@ -77,7 +122,10 @@ class CellIdResolver(ResolverBase):
}
async def resolve_cell(self, radio, opc, lac, cid, **kwargs) -> Optional[dict]:
return self._resolve_cell(radio, opc, lac, cid)
best_match = self._resolve_cell(radio, opc, lac, cid, self.search_network)
if best_match is None:
best_match = self._resolve_cell(radio, opc, lac, cid, self.search_nearby)
return best_match
async def resolve_wifi(self, radio, mac, **kwargs) -> Optional[dict]:
return None