local: Add local database implementation

This commit is contained in:
Teemu Ikonen
2023-05-26 16:42:30 +03:00
parent 5e255f4608
commit 416bb5086f
8 changed files with 611 additions and 4 deletions

15
bin/recluster_local.py Executable file
View File

@@ -0,0 +1,15 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Entry point script for running ols inside the source tree
import re
import sys
from pathlib import Path
# Run from local source dir
sys.path.insert(0, str(Path(sys.argv[0]).resolve().parent.parent))
from ols.resolver.local import recluster_main # noqa: E402
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(recluster_main())

View File

@@ -86,7 +86,7 @@ conffile_args = [
]
valid_locator_types = ['clustering', 'firstcell', 'm8b', 'strongestwifi', 'web']
valid_resolver_types = ['cellid', 'wiglenet', 'wiglenetcache']
valid_resolver_types = ['cellid', 'wiglenet', 'wiglenetcache', 'localdb']
class MissingConfigException(Exception):
@@ -183,7 +183,7 @@ def get_config():
fix_datapath(conf, 'obsdb')
for wconf in (b for b in conf.get('resolver', {}).values()
if b.get('type', '') in ('wiglenet', 'cellid')):
if b.get('type', '') in ('wiglenet', 'cellid', 'localdb')):
fix_datapath(wconf, 'db')
for wconf in (b for b in conf.get('locator', {}).values()

View File

@@ -30,3 +30,6 @@ CELL_MAX_SIGNAL = {
CELLAREA_MIN_ACCURACY = 50000.0 # m
CELLAREA_MAX_ACCURACY = 500000.0 # m
CELLAREA_MAX_RADIUS = 500000.0 # m
TEMPORARY_BLOCKLIST_DURATION = 48 * 60 * 60 * 1e6 # 48 h in usec
MIN_CLUSTER_AGE = 60 * 1e6 # 60 sec

View File

@@ -0,0 +1,3 @@
"""Local resolver database."""
from .local import MIN_ACCURACY, get_localresolver, recluster_main # noqa: F401
from .localdb import LocalDB, TILE_ZOOM # noqa: F401

284
ols/resolver/local/local.py Normal file
View File

@@ -0,0 +1,284 @@
import itertools
import logging
import sys
from datetime import datetime
from typing import Optional
import fastcluster
import numpy
from .localdb import Cluster, LocalDB, Location, TILE_ZOOM
from ..base import ResolverBase
from ...constants import (
BLUE_MAX_RADIUS,
CELLAREA_MAX_RADIUS,
CELL_MAX_RADIUS,
MIN_CLUSTER_AGE,
TEMPORARY_BLOCKLIST_DURATION,
WIFI_DEFAULT_SIGNAL,
WIFI_MAX_RADIUS,
WIFI_MIN_SIGNAL,
)
from ...obsdb import UpdaterBase
from ...utils import (
circle_radius,
cut_from_linkage,
encode_cellarea,
encode_cellid,
encode_wifi,
haversine,
ichnaea_score,
mcc_mnc_to_opc,
tile_bbox,
tile_to_latlon,
webmercator_tile,
)
log = logging.getLogger(__name__)
MIN_ACCURACY = 150 # m
_dbfile_to_resolver = {}
def localdb_score(now, firstobs, lastobs, num_tiles):
"""Scoring for clusters is ichnaea score, but samples set to 2 * num_tiles."""
samples = 2 * num_tiles
return ichnaea_score(now, firstobs, lastobs, samples)
def get_localresolver(dbfile):
if dbfile in _dbfile_to_resolver:
return _dbfile_to_resolver[dbfile]
else:
resolver = LocalResolver(dbfile)
_dbfile_to_resolver[dbfile] = resolver
return resolver
class LocalResolver(ResolverBase, UpdaterBase):
def __init__(self, dbfile):
self.db = LocalDB(dbfile)
def insert_locate(self, observation, latlon, accuracy):
"""Insert observation from a (possibly resolved) locate API call."""
if accuracy is None or accuracy > MIN_ACCURACY:
return
if latlon is None or None in latlon:
return
# FIXME: Convert 'time' to an ms since epoch 'timestamp' in observation
timestamp = int(observation['time'].timestamp() * 1e6) # us from epoch
for wifi in observation.get('wifiAccessPoints') or []:
strength = wifi.get('signalStrength', WIFI_DEFAULT_SIGNAL)
if strength > WIFI_MIN_SIGNAL:
netid = encode_wifi(wifi['macAddress'])
self._add_network(netid, timestamp, latlon, accuracy)
for c in observation.get('cellTowers') or []:
opc = mcc_mnc_to_opc(c['mobileCountryCode'], c['mobileNetworkCode'])
cell_netid = encode_cellid(
c['radioType'], opc, c['locationAreaCode'], c['cellId'])
self._add_network(cell_netid, timestamp, latlon, accuracy)
cellarea_id = encode_cellarea(
c['radioType'], opc, c['locationAreaCode'])
self._add_network(cellarea_id, timestamp, latlon, accuracy)
def insert_submit(self, submission):
"""Insert observation from a submit API call."""
for item in submission['items']:
# Submission timestamp is in ms since epoch
timestamp = item.get('timestamp',
int(datetime.now().timestamp()) * 1e3)
# FIXME: insert_locate should use ms since epoch timestamp
# like in submit schema
item['time'] = datetime.fromtimestamp(timestamp / 1.0e3)
pos = item.get('position')
if pos is None:
log.warning("Submission without position")
return
# lat and lon are validated in the schema, accuracy is not
latlon = (pos['latitude'], pos['longitude'])
accuracy = pos.get('accuracy', 10e3)
self.insert_locate(item, latlon, accuracy)
def _rerun_clustering(self):
netids = self.db.list_location_netids()
log.info(f"Have {len(netids)} locations")
for i, netid in enumerate(netids):
if i % 100 == 0:
log.info(f"Location {i}/{len(netids)}: {netid}")
self._recluster(netid)
def _cluster_from_locations(self,
locations: list[Location],
valid: bool = False) -> Optional[Cluster]:
if not locations:
return None
nw, se = tile_bbox(locations[0]['x'], locations[0]['y'], TILE_ZOOM)
lat_corr = (se[0] - nw[0]) / 2.0
lon_corr = (se[1] - nw[1]) / 2.0
latlons = [tile_to_latlon(e['x'], e['y'], TILE_ZOOM) for e in locations]
center = (numpy.mean([e[0] for e in latlons]) + lat_corr,
numpy.mean([e[1] for e in latlons]) + lon_corr)
bb_nw = max(e[0] for e in latlons), min(e[1] for e in latlons)
bb_se = (min(e[0] for e in latlons) + 2.0 * lat_corr,
max(e[1] for e in latlons) + 2.0 * lon_corr)
radius = circle_radius(center, bb_nw, bb_se)
cluster: Cluster = {
'netid': locations[0]['netid'],
'valid': valid,
'firstobs': min(e['firstobs'] for e in locations),
'lastobs': max(e['lastobs'] for e in locations),
'numtiles': len(locations),
'lat': center[0],
'lon': center[1],
'radius': radius,
}
return cluster
def _recluster(self, netid: str) -> None:
"""Recalculate clustering for netid and update DB."""
locations = self.db.find_locations(netid)
length = len(locations)
msg = f"Reclustering {netid}"
self.db.delete_clusters(netid)
if length < 1:
log.debug(msg + ": no locations")
return
if length < 2:
cluster = self._cluster_from_locations(locations, True)
self.db.insert_cluster(cluster)
# log.debug(msg + ": 1 location")
return
# TODO: Automatic max radius determination from data (discard outliers
# etc.) for cell and cellarea nets
if netid.startswith('w_'):
max_cluster_radius = WIFI_MAX_RADIUS
elif netid.startswith('b_'):
max_cluster_radius = BLUE_MAX_RADIUS
elif netid.startswith('c_'):
max_cluster_radius = CELLAREA_MAX_RADIUS
else:
max_cluster_radius = CELL_MAX_RADIUS
positions = [tile_to_latlon(loc['x'], loc['y'], TILE_ZOOM) for loc in locations]
dist_matrix = numpy.zeros(length * (length - 1) // 2, dtype=numpy.double)
for i, (a, b) in enumerate(itertools.combinations(positions, 2)):
dist_matrix[i] = haversine(a, b)
if dist_matrix.max() < max_cluster_radius: # 1 cluster
cluster = self._cluster_from_locations(locations, True)
self.db.insert_cluster(cluster)
# log.debug(msg + f": 1 cluster with {length} locations")
return
# More than 1 cluster
link_matrix = fastcluster.linkage(dist_matrix, method="complete")
cut = cut_from_linkage(link_matrix, max_cluster_radius)
n_clusters = max(cut) + 1
# A netid where observations cluster into multiple groups is either
# 1) mobile, so there will be no valid clusters,
# 2) has one large observation cluster and one or more very small clusters
# which are not recent, likely caused by observation errors
# 3) moved, but stable after moving, i.e. has more than one large clusters
# which are clearly separated in observation time
# Being moved and valid requires that i) there is no
# overlap in observation time range (firstobs to lastobs) between the
# cluster with the latest observation and other valid clusters and that ii)
# there is a multi-day (TEMPORARY_BLOCKLIST_DURATION) range of observations
# in the latest cluster.
groups = [[] for _ in range(n_clusters)]
for i, c in enumerate(cut):
groups[c].append(locations[i])
is_valid = []
tranges = []
lasttime = 0
lastind = -1
i = 0
for i, group in enumerate(groups):
mintime = min(e['firstobs'] for e in group)
maxtime = max(e['lastobs'] for e in group)
is_valid.append((maxtime - mintime) > MIN_CLUSTER_AGE)
tranges.append((mintime, maxtime))
if maxtime > lasttime:
lasttime = maxtime
lastind = i
have_valid = (
(lastind >= 0)
# Cluster with latest observation has been observed long enough
and ((tranges[lastind][1] - tranges[lastind][0])
> TEMPORARY_BLOCKLIST_DURATION)
# Other valid clusters do not overlap in observation range
and (max((tranges[i][1] for i in range(len(tranges))
if i != lastind and is_valid[i]),
default=0) < tranges[lastind][0]))
# FIXME: Should maybe count the number of recent clusters and invalidate
# if there are too many of them.
# TODO: Don't store oldest clusters, if there are many newer clusters.
# Also delete the old locations from DB.
for i, group in enumerate(groups):
cluster = self._cluster_from_locations(
group, valid=(i == lastind and have_valid))
self.db.insert_cluster(cluster)
log.debug(msg + f": {n_clusters} clusters, valid = {have_valid}")
def _add_network(self,
netid: str,
timestamp: int, # usec from epoch
latlon: tuple[float, float],
accuracy: float) -> None:
tile = webmercator_tile(latlon[0], latlon[1], TILE_ZOOM)
location: Location = {
"netid": netid,
"x": tile.x,
"y": tile.y,
"firstobs": timestamp,
"lastobs": timestamp,
}
if self.db.insert_location(location):
log.debug(f"Reclustering {netid}")
self._recluster(netid)
def _resolve_netid(self, netid) -> Optional[dict]:
cluster = self.db.find_best_cluster(netid)
if cluster is None:
return None
now = int(datetime.now().timestamp() * 1e6)
return {
'latlon': (cluster['lat'], cluster['lon']),
'radius': cluster['radius'],
'age': now - cluster['lastobs'],
'score': localdb_score(
now, cluster['firstobs'], cluster['lastobs'], cluster['numtiles']),
}
async def resolve_wifi(self, mac, **kwargs) -> Optional[dict]:
return self._resolve_netid(encode_wifi(mac))
async def resolve_cell(self, tec, opc, lac, cid, **kwargs) -> Optional[dict]:
cmodel = self._resolve_netid(encode_cellid(tec, opc, lac, cid))
return (cmodel if cmodel is not None
else self._resolve_netid(encode_cellarea(tec, opc, lac)))
def recluster_main():
dbfile = sys.argv[1]
lres = LocalResolver(dbfile)
lres._rerun_clustering()

View File

@@ -0,0 +1,208 @@
import logging
import sqlite3
from typing import Optional
from typing_extensions import TypedDict
VERSION = 1
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
# Zoom level for location tiles.
# z = 21 gives a ~27 m tile diagonal in the equator and ~9 m on lat = 70.0.
# Compare with WIFI_MIN_RADIUS = 10 m, i.e. 20 m diameter.
TILE_ZOOM = 21
Cluster = TypedDict('Cluster', {
'netid': str,
'valid': bool,
'firstobs': int, # uint from epoch
'lastobs': int, # uint from epoch
'numtiles': int,
'lat': float,
'lon': float,
'radius': float,
})
Location = TypedDict('Location', {
'netid': str,
'x': int, # tile x
'y': int, # tile y, z is TILE_ZOOM
'firstobs': int, # uint from epoch
'lastobs': int, # uint from epoch
})
class LocalDB(object):
def __init__(self, filename) -> None:
self.con = sqlite3.connect(filename)
self.con.row_factory = sqlite3.Row
self.maybe_create()
log.debug('Database file: ' + filename)
log.debug('Database version: ' + str(self.get_version()))
def get_version(self) -> int:
cur = self.con.cursor()
return cur.execute('PRAGMA user_version').fetchone()[0]
def maybe_create(self):
"""DB with local observations with location."""
version = self.get_version()
if version != 0 and version != VERSION:
log.error('Wrong version of the database detected, exiting')
raise ValueError
cur = self.con.cursor()
# FIXME: Cellarea searches (the (rtype, opc, lac) tuple, see ichnaea) would
# need either a separate netid type (like 'c_' + f'{rtype}_{opc}_{lac}') or
# storing cell tuple items separately for cell locations and clusters,
# so that the cell area could be efficiently extracted from cell rows.
cur.execute("""
CREATE TABLE IF NOT EXISTS cluster (
-- # Sqlite creates a rowid automatically: rowid INTEGER
netid TEXT, -- 'w_' + mac for wifi, rtype_opc_lac_cid for cell
valid INTEGER, -- 1 for valid cluster, only one valid per netid allowed
firstobs INTEGER, -- usec from epoch
lastobs INTEGER, -- usec from epoch
-- numobs INTEGER, -- number of observations in this cluster
numtiles INTEGER, -- number of distinct tiles, numobs proxy in scoring
lat REAL, -- calculated from locations
lon REAL,
radius REAL -- meters
-- score INTEGER, -- ichnaea_score can be computed from other cols
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS location (
-- # Sqlite creates a rowid automatically: rowid INTEGER
netid TEXT, -- mac for wifi/bt, opc_lac_cid tuple for cell
x INTEGER, -- tile x
y INTEGER, -- tile y, z is TILE_ZOOM
firstobs INTEGER, -- usec from epoch
lastobs INTEGER, -- usec from epoch
-- numobs, -- number of observations in this tile
PRIMARY KEY (netid, x, y)
);
""")
cur.execute(f'PRAGMA user_version = {VERSION}')
self.con.commit()
def find_best_cluster(self, netid) -> Optional[Cluster]:
cur = self.con.cursor()
cs = cur.execute("""
SELECT * FROM cluster WHERE netid = ? AND valid = 1;
""", (netid,)).fetchall()
clusters = [Cluster(e) for e in cs]
if len(clusters) > 1:
log.error("More than 1 valid clusters in DB!")
return clusters[0] if clusters else None
def find_clusters(self, netid: int) -> list[Cluster]:
cur = self.con.cursor()
cs = cur.execute("""
SELECT * FROM cluster WHERE netid = ?;
""", (netid,)).fetchall()
return [Cluster(e) for e in cs]
def delete_clusters(self, netid: str) -> None:
"""Delete all clusters with netid from DB."""
cur = self.con.cursor()
cur.execute("""
DELETE FROM cluster WHERE netid = ?;
""", (netid,))
self.con.commit()
def insert_cluster(self, cluster: Optional[Cluster]) -> None:
"""Insert cluster into DB, does not check for duplicates."""
if cluster is None:
return
cur = self.con.cursor()
cur.execute("""
INSERT INTO cluster VALUES(
:netid,
:valid,
:firstobs,
:lastobs,
:numtiles,
:lat,
:lon,
:radius
)
""", cluster)
self.con.commit()
def list_location_netids(self) -> list[str]:
cur = self.con.cursor()
netids = cur.execute("""
SELECT DISTINCT(netid) FROM location;
""")
return [n[0] for n in netids.fetchall()]
def list_valid_cluster_netids(self) -> list[str]:
cur = self.con.cursor()
netids = cur.execute("""
SELECT DISTINCT(netid) FROM cluster WHERE valid = 1;
""")
return [n[0] for n in netids.fetchall()]
def list_invalid_cluster_netids(self) -> list[str]:
cur = self.con.cursor()
netids = cur.execute("""
SELECT netid FROM cluster
EXCEPT
SELECT netid FROM cluster WHERE valid == 1;
""")
return [n[0] for n in netids.fetchall()]
def find_locations(self, netid) -> list[Location]:
cur = self.con.cursor()
locs = cur.execute("""
SELECT * FROM location WHERE netid = ?;
""", (netid,)).fetchall()
return [Location(e) for e in locs]
def delete_location(self, location: Location) -> None:
pass
def insert_location(self, location: Location) -> bool:
"""Insert or update a location tile, tell if clustering needs updating.
If the location tile for the netid does not exist in the DB, create
a new one and return True.
If it exists, update the timestamps firstobs and lastobs and return
False.
"""
# FIXME: This should also update the 'lastobs' timestamp of the
# corresponding cluster
cur = self.con.cursor()
oldloc = cur.execute("""
SELECT rowid, * FROM location WHERE netid = ? AND x = ? AND y = ?;
""", (location["netid"], location["x"], location["y"])).fetchone()
if oldloc is None:
log.debug("INSERT location")
cur.execute("""
INSERT INTO location VALUES(
:netid,
:x,
:y,
:firstobs,
:lastobs
)
""", location)
update_needed = True
else:
log.debug("UPDATE location")
cur.execute("""
UPDATE location
SET firstobs = ?, lastobs = ?
WHERE rowid = ?
""", (min(location["firstobs"], oldloc["firstobs"]),
max(location["lastobs"], oldloc["lastobs"]),
oldloc["rowid"]))
update_needed = False
self.con.commit()
return update_needed

View File

@@ -180,6 +180,10 @@ def main():
from .resolver.wiglenet import WiglenetCacheOnlyResolver
wconf = conf['resolver'][name]
return WiglenetCacheOnlyResolver(wconf['db'])
elif rtype == 'localdb':
from .resolver.local import get_localresolver
lconf = conf['resolver'][name]
return get_localresolver(lconf['db'])
else:
log.error(f"Unknown resolver type '{rtype}'")
return None

View File

@@ -4,6 +4,79 @@ from typing import Optional
import mercantile # type: ignore
def clusters_from_linkage(links):
"""Return cluster memberships as a list of lists for each non-singleton cluster.
Linkage matrix links is from fastcluster or scipy.cluster.hierarchy.linkage.
For example, a clustering of 4 elements could result in a linkage matrix
with 2 rows, i.e. the full linkage has 6 clusters, for example
[[0], [1], [2], [3], [0, 1], [2, 3]]. The first 4 clusters are the trivial
singleton clusters which are omitted, so the returned value is
[[0, 1], [2, 3]].
"""
length = int(links[-1, -1]) # number of singleton clusters
def is_singleton(n):
"""Return True if index points to a singleton cluster."""
return n < length
clusters = [[n] for n in range(length)] + [
[int(links[m, 0]), int(links[m, 1])] for m in range(links.shape[0])]
all_resolved = False
while not all_resolved:
all_resolved = True
for ci in range(len(clusters)):
if not all(is_singleton(index) for index in clusters[ci]):
all_resolved = False
newcluster = []
for n in clusters[ci]:
if is_singleton(n):
newcluster.append(n)
else:
newcluster.extend(clusters[n])
clusters[ci] = newcluster
return clusters[length:] # omit singletons
def cut_from_linkage(links, level):
"""Given a linkage matrix and a height level, return flat clusters.
Linkage matrix is from fastcluster or scipy.cluster.hierarchy.linkage.
Like scipy.clustering.hierarchy.cut_tree with height == level.
"""
assert level >= 0.0
length = int(links[-1, -1]) # number of singleton clusters
clusters = clusters_from_linkage(links)
levels = [0.0] * length + [links[m, 2] for m in range(links.shape[0])]
included = [length + len(clusters) - 1] # last cluster with all elements
all_resolved = False
while not all_resolved:
all_resolved = True
new_included = []
for cind in included:
if levels[cind] < level:
new_included.append(cind)
else:
new_included.extend(int(x) for x in links[cind - length, 0:2])
all_resolved = False
included = new_included
out = [-1] * length
for i, cind in enumerate(included):
elems = clusters[cind - length] if cind >= length else [cind]
for eind in elems:
out[eind] = i
assert all(e >= 0 for e in out)
return out
# Haversine function nicked from:
# https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
def haversine(p1, p2):
@@ -96,6 +169,17 @@ def webmercator_tile(lat, lon, z):
return mercantile.tile(lon, lat, z)
def tile_to_latlon(x, y, z):
lnglat = mercantile.ul(x, y, z)
return lnglat[1], lnglat[0]
def tile_bbox(x, y, z):
"""Return northwest and southeast corners of a tile."""
b = mercantile.bounds((x, y, z))
return (b.north, b.west), (b.south, b.east)
def min_or_none(seq: list[Optional[int]]) -> Optional[int]:
if any(x is None for x in seq):
return None
@@ -132,5 +216,11 @@ def mcc_mnc_to_opc(mcc, mnc):
return f'{mcc:03}{mnc:02}'
def encode_cellid(radiotype, opc, lac, cid, **kwargs):
return f'{radiotype}_{opc}_{lac}_{cid}'
def encode_cellid(tec, opc, lac, cid, **kwargs):
return f'{tec.upper()}_{opc}_{lac}_{cid}'
def encode_cellarea(tec, opc, lac, **kwargs):
return f'c_{tec.upper()}_{opc}_{lac}'
def encode_wifi(mac):
return f'w_{mac.lower()}'