
The base image for the "check-tree" test got bumped to Fedora 39. This brings a new python-black version (23.7.0 vs. 22.8.0) and requires reformatting. Maybe we should stick to 22.8.0, via `pip install`. But it seems better to just follow the latest black version (the one from current Fedora). So do the reformatting instead. https://black.readthedocs.io/en/stable/change_log.html#id38
336 lines
9.9 KiB
Python
Executable File
336 lines
9.9 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
|
|
# A example script to activate many profiles in parallel.
|
|
#
|
|
# It uses entirely asynchronous API. At various points the
|
|
# script explicitly iterates the main context, which is unlike
|
|
# a more complex application that uses the GMainContext, which
|
|
# probably would run the context only at one point as long as
|
|
# the application is running (from the main function).
|
|
|
|
import gi
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
gi.require_version("NM", "1.0")
|
|
from gi.repository import NM, GLib
|
|
|
|
|
|
class MyError(Exception):
|
|
pass
|
|
|
|
|
|
NUM_PARALLEL_STARTING = 10
|
|
NUM_PARALLEL_IN_PROGRESS = 50
|
|
|
|
s = os.getenv("NUM_PARALLEL_STARTING")
|
|
if s:
|
|
NUM_PARALLEL_STARTING = int(s)
|
|
|
|
s = os.getenv("NUM_PARALLEL_IN_PROGRESS")
|
|
if s:
|
|
NUM_PARALLEL_IN_PROGRESS = int(s)
|
|
|
|
|
|
start_time = time.monotonic()
|
|
|
|
|
|
def log(msg):
|
|
# use nm_utils_print(), so that the log messages are in synch with
|
|
# LIBNM_CLIENT_DEBUG=trace messages.
|
|
NM.utils_print(0, "[%015.10f] %s\n" % (time.monotonic() - start_time, msg))
|
|
|
|
|
|
def nmc_new(io_priority=GLib.PRIORITY_DEFAULT, cancellable=None):
|
|
# create a NMClient instance using the async initialization
|
|
# (but the function itself iterates the main context until
|
|
# the initialization completes).
|
|
|
|
result = []
|
|
|
|
def cb(source_object, res):
|
|
try:
|
|
source_object.init_finish(res)
|
|
except Exception as e:
|
|
result.append(e)
|
|
else:
|
|
result.append(None)
|
|
|
|
nmc = NM.Client()
|
|
nmc.init_async(io_priority, cancellable, cb)
|
|
while not result:
|
|
nmc.get_main_context().iteration(may_block=True)
|
|
|
|
if result[0]:
|
|
raise result[0]
|
|
|
|
log("initialized NMClient cache")
|
|
|
|
return nmc
|
|
|
|
|
|
def nmc_destroy(nmc_transfer_ref):
|
|
# Just for fun, show how to completely cleanup a NMClient instance.
|
|
# An NMClient instance registers D-Bus signals and unrefing the instance
|
|
# will cancel/unsubscribe those signals, but there might still be some
|
|
# pending operations scheduled on the main context. That means, after
|
|
# unrefing the NMClient instance, we may need to iterate the GMainContext
|
|
# a bit longer, go get rid of all resources (otherwise, the GMainContext
|
|
# itself cannot be destroyed and leaks).
|
|
#
|
|
# We can use nm_client_get_context_busy_watcher() for that, by subscribing
|
|
# a weak reference and iterating the context as long as the object is
|
|
# alive.
|
|
|
|
nmc = nmc_transfer_ref[0]
|
|
del nmc_transfer_ref[0]
|
|
|
|
alive = [1]
|
|
|
|
def weak_ref_cb(alive):
|
|
del alive[0]
|
|
|
|
nmc.get_context_busy_watcher().weak_ref(weak_ref_cb, alive)
|
|
main_context = nmc.get_main_context()
|
|
|
|
del nmc
|
|
|
|
while alive:
|
|
main_context.iteration(may_block=True)
|
|
|
|
log("NMClient instance cleaned up")
|
|
|
|
|
|
def find_connections(nmc, argv):
|
|
# parse the inpurt argv and select the connection profiles to activate.
|
|
# The arguments are either "connection.id" or "connection.uuid", possibly
|
|
# qualified by "id" or "uuid".
|
|
|
|
result = []
|
|
|
|
while True:
|
|
if not argv:
|
|
break
|
|
arg_type = argv.pop(0)
|
|
if arg_type in ["id", "uuid"]:
|
|
if not argv:
|
|
raise MyError('missing specifier after "%s"' % (arg_type))
|
|
arg_param = argv.pop(0)
|
|
else:
|
|
arg_param = arg_type
|
|
arg_type = "*"
|
|
|
|
cc = []
|
|
for c in nmc.get_connections():
|
|
if arg_type in ["id", "*"] and arg_param == c.get_id():
|
|
cc.append(c)
|
|
if arg_type in ["uuid", "*"] and arg_param == c.get_uuid():
|
|
cc.append(c)
|
|
|
|
if not cc:
|
|
raise MyError(
|
|
'Could not find a matching connection "%s" "%s"' % (arg_type, arg_param)
|
|
)
|
|
if len(cc) > 1:
|
|
raise MyError(
|
|
'Could not find a unique matching connection "%s" "%s", instead %d profiles found'
|
|
% (arg_type, arg_param, len(cc))
|
|
)
|
|
|
|
if cc[0] not in result:
|
|
# we allow duplicates, but combine them.
|
|
result.extend(cc)
|
|
|
|
for c in result:
|
|
log(
|
|
"requested connection: %s (%s) (%s)"
|
|
% (c.get_id(), c.get_uuid(), c.get_path())
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
class Activation(object):
|
|
ACTIVATION_STATE_START = "start"
|
|
ACTIVATION_STATE_STARTING = "starting"
|
|
ACTIVATION_STATE_WAITING = "waiting"
|
|
ACTIVATION_STATE_DONE = "done"
|
|
|
|
def __init__(self, con):
|
|
self.con = con
|
|
self.state = Activation.ACTIVATION_STATE_START
|
|
self.result_msg = None
|
|
self.result_ac = None
|
|
self.ac_result = None
|
|
self.wait_id = None
|
|
|
|
def __str__(self):
|
|
return "%s (%s)" % (self.con.get_id(), self.con.get_uuid())
|
|
|
|
def is_done(self, log=log):
|
|
if self.state == Activation.ACTIVATION_STATE_DONE:
|
|
return True
|
|
|
|
if self.state != Activation.ACTIVATION_STATE_WAITING:
|
|
return False
|
|
|
|
def _log_result(self, msg, done_with_success=False):
|
|
log("connection %s done: %s" % (self, msg))
|
|
self.state = Activation.ACTIVATION_STATE_DONE
|
|
self.done_with_success = done_with_success
|
|
return True
|
|
|
|
ac = self.result_ac
|
|
if not ac:
|
|
return _log_result(self, "failed activation call (%s)" % (self.result_msg,))
|
|
|
|
if ac.get_client() is None:
|
|
return _log_result(self, "active connection disappeared")
|
|
|
|
if ac.get_state() > NM.ActiveConnectionState.ACTIVATED:
|
|
return _log_result(
|
|
self, "connection failed to activate (state %s)" % (ac.get_state())
|
|
)
|
|
|
|
if ac.get_state() == NM.ActiveConnectionState.ACTIVATED:
|
|
return _log_result(
|
|
self, "connection successfully activated", done_with_success=True
|
|
)
|
|
|
|
return False
|
|
|
|
def start(self, nmc, cancellable=None, activated_callback=None, log=log):
|
|
# Call nmc.activate_connection_async() and return a user data
|
|
# with the information about the pending operation.
|
|
|
|
assert self.state == Activation.ACTIVATION_STATE_START
|
|
|
|
self.state = Activation.ACTIVATION_STATE_STARTING
|
|
|
|
log("activation %s start asynchronously" % (self))
|
|
|
|
def cb_activate_connection(source_object, res):
|
|
assert self.state == Activation.ACTIVATION_STATE_STARTING
|
|
try:
|
|
ac = nmc.activate_connection_finish(res)
|
|
except Exception as e:
|
|
self.result_msg = str(e)
|
|
log(
|
|
"activation %s started asynchronously failed: %s"
|
|
% (self, self.result_msg)
|
|
)
|
|
else:
|
|
self.result_msg = "success"
|
|
self.result_ac = ac
|
|
log(
|
|
"activation %s started asynchronously success: %s"
|
|
% (self, ac.get_path())
|
|
)
|
|
self.state = Activation.ACTIVATION_STATE_WAITING
|
|
if activated_callback is not None:
|
|
activated_callback(self)
|
|
|
|
nmc.activate_connection_async(
|
|
self.con, None, None, cancellable, cb_activate_connection
|
|
)
|
|
|
|
def wait(self, done_callback=None, log=log):
|
|
assert self.state == Activation.ACTIVATION_STATE_WAITING
|
|
assert self.result_ac
|
|
assert self.wait_id is None
|
|
|
|
def cb_wait(ac, state):
|
|
if self.is_done(log=log):
|
|
self.result_ac.disconnect(self.wait_id)
|
|
self.wait_id = None
|
|
done_callback(self)
|
|
|
|
log("waiting for %s to fully activate" % (self))
|
|
self.wait_id = self.result_ac.connect("notify", cb_wait)
|
|
|
|
|
|
class Manager(object):
|
|
def __init__(self, nmc, cons):
|
|
self.nmc = nmc
|
|
|
|
self.ac_start = [Activation(c) for c in cons]
|
|
self.ac_starting = []
|
|
self.ac_waiting = []
|
|
self.ac_done = []
|
|
|
|
def _log(self, msg):
|
|
lists = [self.ac_start, self.ac_starting, self.ac_waiting, self.ac_done]
|
|
|
|
n = sum(len(l) for l in lists)
|
|
n = str(len(str(n)))
|
|
|
|
prefix = "/".join((("%0" + n + "d") % len(l)) for l in lists)
|
|
log("%s: %s" % (prefix, msg))
|
|
|
|
def ac_run(self):
|
|
loop = GLib.MainLoop(self.nmc.get_main_context())
|
|
|
|
while self.ac_start or self.ac_starting or self.ac_waiting:
|
|
rate_limit_parallel_in_progress = (
|
|
len(self.ac_starting) + len(self.ac_waiting) >= NUM_PARALLEL_IN_PROGRESS
|
|
)
|
|
|
|
if (
|
|
not rate_limit_parallel_in_progress
|
|
and self.ac_start
|
|
and len(self.ac_starting) < NUM_PARALLEL_STARTING
|
|
):
|
|
activation = self.ac_start.pop(0)
|
|
self.ac_starting.append(activation)
|
|
|
|
def cb_activated(activation2):
|
|
self.ac_starting.remove(activation2)
|
|
if activation2.is_done(log=self._log):
|
|
self.ac_done.append(activation2)
|
|
else:
|
|
self.ac_waiting.append(activation2)
|
|
|
|
def cb_done(activation3):
|
|
self.ac_waiting.remove(activation3)
|
|
self.ac_done.append(activation3)
|
|
loop.quit()
|
|
|
|
activation2.wait(done_callback=cb_done, log=self._log)
|
|
loop.quit()
|
|
|
|
activation.start(
|
|
self.nmc, activated_callback=cb_activated, log=self._log
|
|
)
|
|
continue
|
|
|
|
loop.run()
|
|
|
|
res_list = [ac.done_with_success for ac in self.ac_done]
|
|
|
|
log(
|
|
"%s out of %s activations are now successfully activated"
|
|
% (sum(res_list), len(self.ac_done))
|
|
)
|
|
|
|
return all(res_list)
|
|
|
|
|
|
def main():
|
|
nmc = nmc_new()
|
|
|
|
cons = find_connections(nmc, sys.argv[1:])
|
|
|
|
all_good = Manager(nmc, cons).ac_run()
|
|
|
|
nmc_transfer_ref = [nmc]
|
|
del nmc
|
|
nmc_destroy(nmc_transfer_ref)
|
|
|
|
sys.exit(0 if all_good else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|