diff --git a/examples/python/gi/nm-up-many.py b/examples/python/gi/nm-up-many.py index 52f981d77..e5faad194 100755 --- a/examples/python/gi/nm-up-many.py +++ b/examples/python/gi/nm-up-many.py @@ -9,21 +9,34 @@ # probably would run the context only at one point as long as # the application is running (from the main function). -import sys import gi +import os +import sys import time gi.require_version("NM", "1.0") from gi.repository import NM, GLib -start_time = time.monotonic() - - class MyError(Exception): pass +NUM_PARALLEL_STARTING = 10 +NUM_PARALLEL_IN_PROGRESS = 50 + +s = os.getenv("NUM_PARALLEL_STARTING") +if s: + NUM_PARALLEL_STARTING = int(s) + +s = os.getenv("NUM_PARALLEL_IN_PROGRESS") +if s: + NUM_PARALLEL_IN_PROGRESS = int(s) + + +start_time = time.monotonic() + + def log(msg): # use nm_utils_print(), so that the log messages are in synch with # LIBNM_CLIENT_DEBUG=trace messages. @@ -142,212 +155,176 @@ def find_connections(nmc, argv): return result -def nmc_activate_start(nmc, con): +class Activation(object): + ACTIVATION_STATE_START = "start" + ACTIVATION_STATE_STARTING = "starting" + ACTIVATION_STATE_WAITING = "waiting" + ACTIVATION_STATE_DONE = "done" - # Call nmc.activate_connection_async() and return a user data - # with the information about the pending operation. + def __init__(self, con): + self.con = con + self.state = Activation.ACTIVATION_STATE_START + self.result_msg = None + self.result_ac = None + self.ac_result = None + self.wait_id = None - activation = { - "con": con, - "result": None, - "result_msg": None, - "result_ac": None, - "ac_result": None, - } + def __str__(self): + return "%s (%s)" % (self.con.get_id(), self.con.get_uuid()) - log("activation %s (%s) start asynchronously" % (con.get_id(), con.get_uuid())) + def is_done(self, log=log): - def cb(source_object, res, activation): - # The callback does not call other code for signaling the - # completion. Instead, we remember in "activation" that - # the callback was completed. - # - # Other code will repeatedly go through the "activation_list" - # and find those that are completed (nmc_activate_find_completed()). - try: - ac = nmc.activate_connection_finish(res) - except Exception as e: - activation["result"] = False - activation["result_msg"] = str(e) - else: - activation["result"] = True - activation["result_msg"] = "success" - activation["result_ac"] = ac + if self.state == Activation.ACTIVATION_STATE_DONE: + return True - nmc.activate_connection_async(con, None, None, None, cb, activation) + if self.state != Activation.ACTIVATION_STATE_WAITING: + return False - return activation + def _log_result(self, msg, done_with_success=False): + log("connection %s done: %s" % (self, msg)) + self.state = Activation.ACTIVATION_STATE_DONE + self.done_with_success = done_with_success + return True + ac = self.result_ac + if not ac: + return _log_result(self, "failed activation call (%s)" % (self.result_msg,)) -def nmc_activate_find_completed(activation_list): + if ac.get_client() is None: + return _log_result(self, "active connection disappeared") - # Iterate over list of "activation" data, find the first - # one that is completed, remove it from the list and return - # it. - - for idx, activation in enumerate(activation_list): - if activation["result"] is not None: - del activation_list[idx] - return activation - - return None - - -def nmc_activate_complete( - nmc, activation_list, completed_list, num_parallel_invocations -): - - # We schedule activations asynchronously and in parallel. However, we - # still want to rate limit the number of parallel activations. This - # function does that: if there are more than "num_parallel_invocations" activations - # in progress, then wait until the excess number of them completed. - # The completed ones move from "activation_list" over to "completed_list". - - completed = 0 - while True: - - need_to_wait = len(activation_list) > num_parallel_invocations - - # Even if we don't need to wait (that is, the list of pending activations - # is reasonably short), we still tentatively iterate the GMainContext a bit. - if not nmc.get_main_context().iteration(may_block=need_to_wait): - if need_to_wait: - continue - # Ok, nothing ready yet. - break - - # this is not efficient after each iteration(), but it's good enough. - # The activation list is supposed to be short. - activation = nmc_activate_find_completed(activation_list) - - if activation is None: - continue - - con = activation["con"] - log( - "activation %s (%s) start complete: %s%s" - % ( - con.get_id(), - con.get_uuid(), - activation["result_msg"], - ( - "" - if not activation["result"] - else (" (%s)" % (activation["result_ac"].get_path())) - ), + if ac.get_state() > NM.ActiveConnectionState.ACTIVATED: + return _log_result( + self, "connection failed to activate (state %s)" % (ac.get_state()) ) - ) - completed += 1 - completed_list.append(activation) + if ac.get_state() == NM.ActiveConnectionState.ACTIVATED: + return _log_result( + self, "connection successfully activated", done_with_success=True + ) - if completed > 0: - log( - "completed %d activations, %d activations still pending" - % (completed, len(activation_list)) + return False + + def start(self, nmc, cancellable=None, activated_callback=None, log=log): + + # Call nmc.activate_connection_async() and return a user data + # with the information about the pending operation. + + assert self.state == Activation.ACTIVATION_STATE_START + + self.state = Activation.ACTIVATION_STATE_STARTING + + log("activation %s start asynchronously" % (self)) + + def cb_activate_connection(source_object, res): + assert self.state == Activation.ACTIVATION_STATE_STARTING + try: + ac = nmc.activate_connection_finish(res) + except Exception as e: + self.result_msg = str(e) + log( + "activation %s started asynchronously failed: %s" + % (self, self.result_msg) + ) + else: + self.result_msg = "success" + self.result_ac = ac + log( + "activation %s started asynchronously success: %s" + % (self, ac.get_path()) + ) + self.state = Activation.ACTIVATION_STATE_WAITING + if activated_callback is not None: + activated_callback(self) + + nmc.activate_connection_async( + self.con, None, None, cancellable, cb_activate_connection ) + def wait(self, done_callback=None, log=log): -def nmc_activate_all(nmc, cons): + assert self.state == Activation.ACTIVATION_STATE_WAITING + assert self.result_ac + assert self.wait_id is None - # iterate of all connections ("cons") and activate them - # in parallel. nmc_activate_complete() is used to rate limits - # how many parallel invocations we allow. + def cb_wait(ac, state): + if self.is_done(log=log): + self.result_ac.disconnect(self.wait_id) + self.wait_id = None + done_callback(self) - num_parallel_invocations = 100 - - activation_list = [] - completed_list = [] - for c in cons: - activation = nmc_activate_start(nmc, c) - activation_list.append(activation) - nmc_activate_complete( - nmc, activation_list, completed_list, num_parallel_invocations - ) - nmc_activate_complete(nmc, activation_list, completed_list, 0) - assert not activation_list - assert len(completed_list) == len(cons) - - return completed_list + log("waiting for %s to fully activate" % (self)) + self.wait_id = self.result_ac.connect("notify", cb_wait) -def nmc_activate_wait_for_pending(nmc, completed_list): +class Manager(object): + def __init__(self, nmc, cons): - # go through the list of activations and wait that they - # all reach a final state. That is, either that they are failed - # or fully ACTIVATED state. + self.nmc = nmc - log("wait for all active connection to either reach ACTIVATED state or fail...") + self.ac_start = [Activation(c) for c in cons] + self.ac_starting = [] + self.ac_waiting = [] + self.ac_done = [] - def log_result(activation, message): - activation["ac_result"] = message - log( - "connection %s (%s) activation fully completed: %s" - % (ac.get_id(), ac.get_uuid(), message) - ) + def _log(self, msg): - while True: + lists = [self.ac_start, self.ac_starting, self.ac_waiting, self.ac_done] - # again, it's not efficient to check the entire list for completion - # after each g_main_context_iteration(). But "completed_list" should - # be reasonably small. + n = sum(len(l) for l in lists) + n = str(len(str(n))) - activation = None - for idx, activ in enumerate(completed_list): - if activ["ac_result"] is not None: - continue - if activ["result"] is False: - log_result(activ, "failed to start activation") - continue - ac = activ["result_ac"] - if ac.get_client() is None: - log_result(activ, "active connection disappeared") - continue - if ac.get_state() == NM.ActiveConnectionState.ACTIVATED: - log_result(activ, "connection successfully activated") - continue - if ac.get_state() > NM.ActiveConnectionState.ACTIVATED: - log_result( - activ, "connection failed to activate (state %s)" % (ac.get_state()) + prefix = "/".join((("%0" + n + "d") % len(l)) for l in lists) + log("%s: %s" % (prefix, msg)) + + def ac_run(self): + + loop = GLib.MainLoop(self.nmc.get_main_context()) + + while self.ac_start or self.ac_starting or self.ac_waiting: + + rate_limit_parallel_in_progress = ( + len(self.ac_starting) + len(self.ac_waiting) >= NUM_PARALLEL_IN_PROGRESS + ) + + if ( + not rate_limit_parallel_in_progress + and self.ac_start + and len(self.ac_starting) < NUM_PARALLEL_STARTING + ): + activation = self.ac_start.pop(0) + self.ac_starting.append(activation) + + def cb_activated(activation2): + self.ac_starting.remove(activation2) + if activation2.is_done(log=self._log): + self.ac_done.append(activation2) + else: + self.ac_waiting.append(activation2) + + def cb_done(activation3): + self.ac_waiting.remove(activation3) + self.ac_done.append(activation3) + loop.quit() + + activation2.wait(done_callback=cb_done, log=self._log) + loop.quit() + + activation.start( + self.nmc, activated_callback=cb_activated, log=self._log ) continue - activation = activ - break - if activation is None: - log("no more activation to wait for") - break + loop.run() - nmc.get_main_context().iteration(may_block=True) + res_list = [ac.done_with_success for ac in self.ac_done] + log( + "%s out of %s activations are now successfully activated" + % (sum(res_list), len(self.ac_done)) + ) -def nmc_activate_check_good(nmc, completed_list): - - # go through the list of activations and check that all of them are - # in a good state. - - n_good = 0 - n_bad = 0 - - for activ in completed_list: - if activ["result"] is False: - n_bad += 1 - continue - ac = activ["result_ac"] - if ac.get_client() is None: - n_bad += 1 - continue - if ac.get_state() != NM.ActiveConnectionState.ACTIVATED: - n_bad += 1 - continue - n_good += 1 - - log( - "%d out of %d activations are now successfully activated" - % (n_good, n_good + n_bad) - ) - - return n_bad == 0 + return all(res_list) def main(): @@ -355,11 +332,7 @@ def main(): cons = find_connections(nmc, sys.argv[1:]) - completed_list = nmc_activate_all(nmc, cons) - - nmc_activate_wait_for_pending(nmc, completed_list) - - all_good = nmc_activate_check_good(nmc, completed_list) + all_good = Manager(nmc, cons).ac_run() nmc_transfer_ref = [nmc] del nmc