diff --git a/examples/python/gi/nm-wg-set b/examples/python/gi/nm-wg-set index 1f5279ee6..4f074605c 100755 --- a/examples/python/gi/nm-wg-set +++ b/examples/python/gi/nm-wg-set @@ -61,23 +61,32 @@ import sys import os import gi -gi.require_version('NM', '1.0') + +gi.require_version("NM", "1.0") from gi.repository import NM + class MyError(Exception): pass + def pr(v): import pprint + pprint.pprint(v, indent=4, depth=5, width=60) + ############################################################################### + def connection_is_wireguard(conn): s_con = conn.get_setting(NM.SettingConnection) - return s_con \ - and s_con.get_connection_type() == NM.SETTING_WIREGUARD_SETTING_NAME \ - and conn.get_setting(NM.SettingWireGuard) + return ( + s_con + and s_con.get_connection_type() == NM.SETTING_WIREGUARD_SETTING_NAME + and conn.get_setting(NM.SettingWireGuard) + ) + def connection_to_str(conn): if connection_is_wireguard(conn): @@ -85,52 +94,57 @@ def connection_to_str(conn): if iface: extra = ', interface: "%s"' % (iface) else: - extra = '' + extra = "" else: - extra = ', type: %s' % (conn.get_setting(NM.SettingConnection).get_connection_type()) + extra = ", type: %s" % ( + conn.get_setting(NM.SettingConnection).get_connection_type() + ) return '"%s" (%s%s)' % (conn.get_id(), conn.get_uuid(), extra) + def connections_wg(connections): l = list([c for c in connections if connection_is_wireguard(c)]) - l.sort(key = connection_to_str) + l.sort(key=connection_to_str) return l + def connections_find(connections, con_spec, con_id): connections = list(sorted(connections, key=connection_to_str)) l = [] - if con_spec in [None, 'id']: + if con_spec in [None, "id"]: for c in connections: if con_id == c.get_id(): if c not in l: l.append(c) - if con_spec in [None, 'interface']: + if con_spec in [None, "interface"]: for c in connections: s_con = c.get_setting(NM.SettingConnection) - if s_con \ - and con_id == s_con.get_interface_name(): + if s_con and con_id == s_con.get_interface_name(): if c not in l: l.append(c) - if con_spec in [None, 'uuid']: + if con_spec in [None, "uuid"]: for c in connections: if con_id == c.get_uuid(): if c not in l: l.append(c) - l.sort(key = connection_to_str) + l.sort(key=connection_to_str) return l def print_hint(nm_client): - print('Maybe you want to create a profile first with') - print(' nmcli connection add type wireguard ifname wg0 $MORE_ARGS') + print("Maybe you want to create a profile first with") + print(" nmcli connection add type wireguard ifname wg0 $MORE_ARGS") connections = connections_wg(nm_client.get_connections()) if connections: - print('Or edit one of the following WireGuard profiles:') + print("Or edit one of the following WireGuard profiles:") for c in connections: - print (' - %s' % (connection_to_str(c))) + print(" - %s" % (connection_to_str(c))) + ############################################################################### + def argv_get_one(argv, idx, type_ctor=None, topic=None): if topic is not None: @@ -146,7 +160,7 @@ def argv_get_one(argv, idx, type_ctor=None, topic=None): try: v = argv[idx] except: - raise MyError('missing argument') + raise MyError("missing argument") if type_ctor is not None: try: v = type_ctor(v) @@ -154,16 +168,18 @@ def argv_get_one(argv, idx, type_ctor=None, topic=None): raise MyError('invalid argument "%s" (%s)' % (v, e.message)) return v + ############################################################################### + def arg_parse_secret_flags(arg): try: f = arg.strip() n = { - 'none': NM.SettingSecretFlags.NONE, - 'not-saved': NM.SettingSecretFlags.NOT_SAVED, - 'not-required': NM.SettingSecretFlags.NOT_REQUIRED, - 'agent-owned': NM.SettingSecretFlags.AGENT_OWNED, + "none": NM.SettingSecretFlags.NONE, + "not-saved": NM.SettingSecretFlags.NOT_SAVED, + "not-required": NM.SettingSecretFlags.NOT_REQUIRED, + "agent-owned": NM.SettingSecretFlags.AGENT_OWNED, }.get(f) if n is not None: return n @@ -171,7 +187,8 @@ def arg_parse_secret_flags(arg): except Exception as e: raise MyError('invalid secret flags "%s"' % (arg)) -def _arg_parse_int(arg, vmin, vmax, key, base = 0): + +def _arg_parse_int(arg, vmin, vmax, key, base=0): try: v = int(arg, base) if v >= vmin and vmax <= 0xFFFFFFFF: @@ -180,18 +197,22 @@ def _arg_parse_int(arg, vmin, vmax, key, base = 0): raise MyError('invalid %s "%s"' % (key, arg)) raise MyError("%s out of range" % (key)) + def arg_parse_listen_port(arg): return _arg_parse_int(arg, 0, 0xFFFF, "listen-port") + def arg_parse_fwmark(arg): - return _arg_parse_int(arg, 0, 0xFFFFFFFF, "fwmark", base = 0) + return _arg_parse_int(arg, 0, 0xFFFFFFFF, "fwmark", base=0) + def arg_parse_persistent_keep_alive(arg): return _arg_parse_int(arg, 0, 0xFFFFFFFF, "persistent-keepalive") + def arg_parse_allowed_ips(arg): - l = [s.strip() for s in arg.strip().split(',')] - l = [s for s in l if s != ''] + l = [s.strip() for s in arg.strip().split(",")] + l = [s for s in l if s != ""] l = list(l) # use a peer to parse and validate the allowed-ips. peer = NM.WireGuardPeer() @@ -200,40 +221,47 @@ def arg_parse_allowed_ips(arg): raise MyError('invalid allowed-ip "%s"' % (aip)) return l + ############################################################################### + def secret_flags_to_string(flags): nick = { - NM.SettingSecretFlags.NONE: 'none', - NM.SettingSecretFlags.NOT_SAVED: 'not-saved', - NM.SettingSecretFlags.NOT_REQUIRED: 'not-required', - NM.SettingSecretFlags.AGENT_OWNED: 'agent-owned', - }.get(flags) + NM.SettingSecretFlags.NONE: "none", + NM.SettingSecretFlags.NOT_SAVED: "not-saved", + NM.SettingSecretFlags.NOT_REQUIRED: "not-required", + NM.SettingSecretFlags.AGENT_OWNED: "agent-owned", + }.get(flags) num = str(int(flags)) if nick is None: return num - return '%s (%s)' % (num, nick) + return "%s (%s)" % (num, nick) + def secret_to_string(secret): - if os.environ.get('WG_HIDE_KEYS', '') != 'never': - return '(hidden)' + if os.environ.get("WG_HIDE_KEYS", "") != "never": + return "(hidden)" if not secret: - return '' + return "" return secret + def val_to_str(val): if val == NM.Ternary.DEFAULT: - return 'default' + return "default" if val == NM.Ternary.TRUE: - return 'true' + return "true" if val == NM.Ternary.FALSE: - return 'false' + return "false" return repr(val) + ############################################################################### + def wg_read_private_key(privkey_file): import base64 + try: with open(privkey_file, "r") as f: data = f.read() @@ -244,17 +272,20 @@ def wg_read_private_key(privkey_file): except Exception as e: raise MyError('failed to read private key "%s": %s' % (privkey_file, e.message)) -def wg_peer_is_valid(peer, msg = None): + +def wg_peer_is_valid(peer, msg=None): try: peer.is_valid(True, True) except gi.repository.GLib.Error as e: if msg is None: - raise MyError('%s' % (e.message)) + raise MyError("%s" % (e.message)) else: - raise MyError('%s' % (msg)) + raise MyError("%s" % (msg)) + ############################################################################### + def do_get(nm_client, connection): s_con = conn.get_setting(NM.SettingConnection) s_wg = conn.get_setting(NM.SettingWireGuard) @@ -262,24 +293,55 @@ def do_get(nm_client, connection): # Fetching secrets is not implemented. For now show them all as # . - print('interface: %s' % (s_con.get_interface_name())) - print('uuid: %s' % (conn.get_uuid())) - print('id: %s' % (conn.get_id())) - print('private-key: %s' % (secret_to_string(s_wg.get_private_key()))) - print('private-key-flags: %s' % (secret_flags_to_string(s_wg.get_private_key_flags()))) - print('listen-port: %s' % (s_wg.get_listen_port())) - print('fwmark: 0x%x' % (s_wg.get_fwmark())) - print('peer-routes: %s' % (val_to_str(s_wg.get_peer_routes()))) - print('ip4-auto-default-route: %s' % (val_to_str(s_wg.get_ip4_auto_default_route()))) - print('ip6-auto-default-route: %s' % (val_to_str(s_wg.get_ip6_auto_default_route()))) + print("interface: %s" % (s_con.get_interface_name())) + print("uuid: %s" % (conn.get_uuid())) + print("id: %s" % (conn.get_id())) + print( + "private-key: %s" % (secret_to_string(s_wg.get_private_key())) + ) + print( + "private-key-flags: %s" + % (secret_flags_to_string(s_wg.get_private_key_flags())) + ) + print("listen-port: %s" % (s_wg.get_listen_port())) + print("fwmark: 0x%x" % (s_wg.get_fwmark())) + print("peer-routes: %s" % (val_to_str(s_wg.get_peer_routes()))) + print( + "ip4-auto-default-route: %s" + % (val_to_str(s_wg.get_ip4_auto_default_route())) + ) + print( + "ip6-auto-default-route: %s" + % (val_to_str(s_wg.get_ip6_auto_default_route())) + ) for i in range(s_wg.get_peers_len()): peer = s_wg.get_peer(i) - print('peer[%d].public-key: %s' % (i, peer.get_public_key())) - print('peer[%d].preshared-key: %s' % (i, secret_to_string(peer.get_preshared_key()))) - print('peer[%d].preshared-key-flags: %s' % (i, secret_flags_to_string(peer.get_preshared_key_flags()))) - print('peer[%d].endpoint: %s' % (i, peer.get_endpoint() if peer.get_endpoint() else '')) - print('peer[%d].persistent-keepalive: %s' % (i, peer.get_persistent_keepalive())) - print('peer[%d].allowed-ips: %s' % (i, ','.join([peer.get_allowed_ip(j) for j in range(peer.get_allowed_ips_len())]))) + print("peer[%d].public-key: %s" % (i, peer.get_public_key())) + print( + "peer[%d].preshared-key: %s" + % (i, secret_to_string(peer.get_preshared_key())) + ) + print( + "peer[%d].preshared-key-flags: %s" + % (i, secret_flags_to_string(peer.get_preshared_key_flags())) + ) + print( + "peer[%d].endpoint: %s" + % (i, peer.get_endpoint() if peer.get_endpoint() else "") + ) + print( + "peer[%d].persistent-keepalive: %s" % (i, peer.get_persistent_keepalive()) + ) + print( + "peer[%d].allowed-ips: %s" + % ( + i, + ",".join( + [peer.get_allowed_ip(j) for j in range(peer.get_allowed_ips_len())] + ), + ) + ) + def do_set(nm_client, conn, argv): s_wg = conn.get_setting(NM.SettingWireGuard) @@ -291,9 +353,7 @@ def do_set(nm_client, conn, argv): try: idx = 0 while True: - if peer \ - and ( idx >= len(argv) \ - or argv[idx] == 'peer'): + if peer and (idx >= len(argv) or argv[idx] == "peer"): if peer_remove: pp_peer, pp_idx = s_wg.get_peer_by_public_key(peer.get_public_key()) if pp_peer: @@ -312,29 +372,40 @@ def do_set(nm_client, conn, argv): peer_secret_flags = None if idx >= len(argv): - break; + break - if not peer and argv[idx] == 'private-key': + if not peer and argv[idx] == "private-key": key = argv_get_one(argv, idx + 1, None, idx) - if key == '': + if key == "": s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY, None) else: - s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY, wg_read_private_key(key)) + s_wg.set_property( + NM.SETTING_WIREGUARD_PRIVATE_KEY, wg_read_private_key(key) + ) idx += 2 continue - if not peer and argv[idx] == 'private-key-flags': - s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY_FLAGS, argv_get_one(argv, idx + 1, arg_parse_secret_flags, idx)) + if not peer and argv[idx] == "private-key-flags": + s_wg.set_property( + NM.SETTING_WIREGUARD_PRIVATE_KEY_FLAGS, + argv_get_one(argv, idx + 1, arg_parse_secret_flags, idx), + ) idx += 2 continue - if not peer and argv[idx] == 'listen-port': - s_wg.set_property(NM.SETTING_WIREGUARD_LISTEN_PORT, argv_get_one(argv, idx + 1, arg_parse_listen_port, idx)) + if not peer and argv[idx] == "listen-port": + s_wg.set_property( + NM.SETTING_WIREGUARD_LISTEN_PORT, + argv_get_one(argv, idx + 1, arg_parse_listen_port, idx), + ) idx += 2 continue - if not peer and argv[idx] == 'fwmark': - s_wg.set_property(NM.SETTING_WIREGUARD_FWMARK, argv_get_one(argv, idx + 1, arg_parse_fwmark, idx)) + if not peer and argv[idx] == "fwmark": + s_wg.set_property( + NM.SETTING_WIREGUARD_FWMARK, + argv_get_one(argv, idx + 1, arg_parse_fwmark, idx), + ) idx += 2 continue - if argv[idx] == 'peer': + if argv[idx] == "peer": public_key = argv_get_one(argv, idx + 1, None, idx) peer, peer_idx = s_wg.get_peer_by_public_key(public_key) if peer: @@ -347,13 +418,13 @@ def do_set(nm_client, conn, argv): peer_remove = False idx += 2 continue - if peer and argv[idx] == 'remove': + if peer and argv[idx] == "remove": peer_remove = True idx += 1 continue - if peer and argv[idx] == 'preshared-key': + if peer and argv[idx] == "preshared-key": psk = argv_get_one(argv, idx + 1, None, idx) - if psk == '': + if psk == "": peer.set_preshared_key(None, True) if peer_secret_flags is not None: peer_secret_flags = NM.SettingSecretFlags.NOT_REQUIRED @@ -363,20 +434,26 @@ def do_set(nm_client, conn, argv): peer_secret_flags = NM.SettingSecretFlags.NONE idx += 2 continue - if peer and argv[idx] == 'preshared-key-flags': - peer_secret_flags = argv_get_one(argv, idx + 1, arg_parse_secret_flags, idx) + if peer and argv[idx] == "preshared-key-flags": + peer_secret_flags = argv_get_one( + argv, idx + 1, arg_parse_secret_flags, idx + ) idx += 2 continue - if peer and argv[idx] == 'endpoint': + if peer and argv[idx] == "endpoint": peer.set_endpoint(argv_get_one(argv, idx + 1, None, idx), True) idx += 2 continue - if peer and argv[idx] == 'persistent-keepalive': - peer.set_persistent_keepalive(argv_get_one(argv, idx + 1, arg_parse_persistent_keep_alive, idx)) + if peer and argv[idx] == "persistent-keepalive": + peer.set_persistent_keepalive( + argv_get_one(argv, idx + 1, arg_parse_persistent_keep_alive, idx) + ) idx += 2 continue - if peer and argv[idx] == 'allowed-ips': - allowed_ips = list(argv_get_one(argv, idx + 1, arg_parse_allowed_ips, idx)) + if peer and argv[idx] == "allowed-ips": + allowed_ips = list( + argv_get_one(argv, idx + 1, arg_parse_allowed_ips, idx) + ) peer.clear_allowed_ips() for aip in allowed_ips: peer.append_allowed_ip(aip, False) @@ -386,21 +463,22 @@ def do_set(nm_client, conn, argv): raise MyError('invalid argument "%s"' % (argv[idx])) except MyError as e: - print('Error: %s' % (e.message)) + print("Error: %s" % (e.message)) sys.exit(1) try: conn.commit_changes(True, None) except Exception as e: - print('failure to commit connection: %s' % (e)) + print("failure to commit connection: %s" % (e)) sys.exit(1) - print('Success') + print("Success") sys.exit(0) + ############################################################################### -if __name__ == '__main__': +if __name__ == "__main__": nm_client = NM.Client.new(None) @@ -409,13 +487,17 @@ if __name__ == '__main__': con_spec = None if len(argv) >= 1: - if argv[0] in [ 'id', 'uuid', 'interface' ]: + if argv[0] in ["id", "uuid", "interface"]: con_spec = argv[0] del argv[0] if len(argv) < 1: - print('Requires an existing NetworkManager connection profile as first argument') - print('Select it based on the connection ID, UUID, or interface-name (optionally qualify the selection with [id|uuid|interface])') + print( + "Requires an existing NetworkManager connection profile as first argument" + ) + print( + "Select it based on the connection ID, UUID, or interface-name (optionally qualify the selection with [id|uuid|interface])" + ) print_hint(nm_client) sys.exit(1) con_id = argv[0] @@ -423,19 +505,29 @@ if __name__ == '__main__': connections = connections_find(nm_client.get_connections(), con_spec, con_id) if len(connections) == 0: - print('No matching connection %s\"%s\" found.' % ((con_spec+' ' if con_spec else ''), con_id)) + print( + 'No matching connection %s"%s" found.' + % ((con_spec + " " if con_spec else ""), con_id) + ) print_hint(nm_client) sys.exit(1) if len(connections) > 1: - print("Connection %s\"%s\" is not unique (%s)" % ((con_spec+' ' if con_spec else ''), con_id, ', '.join(['['+connection_to_str(c)+']' for c in connections]))) + print( + 'Connection %s"%s" is not unique (%s)' + % ( + (con_spec + " " if con_spec else ""), + con_id, + ", ".join(["[" + connection_to_str(c) + "]" for c in connections]), + ) + ) if not con_spec: - print('Maybe qualify the name with [id|uuid|interface]?') + print("Maybe qualify the name with [id|uuid|interface]?") sys.exit(1) conn = connections[0] if not connection_is_wireguard(conn): - print('Connection %s is not a WireGuard profile' % (connection_to_str(conn))) - print('See available profiles with `nmcli connection show`') + print("Connection %s is not a WireGuard profile" % (connection_to_str(conn))) + print("See available profiles with `nmcli connection show`") sys.exit(1) try: @@ -449,4 +541,3 @@ if __name__ == '__main__': do_get(nm_client, conn) else: do_set(nm_client, conn, argv) -