dyn-dns: obtain IP address via UPnP

This commit is contained in:
Colin 2023-05-26 22:39:32 +00:00
parent 5b80308074
commit 7e402ce974
4 changed files with 110 additions and 19 deletions

View File

@ -5,7 +5,8 @@ let
cfg = config.sane.services.dyn-dns;
getIp = pkgs.writeShellScript "dyn-dns-query-wan" ''
# preferred method and fallback
${pkgs.sane-scripts}/bin/sane-ip-check-router-wan || \
# OPNsense router broadcasts its UPnP endpoint every 30s
timeout 60 ${pkgs.sane-scripts}/bin/sane-ip-check-upnp || \
${pkgs.sane-scripts}/bin/sane-ip-check
'';
in

View File

@ -138,6 +138,11 @@ let
pname = "sane-date-math";
src = ./src;
};
ip-check-upnp = static-nix-shell.mkPython3Bin {
pname = "sane-ip-check-upnp";
src = ./src;
pkgs = [ "miniupnpc" ];
};
reclaim-boot-space = static-nix-shell.mkPython3Bin {
pname = "sane-reclaim-boot-space";
src = ./src;

View File

@ -1,18 +0,0 @@
#!/usr/bin/env bash
# query the WAN IP address OF MY ROUTER
# requires creds
passwd=$(sudo cat /run/secrets/router_passwd)
cookie=$(mktemp)
curlflags="curl --silent --insecure --cookie-jar $cookie --connect-timeout 5"
# authenticate
curl $curlflags \
--data "username=admin&password=$passwd" \
https://192.168.0.1
# query the WAN IP
ip=$(curl $curlflags \
-H "X-Requested-With: XMLHttpRequest" \
"https://192.168.0.1/cgi/cgi_action?Action=GetConnectionStatus" \
| jq -r .wan_status.ipaddr)
echo "$ip" | grep -P " *^\d+\.\d+\.\d+\.\d+ *$"
exit $?

View File

@ -0,0 +1,103 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p miniupnpc
# based on this minimal SSDP client: <https://gist.github.com/schlamar/2428250>
# best to run this with an external timeout. e.g.
# - `timeout 60 sane-ip-check-upnp`
import logging
import socket
import struct
import subprocess
import sys
logger = logging.getLogger(__name__)
MCAST_GRP = "239.255.255.250"
class SsdpResponse:
def __init__(self, headers: "Dict[str, str]"):
self.headers = headers
@staticmethod
def parse(msg: str) -> "Self":
headers = {}
for line in [m.strip() for m in msg.split("\r\n") if m.strip()]:
if ":" not in line: continue
sep_idx = line.find(":")
header, content = line[:sep_idx].strip(), line[sep_idx+1:].strip()
headers[header.upper()] = content
if headers:
return SsdpResponse(headers)
def is_rootdevice(self) -> bool:
return self.headers.get("NT", "").lower() == "upnp:rootdevice"
def location(self) -> str:
return self.headers.get("LOCATION")
def get_root_devices():
listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
listener.bind(("", 1900))
logger.info("bound")
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
listener.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
root_descs = set()
while True:
packet, (host, src_port) = listener.recvfrom(2048)
logger.info(f"message from {host}")
# if host.endswith(".1"): # router
try:
msg = packet.decode("utf-8")
except:
logger.debug("failed to decode packet to string")
else:
logger.debug(msg)
resp = SsdpResponse.parse(msg)
if resp and resp.is_rootdevice():
root_desc = resp.location()
if root_desc and root_desc not in root_descs:
root_descs.add(root_desc)
logger.info(f"root desc: {root_desc}")
yield root_desc
def get_wan_from_location(location: str):
""" location = URI from the Location header, e.g. http://10.78.79.1:2189/rootDesc.xml """
# get connection [s]tatus
res = subprocess.run(["upnpc", "-u", location, "-s"], capture_output=True)
res.check_returncode()
status = res.stdout.decode("utf-8")
logger.info(f"got status: {status}")
for line in [l.strip() for l in status.split("\n")]:
sentinel = "ExternalIPAddress ="
if line.startswith(sentinel):
ip = line[len(sentinel):].strip()
return ip
def get_any_wan():
for location in get_root_devices():
wan = get_wan_from_location(location)
if wan:
return wan
if __name__ == '__main__':
logging.basicConfig()
for arg in sys.argv[1:]:
if arg == "-v":
logging.getLogger().setLevel(logging.INFO)
elif arg == "-vv":
logging.getLogger().setLevel(logging.DEBUG)
else:
raise RuntimeError(f"invalid CLI argument {arg!r}")
print(get_any_wan())