122 lines
3.4 KiB
Python
122 lines
3.4 KiB
Python
# flake8: noqa
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import json
|
|
from pprint import pp
|
|
import tempfile
|
|
from pathlib import Path
|
|
import argparse
|
|
import httpx
|
|
import dns.zone
|
|
import dns.rdtypes
|
|
|
|
SOPS_BIN = "@sops@"
|
|
DNS_SECRETS_FILE = "@dns_secrets_file@"
|
|
data_str = "@data@"
|
|
DATA = json.loads(data_str)
|
|
|
|
secrets_json = subprocess.check_output([SOPS_BIN, "-d", DNS_SECRETS_FILE])
|
|
secrets = json.loads(secrets_json)
|
|
|
|
AUTH_ID = secrets["auth_id"]
|
|
AUTH_PASSWORD = secrets["auth_password"]
|
|
|
|
BASE_URL = "https://api.cloudns.net"
|
|
|
|
def req(path:str, **kwargs):
|
|
auth_params = {
|
|
"auth-id": AUTH_ID,
|
|
"auth-password": AUTH_PASSWORD,
|
|
}
|
|
|
|
params = { k.replace("_","-"): v for k, v in kwargs.items() }
|
|
|
|
return httpx.get(BASE_URL + path, params={**auth_params, **params}).json()
|
|
|
|
def textify(z:dns.zone.Zone) -> str:
|
|
for node in z.nodes.values():
|
|
node.rdatasets.sort(key = lambda rrd: (rrd.rdclass, rrd.rdtype, rrd.covers, rrd.ttl))
|
|
return z.to_text(sorted = True, relativize = True, nl = "\n", want_comments = False, want_origin = True)
|
|
|
|
def set_soa_serial(zone:dns.zone.Zone, serial:int):
|
|
soa = zone.find_rdataset(zone.origin, 'SOA')
|
|
old_soa = soa[0]
|
|
|
|
new_soa = dns.rdtypes.ANY.SOA.SOA(
|
|
old_soa.rdclass,
|
|
old_soa.rdtype,
|
|
old_soa.mname,
|
|
old_soa.rname,
|
|
serial,
|
|
old_soa.refresh,
|
|
old_soa.retry,
|
|
old_soa.expire,
|
|
old_soa.minimum
|
|
)
|
|
|
|
soa.clear()
|
|
soa.add(new_soa)
|
|
|
|
def display_and_maybe_update(origin: str, update: bool):
|
|
desired_zone = dns.zone.from_text(DATA[origin], origin = origin)
|
|
|
|
res = req("/dns/records-export.json", domain_name = origin)
|
|
current_zone_str = res["zone"]
|
|
current_zone = dns.zone.from_text(current_zone_str, origin = origin)
|
|
|
|
assert(desired_zone.rdclass == current_zone.rdclass)
|
|
assert(desired_zone.origin == current_zone.origin)
|
|
|
|
# cloudns makes its own serial, we can't change it.
|
|
# set desired serial to match current serial
|
|
current_serial = current_zone.get_soa().serial
|
|
set_soa_serial(desired_zone, current_serial)
|
|
|
|
current_text = textify(current_zone)
|
|
desired_text = textify(desired_zone)
|
|
if current_text == desired_text:
|
|
print("No difference")
|
|
return
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
current = Path(f"{tmpdir}/current-zone.bind")
|
|
current.write_text(current_text)
|
|
desired = Path(f"{tmpdir}/desired-zone.bind")
|
|
desired.write_text(desired_text)
|
|
os.system(f"git diff --no-index {current} {desired}")
|
|
|
|
if not update:
|
|
return
|
|
user_input = input("Do you want to continue? (y/n): ").strip().lower()
|
|
|
|
if user_input != 'y':
|
|
print("Abort.")
|
|
sys.exit(1)
|
|
|
|
res = req("/dns/records-import.json", domain_name = origin, format = "bind", content = desired_text, delete_existing_records = 1)
|
|
pp(res)
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--domain")
|
|
parser.add_argument("--all-domains", action="store_true")
|
|
parser.add_argument("--update", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
all_domains = bool(args.all_domains)
|
|
update = bool(args.update)
|
|
|
|
assert((args.domain is not None) != all_domains)
|
|
|
|
if all_domains:
|
|
assert(args.domain is None)
|
|
domains = DATA.keys()
|
|
else:
|
|
assert(args.domain is not None)
|
|
domains = [args.domain]
|
|
|
|
for domain in domains:
|
|
print(domain)
|
|
print("---------")
|
|
display_and_maybe_update(origin=domain, update=update)
|