diff --git a/Makefile.am b/Makefile.am index 9c81ef9c9..0f827c676 100644 --- a/Makefile.am +++ b/Makefile.am @@ -5468,7 +5468,7 @@ endif ############################################################################### check-local-tests-client: src/nmcli/nmcli src/tests/client/test-client.py - LIBTOOL="$(LIBTOOL)" "$(srcdir)/src/tests/client/test-client.sh" "$(builddir)" "$(srcdir)" "$(PYTHON)" -- + LIBTOOL="$(LIBTOOL)" "$(srcdir)/src/tests/client/test-client.sh" "$(builddir)" "$(srcdir)" "$(PYTHON)" -- TestNmcli check_local += check-local-tests-client diff --git a/src/tests/client/meson.build b/src/tests/client/meson.build index 6dc0f2a2c..6a6891354 100644 --- a/src/tests/client/meson.build +++ b/src/tests/client/meson.build @@ -8,6 +8,7 @@ test( source_root, python.path(), '--', + 'TestNmcli', ], env: [ 'LIBTOOL=', diff --git a/src/tests/client/test-client.py b/src/tests/client/test-client.py index e5febe964..859a5280b 100755 --- a/src/tests/client/test-client.py +++ b/src/tests/client/test-client.py @@ -835,7 +835,7 @@ class AsyncProcess: MAX_JOBS = 15 -class TestNmcli(unittest.TestCase): +class TestNmClient(unittest.TestCase): def __init__(self, *args, **kwargs): self._calling_num = {} self._skip_test_for_l10n_diff = [] @@ -888,131 +888,6 @@ class TestNmcli(unittest.TestCase): return content_expect, results_expect - def nmcli_construct_argv(self, args, with_valgrind=None): - - if with_valgrind is None: - with_valgrind = conf.get(ENV_NM_TEST_VALGRIND) - - valgrind_log = None - cmd = conf.get(ENV_NM_TEST_CLIENT_NMCLI_PATH) - if with_valgrind: - valgrind_log = tempfile.mkstemp(prefix="nm-test-client-valgrind.") - argv = [ - "valgrind", - "--quiet", - "--error-exitcode=37", - "--leak-check=full", - "--gen-suppressions=all", - ( - "--suppressions=" - + PathConfiguration.top_srcdir() - + "/valgrind.suppressions" - ), - "--num-callers=100", - "--log-file=" + valgrind_log[1], - cmd, - ] - libtool = conf.get(ENV_LIBTOOL) - if libtool: - argv = list(libtool) + ["--mode=execute"] + argv - else: - argv = [cmd] - - argv.extend(args) - return argv, valgrind_log - - def call_nmcli_l( - self, - args, - check_on_disk=_DEFAULT_ARG, - fatal_warnings=_DEFAULT_ARG, - expected_returncode=_DEFAULT_ARG, - expected_stdout=_DEFAULT_ARG, - expected_stderr=_DEFAULT_ARG, - replace_stdout=None, - replace_stderr=None, - replace_cmd=None, - sort_lines_stdout=False, - extra_env=None, - sync_barrier=False, - ): - frame = sys._getframe(1) - for lang in ["C", "pl"]: - self._call_nmcli( - args, - lang, - check_on_disk, - fatal_warnings, - expected_returncode, - expected_stdout, - expected_stderr, - replace_stdout, - replace_stderr, - replace_cmd, - sort_lines_stdout, - extra_env, - sync_barrier, - frame, - ) - - def call_nmcli( - self, - args, - langs=None, - lang=None, - check_on_disk=_DEFAULT_ARG, - fatal_warnings=_DEFAULT_ARG, - expected_returncode=_DEFAULT_ARG, - expected_stdout=_DEFAULT_ARG, - expected_stderr=_DEFAULT_ARG, - replace_stdout=None, - replace_stderr=None, - replace_cmd=None, - sort_lines_stdout=False, - extra_env=None, - sync_barrier=None, - ): - - frame = sys._getframe(1) - - if langs is not None: - assert lang is None - else: - if lang is None: - lang = "C" - langs = [lang] - - if sync_barrier is None: - sync_barrier = len(langs) == 1 - - for lang in langs: - self._call_nmcli( - args, - lang, - check_on_disk, - fatal_warnings, - expected_returncode, - expected_stdout, - expected_stderr, - replace_stdout, - replace_stderr, - replace_cmd, - sort_lines_stdout, - extra_env, - sync_barrier, - frame, - ) - - def call_nmcli_pexpect(self, args): - - env = self._env(extra_env={"NO_COLOR": "1"}) - argv, valgrind_log = self.nmcli_construct_argv(args) - - pexp = pexpect.spawn(argv[0], argv[1:], timeout=10, env=env) - - typ = collections.namedtuple("CallNmcliPexpect", ["pexp", "valgrind_log"]) - return typ(pexp, valgrind_log) - def _env( self, lang="C", calling_num=None, fatal_warnings=_DEFAULT_ARG, extra_env=None ): @@ -1053,184 +928,47 @@ class TestNmcli(unittest.TestCase): env[k] = v return env - def _call_nmcli( - self, - args, - lang, - check_on_disk, - fatal_warnings, - expected_returncode, - expected_stdout, - expected_stderr, - replace_stdout, - replace_stderr, - replace_cmd, - sort_lines_stdout, - extra_env, - sync_barrier, - frame, - ): + def cmd_construct_argv(self, cmd_path, args, with_valgrind=None): - if sync_barrier: - self.async_wait() + if with_valgrind is None: + with_valgrind = conf.get(ENV_NM_TEST_VALGRIND) - calling_fcn = frame.f_code.co_name - calling_num = self._calling_num.get(calling_fcn, 0) + 1 - self._calling_num[calling_fcn] = calling_num - - test_name = "%s-%03d" % (calling_fcn, calling_num) - - # we cannot use frame.f_code.co_filename directly, because it might be different depending - # on where the file lies and which is CWD. We still want to give the location of - # the file, so that the user can easier find the source (when looking at the .expected files) - self.assertTrue( - os.path.abspath(frame.f_code.co_filename).endswith( - "/" + PathConfiguration.canonical_script_filename() - ) - ) - - if conf.get(ENV_NM_TEST_WITH_LINENO): - calling_location = "%s:%d:%s()/%d" % ( - PathConfiguration.canonical_script_filename(), - frame.f_lineno, - frame.f_code.co_name, - calling_num, - ) + valgrind_log = None + cmd = conf.get(cmd_path) + if with_valgrind: + valgrind_log = tempfile.mkstemp(prefix="nm-test-client-valgrind.") + argv = [ + "valgrind", + "--quiet", + "--error-exitcode=37", + "--leak-check=full", + "--gen-suppressions=all", + ( + "--suppressions=" + + PathConfiguration.top_srcdir() + + "/valgrind.suppressions" + ), + "--num-callers=100", + "--log-file=" + valgrind_log[1], + cmd, + ] + libtool = conf.get(ENV_LIBTOOL) + if libtool: + argv = list(libtool) + ["--mode=execute"] + argv else: - calling_location = "%s:%s()/%d" % ( - PathConfiguration.canonical_script_filename(), - frame.f_code.co_name, - calling_num, - ) + argv = [cmd] - if lang is None or lang == "C": - lang = "C" - elif lang == "de": - lang = "de_DE.utf8" - elif lang == "pl": - lang = "pl_PL.UTF-8" - else: - self.fail("invalid language %s" % (lang)) + argv.extend(args) + return argv, valgrind_log - # Running under valgrind is not yet supported for those tests. - args, valgrind_log = self.nmcli_construct_argv(args, with_valgrind=False) + def call_pexpect(self, cmd_path, args, extra_env): + argv, valgrind_log = self.cmd_construct_argv(cmd_path, args) + env = self._env(extra_env=extra_env) - assert valgrind_log is None + pexp = pexpect.spawn(argv[0], argv[1:], timeout=10, env=env) - if replace_stdout is not None: - replace_stdout = list(replace_stdout) - if replace_stderr is not None: - replace_stderr = list(replace_stderr) - if replace_cmd is not None: - replace_cmd = list(replace_cmd) - - if check_on_disk is _DEFAULT_ARG: - check_on_disk = ( - expected_returncode is _DEFAULT_ARG - and ( - expected_stdout is _DEFAULT_ARG - or expected_stdout is _UNSTABLE_OUTPUT - ) - and ( - expected_stderr is _DEFAULT_ARG - or expected_stderr is _UNSTABLE_OUTPUT - ) - ) - if expected_returncode is _DEFAULT_ARG: - expected_returncode = None - if expected_stdout is _DEFAULT_ARG: - expected_stdout = None - if expected_stderr is _DEFAULT_ARG: - expected_stderr = None - - results_idx = len(self._results) - self._results.append(None) - - def complete_cb(async_job, returncode, stdout, stderr): - - if expected_stdout is _UNSTABLE_OUTPUT: - stdout = "".encode("utf-8") - else: - stdout = Util.replace_text(stdout, replace_stdout) - - if expected_stderr is _UNSTABLE_OUTPUT: - stderr = "".encode("utf-8") - else: - stderr = Util.replace_text(stderr, replace_stderr) - - if sort_lines_stdout: - stdout = b"\n".join(sorted(stdout.split(b"\n"))) - - ignore_l10n_diff = lang != "C" and not conf.get( - ENV_NM_TEST_CLIENT_CHECK_L10N - ) - - if expected_stderr is not None and expected_stderr is not _UNSTABLE_OUTPUT: - if expected_stderr != stderr: - if ignore_l10n_diff: - self._skip_test_for_l10n_diff.append(test_name) - else: - self.assertEqual(expected_stderr, stderr) - if expected_stdout is not None and expected_stdout is not _UNSTABLE_OUTPUT: - if expected_stdout != stdout: - if ignore_l10n_diff: - self._skip_test_for_l10n_diff.append(test_name) - else: - self.assertEqual(expected_stdout, stdout) - if expected_returncode is not None: - self.assertEqual(expected_returncode, returncode) - - if fatal_warnings is _DEFAULT_ARG: - if expected_returncode != -5: - self.assertNotEqual(returncode, -5) - elif fatal_warnings: - if expected_returncode is None: - self.assertEqual(returncode, -5) - - if check_on_disk: - cmd = "$NMCLI %s" % (Util.shlex_join(args[1:]),) - cmd = Util.replace_text(cmd, replace_cmd) - - if returncode < 0: - returncode_str = "%d (SIGNAL %s)" % ( - returncode, - Util.signal_no_to_str(-returncode), - ) - else: - returncode_str = "%d" % (returncode) - - content = ( - ("location: %s\n" % (calling_location)).encode("utf8") - + ("cmd: %s\n" % (cmd)).encode("utf8") - + ("lang: %s\n" % (lang)).encode("utf8") - + ("returncode: %s\n" % (returncode_str)).encode("utf8") - ) - if len(stdout) > 0: - content += ( - ("stdout: %d bytes\n>>>\n" % (len(stdout))).encode("utf8") - + stdout - + "\n<<<\n".encode("utf8") - ) - if len(stderr) > 0: - content += ( - ("stderr: %d bytes\n>>>\n" % (len(stderr))).encode("utf8") - + stderr - + "\n<<<\n".encode("utf8") - ) - content = ("size: %s\n" % (len(content))).encode("utf8") + content - - self._results[results_idx] = { - "test_name": test_name, - "ignore_l10n_diff": ignore_l10n_diff, - "content": content, - } - - env = self._env(lang, calling_num, fatal_warnings, extra_env) - async_job = AsyncProcess(args=args, env=env, complete_cb=complete_cb) - - self._async_jobs.append(async_job) - - self.async_start(wait_all=sync_barrier) + typ = collections.namedtuple("CallPexpect", ["pexp", "valgrind_log"]) + return typ(pexp, valgrind_log) def async_start(self, wait_all=False): @@ -1381,13 +1119,281 @@ class TestNmcli(unittest.TestCase): % (",".join(skip_test_for_l10n_diff)) ) - def skip_without_pexpect(func): - def f(self): - if pexpect is None: - raise unittest.SkipTest("pexpect not available") - func(self) + def setUp(self): + if not dbus_session_inited: + self.skipTest( + "Own D-Bus session for testing is not initialized. Do you have dbus-run-session available?" + ) + if NM is None: + self.skipTest("gi.NM is not available. Did you build with introspection?") - return f + +class TestNmcli(TestNmClient): + def call_nmcli_l( + self, + args, + check_on_disk=_DEFAULT_ARG, + fatal_warnings=_DEFAULT_ARG, + expected_returncode=_DEFAULT_ARG, + expected_stdout=_DEFAULT_ARG, + expected_stderr=_DEFAULT_ARG, + replace_stdout=None, + replace_stderr=None, + replace_cmd=None, + sort_lines_stdout=False, + extra_env=None, + sync_barrier=False, + ): + frame = sys._getframe(1) + for lang in ["C", "pl"]: + self._call_nmcli( + args, + lang, + check_on_disk, + fatal_warnings, + expected_returncode, + expected_stdout, + expected_stderr, + replace_stdout, + replace_stderr, + replace_cmd, + sort_lines_stdout, + extra_env, + sync_barrier, + frame, + ) + + def call_nmcli( + self, + args, + langs=None, + lang=None, + check_on_disk=_DEFAULT_ARG, + fatal_warnings=_DEFAULT_ARG, + expected_returncode=_DEFAULT_ARG, + expected_stdout=_DEFAULT_ARG, + expected_stderr=_DEFAULT_ARG, + replace_stdout=None, + replace_stderr=None, + replace_cmd=None, + sort_lines_stdout=False, + extra_env=None, + sync_barrier=None, + ): + + frame = sys._getframe(1) + + if langs is not None: + assert lang is None + else: + if lang is None: + lang = "C" + langs = [lang] + + if sync_barrier is None: + sync_barrier = len(langs) == 1 + + for lang in langs: + self._call_nmcli( + args, + lang, + check_on_disk, + fatal_warnings, + expected_returncode, + expected_stdout, + expected_stderr, + replace_stdout, + replace_stderr, + replace_cmd, + sort_lines_stdout, + extra_env, + sync_barrier, + frame, + ) + + def call_nmcli_pexpect(self, args): + return self.call_pexpect(ENV_NM_TEST_CLIENT_NMCLI_PATH, args, {"NO_COLOR": "1"}) + + def _call_nmcli( + self, + args, + lang, + check_on_disk, + fatal_warnings, + expected_returncode, + expected_stdout, + expected_stderr, + replace_stdout, + replace_stderr, + replace_cmd, + sort_lines_stdout, + extra_env, + sync_barrier, + frame, + ): + + if sync_barrier: + self.async_wait() + + calling_fcn = frame.f_code.co_name + calling_num = self._calling_num.get(calling_fcn, 0) + 1 + self._calling_num[calling_fcn] = calling_num + + test_name = "%s-%03d" % (calling_fcn, calling_num) + + # we cannot use frame.f_code.co_filename directly, because it might be different depending + # on where the file lies and which is CWD. We still want to give the location of + # the file, so that the user can easier find the source (when looking at the .expected files) + self.assertTrue( + os.path.abspath(frame.f_code.co_filename).endswith( + "/" + PathConfiguration.canonical_script_filename() + ) + ) + + if conf.get(ENV_NM_TEST_WITH_LINENO): + calling_location = "%s:%d:%s()/%d" % ( + PathConfiguration.canonical_script_filename(), + frame.f_lineno, + frame.f_code.co_name, + calling_num, + ) + else: + calling_location = "%s:%s()/%d" % ( + PathConfiguration.canonical_script_filename(), + frame.f_code.co_name, + calling_num, + ) + + if lang is None or lang == "C": + lang = "C" + elif lang == "de": + lang = "de_DE.utf8" + elif lang == "pl": + lang = "pl_PL.UTF-8" + else: + self.fail("invalid language %s" % (lang)) + + # Running under valgrind is not yet supported for those tests. + args, valgrind_log = self.cmd_construct_argv( + ENV_NM_TEST_CLIENT_NMCLI_PATH, args, with_valgrind=False + ) + + assert valgrind_log is None + + if replace_stdout is not None: + replace_stdout = list(replace_stdout) + if replace_stderr is not None: + replace_stderr = list(replace_stderr) + if replace_cmd is not None: + replace_cmd = list(replace_cmd) + + if check_on_disk is _DEFAULT_ARG: + check_on_disk = ( + expected_returncode is _DEFAULT_ARG + and ( + expected_stdout is _DEFAULT_ARG + or expected_stdout is _UNSTABLE_OUTPUT + ) + and ( + expected_stderr is _DEFAULT_ARG + or expected_stderr is _UNSTABLE_OUTPUT + ) + ) + if expected_returncode is _DEFAULT_ARG: + expected_returncode = None + if expected_stdout is _DEFAULT_ARG: + expected_stdout = None + if expected_stderr is _DEFAULT_ARG: + expected_stderr = None + + results_idx = len(self._results) + self._results.append(None) + + def complete_cb(async_job, returncode, stdout, stderr): + + if expected_stdout is _UNSTABLE_OUTPUT: + stdout = "".encode("utf-8") + else: + stdout = Util.replace_text(stdout, replace_stdout) + + if expected_stderr is _UNSTABLE_OUTPUT: + stderr = "".encode("utf-8") + else: + stderr = Util.replace_text(stderr, replace_stderr) + + if sort_lines_stdout: + stdout = b"\n".join(sorted(stdout.split(b"\n"))) + + ignore_l10n_diff = lang != "C" and not conf.get( + ENV_NM_TEST_CLIENT_CHECK_L10N + ) + + if expected_stderr is not None and expected_stderr is not _UNSTABLE_OUTPUT: + if expected_stderr != stderr: + if ignore_l10n_diff: + self._skip_test_for_l10n_diff.append(test_name) + else: + self.assertEqual(expected_stderr, stderr) + if expected_stdout is not None and expected_stdout is not _UNSTABLE_OUTPUT: + if expected_stdout != stdout: + if ignore_l10n_diff: + self._skip_test_for_l10n_diff.append(test_name) + else: + self.assertEqual(expected_stdout, stdout) + if expected_returncode is not None: + self.assertEqual(expected_returncode, returncode) + + if fatal_warnings is _DEFAULT_ARG: + if expected_returncode != -5: + self.assertNotEqual(returncode, -5) + elif fatal_warnings: + if expected_returncode is None: + self.assertEqual(returncode, -5) + + if check_on_disk: + cmd = "$NMCLI %s" % (Util.shlex_join(args[1:]),) + cmd = Util.replace_text(cmd, replace_cmd) + + if returncode < 0: + returncode_str = "%d (SIGNAL %s)" % ( + returncode, + Util.signal_no_to_str(-returncode), + ) + else: + returncode_str = "%d" % (returncode) + + content = ( + ("location: %s\n" % (calling_location)).encode("utf8") + + ("cmd: %s\n" % (cmd)).encode("utf8") + + ("lang: %s\n" % (lang)).encode("utf8") + + ("returncode: %s\n" % (returncode_str)).encode("utf8") + ) + if len(stdout) > 0: + content += ( + ("stdout: %d bytes\n>>>\n" % (len(stdout))).encode("utf8") + + stdout + + "\n<<<\n".encode("utf8") + ) + if len(stderr) > 0: + content += ( + ("stderr: %d bytes\n>>>\n" % (len(stderr))).encode("utf8") + + stderr + + "\n<<<\n".encode("utf8") + ) + content = ("size: %s\n" % (len(content))).encode("utf8") + content + + self._results[results_idx] = { + "test_name": test_name, + "ignore_l10n_diff": ignore_l10n_diff, + "content": content, + } + + env = self._env(lang, calling_num, fatal_warnings, extra_env) + async_job = AsyncProcess(args=args, env=env, complete_cb=complete_cb) + + self._async_jobs.append(async_job) + + self.async_start(wait_all=sync_barrier) def nm_test(func): def f(self): @@ -1404,13 +1410,13 @@ class TestNmcli(unittest.TestCase): return f - def setUp(self): - if not dbus_session_inited: - self.skipTest( - "Own D-Bus session for testing is not initialized. Do you have dbus-run-session available?" - ) - if NM is None: - self.skipTest("gi.NM is not available. Did you build with introspection?") + def skip_without_pexpect(func): + def f(self): + if pexpect is None: + raise unittest.SkipTest("pexpect not available") + func(self) + + return f def init_001(self): self.srv.op_AddObj("WiredDevice", iface="eth0")