
The python example is (also) used to test the feature. That is because currently nmcli does not yet have support for ovs.external-ids and this API is only accessible via D-Bus (or a tool like this example).
664 lines
18 KiB
Python
Executable File
664 lines
18 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# SPDX-License-Identifier: GPL-2.0+
|
|
#
|
|
# Copyright (C) 2017, 2020 Red Hat, Inc.
|
|
#
|
|
|
|
#
|
|
# set and show OVS external-ids for a connection:
|
|
#
|
|
|
|
import sys
|
|
import os
|
|
import re
|
|
import pprint
|
|
import subprocess
|
|
|
|
import gi
|
|
|
|
gi.require_version("NM", "1.0")
|
|
from gi.repository import GLib, NM
|
|
|
|
###############################################################################
|
|
|
|
MODE_GET = "get"
|
|
MODE_SET = "set"
|
|
MODE_APPLY = "apply"
|
|
|
|
|
|
def memoize0(f):
|
|
result = []
|
|
|
|
def helper():
|
|
if len(result) == 0:
|
|
result.append(f())
|
|
return result[0]
|
|
|
|
return helper
|
|
|
|
|
|
def memoize(f):
|
|
memo = {}
|
|
|
|
def helper(x):
|
|
if x not in memo:
|
|
memo[x] = f(x)
|
|
return memo[x]
|
|
|
|
return helper
|
|
|
|
|
|
def pr(v):
|
|
pprint.pprint(v, indent=4, depth=5, width=60)
|
|
|
|
|
|
@memoize0
|
|
def is_libnm_debug():
|
|
return os.getenv("LIBNM_CLIENT_DEBUG") is not None
|
|
|
|
|
|
@memoize0
|
|
def can_sudo():
|
|
try:
|
|
return (
|
|
subprocess.run(
|
|
["sudo", "-n", "true"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
).returncode
|
|
== 0
|
|
)
|
|
except:
|
|
return False
|
|
|
|
|
|
def _print(msg=""):
|
|
if is_libnm_debug():
|
|
# we want to use the same logging mechanism as libnm's debug
|
|
# logging with "LIBNM_CLIENT_DEBUG=trace,stdout".
|
|
NM.utils_print(0, msg + "\n")
|
|
return
|
|
print(msg)
|
|
|
|
|
|
def mainloop_run(timeout_msec=0, mainloop=None):
|
|
if mainloop is None:
|
|
mainloop = GLib.MainLoop()
|
|
|
|
timeout_id = None
|
|
timeout_reached = []
|
|
|
|
if timeout_msec > 0:
|
|
|
|
def _timeout_cb(unused):
|
|
# it can happen that the caller already quit the mainloop
|
|
# otherwise. In that case, we don't want to signal a timeout.
|
|
if mainloop.is_running():
|
|
timeout_reached.append(1)
|
|
mainloop.quit()
|
|
return True
|
|
|
|
timeout_id = GLib.timeout_add(timeout_msec, _timeout_cb, None)
|
|
|
|
mainloop.run()
|
|
if timeout_id:
|
|
GLib.source_remove(timeout_id)
|
|
return not timeout_reached
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
def connection_update2(remote_connection, connection):
|
|
|
|
mainloop = GLib.MainLoop()
|
|
result_error = []
|
|
|
|
def cb(c, result):
|
|
try:
|
|
c.update2_finish(result)
|
|
except Exception as e:
|
|
result_error.append(e)
|
|
mainloop.quit()
|
|
|
|
remote_connection.update2(
|
|
connection.to_dbus(NM.ConnectionSerializationFlags.ALL),
|
|
NM.SettingsUpdate2Flags.NO_REAPPLY,
|
|
None,
|
|
None,
|
|
cb,
|
|
)
|
|
|
|
mainloop_run(mainloop=mainloop)
|
|
|
|
if result_error:
|
|
raise result_error[0]
|
|
|
|
|
|
def device_get_applied_connection(device):
|
|
mainloop = GLib.MainLoop()
|
|
rr = []
|
|
|
|
def cb(c, result):
|
|
try:
|
|
con, version_id = c.get_applied_connection_finish(result)
|
|
except Exception as e:
|
|
rr.append(e)
|
|
else:
|
|
rr.append(con)
|
|
rr.append(version_id)
|
|
mainloop.quit()
|
|
|
|
device.get_applied_connection_async(0, None, cb)
|
|
|
|
mainloop_run(mainloop=mainloop)
|
|
|
|
if len(rr) == 1:
|
|
raise rr[0]
|
|
return rr[0], rr[1]
|
|
|
|
|
|
def device_reapply(device, connection, version_id):
|
|
|
|
mainloop = GLib.MainLoop()
|
|
result_error = []
|
|
|
|
def cb(d, result):
|
|
try:
|
|
d.reapply_finish(result)
|
|
except Exception as e:
|
|
result_error.append(e)
|
|
mainloop.quit()
|
|
|
|
device.reapply_async(connection, version_id, 0, None, cb)
|
|
|
|
mainloop_run(mainloop=mainloop)
|
|
|
|
if len(result_error) == 1:
|
|
raise result_error[0]
|
|
|
|
|
|
def ovs_print_external_ids(prefix):
|
|
if not can_sudo():
|
|
_print(prefix + ": not running as root and cannot call ovs-vsctl")
|
|
return
|
|
|
|
cmds = [["ovs-vsctl", "show"]]
|
|
for typ in ["Bridge", "Port", "Interface"]:
|
|
cmds += [["ovs-vsctl", "--columns=name,external-ids", "list", typ]]
|
|
|
|
out = ""
|
|
for cmd in cmds:
|
|
p = subprocess.run(cmd, stdout=subprocess.PIPE, check=True,)
|
|
out += p.stdout.decode("utf-8") + "\n"
|
|
out = "\n".join([prefix + s for s in out.split("\n")])
|
|
_print(out)
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
def usage():
|
|
_print("%s g[et] PROFILE [ GETTER ]" % (sys.argv[0]))
|
|
_print("%s s[et] PROFILE SETTER [--test]" % (sys.argv[0]))
|
|
_print("%s a[pply] DEVICE SETTER [--test]" % (sys.argv[0]))
|
|
_print(
|
|
" PROFILE := [id | uuid | type] STRING | [ ~id | ~type ] REGEX_STRING | STRING"
|
|
)
|
|
_print(" DEVICE := [iface] STRING")
|
|
_print(" GETTER := ( KEY | ~REGEX_KEY ) [... GETTER]")
|
|
_print(" SETTER := ( + | - | -KEY | [+]KEY VALUE ) [... SETTER]")
|
|
|
|
|
|
def die(msg, show_usage=False):
|
|
_print("FAILED: %s" % (msg))
|
|
if show_usage:
|
|
usage()
|
|
sys.exit(1)
|
|
|
|
|
|
def die_usage(msg):
|
|
die(msg, show_usage=True)
|
|
|
|
|
|
def parse_args(argv):
|
|
had_dash_dash = False
|
|
args = {
|
|
"mode": MODE_GET,
|
|
"select_arg": None,
|
|
"ids_arg": [],
|
|
"do_test": False,
|
|
}
|
|
i = 1
|
|
while i < len(argv):
|
|
a = argv[i]
|
|
|
|
if i == 1:
|
|
if a in ["s", "set"]:
|
|
args["mode"] = MODE_SET
|
|
elif a in ["g", "get"]:
|
|
args["mode"] = MODE_GET
|
|
elif a in ["a", "apply"]:
|
|
args["mode"] = MODE_APPLY
|
|
else:
|
|
die_usage("unexpected mode argument '%s'" % (a))
|
|
i += 1
|
|
continue
|
|
|
|
if a == "--test":
|
|
args["do_test"] = True
|
|
i += 1
|
|
continue
|
|
|
|
if args["select_arg"] is None:
|
|
if args["mode"] == MODE_APPLY:
|
|
possible_selects = ["iface"]
|
|
else:
|
|
possible_selects = ["id", "~id", "uuid", "type", "~type"]
|
|
|
|
if a in possible_selects:
|
|
if i + 1 >= len(argv):
|
|
die_usage("'%s' requires an argument'" % (a))
|
|
args["select_arg"] = (a, argv[i + 1])
|
|
i += 2
|
|
continue
|
|
|
|
if a == "*":
|
|
a = None
|
|
args["select_arg"] = ("*", a)
|
|
i += 1
|
|
continue
|
|
|
|
if args["mode"] == MODE_GET:
|
|
args["ids_arg"].append(a)
|
|
i += 1
|
|
continue
|
|
|
|
if not a:
|
|
die_usage("argument should specify a external-id but is empty string")
|
|
|
|
if a[0] == "-":
|
|
v = (a, None)
|
|
i += 1
|
|
elif a == "+":
|
|
v = (a, None)
|
|
i += 1
|
|
else:
|
|
if a[0] != "+":
|
|
a = "+" + a
|
|
if i + 1 >= len(argv):
|
|
die_usage("'%s' requires an argument'" % (a))
|
|
v = (a, argv[i + 1])
|
|
i += 2
|
|
|
|
args["ids_arg"].append(v)
|
|
|
|
if args["mode"] == MODE_SET:
|
|
if not args["ids_arg"]:
|
|
die_usage("Requires one or more external-ids to set or delete")
|
|
|
|
return args
|
|
|
|
|
|
def device_to_str(device, show_type=False):
|
|
if show_type:
|
|
return "%s (%s)" % (device.get_iface(), device.get_type_desc())
|
|
return "%s" % (device.get_iface(),)
|
|
|
|
|
|
def connection_to_str(connection, show_type=False):
|
|
if show_type:
|
|
return "%s (%s, %s)" % (
|
|
connection.get_id(),
|
|
connection.get_uuid(),
|
|
connection.get_connection_type(),
|
|
)
|
|
return "%s (%s)" % (connection.get_id(), connection.get_uuid())
|
|
|
|
|
|
def devices_filter(devices, select_arg):
|
|
devices = list(sorted(devices, key=device_to_str))
|
|
if not select_arg:
|
|
return devices
|
|
# we preserve the order of the selected devices. And
|
|
# if devices are selected multiple times, we return
|
|
# them multiple times.
|
|
l = []
|
|
f = select_arg
|
|
for d in devices:
|
|
if f[0] == "iface":
|
|
if f[1] == d.get_iface():
|
|
l.append(d)
|
|
else:
|
|
assert f[0] == "*"
|
|
if f[1] is None:
|
|
l.append(d)
|
|
else:
|
|
if f[1] in [d.get_iface()]:
|
|
l.append(d)
|
|
return l
|
|
|
|
|
|
def connections_filter(connections, select_arg):
|
|
connections = list(sorted(connections, key=connection_to_str))
|
|
if not select_arg:
|
|
return connections
|
|
# we preserve the order of the selected connections. And
|
|
# if connections are selected multiple times, we return
|
|
# them multiple times.
|
|
l = []
|
|
f = select_arg
|
|
for c in connections:
|
|
if f[0] == "id":
|
|
if f[1] == c.get_id():
|
|
l.append(c)
|
|
elif f[0] == "~id":
|
|
if re.match(f[1], c.get_id()):
|
|
l.append(c)
|
|
elif f[0] == "uuid":
|
|
if f[1] == c.get_uuid():
|
|
l.append(c)
|
|
elif f[0] == "type":
|
|
if f[1] == c.get_connection_type():
|
|
l.append(c)
|
|
elif f[0] == "~type":
|
|
if re.match(f[1], c.get_connection_type()):
|
|
l.append(c)
|
|
else:
|
|
assert f[0] == "*"
|
|
if f[1] is None:
|
|
l.append(c)
|
|
else:
|
|
if f[1] in [c.get_uuid(), c.get_id()]:
|
|
l.append(c)
|
|
return l
|
|
|
|
|
|
def ids_select(ids, mode, ids_arg):
|
|
ids = list(ids)
|
|
if not ids_arg:
|
|
return (ids, [])
|
|
|
|
keys = set()
|
|
requested = []
|
|
for d in ids_arg:
|
|
if mode == MODE_GET:
|
|
if d[0] == "~":
|
|
r = re.compile(d[1:])
|
|
keys.update([k for k in ids if r.match(k)])
|
|
else:
|
|
keys.update([k for k in ids if k == d])
|
|
if d not in requested:
|
|
requested.append(d)
|
|
else:
|
|
assert mode in [MODE_SET, MODE_APPLY]
|
|
d2 = d[0]
|
|
assert d2[0] in ["-", "+"]
|
|
d3 = d2[1:]
|
|
if d3 in ids:
|
|
keys.add(d3)
|
|
return (list([k for k in ids if k in keys]), requested)
|
|
|
|
|
|
def connection_print(connection, mode, ids_arg, dbus_path, prefix=""):
|
|
sett = connection.get_setting(NM.SettingOvsExternalIDs)
|
|
|
|
if sett is not None:
|
|
all_ids = list(sett.get_data_keys())
|
|
keys, requested = ids_select(all_ids, mode, ids_arg)
|
|
num_str = "%s" % (len(all_ids))
|
|
else:
|
|
keys = []
|
|
requested = []
|
|
num_str = "none"
|
|
|
|
_print(
|
|
"%s%s [%s]" % (prefix, connection_to_str(connection, show_type=True), num_str)
|
|
)
|
|
if dbus_path:
|
|
_print("%s %s" % (prefix, dbus_path))
|
|
if sett is not None:
|
|
dd = sett.get_property(NM.SETTING_OVS_EXTERNAL_IDS_DATA)
|
|
else:
|
|
dd = {}
|
|
for k in keys:
|
|
v = sett.get_data(k)
|
|
assert v is not None
|
|
assert v == dd.get(k, None)
|
|
_print('%s "%s" = "%s"' % (prefix, k, v))
|
|
for k in requested:
|
|
_print('%s "%s" = <unset>' % (prefix, k))
|
|
|
|
|
|
def sett_update(connection, ids_arg):
|
|
|
|
sett = connection.get_setting(NM.SettingOvsExternalIDs)
|
|
|
|
for d in ids_arg:
|
|
op = d[0][0]
|
|
key = d[0][1:]
|
|
val = d[1]
|
|
|
|
oldval = None
|
|
if sett is not None:
|
|
oldval = sett.get_data(key)
|
|
|
|
if op == "-":
|
|
assert val is None
|
|
if key == "":
|
|
if sett is None:
|
|
_print(" DEL: setting (ovs-external-ids group was not present)")
|
|
else:
|
|
connection.remove_setting(NM.SettingOvsExternalIDs)
|
|
sett = None
|
|
_print(" DEL: setting")
|
|
continue
|
|
|
|
if sett is None:
|
|
_print(' DEL: "%s" (ovs-external-ids group was not present)' % (key))
|
|
continue
|
|
if oldval is None:
|
|
_print(' DEL: "%s" (id was unset)' % (key))
|
|
continue
|
|
_print(' DEL: "%s" (id was set to"%s")' % (key, oldval))
|
|
sett.set_data(key, None)
|
|
continue
|
|
|
|
if key == "":
|
|
assert val is None
|
|
if sett is None:
|
|
sett = NM.SettingOvsExternalIDs.new()
|
|
connection.add_setting(sett)
|
|
_print(" SET: setting (external-ids group was added)")
|
|
continue
|
|
|
|
_print(" SET: setting (external-ids group was present)")
|
|
continue
|
|
|
|
assert val is not None
|
|
|
|
if sett is None:
|
|
sett = NM.SettingOvsExternalIDs.new()
|
|
connection.add_setting(sett)
|
|
_print(
|
|
' SET: "%s" = "%s" (external-ids group was not present)' % (key, val)
|
|
)
|
|
elif oldval is None:
|
|
_print(' SET: "%s" = "%s" (new)' % (key, val))
|
|
elif oldval != val:
|
|
_print(' SET: "%s" = "%s" (was "%s")' % (key, val, oldval))
|
|
else:
|
|
_print(' SET: "%s" = "%s" (unchanged)' % (key, val))
|
|
sett.set_data(key, val)
|
|
|
|
|
|
def do_get(connections, ids_arg):
|
|
first_line = True
|
|
for c in connections:
|
|
if first_line:
|
|
first_line = False
|
|
else:
|
|
_print()
|
|
connection_print(c, MODE_GET, ids_arg, dbus_path=c.get_path())
|
|
|
|
|
|
def do_set(nmc, connection, ids_arg, do_test):
|
|
|
|
remote_connection = connection
|
|
connection = NM.SimpleConnection.new_clone(remote_connection)
|
|
|
|
connection_print(
|
|
connection, MODE_SET, [], remote_connection.get_path(), prefix="BEFORE: "
|
|
)
|
|
_print()
|
|
|
|
sett_update(connection, ids_arg)
|
|
|
|
if do_test:
|
|
_print()
|
|
_print("Only show. Run without --test to set")
|
|
return
|
|
|
|
try:
|
|
connection_update2(remote_connection, connection)
|
|
except Exception as e:
|
|
_print()
|
|
_print("FAILURE to commit connection: %s" % (e))
|
|
return
|
|
|
|
# NMClient received the completion of Update2() call. It also received
|
|
# a property changed signal that the profile changed, and it is about
|
|
# to fetch the new value. However, that value is not yet here.
|
|
#
|
|
# libnm should provide a better API for this. For example, not signal
|
|
# completion of update2() until the profile was refetched. Or, indicate
|
|
# that the settings are dirty, so we would know how long to wait.
|
|
#
|
|
# Add an ugly workaround here and wait a bit.
|
|
_print()
|
|
_print("WORKAROUND: wait for connection to change")
|
|
mainloop_run(timeout_msec=500)
|
|
|
|
if remote_connection is not nmc.get_object_by_path(remote_connection.get_path()):
|
|
_print()
|
|
_print(
|
|
"Connection %s no longer exists after commit"
|
|
% (remote_connection.get_path())
|
|
)
|
|
return
|
|
|
|
_print()
|
|
connection_print(
|
|
remote_connection, MODE_SET, [], remote_connection.get_path(), prefix="AFTER: "
|
|
)
|
|
|
|
_print()
|
|
if remote_connection.compare(connection, NM.SettingCompareFlags.EXACT):
|
|
_print("resulting connection is as expected")
|
|
else:
|
|
_print("WARNING: resulting connection is not as expected")
|
|
|
|
|
|
def do_apply(nmc, device, ids_arg, do_test):
|
|
|
|
try:
|
|
connection_orig, version_id = device_get_applied_connection(device)
|
|
except Exception as e:
|
|
_print(
|
|
'failure to get applied connection for %s: %s"' % (device_to_str(device), e)
|
|
)
|
|
die("The device does not seem active? Nothing to reapply")
|
|
|
|
_print(
|
|
"REAPPLY device %s (%s) with connection %s (version-id = %s)"
|
|
% (
|
|
device_to_str(device),
|
|
NM.Object.get_path(device),
|
|
connection_to_str(connection_orig),
|
|
version_id,
|
|
)
|
|
)
|
|
_print()
|
|
|
|
ovs_print_external_ids("BEFORE-OVS-VSCTL: ")
|
|
_print()
|
|
|
|
connection = NM.SimpleConnection.new_clone(connection_orig)
|
|
|
|
connection_print(connection, MODE_APPLY, [], device.get_path(), prefix="BEFORE: ")
|
|
_print()
|
|
|
|
sett_update(connection, ids_arg)
|
|
|
|
if do_test:
|
|
_print()
|
|
_print("Only show. Run without --test to set")
|
|
return
|
|
|
|
_print()
|
|
_print("reapply...")
|
|
|
|
try:
|
|
device_reapply(device, connection, version_id)
|
|
except Exception as e:
|
|
_print()
|
|
_print("FAILURE to commit connection: %s" % (e))
|
|
return
|
|
|
|
try:
|
|
connection_after, version_id = device_get_applied_connection(device)
|
|
except Exception as e:
|
|
_print(
|
|
'failure to get applied connection after reapply for device %s: %s"'
|
|
% (device_to_str(device), e)
|
|
)
|
|
die("FAILURE to get applied connection after reapply")
|
|
|
|
_print()
|
|
connection_print(connection, MODE_APPLY, [], device.get_path(), prefix="AFTER: ")
|
|
_print()
|
|
|
|
ovs_print_external_ids("AFTER-OVS-VSCTL: ")
|
|
|
|
|
|
###############################################################################
|
|
|
|
if __name__ == "__main__":
|
|
|
|
args = parse_args(sys.argv)
|
|
|
|
nmc = NM.Client.new(None)
|
|
|
|
if args["mode"] == MODE_APPLY:
|
|
|
|
devices = devices_filter(nmc.get_devices(), args["select_arg"])
|
|
|
|
if len(devices) != 1:
|
|
_print(
|
|
"To apply the external-ids of a device, exactly one connection must be selected. Instead, %s devices matched ([%s])"
|
|
% (len(devices), ", ".join([device_to_str(c) for c in devices]),)
|
|
)
|
|
die_usage("Select unique device to apply")
|
|
do_apply(nmc, devices[0], args["ids_arg"], do_test=args["do_test"])
|
|
|
|
else:
|
|
|
|
connections = connections_filter(nmc.get_connections(), args["select_arg"])
|
|
|
|
if args["mode"] == MODE_SET:
|
|
if len(connections) != 1:
|
|
_print(
|
|
"To set the external-ids of a connection, exactly one connection must be selected via id|uuid. Instead, %s connection matched ([%s])"
|
|
% (
|
|
len(connections),
|
|
", ".join([connection_to_str(c) for c in connections]),
|
|
)
|
|
)
|
|
die_usage("Select unique connection to set")
|
|
do_set(nmc, connections[0], args["ids_arg"], do_test=args["do_test"])
|
|
else:
|
|
if len(connections) < 1:
|
|
_print("No connection selected for printing the external ids")
|
|
die_usage("Select connection to get")
|
|
do_get(connections, args["ids_arg"])
|