sane-script to forward a list of ports via UPnP

This commit is contained in:
Colin 2023-05-27 09:57:09 +00:00
parent c1ddddddc0
commit 3c40fa6982
5 changed files with 217 additions and 87 deletions

View File

@ -90,11 +90,17 @@ let
};
};
# remove python scripts (we package them further below)
patchPhase = builtins.concatStringsSep
"\n"
(lib.mapAttrsToList (name: pkg: "rm ${pkg.pname}") py-scripts)
;
patchPhase =
let
rmPy = builtins.concatStringsSep
"\n"
(lib.mapAttrsToList (name: pkg: "rm ${pkg.pname}") py-scripts)
;
in ''
# remove python library files, and python binaries (those are packaged further below)
rm -rf lib/
${rmPy}
'';
installPhase = ''
mkdir -p $out/bin
@ -142,6 +148,19 @@ let
pname = "sane-ip-check-upnp";
src = ./src;
pkgs = [ "miniupnpc" ];
postInstall = ''
mkdir -p $out/bin/lib
cp -R lib/* $out/bin/lib/
'';
};
ip-port-forward = static-nix-shell.mkPython3Bin {
pname = "sane-ip-port-forward";
src = ./src;
pkgs = [ "miniupnpc" ];
postInstall = ''
mkdir -p $out/bin/lib
cp -R lib/* $out/bin/lib/
'';
};
reclaim-boot-space = static-nix-shell.mkPython3Bin {
pname = "sane-reclaim-boot-space";

View File

@ -0,0 +1,110 @@
# based on this minimal SSDP client: <https://gist.github.com/schlamar/2428250>
import logging
import socket
import struct
import subprocess
logger = logging.getLogger(__name__)
MCAST_GRP = "239.255.255.250"
class SsdpResponse:
def __init__(self, headers: "Dict[str, str]"):
self.headers = headers
@staticmethod
def parse(msg: str) -> "Self":
headers = {}
for line in [m.strip() for m in msg.split("\r\n") if m.strip()]:
if ":" not in line: continue
sep_idx = line.find(":")
header, content = line[:sep_idx].strip(), line[sep_idx+1:].strip()
headers[header.upper()] = content
if headers:
return SsdpResponse(headers)
def is_rootdevice(self) -> bool:
return self.headers.get("NT", "").lower() == "upnp:rootdevice"
def location(self) -> str:
return self.headers.get("LOCATION")
def get_root_devices():
listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
listener.bind(("", 1900))
logger.info("bound")
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
listener.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
root_descs = set()
while True:
packet, (host, src_port) = listener.recvfrom(2048)
logger.info(f"message from {host}")
# if host.endswith(".1"): # router
try:
msg = packet.decode("utf-8")
except:
logger.debug("failed to decode packet to string")
else:
logger.debug(msg)
resp = SsdpResponse.parse(msg)
if resp and resp.is_rootdevice():
root_desc = resp.location()
if root_desc and root_desc not in root_descs:
root_descs.add(root_desc)
logger.info(f"root desc: {root_desc}")
yield root_desc
def get_wan_from_location(location: str):
""" location = URI from the Location header, e.g. http://10.78.79.1:2189/rootDesc.xml """
# get connection [s]tatus
res = subprocess.run(["upnpc", "-u", location, "-s"], capture_output=True)
res.check_returncode()
status = res.stdout.decode("utf-8")
logger.info(f"got status: {status}")
for line in [l.strip() for l in status.split("\n")]:
sentinel = "ExternalIPAddress ="
if line.startswith(sentinel):
ip = line[len(sentinel):].strip()
return ip
def get_any_wan():
""" return (location, WAN IP) for the first device seen which has a WAN IP """
for location in get_root_devices():
wan = get_wan_from_location(location)
if wan:
return location, wan
def get_lan_ip() -> str:
ips = subprocess.check_output(["hostname", "-i"]).decode("utf-8").strip().split(" ")
ips = [i for i in ips if i.startswith("10.") or i.startswith("192.168.")]
assert len(ips) == 1, ips
return ips[0]
def forward_port(root_device: str, proto: str, port: int, reason: str, duration: int = 86400, lan_ip: str = None):
lan_ip = lan_ip or get_lan_ip()
args = [
"upnpc",
"-u", root_device,
"-e", reason,
"-a", lan_ip,
str(port),
str(port),
proto,
str(duration),
]
logger.debug(f"running: {args!r}")
stdout = subprocess.check_output(args).decode("utf-8")
logger.info(stdout)

View File

@ -1,94 +1,17 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p miniupnpc
# based on this minimal SSDP client: <https://gist.github.com/schlamar/2428250>
# best to run this with an external timeout. e.g.
# - `timeout 60 sane-ip-check-upnp`
import logging
import socket
import struct
import subprocess
import os
import sys
logger = logging.getLogger(__name__)
d = os.path.dirname(__file__)
sys.path.insert(0, d)
MCAST_GRP = "239.255.255.250"
class SsdpResponse:
def __init__(self, headers: "Dict[str, str]"):
self.headers = headers
@staticmethod
def parse(msg: str) -> "Self":
headers = {}
for line in [m.strip() for m in msg.split("\r\n") if m.strip()]:
if ":" not in line: continue
sep_idx = line.find(":")
header, content = line[:sep_idx].strip(), line[sep_idx+1:].strip()
headers[header.upper()] = content
if headers:
return SsdpResponse(headers)
def is_rootdevice(self) -> bool:
return self.headers.get("NT", "").lower() == "upnp:rootdevice"
def location(self) -> str:
return self.headers.get("LOCATION")
def get_root_devices():
listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
listener.bind(("", 1900))
logger.info("bound")
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
listener.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
root_descs = set()
while True:
packet, (host, src_port) = listener.recvfrom(2048)
logger.info(f"message from {host}")
# if host.endswith(".1"): # router
try:
msg = packet.decode("utf-8")
except:
logger.debug("failed to decode packet to string")
else:
logger.debug(msg)
resp = SsdpResponse.parse(msg)
if resp and resp.is_rootdevice():
root_desc = resp.location()
if root_desc and root_desc not in root_descs:
root_descs.add(root_desc)
logger.info(f"root desc: {root_desc}")
yield root_desc
def get_wan_from_location(location: str):
""" location = URI from the Location header, e.g. http://10.78.79.1:2189/rootDesc.xml """
# get connection [s]tatus
res = subprocess.run(["upnpc", "-u", location, "-s"], capture_output=True)
res.check_returncode()
status = res.stdout.decode("utf-8")
logger.info(f"got status: {status}")
for line in [l.strip() for l in status.split("\n")]:
sentinel = "ExternalIPAddress ="
if line.startswith(sentinel):
ip = line[len(sentinel):].strip()
return ip
def get_any_wan():
for location in get_root_devices():
wan = get_wan_from_location(location)
if wan:
return wan
from lib.sane_ssdp import get_any_wan
if __name__ == '__main__':
logging.basicConfig()
@ -100,4 +23,5 @@ if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
else:
raise RuntimeError(f"invalid CLI argument {arg!r}")
print(get_any_wan())
_rootdev, wan_ip = get_any_wan()
print(wan_ip)

View File

@ -0,0 +1,74 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p miniupnpc
'''
USAGE: sane-ip-port-forward [options] [proto:port]*
options:
-v: verbose (show info messages)
-vv: more verbose (show debug messages)
-h: show this help messages
proto:port:
proto is `udp` or `tcp` (case insensitive)
port is any integer 1-65535 inclusive
'''
import logging
import subprocess
import sys
sys.path.insert(0, ".")
from lib.sane_ssdp import get_any_wan, forward_port
class BadCliArgs(Exception):
def __init__(self, msg: str = None):
helpstr = __doc__.strip()
if msg:
super().__init__(f"{msg}\n\n{helpstr}")
else:
super().__init__(helpstr)
def try_parse_port(s: str):
"""
`udp:53` -> ["udp", 53]
`tcp:65535` -> ["tcp", 65535]
"""
try:
proto, portstr = s.strip().split(":")
proto, port = proto.lower(), int(portstr)
assert proto in ["tcp", "udp"]
assert 0 < port < 65536
return proto, port
except Exception:
pass
def parse_args(argv: "List[str]") -> "List[('udp'|'tcp', port)]":
forwards = []
for arg in sys.argv[1:]:
if arg == "-h":
raise BadCliArgs()
if arg == "-v":
logging.getLogger().setLevel(logging.INFO)
elif arg == "-vv":
logging.getLogger().setLevel(logging.DEBUG)
elif try_parse_port(arg):
forwards.append(try_parse_port(arg))
else:
raise BadCliArgs(f"invalid CLI argument {arg!r}")
return forwards
if __name__ == '__main__':
logging.basicConfig()
try:
forwards = parse_args(sys.argv)
except BadCliArgs as e:
print(e)
sys.exit(1)
root_device, _wan = get_any_wan()
hostname = subprocess.check_output(["hostname"]).decode("utf-8").strip()
for (proto, port) in forwards:
forward_port(root_device, proto, port, f"colin-{hostname}")

View File

@ -53,6 +53,7 @@ in rec {
'';
nativeBuildInputs = [ makeWrapper ];
installPhase = ''
runHook preInstall
mkdir -p $out/bin
mv ${srcPath} $out/bin/${srcPath}
@ -62,6 +63,8 @@ in rec {
# add runtime dependencies to PATH
wrapProgram $out/bin/${srcPath} \
--suffix PATH : ${lib.makeBinPath pkgsEnv }
runHook postInstall
'';
} // (removeAttrs attrs [ "interpreter" "interpreterName" "pkgsEnv" "pkgExprs" "srcPath" ])
);