clients/tests: don't wait for first job before scheduling parallel jobs

Previously, the test would kick off 15 processes in parallel, but
the first job in the queue would block more processes from being
started.

That is, async_start() would only start 15 processes, but since none of
them were reaped before async_wait() was called, no more than 15 jobs
were running during the start phase. That is not a real issue, because
the start phase is non-blocking and queues all the jobs quickly. It's
not really expected that during that time many processes already completed.
Anyway, this was a bit ugly.

The bigger problem is that async_wait() would always block for the
first job to complete, before starting more processes. That means,
if the first job in the queue takes unusually long, then this blocks
other processes from getting reaped and new processes from being
started.

Instead, don't block only one one jobs, but poll them in turn for a
short amount of time. Whichever process exits first will be completed
and more jobs will be started.

In fact, in the current setup it's hard to notice any difference,
because all nmcli invocations take about the same time and are
relatively fast. That this approach parallelizes better can be seen
when the runtime of jobs varies stronger (and some invocations take
a notably longer time). As we later want to run nmcli under valgrind,
this probably will make a difference.

An alternative would be not to poll()/wait() for child processes,
but somehow get notified. For example, we could use a GMainContext
and watch child processes. But that's probably more complicated
to do, so let's keep the naive approach with polling.
This commit is contained in:
Thomas Haller
2019-10-12 11:02:21 +02:00
parent facfc94744
commit bb4b749595

View File

@@ -93,6 +93,7 @@ import shlex
import re
import dbus
import time
import random
import dbus.service
import dbus.mainloop.glib
@@ -171,20 +172,50 @@ class Util:
return "'" + s.replace("'", "'\"'\"'") + "'"
@staticmethod
def popen_wait(p, timeout = None):
# wait() has a timeout argument only since 3.3
def popen_wait(p, timeout = 0):
if Util.python_has_version(3, 3):
return p.wait(timeout)
if timeout is None:
return p.wait()
if timeout == 0:
return p.poll()
try:
return p.wait(timeout)
except subprocess.TimeoutExpired:
return None
start = NM.utils_get_timestamp_msec()
while True:
if p.poll() is not None:
return p.returncode
if start + (timeout * 1000) < NM.utils_get_timestamp_msec():
raise Exception("timeout expired")
if timeout == 0 or start + (timeout * 1000) < NM.utils_get_timestamp_msec():
return None
time.sleep(0.05)
@staticmethod
def random_job(jobs):
jobs = list(jobs)
l = len(jobs)
t = l * (l + 1) / 2
while True:
# we return a random jobs from the list, but the indexes at the front of
# the list are more likely. The idea is, that those jobs were started first,
# and are expected to complete first. As we poll, we want to check more frequently
# on the elements at the beginning of the list...
#
# Let's assign probabilities with an arithmetic series.
# That is, if there are 16 jobs, then the first gets weighted
# with 16, the second with 15, then 14, and so on, until the
# last has weight 1. That means, the first element is 16 times
# more probable than the last.
# Element at idx (starting with 0) is picked with probability
# 1 / (l*(l+1)/2) * (l - idx)
r = random.random() * t
idx = 0
rx = 0
while True:
rx += (l - idx)
if rx >= r or idx == l - 1:
yield jobs[idx]
break
idx += 1
@staticmethod
def iter_single(itr, min_num = 1, max_num = 1):
itr = list(itr)
@@ -336,7 +367,7 @@ class NMStubServer:
if (NM.utils_get_timestamp_msec() - start) >= 4000:
p.stdin.close()
p.kill()
Util.popen_wait(p, 1000)
Util.popen_wait(p, 1)
raise Exception("after starting stub service the D-Bus name was not claimed in time")
self._nmobj = nmobj
@@ -344,14 +375,17 @@ class NMStubServer:
self._p = p
def shutdown(self):
conn = self._conn
p = self._p
self._nmobj = None
self._nmiface = None
self._conn = None
self._p.stdin.close()
self._p.kill()
Util.popen_wait(self._p, 1000)
self._p = None
if self._conn_get_main_object(self._conn) is not None:
p.stdin.close()
p.kill()
if Util.popen_wait(p, 1) is None:
raise Exception("Stub service did not exit in time")
if self._conn_get_main_object(conn) is not None:
raise Exception("Stub service is not still here although it should shut down")
class _MethodProxy:
@@ -409,51 +443,64 @@ class AsyncProcess():
def __init__(self,
args,
env,
complete_cb):
self._args = args
complete_cb,
max_waittime_msec = 2000):
self._args = list(args)
self._env = env
self._complete_cb = complete_cb
self._max_waittime_msec = max_waittime_msec
def start(self):
if not hasattr(self, '_p'):
self._p_start_timestamp = NM.utils_get_timestamp_msec()
self._p = subprocess.Popen(self._args,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
env = self._env)
def wait(self):
def _timeout_remaining_time(self):
# note that we call this during poll() and wait_and_complete().
# we don't know the exact time when the process terminated,
# so this is only approximately correct, if we call poll/wait
# frequently.
# Worst case, we will think that the process did not time out,
# when in fact it was running longer than max-waittime.
return self._max_waittime_msec - (NM.utils_get_timestamp_msec() - self._p_start_timestamp)
def poll(self, timeout = 0):
self.start()
error = False
try:
Util.popen_wait(self._p, 2000)
except Exception as e:
error = True
raise e
finally:
(returncode, stdout, stderr) = (self._p.returncode,
self._p.stdout.read(),
self._p.stderr.read())
return_code = Util.popen_wait(self._p, timeout)
if return_code is not None \
and self._timeout_remaining_time() <= 0:
raise Exception("process is still running after timeout: %s" % (' '.join(self._args)))
return return_code
self._p.stdout.close()
self._p.stderr.close()
self._p = None
def wait_and_complete(self):
self.start()
if error:
print(stdout)
print(stderr)
p = self._p
self._p = None
try:
self._complete_cb(self, returncode, stdout, stderr)
except Exception as e:
raise e
return_code = Util.popen_wait(p, max(0, self._timeout_remaining_time()) / 1000)
(stdout, stderr) = (p.stdout.read(), p.stderr.read())
p.stdout.close()
p.stderr.close()
if return_code is None:
print(stdout)
print(stderr)
raise Exception("process did not complete in time: %s" % (' '.join(self._args)))
self._complete_cb(self, return_code, stdout, stderr)
###############################################################################
class NmTestBase(unittest.TestCase):
pass
MAX_JOBS = 15
class TestNmcli(NmTestBase):
@staticmethod
@@ -638,6 +685,9 @@ class TestNmcli(NmTestBase):
if expected_stderr is _DEFAULT_ARG:
expected_stderr = None
results_idx = len(self._results)
self._results.append(None)
def complete_cb(async_job,
returncode,
stdout,
@@ -699,11 +749,11 @@ class TestNmcli(NmTestBase):
content = ('size: %s\n' % (len(content))).encode('utf8') + \
content
self._results.append({
self._results[results_idx] = {
'test_name' : test_name,
'ignore_l10n_diff' : ignore_l10n_diff,
'content' : content,
})
}
async_job = AsyncProcess(args = args,
env = env,
@@ -711,20 +761,46 @@ class TestNmcli(NmTestBase):
self._async_jobs.append(async_job)
if sync_barrier:
self.async_wait()
else:
self.async_start()
self.async_start(wait_all = sync_barrier)
def async_start(self):
# limit number parallel running jobs
for async_job in self._async_jobs[0:15]:
async_job.start()
def async_start(self, wait_all = False):
while True:
while True:
for async_job in list(self._async_jobs[0:MAX_JOBS]):
async_job.start()
# start up to MAX_JOBS jobs, but poll() and complete those
# that are already exited. Retry, until there are no more
# jobs to start, or until MAX_JOBS are running.
jobs_running = []
for async_job in list(self._async_jobs[0:MAX_JOBS]):
if async_job.poll() is not None:
self._async_jobs.remove(async_job)
async_job.wait_and_complete()
continue
jobs_running.append(async_job)
if len(jobs_running) >= len(self._async_jobs):
break
if len(jobs_running) >= MAX_JOBS:
break
if not jobs_running:
return
if not wait_all:
return
# in a loop, indefinitely poll the running jobs until we find one that
# completes. Note that poll() itself will raise an exception if a
# jobs times out.
for async_job in Util.random_job(jobs_running):
if async_job.poll(timeout = 0.03) is not None:
self._async_jobs.remove(async_job)
async_job.wait_and_complete()
break
def async_wait(self):
while self._async_jobs:
self.async_start()
self._async_jobs.pop(0).wait()
return self.async_start(wait_all = True)
def _nm_test_pre(self):
self._calling_num = {}