cellid: Add a DB importer from opencellid CSV format
This commit is contained in:
@@ -1,2 +1,2 @@
|
||||
"""CellId database resolver."""
|
||||
from .cellidresolver import CellIdResolver # noqa: F401
|
||||
from .cellidresolver import CellIdResolver, CellIdWriter # noqa: F401
|
||||
|
68
ols/resolver/cellid/cellid_import.py
Normal file
68
ols/resolver/cellid/cellid_import.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import argparse
|
||||
import csv
|
||||
import gzip
|
||||
import logging
|
||||
|
||||
from ols.resolver.cellid import CellIdWriter
|
||||
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format='[%(asctime)s.%(msecs)03d] %(levelname)s: %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S')
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def int_or_none(s):
|
||||
try:
|
||||
return int(s)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def open_maybe_gzip(filename):
|
||||
with open(filename, 'rb') as f:
|
||||
is_gzip = (f.read(2) == b'\x1f\x8b')
|
||||
if is_gzip:
|
||||
return gzip.open(filename, 'rt')
|
||||
else:
|
||||
return open(filename, 'rt')
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description=(
|
||||
'Import selected rows from cell id export CSV file to an sqlite DB file'))
|
||||
parser.add_argument('CSVFILE')
|
||||
parser.add_argument('-o', '--output', dest='DBFILE', required=True)
|
||||
parser.add_argument('-c', '--mcc', dest='MCC', required=False)
|
||||
parser.add_argument('-n', '--mnc', dest='MNC', required=False)
|
||||
args = parser.parse_args()
|
||||
|
||||
db = CellIdWriter(args.DBFILE)
|
||||
|
||||
mcc = int_or_none(args.MCC)
|
||||
mnc = int_or_none(args.MNC)
|
||||
|
||||
log.info('Starting')
|
||||
rows = []
|
||||
with open_maybe_gzip(args.CSVFILE) as csvfile:
|
||||
has_header = csv.Sniffer().has_header(csvfile.read(1024))
|
||||
csvfile.seek(0)
|
||||
reader = csv.reader(csvfile, delimiter=',')
|
||||
if has_header:
|
||||
_ = next(reader)
|
||||
for ind, row in enumerate(reader):
|
||||
if (ind + 1) % 1e5 == 0:
|
||||
log.debug(f'On line {ind + 1}')
|
||||
if row is None:
|
||||
continue
|
||||
if mcc is not None and mcc != int(row[1]):
|
||||
continue
|
||||
if mnc is not None and mnc != int(row[2]):
|
||||
continue
|
||||
rows.append(row)
|
||||
|
||||
log.info(f"Processed {ind} rows.")
|
||||
log.info(f"Found {len(rows)} matching rows.")
|
||||
db.insert_or_replace_csvrows(rows)
|
||||
log.info('Done')
|
@@ -10,6 +10,7 @@ log = logging.getLogger(__name__)
|
||||
|
||||
DB_VERSION = 1
|
||||
|
||||
|
||||
class CellIdResolver(object):
|
||||
"""Resolve cell towers from db with columns in the cell id export format.
|
||||
|
||||
@@ -79,3 +80,69 @@ class CellIdResolver(object):
|
||||
|
||||
async def resolve_wifi(self, radio, mac, **kwargs) -> Optional[dict]:
|
||||
return None
|
||||
|
||||
|
||||
def convert_csvrow(r):
|
||||
"""Convert columns in opencellid format row and return necessary ones."""
|
||||
try:
|
||||
# Full row
|
||||
# row = (r[0], int(r[1]), int(r[2]), int(r[3]), int(r[4]), int_or_none(r[5]),
|
||||
# float(r[6]), float(r[7]), int(r[8]), int(r[9]), int(r[10]),
|
||||
# int(r[11]), int(r[12]), int_or_none(r[13]))
|
||||
# Selected items
|
||||
row = (r[0], int(r[1]), int(r[2]), int(r[3]), int(r[4]),
|
||||
float(r[6]), float(r[7]), int(r[8]), int(r[10]),
|
||||
int(r[11]), int(r[12]))
|
||||
return row
|
||||
except ValueError as e:
|
||||
log.error(f'Error converting row: {r}')
|
||||
raise e
|
||||
|
||||
|
||||
class CellIdWriter(object):
|
||||
"""Creates a CellId DB used by CellIdResolver."""
|
||||
|
||||
def __init__(self, dbfile) -> None:
|
||||
self.con = sqlite3.connect(dbfile)
|
||||
self.maybe_create_db()
|
||||
log.debug('Database version: ' + str(self.get_db_version()))
|
||||
self.con.row_factory = sqlite3.Row
|
||||
|
||||
def get_db_version(self):
|
||||
cur = self.con.cursor()
|
||||
return cur.execute('PRAGMA user_version').fetchone()[0]
|
||||
|
||||
def maybe_create_db(self):
|
||||
version = self.get_db_version()
|
||||
if version != 0 and version != DB_VERSION:
|
||||
log.error('Incompatible database version detected')
|
||||
raise ValueError
|
||||
|
||||
cur = self.con.cursor()
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS cell (
|
||||
radio TEXT NOT NULL,
|
||||
mcc INTEGER NOT NULL,
|
||||
net INTEGER NOT NULL,
|
||||
area INTEGER NOT NULL,
|
||||
cell INTEGER NOT NULL,
|
||||
lon REAL NOT NULL,
|
||||
lat REAL NOT NULL,
|
||||
range INTEGER,
|
||||
samples INTEGER,
|
||||
created INTEGER,
|
||||
updated INTEGER,
|
||||
PRIMARY KEY (radio, mcc, net, area, cell)
|
||||
);
|
||||
""")
|
||||
cur.execute(f'PRAGMA user_version = {DB_VERSION}')
|
||||
self.con.commit()
|
||||
|
||||
def insert_or_replace_rows(self, rows):
|
||||
self.con.executemany("""
|
||||
INSERT OR REPLACE INTO cell VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
|
||||
""", rows)
|
||||
self.con.commit()
|
||||
|
||||
def insert_or_replace_csvrows(self, csvrows):
|
||||
self.insert_or_replace_rows([convert_csvrow(r) for r in csvrows])
|
||||
|
Reference in New Issue
Block a user