sane-scripts: make the UPnP/ssdp code more resilient to errors
This commit is contained in:
parent
66156829d9
commit
37995e23c2
Binary file not shown.
|
@ -66,16 +66,20 @@ def get_wan_from_location(location: str):
|
||||||
""" location = URI from the Location header, e.g. http://10.78.79.1:2189/rootDesc.xml """
|
""" location = URI from the Location header, e.g. http://10.78.79.1:2189/rootDesc.xml """
|
||||||
|
|
||||||
# get connection [s]tatus
|
# get connection [s]tatus
|
||||||
res = subprocess.run(["upnpc", "-u", location, "-s"], capture_output=True)
|
cmd = ["upnpc", "-u", location, "-s"]
|
||||||
res.check_returncode()
|
res = subprocess.run(cmd, capture_output=True)
|
||||||
|
if res.returncode != 0:
|
||||||
|
logger.info(f"get_wan_from_location failed: {cmd!r}\n{res.stderr}")
|
||||||
|
return None
|
||||||
|
|
||||||
status = res.stdout.decode("utf-8")
|
status = res.stdout.decode("utf-8")
|
||||||
logger.info(f"got status: {status}")
|
logger.debug(f"got status: {status}")
|
||||||
|
|
||||||
for line in [l.strip() for l in status.split("\n")]:
|
for line in [l.strip() for l in status.split("\n")]:
|
||||||
sentinel = "ExternalIPAddress ="
|
sentinel = "ExternalIPAddress ="
|
||||||
if line.startswith(sentinel):
|
if line.startswith(sentinel):
|
||||||
ip = line[len(sentinel):].strip()
|
ip = line[len(sentinel):].strip()
|
||||||
|
logger.info(f"got ExternalIPAddress = {ip} from {location}")
|
||||||
return ip
|
return ip
|
||||||
|
|
||||||
def get_any_wan():
|
def get_any_wan():
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
#!/usr/bin/env nix-shell
|
#!/usr/bin/env nix-shell
|
||||||
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p miniupnpc
|
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])" -p inetutils -p miniupnpc
|
||||||
|
|
||||||
'''
|
'''
|
||||||
USAGE: sane-ip-port-forward [options] [proto:port]*
|
USAGE: sane-ip-port-forward [options] [proto:port]*
|
||||||
|
@ -8,12 +8,15 @@ options:
|
||||||
-v: verbose (show info messages)
|
-v: verbose (show info messages)
|
||||||
-vv: more verbose (show debug messages)
|
-vv: more verbose (show debug messages)
|
||||||
-h: show this help messages
|
-h: show this help messages
|
||||||
|
-d <int>: lease for the given duration in seconds (default: {DEFAULT_LEASE_SEC})
|
||||||
|
|
||||||
proto:port:
|
proto:port:
|
||||||
proto is `udp` or `tcp` (case insensitive)
|
proto is `udp` or `tcp` (case insensitive)
|
||||||
port is any integer 1-65535 inclusive
|
port is any integer 1-65535 inclusive
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
DEFAULT_LEASE_SEC = 86400
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
@ -24,7 +27,7 @@ from lib.sane_ssdp import get_any_wan, forward_port
|
||||||
|
|
||||||
class BadCliArgs(Exception):
|
class BadCliArgs(Exception):
|
||||||
def __init__(self, msg: str = None):
|
def __init__(self, msg: str = None):
|
||||||
helpstr = __doc__.strip()
|
helpstr = __doc__.format(DEFAULT_LEASE_SEC=DEFAULT_LEASE_SEC).strip()
|
||||||
if msg:
|
if msg:
|
||||||
super().__init__(f"{msg}\n\n{helpstr}")
|
super().__init__(f"{msg}\n\n{helpstr}")
|
||||||
else:
|
else:
|
||||||
|
@ -45,25 +48,40 @@ def try_parse_port(s: str):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def parse_args(argv: "List[str]") -> "List[('udp'|'tcp', port)]":
|
def parse_args(argv: "List[str]") -> "List[('udp'|'tcp', port)]":
|
||||||
|
"""
|
||||||
|
returns (list of forwards, lease duration)
|
||||||
|
where:
|
||||||
|
- list of forwards is [('udp'|'tcp', port: int)]
|
||||||
|
- lease duration is seconds: int
|
||||||
|
"""
|
||||||
forwards = []
|
forwards = []
|
||||||
for arg in sys.argv[1:]:
|
duration = DEFAULT_LEASE_SEC
|
||||||
|
unparsed = sys.argv[1:][::-1]
|
||||||
|
while unparsed:
|
||||||
|
arg = unparsed.pop()
|
||||||
if arg == "-h":
|
if arg == "-h":
|
||||||
raise BadCliArgs()
|
raise BadCliArgs()
|
||||||
if arg == "-v":
|
if arg == "-v":
|
||||||
logging.getLogger().setLevel(logging.INFO)
|
logging.getLogger().setLevel(logging.INFO)
|
||||||
elif arg == "-vv":
|
elif arg == "-vv":
|
||||||
logging.getLogger().setLevel(logging.DEBUG)
|
logging.getLogger().setLevel(logging.DEBUG)
|
||||||
|
elif arg == "-d" and unparsed:
|
||||||
|
d = unparsed.pop()
|
||||||
|
try:
|
||||||
|
duration = int(d)
|
||||||
|
except Exception:
|
||||||
|
raise BadCliArgs(f"invalid CLI argument: -d {d!r}")
|
||||||
elif try_parse_port(arg):
|
elif try_parse_port(arg):
|
||||||
forwards.append(try_parse_port(arg))
|
forwards.append(try_parse_port(arg))
|
||||||
else:
|
else:
|
||||||
raise BadCliArgs(f"invalid CLI argument {arg!r}")
|
raise BadCliArgs(f"invalid CLI argument: {arg!r}")
|
||||||
return forwards
|
return forwards, duration
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
logging.basicConfig()
|
logging.basicConfig()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
forwards = parse_args(sys.argv)
|
forwards, duration = parse_args(sys.argv)
|
||||||
except BadCliArgs as e:
|
except BadCliArgs as e:
|
||||||
print(e)
|
print(e)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
@ -71,4 +89,4 @@ if __name__ == '__main__':
|
||||||
root_device, _wan = get_any_wan()
|
root_device, _wan = get_any_wan()
|
||||||
hostname = subprocess.check_output(["hostname"]).decode("utf-8").strip()
|
hostname = subprocess.check_output(["hostname"]).decode("utf-8").strip()
|
||||||
for (proto, port) in forwards:
|
for (proto, port) in forwards:
|
||||||
forward_port(root_device, proto, port, f"colin-{hostname}")
|
forward_port(root_device, proto, port, f"colin-{hostname}", duration=duration)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user