examples: run python black on "examples/python/gi/nm-wg-set"

black by default only considers files that have a ".py" extension.
This commit is contained in:
Thomas Haller
2020-07-02 17:37:12 +02:00
parent 3b896cc642
commit 1acd64b7a2

View File

@@ -61,23 +61,32 @@ import sys
import os import os
import gi import gi
gi.require_version('NM', '1.0')
gi.require_version("NM", "1.0")
from gi.repository import NM from gi.repository import NM
class MyError(Exception): class MyError(Exception):
pass pass
def pr(v): def pr(v):
import pprint import pprint
pprint.pprint(v, indent=4, depth=5, width=60) pprint.pprint(v, indent=4, depth=5, width=60)
############################################################################### ###############################################################################
def connection_is_wireguard(conn): def connection_is_wireguard(conn):
s_con = conn.get_setting(NM.SettingConnection) s_con = conn.get_setting(NM.SettingConnection)
return s_con \ return (
and s_con.get_connection_type() == NM.SETTING_WIREGUARD_SETTING_NAME \ s_con
and conn.get_setting(NM.SettingWireGuard) and s_con.get_connection_type() == NM.SETTING_WIREGUARD_SETTING_NAME
and conn.get_setting(NM.SettingWireGuard)
)
def connection_to_str(conn): def connection_to_str(conn):
if connection_is_wireguard(conn): if connection_is_wireguard(conn):
@@ -85,52 +94,57 @@ def connection_to_str(conn):
if iface: if iface:
extra = ', interface: "%s"' % (iface) extra = ', interface: "%s"' % (iface)
else: else:
extra = '' extra = ""
else: else:
extra = ', type: %s' % (conn.get_setting(NM.SettingConnection).get_connection_type()) extra = ", type: %s" % (
conn.get_setting(NM.SettingConnection).get_connection_type()
)
return '"%s" (%s%s)' % (conn.get_id(), conn.get_uuid(), extra) return '"%s" (%s%s)' % (conn.get_id(), conn.get_uuid(), extra)
def connections_wg(connections): def connections_wg(connections):
l = list([c for c in connections if connection_is_wireguard(c)]) l = list([c for c in connections if connection_is_wireguard(c)])
l.sort(key = connection_to_str) l.sort(key=connection_to_str)
return l return l
def connections_find(connections, con_spec, con_id): def connections_find(connections, con_spec, con_id):
connections = list(sorted(connections, key=connection_to_str)) connections = list(sorted(connections, key=connection_to_str))
l = [] l = []
if con_spec in [None, 'id']: if con_spec in [None, "id"]:
for c in connections: for c in connections:
if con_id == c.get_id(): if con_id == c.get_id():
if c not in l: if c not in l:
l.append(c) l.append(c)
if con_spec in [None, 'interface']: if con_spec in [None, "interface"]:
for c in connections: for c in connections:
s_con = c.get_setting(NM.SettingConnection) s_con = c.get_setting(NM.SettingConnection)
if s_con \ if s_con and con_id == s_con.get_interface_name():
and con_id == s_con.get_interface_name():
if c not in l: if c not in l:
l.append(c) l.append(c)
if con_spec in [None, 'uuid']: if con_spec in [None, "uuid"]:
for c in connections: for c in connections:
if con_id == c.get_uuid(): if con_id == c.get_uuid():
if c not in l: if c not in l:
l.append(c) l.append(c)
l.sort(key = connection_to_str) l.sort(key=connection_to_str)
return l return l
def print_hint(nm_client): def print_hint(nm_client):
print('Maybe you want to create a profile first with') print("Maybe you want to create a profile first with")
print(' nmcli connection add type wireguard ifname wg0 $MORE_ARGS') print(" nmcli connection add type wireguard ifname wg0 $MORE_ARGS")
connections = connections_wg(nm_client.get_connections()) connections = connections_wg(nm_client.get_connections())
if connections: if connections:
print('Or edit one of the following WireGuard profiles:') print("Or edit one of the following WireGuard profiles:")
for c in connections: for c in connections:
print (' - %s' % (connection_to_str(c))) print(" - %s" % (connection_to_str(c)))
############################################################################### ###############################################################################
def argv_get_one(argv, idx, type_ctor=None, topic=None): def argv_get_one(argv, idx, type_ctor=None, topic=None):
if topic is not None: if topic is not None:
@@ -146,7 +160,7 @@ def argv_get_one(argv, idx, type_ctor=None, topic=None):
try: try:
v = argv[idx] v = argv[idx]
except: except:
raise MyError('missing argument') raise MyError("missing argument")
if type_ctor is not None: if type_ctor is not None:
try: try:
v = type_ctor(v) v = type_ctor(v)
@@ -154,16 +168,18 @@ def argv_get_one(argv, idx, type_ctor=None, topic=None):
raise MyError('invalid argument "%s" (%s)' % (v, e.message)) raise MyError('invalid argument "%s" (%s)' % (v, e.message))
return v return v
############################################################################### ###############################################################################
def arg_parse_secret_flags(arg): def arg_parse_secret_flags(arg):
try: try:
f = arg.strip() f = arg.strip()
n = { n = {
'none': NM.SettingSecretFlags.NONE, "none": NM.SettingSecretFlags.NONE,
'not-saved': NM.SettingSecretFlags.NOT_SAVED, "not-saved": NM.SettingSecretFlags.NOT_SAVED,
'not-required': NM.SettingSecretFlags.NOT_REQUIRED, "not-required": NM.SettingSecretFlags.NOT_REQUIRED,
'agent-owned': NM.SettingSecretFlags.AGENT_OWNED, "agent-owned": NM.SettingSecretFlags.AGENT_OWNED,
}.get(f) }.get(f)
if n is not None: if n is not None:
return n return n
@@ -171,7 +187,8 @@ def arg_parse_secret_flags(arg):
except Exception as e: except Exception as e:
raise MyError('invalid secret flags "%s"' % (arg)) raise MyError('invalid secret flags "%s"' % (arg))
def _arg_parse_int(arg, vmin, vmax, key, base = 0):
def _arg_parse_int(arg, vmin, vmax, key, base=0):
try: try:
v = int(arg, base) v = int(arg, base)
if v >= vmin and vmax <= 0xFFFFFFFF: if v >= vmin and vmax <= 0xFFFFFFFF:
@@ -180,18 +197,22 @@ def _arg_parse_int(arg, vmin, vmax, key, base = 0):
raise MyError('invalid %s "%s"' % (key, arg)) raise MyError('invalid %s "%s"' % (key, arg))
raise MyError("%s out of range" % (key)) raise MyError("%s out of range" % (key))
def arg_parse_listen_port(arg): def arg_parse_listen_port(arg):
return _arg_parse_int(arg, 0, 0xFFFF, "listen-port") return _arg_parse_int(arg, 0, 0xFFFF, "listen-port")
def arg_parse_fwmark(arg): def arg_parse_fwmark(arg):
return _arg_parse_int(arg, 0, 0xFFFFFFFF, "fwmark", base = 0) return _arg_parse_int(arg, 0, 0xFFFFFFFF, "fwmark", base=0)
def arg_parse_persistent_keep_alive(arg): def arg_parse_persistent_keep_alive(arg):
return _arg_parse_int(arg, 0, 0xFFFFFFFF, "persistent-keepalive") return _arg_parse_int(arg, 0, 0xFFFFFFFF, "persistent-keepalive")
def arg_parse_allowed_ips(arg): def arg_parse_allowed_ips(arg):
l = [s.strip() for s in arg.strip().split(',')] l = [s.strip() for s in arg.strip().split(",")]
l = [s for s in l if s != ''] l = [s for s in l if s != ""]
l = list(l) l = list(l)
# use a peer to parse and validate the allowed-ips. # use a peer to parse and validate the allowed-ips.
peer = NM.WireGuardPeer() peer = NM.WireGuardPeer()
@@ -200,40 +221,47 @@ def arg_parse_allowed_ips(arg):
raise MyError('invalid allowed-ip "%s"' % (aip)) raise MyError('invalid allowed-ip "%s"' % (aip))
return l return l
############################################################################### ###############################################################################
def secret_flags_to_string(flags): def secret_flags_to_string(flags):
nick = { nick = {
NM.SettingSecretFlags.NONE: 'none', NM.SettingSecretFlags.NONE: "none",
NM.SettingSecretFlags.NOT_SAVED: 'not-saved', NM.SettingSecretFlags.NOT_SAVED: "not-saved",
NM.SettingSecretFlags.NOT_REQUIRED: 'not-required', NM.SettingSecretFlags.NOT_REQUIRED: "not-required",
NM.SettingSecretFlags.AGENT_OWNED: 'agent-owned', NM.SettingSecretFlags.AGENT_OWNED: "agent-owned",
}.get(flags) }.get(flags)
num = str(int(flags)) num = str(int(flags))
if nick is None: if nick is None:
return num return num
return '%s (%s)' % (num, nick) return "%s (%s)" % (num, nick)
def secret_to_string(secret): def secret_to_string(secret):
if os.environ.get('WG_HIDE_KEYS', '') != 'never': if os.environ.get("WG_HIDE_KEYS", "") != "never":
return '(hidden)' return "(hidden)"
if not secret: if not secret:
return '' return ""
return secret return secret
def val_to_str(val): def val_to_str(val):
if val == NM.Ternary.DEFAULT: if val == NM.Ternary.DEFAULT:
return 'default' return "default"
if val == NM.Ternary.TRUE: if val == NM.Ternary.TRUE:
return 'true' return "true"
if val == NM.Ternary.FALSE: if val == NM.Ternary.FALSE:
return 'false' return "false"
return repr(val) return repr(val)
############################################################################### ###############################################################################
def wg_read_private_key(privkey_file): def wg_read_private_key(privkey_file):
import base64 import base64
try: try:
with open(privkey_file, "r") as f: with open(privkey_file, "r") as f:
data = f.read() data = f.read()
@@ -244,17 +272,20 @@ def wg_read_private_key(privkey_file):
except Exception as e: except Exception as e:
raise MyError('failed to read private key "%s": %s' % (privkey_file, e.message)) raise MyError('failed to read private key "%s": %s' % (privkey_file, e.message))
def wg_peer_is_valid(peer, msg = None):
def wg_peer_is_valid(peer, msg=None):
try: try:
peer.is_valid(True, True) peer.is_valid(True, True)
except gi.repository.GLib.Error as e: except gi.repository.GLib.Error as e:
if msg is None: if msg is None:
raise MyError('%s' % (e.message)) raise MyError("%s" % (e.message))
else: else:
raise MyError('%s' % (msg)) raise MyError("%s" % (msg))
############################################################################### ###############################################################################
def do_get(nm_client, connection): def do_get(nm_client, connection):
s_con = conn.get_setting(NM.SettingConnection) s_con = conn.get_setting(NM.SettingConnection)
s_wg = conn.get_setting(NM.SettingWireGuard) s_wg = conn.get_setting(NM.SettingWireGuard)
@@ -262,24 +293,55 @@ def do_get(nm_client, connection):
# Fetching secrets is not implemented. For now show them all as # Fetching secrets is not implemented. For now show them all as
# <hidden>. # <hidden>.
print('interface: %s' % (s_con.get_interface_name())) print("interface: %s" % (s_con.get_interface_name()))
print('uuid: %s' % (conn.get_uuid())) print("uuid: %s" % (conn.get_uuid()))
print('id: %s' % (conn.get_id())) print("id: %s" % (conn.get_id()))
print('private-key: %s' % (secret_to_string(s_wg.get_private_key()))) print(
print('private-key-flags: %s' % (secret_flags_to_string(s_wg.get_private_key_flags()))) "private-key: %s" % (secret_to_string(s_wg.get_private_key()))
print('listen-port: %s' % (s_wg.get_listen_port())) )
print('fwmark: 0x%x' % (s_wg.get_fwmark())) print(
print('peer-routes: %s' % (val_to_str(s_wg.get_peer_routes()))) "private-key-flags: %s"
print('ip4-auto-default-route: %s' % (val_to_str(s_wg.get_ip4_auto_default_route()))) % (secret_flags_to_string(s_wg.get_private_key_flags()))
print('ip6-auto-default-route: %s' % (val_to_str(s_wg.get_ip6_auto_default_route()))) )
print("listen-port: %s" % (s_wg.get_listen_port()))
print("fwmark: 0x%x" % (s_wg.get_fwmark()))
print("peer-routes: %s" % (val_to_str(s_wg.get_peer_routes())))
print(
"ip4-auto-default-route: %s"
% (val_to_str(s_wg.get_ip4_auto_default_route()))
)
print(
"ip6-auto-default-route: %s"
% (val_to_str(s_wg.get_ip6_auto_default_route()))
)
for i in range(s_wg.get_peers_len()): for i in range(s_wg.get_peers_len()):
peer = s_wg.get_peer(i) peer = s_wg.get_peer(i)
print('peer[%d].public-key: %s' % (i, peer.get_public_key())) print("peer[%d].public-key: %s" % (i, peer.get_public_key()))
print('peer[%d].preshared-key: %s' % (i, secret_to_string(peer.get_preshared_key()))) print(
print('peer[%d].preshared-key-flags: %s' % (i, secret_flags_to_string(peer.get_preshared_key_flags()))) "peer[%d].preshared-key: %s"
print('peer[%d].endpoint: %s' % (i, peer.get_endpoint() if peer.get_endpoint() else '')) % (i, secret_to_string(peer.get_preshared_key()))
print('peer[%d].persistent-keepalive: %s' % (i, peer.get_persistent_keepalive())) )
print('peer[%d].allowed-ips: %s' % (i, ','.join([peer.get_allowed_ip(j) for j in range(peer.get_allowed_ips_len())]))) print(
"peer[%d].preshared-key-flags: %s"
% (i, secret_flags_to_string(peer.get_preshared_key_flags()))
)
print(
"peer[%d].endpoint: %s"
% (i, peer.get_endpoint() if peer.get_endpoint() else "")
)
print(
"peer[%d].persistent-keepalive: %s" % (i, peer.get_persistent_keepalive())
)
print(
"peer[%d].allowed-ips: %s"
% (
i,
",".join(
[peer.get_allowed_ip(j) for j in range(peer.get_allowed_ips_len())]
),
)
)
def do_set(nm_client, conn, argv): def do_set(nm_client, conn, argv):
s_wg = conn.get_setting(NM.SettingWireGuard) s_wg = conn.get_setting(NM.SettingWireGuard)
@@ -291,9 +353,7 @@ def do_set(nm_client, conn, argv):
try: try:
idx = 0 idx = 0
while True: while True:
if peer \ if peer and (idx >= len(argv) or argv[idx] == "peer"):
and ( idx >= len(argv) \
or argv[idx] == 'peer'):
if peer_remove: if peer_remove:
pp_peer, pp_idx = s_wg.get_peer_by_public_key(peer.get_public_key()) pp_peer, pp_idx = s_wg.get_peer_by_public_key(peer.get_public_key())
if pp_peer: if pp_peer:
@@ -312,29 +372,40 @@ def do_set(nm_client, conn, argv):
peer_secret_flags = None peer_secret_flags = None
if idx >= len(argv): if idx >= len(argv):
break; break
if not peer and argv[idx] == 'private-key': if not peer and argv[idx] == "private-key":
key = argv_get_one(argv, idx + 1, None, idx) key = argv_get_one(argv, idx + 1, None, idx)
if key == '': if key == "":
s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY, None) s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY, None)
else: else:
s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY, wg_read_private_key(key)) s_wg.set_property(
NM.SETTING_WIREGUARD_PRIVATE_KEY, wg_read_private_key(key)
)
idx += 2 idx += 2
continue continue
if not peer and argv[idx] == 'private-key-flags': if not peer and argv[idx] == "private-key-flags":
s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY_FLAGS, argv_get_one(argv, idx + 1, arg_parse_secret_flags, idx)) s_wg.set_property(
NM.SETTING_WIREGUARD_PRIVATE_KEY_FLAGS,
argv_get_one(argv, idx + 1, arg_parse_secret_flags, idx),
)
idx += 2 idx += 2
continue continue
if not peer and argv[idx] == 'listen-port': if not peer and argv[idx] == "listen-port":
s_wg.set_property(NM.SETTING_WIREGUARD_LISTEN_PORT, argv_get_one(argv, idx + 1, arg_parse_listen_port, idx)) s_wg.set_property(
NM.SETTING_WIREGUARD_LISTEN_PORT,
argv_get_one(argv, idx + 1, arg_parse_listen_port, idx),
)
idx += 2 idx += 2
continue continue
if not peer and argv[idx] == 'fwmark': if not peer and argv[idx] == "fwmark":
s_wg.set_property(NM.SETTING_WIREGUARD_FWMARK, argv_get_one(argv, idx + 1, arg_parse_fwmark, idx)) s_wg.set_property(
NM.SETTING_WIREGUARD_FWMARK,
argv_get_one(argv, idx + 1, arg_parse_fwmark, idx),
)
idx += 2 idx += 2
continue continue
if argv[idx] == 'peer': if argv[idx] == "peer":
public_key = argv_get_one(argv, idx + 1, None, idx) public_key = argv_get_one(argv, idx + 1, None, idx)
peer, peer_idx = s_wg.get_peer_by_public_key(public_key) peer, peer_idx = s_wg.get_peer_by_public_key(public_key)
if peer: if peer:
@@ -347,13 +418,13 @@ def do_set(nm_client, conn, argv):
peer_remove = False peer_remove = False
idx += 2 idx += 2
continue continue
if peer and argv[idx] == 'remove': if peer and argv[idx] == "remove":
peer_remove = True peer_remove = True
idx += 1 idx += 1
continue continue
if peer and argv[idx] == 'preshared-key': if peer and argv[idx] == "preshared-key":
psk = argv_get_one(argv, idx + 1, None, idx) psk = argv_get_one(argv, idx + 1, None, idx)
if psk == '': if psk == "":
peer.set_preshared_key(None, True) peer.set_preshared_key(None, True)
if peer_secret_flags is not None: if peer_secret_flags is not None:
peer_secret_flags = NM.SettingSecretFlags.NOT_REQUIRED peer_secret_flags = NM.SettingSecretFlags.NOT_REQUIRED
@@ -363,20 +434,26 @@ def do_set(nm_client, conn, argv):
peer_secret_flags = NM.SettingSecretFlags.NONE peer_secret_flags = NM.SettingSecretFlags.NONE
idx += 2 idx += 2
continue continue
if peer and argv[idx] == 'preshared-key-flags': if peer and argv[idx] == "preshared-key-flags":
peer_secret_flags = argv_get_one(argv, idx + 1, arg_parse_secret_flags, idx) peer_secret_flags = argv_get_one(
argv, idx + 1, arg_parse_secret_flags, idx
)
idx += 2 idx += 2
continue continue
if peer and argv[idx] == 'endpoint': if peer and argv[idx] == "endpoint":
peer.set_endpoint(argv_get_one(argv, idx + 1, None, idx), True) peer.set_endpoint(argv_get_one(argv, idx + 1, None, idx), True)
idx += 2 idx += 2
continue continue
if peer and argv[idx] == 'persistent-keepalive': if peer and argv[idx] == "persistent-keepalive":
peer.set_persistent_keepalive(argv_get_one(argv, idx + 1, arg_parse_persistent_keep_alive, idx)) peer.set_persistent_keepalive(
argv_get_one(argv, idx + 1, arg_parse_persistent_keep_alive, idx)
)
idx += 2 idx += 2
continue continue
if peer and argv[idx] == 'allowed-ips': if peer and argv[idx] == "allowed-ips":
allowed_ips = list(argv_get_one(argv, idx + 1, arg_parse_allowed_ips, idx)) allowed_ips = list(
argv_get_one(argv, idx + 1, arg_parse_allowed_ips, idx)
)
peer.clear_allowed_ips() peer.clear_allowed_ips()
for aip in allowed_ips: for aip in allowed_ips:
peer.append_allowed_ip(aip, False) peer.append_allowed_ip(aip, False)
@@ -386,21 +463,22 @@ def do_set(nm_client, conn, argv):
raise MyError('invalid argument "%s"' % (argv[idx])) raise MyError('invalid argument "%s"' % (argv[idx]))
except MyError as e: except MyError as e:
print('Error: %s' % (e.message)) print("Error: %s" % (e.message))
sys.exit(1) sys.exit(1)
try: try:
conn.commit_changes(True, None) conn.commit_changes(True, None)
except Exception as e: except Exception as e:
print('failure to commit connection: %s' % (e)) print("failure to commit connection: %s" % (e))
sys.exit(1) sys.exit(1)
print('Success') print("Success")
sys.exit(0) sys.exit(0)
############################################################################### ###############################################################################
if __name__ == '__main__': if __name__ == "__main__":
nm_client = NM.Client.new(None) nm_client = NM.Client.new(None)
@@ -409,13 +487,17 @@ if __name__ == '__main__':
con_spec = None con_spec = None
if len(argv) >= 1: if len(argv) >= 1:
if argv[0] in [ 'id', 'uuid', 'interface' ]: if argv[0] in ["id", "uuid", "interface"]:
con_spec = argv[0] con_spec = argv[0]
del argv[0] del argv[0]
if len(argv) < 1: if len(argv) < 1:
print('Requires an existing NetworkManager connection profile as first argument') print(
print('Select it based on the connection ID, UUID, or interface-name (optionally qualify the selection with [id|uuid|interface])') "Requires an existing NetworkManager connection profile as first argument"
)
print(
"Select it based on the connection ID, UUID, or interface-name (optionally qualify the selection with [id|uuid|interface])"
)
print_hint(nm_client) print_hint(nm_client)
sys.exit(1) sys.exit(1)
con_id = argv[0] con_id = argv[0]
@@ -423,19 +505,29 @@ if __name__ == '__main__':
connections = connections_find(nm_client.get_connections(), con_spec, con_id) connections = connections_find(nm_client.get_connections(), con_spec, con_id)
if len(connections) == 0: if len(connections) == 0:
print('No matching connection %s\"%s\" found.' % ((con_spec+' ' if con_spec else ''), con_id)) print(
'No matching connection %s"%s" found.'
% ((con_spec + " " if con_spec else ""), con_id)
)
print_hint(nm_client) print_hint(nm_client)
sys.exit(1) sys.exit(1)
if len(connections) > 1: if len(connections) > 1:
print("Connection %s\"%s\" is not unique (%s)" % ((con_spec+' ' if con_spec else ''), con_id, ', '.join(['['+connection_to_str(c)+']' for c in connections]))) print(
'Connection %s"%s" is not unique (%s)'
% (
(con_spec + " " if con_spec else ""),
con_id,
", ".join(["[" + connection_to_str(c) + "]" for c in connections]),
)
)
if not con_spec: if not con_spec:
print('Maybe qualify the name with [id|uuid|interface]?') print("Maybe qualify the name with [id|uuid|interface]?")
sys.exit(1) sys.exit(1)
conn = connections[0] conn = connections[0]
if not connection_is_wireguard(conn): if not connection_is_wireguard(conn):
print('Connection %s is not a WireGuard profile' % (connection_to_str(conn))) print("Connection %s is not a WireGuard profile" % (connection_to_str(conn)))
print('See available profiles with `nmcli connection show`') print("See available profiles with `nmcli connection show`")
sys.exit(1) sys.exit(1)
try: try:
@@ -449,4 +541,3 @@ if __name__ == '__main__':
do_get(nm_client, conn) do_get(nm_client, conn)
else: else:
do_set(nm_client, conn, argv) do_set(nm_client, conn, argv)