tests/client: split out nmcli specific bits into a separate cass
The mock service is more widely useful -- in particular for testing nm-cloud-setup in a following commit. Split the commonly useful parts into TestNmClient class.
This commit is contained in:
@@ -835,7 +835,7 @@ class AsyncProcess:
|
||||
MAX_JOBS = 15
|
||||
|
||||
|
||||
class TestNmcli(unittest.TestCase):
|
||||
class TestNmClient(unittest.TestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._calling_num = {}
|
||||
self._skip_test_for_l10n_diff = []
|
||||
@@ -888,131 +888,6 @@ class TestNmcli(unittest.TestCase):
|
||||
|
||||
return content_expect, results_expect
|
||||
|
||||
def nmcli_construct_argv(self, args, with_valgrind=None):
|
||||
|
||||
if with_valgrind is None:
|
||||
with_valgrind = conf.get(ENV_NM_TEST_VALGRIND)
|
||||
|
||||
valgrind_log = None
|
||||
cmd = conf.get(ENV_NM_TEST_CLIENT_NMCLI_PATH)
|
||||
if with_valgrind:
|
||||
valgrind_log = tempfile.mkstemp(prefix="nm-test-client-valgrind.")
|
||||
argv = [
|
||||
"valgrind",
|
||||
"--quiet",
|
||||
"--error-exitcode=37",
|
||||
"--leak-check=full",
|
||||
"--gen-suppressions=all",
|
||||
(
|
||||
"--suppressions="
|
||||
+ PathConfiguration.top_srcdir()
|
||||
+ "/valgrind.suppressions"
|
||||
),
|
||||
"--num-callers=100",
|
||||
"--log-file=" + valgrind_log[1],
|
||||
cmd,
|
||||
]
|
||||
libtool = conf.get(ENV_LIBTOOL)
|
||||
if libtool:
|
||||
argv = list(libtool) + ["--mode=execute"] + argv
|
||||
else:
|
||||
argv = [cmd]
|
||||
|
||||
argv.extend(args)
|
||||
return argv, valgrind_log
|
||||
|
||||
def call_nmcli_l(
|
||||
self,
|
||||
args,
|
||||
check_on_disk=_DEFAULT_ARG,
|
||||
fatal_warnings=_DEFAULT_ARG,
|
||||
expected_returncode=_DEFAULT_ARG,
|
||||
expected_stdout=_DEFAULT_ARG,
|
||||
expected_stderr=_DEFAULT_ARG,
|
||||
replace_stdout=None,
|
||||
replace_stderr=None,
|
||||
replace_cmd=None,
|
||||
sort_lines_stdout=False,
|
||||
extra_env=None,
|
||||
sync_barrier=False,
|
||||
):
|
||||
frame = sys._getframe(1)
|
||||
for lang in ["C", "pl"]:
|
||||
self._call_nmcli(
|
||||
args,
|
||||
lang,
|
||||
check_on_disk,
|
||||
fatal_warnings,
|
||||
expected_returncode,
|
||||
expected_stdout,
|
||||
expected_stderr,
|
||||
replace_stdout,
|
||||
replace_stderr,
|
||||
replace_cmd,
|
||||
sort_lines_stdout,
|
||||
extra_env,
|
||||
sync_barrier,
|
||||
frame,
|
||||
)
|
||||
|
||||
def call_nmcli(
|
||||
self,
|
||||
args,
|
||||
langs=None,
|
||||
lang=None,
|
||||
check_on_disk=_DEFAULT_ARG,
|
||||
fatal_warnings=_DEFAULT_ARG,
|
||||
expected_returncode=_DEFAULT_ARG,
|
||||
expected_stdout=_DEFAULT_ARG,
|
||||
expected_stderr=_DEFAULT_ARG,
|
||||
replace_stdout=None,
|
||||
replace_stderr=None,
|
||||
replace_cmd=None,
|
||||
sort_lines_stdout=False,
|
||||
extra_env=None,
|
||||
sync_barrier=None,
|
||||
):
|
||||
|
||||
frame = sys._getframe(1)
|
||||
|
||||
if langs is not None:
|
||||
assert lang is None
|
||||
else:
|
||||
if lang is None:
|
||||
lang = "C"
|
||||
langs = [lang]
|
||||
|
||||
if sync_barrier is None:
|
||||
sync_barrier = len(langs) == 1
|
||||
|
||||
for lang in langs:
|
||||
self._call_nmcli(
|
||||
args,
|
||||
lang,
|
||||
check_on_disk,
|
||||
fatal_warnings,
|
||||
expected_returncode,
|
||||
expected_stdout,
|
||||
expected_stderr,
|
||||
replace_stdout,
|
||||
replace_stderr,
|
||||
replace_cmd,
|
||||
sort_lines_stdout,
|
||||
extra_env,
|
||||
sync_barrier,
|
||||
frame,
|
||||
)
|
||||
|
||||
def call_nmcli_pexpect(self, args):
|
||||
|
||||
env = self._env(extra_env={"NO_COLOR": "1"})
|
||||
argv, valgrind_log = self.nmcli_construct_argv(args)
|
||||
|
||||
pexp = pexpect.spawn(argv[0], argv[1:], timeout=10, env=env)
|
||||
|
||||
typ = collections.namedtuple("CallNmcliPexpect", ["pexp", "valgrind_log"])
|
||||
return typ(pexp, valgrind_log)
|
||||
|
||||
def _env(
|
||||
self, lang="C", calling_num=None, fatal_warnings=_DEFAULT_ARG, extra_env=None
|
||||
):
|
||||
@@ -1053,184 +928,47 @@ class TestNmcli(unittest.TestCase):
|
||||
env[k] = v
|
||||
return env
|
||||
|
||||
def _call_nmcli(
|
||||
self,
|
||||
args,
|
||||
lang,
|
||||
check_on_disk,
|
||||
fatal_warnings,
|
||||
expected_returncode,
|
||||
expected_stdout,
|
||||
expected_stderr,
|
||||
replace_stdout,
|
||||
replace_stderr,
|
||||
replace_cmd,
|
||||
sort_lines_stdout,
|
||||
extra_env,
|
||||
sync_barrier,
|
||||
frame,
|
||||
):
|
||||
def cmd_construct_argv(self, cmd_path, args, with_valgrind=None):
|
||||
|
||||
if sync_barrier:
|
||||
self.async_wait()
|
||||
if with_valgrind is None:
|
||||
with_valgrind = conf.get(ENV_NM_TEST_VALGRIND)
|
||||
|
||||
calling_fcn = frame.f_code.co_name
|
||||
calling_num = self._calling_num.get(calling_fcn, 0) + 1
|
||||
self._calling_num[calling_fcn] = calling_num
|
||||
|
||||
test_name = "%s-%03d" % (calling_fcn, calling_num)
|
||||
|
||||
# we cannot use frame.f_code.co_filename directly, because it might be different depending
|
||||
# on where the file lies and which is CWD. We still want to give the location of
|
||||
# the file, so that the user can easier find the source (when looking at the .expected files)
|
||||
self.assertTrue(
|
||||
os.path.abspath(frame.f_code.co_filename).endswith(
|
||||
"/" + PathConfiguration.canonical_script_filename()
|
||||
)
|
||||
)
|
||||
|
||||
if conf.get(ENV_NM_TEST_WITH_LINENO):
|
||||
calling_location = "%s:%d:%s()/%d" % (
|
||||
PathConfiguration.canonical_script_filename(),
|
||||
frame.f_lineno,
|
||||
frame.f_code.co_name,
|
||||
calling_num,
|
||||
)
|
||||
valgrind_log = None
|
||||
cmd = conf.get(cmd_path)
|
||||
if with_valgrind:
|
||||
valgrind_log = tempfile.mkstemp(prefix="nm-test-client-valgrind.")
|
||||
argv = [
|
||||
"valgrind",
|
||||
"--quiet",
|
||||
"--error-exitcode=37",
|
||||
"--leak-check=full",
|
||||
"--gen-suppressions=all",
|
||||
(
|
||||
"--suppressions="
|
||||
+ PathConfiguration.top_srcdir()
|
||||
+ "/valgrind.suppressions"
|
||||
),
|
||||
"--num-callers=100",
|
||||
"--log-file=" + valgrind_log[1],
|
||||
cmd,
|
||||
]
|
||||
libtool = conf.get(ENV_LIBTOOL)
|
||||
if libtool:
|
||||
argv = list(libtool) + ["--mode=execute"] + argv
|
||||
else:
|
||||
calling_location = "%s:%s()/%d" % (
|
||||
PathConfiguration.canonical_script_filename(),
|
||||
frame.f_code.co_name,
|
||||
calling_num,
|
||||
)
|
||||
argv = [cmd]
|
||||
|
||||
if lang is None or lang == "C":
|
||||
lang = "C"
|
||||
elif lang == "de":
|
||||
lang = "de_DE.utf8"
|
||||
elif lang == "pl":
|
||||
lang = "pl_PL.UTF-8"
|
||||
else:
|
||||
self.fail("invalid language %s" % (lang))
|
||||
argv.extend(args)
|
||||
return argv, valgrind_log
|
||||
|
||||
# Running under valgrind is not yet supported for those tests.
|
||||
args, valgrind_log = self.nmcli_construct_argv(args, with_valgrind=False)
|
||||
def call_pexpect(self, cmd_path, args, extra_env):
|
||||
argv, valgrind_log = self.cmd_construct_argv(cmd_path, args)
|
||||
env = self._env(extra_env=extra_env)
|
||||
|
||||
assert valgrind_log is None
|
||||
pexp = pexpect.spawn(argv[0], argv[1:], timeout=10, env=env)
|
||||
|
||||
if replace_stdout is not None:
|
||||
replace_stdout = list(replace_stdout)
|
||||
if replace_stderr is not None:
|
||||
replace_stderr = list(replace_stderr)
|
||||
if replace_cmd is not None:
|
||||
replace_cmd = list(replace_cmd)
|
||||
|
||||
if check_on_disk is _DEFAULT_ARG:
|
||||
check_on_disk = (
|
||||
expected_returncode is _DEFAULT_ARG
|
||||
and (
|
||||
expected_stdout is _DEFAULT_ARG
|
||||
or expected_stdout is _UNSTABLE_OUTPUT
|
||||
)
|
||||
and (
|
||||
expected_stderr is _DEFAULT_ARG
|
||||
or expected_stderr is _UNSTABLE_OUTPUT
|
||||
)
|
||||
)
|
||||
if expected_returncode is _DEFAULT_ARG:
|
||||
expected_returncode = None
|
||||
if expected_stdout is _DEFAULT_ARG:
|
||||
expected_stdout = None
|
||||
if expected_stderr is _DEFAULT_ARG:
|
||||
expected_stderr = None
|
||||
|
||||
results_idx = len(self._results)
|
||||
self._results.append(None)
|
||||
|
||||
def complete_cb(async_job, returncode, stdout, stderr):
|
||||
|
||||
if expected_stdout is _UNSTABLE_OUTPUT:
|
||||
stdout = "<UNSTABLE OUTPUT>".encode("utf-8")
|
||||
else:
|
||||
stdout = Util.replace_text(stdout, replace_stdout)
|
||||
|
||||
if expected_stderr is _UNSTABLE_OUTPUT:
|
||||
stderr = "<UNSTABLE OUTPUT>".encode("utf-8")
|
||||
else:
|
||||
stderr = Util.replace_text(stderr, replace_stderr)
|
||||
|
||||
if sort_lines_stdout:
|
||||
stdout = b"\n".join(sorted(stdout.split(b"\n")))
|
||||
|
||||
ignore_l10n_diff = lang != "C" and not conf.get(
|
||||
ENV_NM_TEST_CLIENT_CHECK_L10N
|
||||
)
|
||||
|
||||
if expected_stderr is not None and expected_stderr is not _UNSTABLE_OUTPUT:
|
||||
if expected_stderr != stderr:
|
||||
if ignore_l10n_diff:
|
||||
self._skip_test_for_l10n_diff.append(test_name)
|
||||
else:
|
||||
self.assertEqual(expected_stderr, stderr)
|
||||
if expected_stdout is not None and expected_stdout is not _UNSTABLE_OUTPUT:
|
||||
if expected_stdout != stdout:
|
||||
if ignore_l10n_diff:
|
||||
self._skip_test_for_l10n_diff.append(test_name)
|
||||
else:
|
||||
self.assertEqual(expected_stdout, stdout)
|
||||
if expected_returncode is not None:
|
||||
self.assertEqual(expected_returncode, returncode)
|
||||
|
||||
if fatal_warnings is _DEFAULT_ARG:
|
||||
if expected_returncode != -5:
|
||||
self.assertNotEqual(returncode, -5)
|
||||
elif fatal_warnings:
|
||||
if expected_returncode is None:
|
||||
self.assertEqual(returncode, -5)
|
||||
|
||||
if check_on_disk:
|
||||
cmd = "$NMCLI %s" % (Util.shlex_join(args[1:]),)
|
||||
cmd = Util.replace_text(cmd, replace_cmd)
|
||||
|
||||
if returncode < 0:
|
||||
returncode_str = "%d (SIGNAL %s)" % (
|
||||
returncode,
|
||||
Util.signal_no_to_str(-returncode),
|
||||
)
|
||||
else:
|
||||
returncode_str = "%d" % (returncode)
|
||||
|
||||
content = (
|
||||
("location: %s\n" % (calling_location)).encode("utf8")
|
||||
+ ("cmd: %s\n" % (cmd)).encode("utf8")
|
||||
+ ("lang: %s\n" % (lang)).encode("utf8")
|
||||
+ ("returncode: %s\n" % (returncode_str)).encode("utf8")
|
||||
)
|
||||
if len(stdout) > 0:
|
||||
content += (
|
||||
("stdout: %d bytes\n>>>\n" % (len(stdout))).encode("utf8")
|
||||
+ stdout
|
||||
+ "\n<<<\n".encode("utf8")
|
||||
)
|
||||
if len(stderr) > 0:
|
||||
content += (
|
||||
("stderr: %d bytes\n>>>\n" % (len(stderr))).encode("utf8")
|
||||
+ stderr
|
||||
+ "\n<<<\n".encode("utf8")
|
||||
)
|
||||
content = ("size: %s\n" % (len(content))).encode("utf8") + content
|
||||
|
||||
self._results[results_idx] = {
|
||||
"test_name": test_name,
|
||||
"ignore_l10n_diff": ignore_l10n_diff,
|
||||
"content": content,
|
||||
}
|
||||
|
||||
env = self._env(lang, calling_num, fatal_warnings, extra_env)
|
||||
async_job = AsyncProcess(args=args, env=env, complete_cb=complete_cb)
|
||||
|
||||
self._async_jobs.append(async_job)
|
||||
|
||||
self.async_start(wait_all=sync_barrier)
|
||||
typ = collections.namedtuple("CallPexpect", ["pexp", "valgrind_log"])
|
||||
return typ(pexp, valgrind_log)
|
||||
|
||||
def async_start(self, wait_all=False):
|
||||
|
||||
@@ -1381,13 +1119,281 @@ class TestNmcli(unittest.TestCase):
|
||||
% (",".join(skip_test_for_l10n_diff))
|
||||
)
|
||||
|
||||
def skip_without_pexpect(func):
|
||||
def f(self):
|
||||
if pexpect is None:
|
||||
raise unittest.SkipTest("pexpect not available")
|
||||
func(self)
|
||||
def setUp(self):
|
||||
if not dbus_session_inited:
|
||||
self.skipTest(
|
||||
"Own D-Bus session for testing is not initialized. Do you have dbus-run-session available?"
|
||||
)
|
||||
if NM is None:
|
||||
self.skipTest("gi.NM is not available. Did you build with introspection?")
|
||||
|
||||
return f
|
||||
|
||||
class TestNmcli(TestNmClient):
|
||||
def call_nmcli_l(
|
||||
self,
|
||||
args,
|
||||
check_on_disk=_DEFAULT_ARG,
|
||||
fatal_warnings=_DEFAULT_ARG,
|
||||
expected_returncode=_DEFAULT_ARG,
|
||||
expected_stdout=_DEFAULT_ARG,
|
||||
expected_stderr=_DEFAULT_ARG,
|
||||
replace_stdout=None,
|
||||
replace_stderr=None,
|
||||
replace_cmd=None,
|
||||
sort_lines_stdout=False,
|
||||
extra_env=None,
|
||||
sync_barrier=False,
|
||||
):
|
||||
frame = sys._getframe(1)
|
||||
for lang in ["C", "pl"]:
|
||||
self._call_nmcli(
|
||||
args,
|
||||
lang,
|
||||
check_on_disk,
|
||||
fatal_warnings,
|
||||
expected_returncode,
|
||||
expected_stdout,
|
||||
expected_stderr,
|
||||
replace_stdout,
|
||||
replace_stderr,
|
||||
replace_cmd,
|
||||
sort_lines_stdout,
|
||||
extra_env,
|
||||
sync_barrier,
|
||||
frame,
|
||||
)
|
||||
|
||||
def call_nmcli(
|
||||
self,
|
||||
args,
|
||||
langs=None,
|
||||
lang=None,
|
||||
check_on_disk=_DEFAULT_ARG,
|
||||
fatal_warnings=_DEFAULT_ARG,
|
||||
expected_returncode=_DEFAULT_ARG,
|
||||
expected_stdout=_DEFAULT_ARG,
|
||||
expected_stderr=_DEFAULT_ARG,
|
||||
replace_stdout=None,
|
||||
replace_stderr=None,
|
||||
replace_cmd=None,
|
||||
sort_lines_stdout=False,
|
||||
extra_env=None,
|
||||
sync_barrier=None,
|
||||
):
|
||||
|
||||
frame = sys._getframe(1)
|
||||
|
||||
if langs is not None:
|
||||
assert lang is None
|
||||
else:
|
||||
if lang is None:
|
||||
lang = "C"
|
||||
langs = [lang]
|
||||
|
||||
if sync_barrier is None:
|
||||
sync_barrier = len(langs) == 1
|
||||
|
||||
for lang in langs:
|
||||
self._call_nmcli(
|
||||
args,
|
||||
lang,
|
||||
check_on_disk,
|
||||
fatal_warnings,
|
||||
expected_returncode,
|
||||
expected_stdout,
|
||||
expected_stderr,
|
||||
replace_stdout,
|
||||
replace_stderr,
|
||||
replace_cmd,
|
||||
sort_lines_stdout,
|
||||
extra_env,
|
||||
sync_barrier,
|
||||
frame,
|
||||
)
|
||||
|
||||
def call_nmcli_pexpect(self, args):
|
||||
return self.call_pexpect(ENV_NM_TEST_CLIENT_NMCLI_PATH, args, {"NO_COLOR": "1"})
|
||||
|
||||
def _call_nmcli(
|
||||
self,
|
||||
args,
|
||||
lang,
|
||||
check_on_disk,
|
||||
fatal_warnings,
|
||||
expected_returncode,
|
||||
expected_stdout,
|
||||
expected_stderr,
|
||||
replace_stdout,
|
||||
replace_stderr,
|
||||
replace_cmd,
|
||||
sort_lines_stdout,
|
||||
extra_env,
|
||||
sync_barrier,
|
||||
frame,
|
||||
):
|
||||
|
||||
if sync_barrier:
|
||||
self.async_wait()
|
||||
|
||||
calling_fcn = frame.f_code.co_name
|
||||
calling_num = self._calling_num.get(calling_fcn, 0) + 1
|
||||
self._calling_num[calling_fcn] = calling_num
|
||||
|
||||
test_name = "%s-%03d" % (calling_fcn, calling_num)
|
||||
|
||||
# we cannot use frame.f_code.co_filename directly, because it might be different depending
|
||||
# on where the file lies and which is CWD. We still want to give the location of
|
||||
# the file, so that the user can easier find the source (when looking at the .expected files)
|
||||
self.assertTrue(
|
||||
os.path.abspath(frame.f_code.co_filename).endswith(
|
||||
"/" + PathConfiguration.canonical_script_filename()
|
||||
)
|
||||
)
|
||||
|
||||
if conf.get(ENV_NM_TEST_WITH_LINENO):
|
||||
calling_location = "%s:%d:%s()/%d" % (
|
||||
PathConfiguration.canonical_script_filename(),
|
||||
frame.f_lineno,
|
||||
frame.f_code.co_name,
|
||||
calling_num,
|
||||
)
|
||||
else:
|
||||
calling_location = "%s:%s()/%d" % (
|
||||
PathConfiguration.canonical_script_filename(),
|
||||
frame.f_code.co_name,
|
||||
calling_num,
|
||||
)
|
||||
|
||||
if lang is None or lang == "C":
|
||||
lang = "C"
|
||||
elif lang == "de":
|
||||
lang = "de_DE.utf8"
|
||||
elif lang == "pl":
|
||||
lang = "pl_PL.UTF-8"
|
||||
else:
|
||||
self.fail("invalid language %s" % (lang))
|
||||
|
||||
# Running under valgrind is not yet supported for those tests.
|
||||
args, valgrind_log = self.cmd_construct_argv(
|
||||
ENV_NM_TEST_CLIENT_NMCLI_PATH, args, with_valgrind=False
|
||||
)
|
||||
|
||||
assert valgrind_log is None
|
||||
|
||||
if replace_stdout is not None:
|
||||
replace_stdout = list(replace_stdout)
|
||||
if replace_stderr is not None:
|
||||
replace_stderr = list(replace_stderr)
|
||||
if replace_cmd is not None:
|
||||
replace_cmd = list(replace_cmd)
|
||||
|
||||
if check_on_disk is _DEFAULT_ARG:
|
||||
check_on_disk = (
|
||||
expected_returncode is _DEFAULT_ARG
|
||||
and (
|
||||
expected_stdout is _DEFAULT_ARG
|
||||
or expected_stdout is _UNSTABLE_OUTPUT
|
||||
)
|
||||
and (
|
||||
expected_stderr is _DEFAULT_ARG
|
||||
or expected_stderr is _UNSTABLE_OUTPUT
|
||||
)
|
||||
)
|
||||
if expected_returncode is _DEFAULT_ARG:
|
||||
expected_returncode = None
|
||||
if expected_stdout is _DEFAULT_ARG:
|
||||
expected_stdout = None
|
||||
if expected_stderr is _DEFAULT_ARG:
|
||||
expected_stderr = None
|
||||
|
||||
results_idx = len(self._results)
|
||||
self._results.append(None)
|
||||
|
||||
def complete_cb(async_job, returncode, stdout, stderr):
|
||||
|
||||
if expected_stdout is _UNSTABLE_OUTPUT:
|
||||
stdout = "<UNSTABLE OUTPUT>".encode("utf-8")
|
||||
else:
|
||||
stdout = Util.replace_text(stdout, replace_stdout)
|
||||
|
||||
if expected_stderr is _UNSTABLE_OUTPUT:
|
||||
stderr = "<UNSTABLE OUTPUT>".encode("utf-8")
|
||||
else:
|
||||
stderr = Util.replace_text(stderr, replace_stderr)
|
||||
|
||||
if sort_lines_stdout:
|
||||
stdout = b"\n".join(sorted(stdout.split(b"\n")))
|
||||
|
||||
ignore_l10n_diff = lang != "C" and not conf.get(
|
||||
ENV_NM_TEST_CLIENT_CHECK_L10N
|
||||
)
|
||||
|
||||
if expected_stderr is not None and expected_stderr is not _UNSTABLE_OUTPUT:
|
||||
if expected_stderr != stderr:
|
||||
if ignore_l10n_diff:
|
||||
self._skip_test_for_l10n_diff.append(test_name)
|
||||
else:
|
||||
self.assertEqual(expected_stderr, stderr)
|
||||
if expected_stdout is not None and expected_stdout is not _UNSTABLE_OUTPUT:
|
||||
if expected_stdout != stdout:
|
||||
if ignore_l10n_diff:
|
||||
self._skip_test_for_l10n_diff.append(test_name)
|
||||
else:
|
||||
self.assertEqual(expected_stdout, stdout)
|
||||
if expected_returncode is not None:
|
||||
self.assertEqual(expected_returncode, returncode)
|
||||
|
||||
if fatal_warnings is _DEFAULT_ARG:
|
||||
if expected_returncode != -5:
|
||||
self.assertNotEqual(returncode, -5)
|
||||
elif fatal_warnings:
|
||||
if expected_returncode is None:
|
||||
self.assertEqual(returncode, -5)
|
||||
|
||||
if check_on_disk:
|
||||
cmd = "$NMCLI %s" % (Util.shlex_join(args[1:]),)
|
||||
cmd = Util.replace_text(cmd, replace_cmd)
|
||||
|
||||
if returncode < 0:
|
||||
returncode_str = "%d (SIGNAL %s)" % (
|
||||
returncode,
|
||||
Util.signal_no_to_str(-returncode),
|
||||
)
|
||||
else:
|
||||
returncode_str = "%d" % (returncode)
|
||||
|
||||
content = (
|
||||
("location: %s\n" % (calling_location)).encode("utf8")
|
||||
+ ("cmd: %s\n" % (cmd)).encode("utf8")
|
||||
+ ("lang: %s\n" % (lang)).encode("utf8")
|
||||
+ ("returncode: %s\n" % (returncode_str)).encode("utf8")
|
||||
)
|
||||
if len(stdout) > 0:
|
||||
content += (
|
||||
("stdout: %d bytes\n>>>\n" % (len(stdout))).encode("utf8")
|
||||
+ stdout
|
||||
+ "\n<<<\n".encode("utf8")
|
||||
)
|
||||
if len(stderr) > 0:
|
||||
content += (
|
||||
("stderr: %d bytes\n>>>\n" % (len(stderr))).encode("utf8")
|
||||
+ stderr
|
||||
+ "\n<<<\n".encode("utf8")
|
||||
)
|
||||
content = ("size: %s\n" % (len(content))).encode("utf8") + content
|
||||
|
||||
self._results[results_idx] = {
|
||||
"test_name": test_name,
|
||||
"ignore_l10n_diff": ignore_l10n_diff,
|
||||
"content": content,
|
||||
}
|
||||
|
||||
env = self._env(lang, calling_num, fatal_warnings, extra_env)
|
||||
async_job = AsyncProcess(args=args, env=env, complete_cb=complete_cb)
|
||||
|
||||
self._async_jobs.append(async_job)
|
||||
|
||||
self.async_start(wait_all=sync_barrier)
|
||||
|
||||
def nm_test(func):
|
||||
def f(self):
|
||||
@@ -1404,13 +1410,13 @@ class TestNmcli(unittest.TestCase):
|
||||
|
||||
return f
|
||||
|
||||
def setUp(self):
|
||||
if not dbus_session_inited:
|
||||
self.skipTest(
|
||||
"Own D-Bus session for testing is not initialized. Do you have dbus-run-session available?"
|
||||
)
|
||||
if NM is None:
|
||||
self.skipTest("gi.NM is not available. Did you build with introspection?")
|
||||
def skip_without_pexpect(func):
|
||||
def f(self):
|
||||
if pexpect is None:
|
||||
raise unittest.SkipTest("pexpect not available")
|
||||
func(self)
|
||||
|
||||
return f
|
||||
|
||||
def init_001(self):
|
||||
self.srv.op_AddObj("WiredDevice", iface="eth0")
|
||||
|
Reference in New Issue
Block a user