From 98fe195613a2a9612177fba3751fcb6da14e08a5 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Oct 2020 09:46:56 +0200 Subject: mtest: fix flake8 issues --- mesonbuild/mtest.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 35cb10a..a688e86 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -326,7 +326,6 @@ class TAPParser: yield self.Error('Too many tests run (expected {}, got {})'.format(plan.count, num_tests)) - class JunitBuilder: """Builder for Junit test results. @@ -740,7 +739,7 @@ class SingleTestRunner: subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) else: - def _send_signal_to_process_group(pgid : int, signum : int) -> None: + def _send_signal_to_process_group(pgid: int, signum: int) -> None: """ sends a signal to a process group """ try: os.killpg(pgid, signum) @@ -841,7 +840,7 @@ class TestHarness: def close_logfiles(self) -> None: for f in ['logfile', 'jsonlogfile']: - lfile = getattr(self, f) + lfile = getattr(self, f) if lfile: lfile.close() setattr(self, f, None) @@ -956,7 +955,7 @@ class TestHarness: Skipped: {:<4} Timeout: {:<4} ''').format(self.success_count, self.expectedfail_count, self.fail_count, - self.unexpectedpass_count, self.skip_count, self.timeout_count) + self.unexpectedpass_count, self.skip_count, self.timeout_count) print(msg) if self.logfile: self.logfile.write(msg) -- cgit v1.1 From f532b0a9c3781cd06c99cd63e9c3b31bf3de7413 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Oct 2020 10:47:40 +0200 Subject: mtest: remove run_special run_special and doit are the same except that run_special forgot to set self.is_run. There is no need for the duplication. Signed-off-by: Paolo Bonzini --- mesonbuild/mtest.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index a688e86..f36f1d1 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -1186,16 +1186,6 @@ class TestHarness: self.process_test_result(result.result()) self.print_stats(test_count, name_max_len, tests, name, result.result(), i) - def run_special(self) -> int: - '''Tests run by the user, usually something like "under gdb 1000 times".''' - if self.is_run: - raise RuntimeError('Can not use run_special after a full run.') - tests = self.get_tests() - if not tests: - return 0 - self.run_tests(tests) - return self.total_failure_count() - def list_tests(th: TestHarness) -> bool: tests = th.get_tests() @@ -1256,9 +1246,7 @@ def run(options: argparse.Namespace) -> int: try: if options.list: return list_tests(th) - if not options.args: - return th.doit() - return th.run_special() + return th.doit() except TestException as e: print('Meson test encountered an error:\n') if os.environ.get('MESON_FORCE_BACKTRACE'): -- cgit v1.1 From 659a5cbaa31e7a66e3ba3e17f9f31be78cec8b18 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Tue, 13 Oct 2020 17:54:15 +0200 Subject: mtest: use asyncio for run loop Use asyncio futures for the run loop, while still handling I/O in a thread pool using run_on_executor. The handling of the test result is not duplicated anymore between run_tests and drain_futures. Instead, the test result is always processed and printed by run_test after single_test.run() completes and (in verbose mode) it cannot interleave with the test output. Therefore the special case for self.options.num_processes == 1 can be removed. --- mesonbuild/mtest.py | 65 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 26 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index f36f1d1..2caa367 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -18,6 +18,7 @@ from ._pathlib import Path from collections import namedtuple from copy import deepcopy import argparse +import asyncio import concurrent.futures as conc import datetime import enum @@ -605,6 +606,17 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]: objs = check_testdata(pickle.load(f)) return objs +# Custom waiting primitives for asyncio + +async def complete_all(futures: T.Iterable[asyncio.Future]) -> None: + """Wait for completion of all the given futures, ignoring cancellation.""" + while futures: + done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION) + # Raise exceptions if needed for all the "done" futures + for f in done: + if not f.cancelled(): + f.result() + class SingleTestRunner: @@ -1135,8 +1147,13 @@ class TestHarness: return test.name def run_tests(self, tests: T.List[TestSerialisation]) -> None: - executor = None - futures = [] # type: T.List[T.Tuple[conc.Future[TestRun], int, int, T.List[TestSerialisation], str, int]] + # Replace with asyncio.run once we can require Python 3.7 + loop = asyncio.get_event_loop() + loop.run_until_complete(self._run_tests(tests)) + + async def _run_tests(self, tests: T.List[TestSerialisation]) -> None: + executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes) + futures = [] # type: T.List[asyncio.Future] test_count = len(tests) name_max_len = max([len(self.get_pretty_suite(test)) for test in tests]) self.open_log_files() @@ -1145,29 +1162,36 @@ class TestHarness: os.chdir(self.options.wd) self.build_data = build.load(os.getcwd()) + async def run_test(test: SingleTestRunner, + name: str, index: int) -> None: + if self.options.repeat > 1 and self.fail_count: + return + res = await asyncio.get_event_loop().run_in_executor(executor, test.run) + self.process_test_result(res) + self.print_stats(test_count, name_max_len, tests, name, res, index) + + def test_done(f: asyncio.Future) -> None: + if not f.cancelled(): + f.result() + futures.remove(f) + try: for _ in range(self.options.repeat): for i, test in enumerate(tests, 1): visible_name = self.get_pretty_suite(test) single_test = self.get_test_runner(test) - if not test.is_parallel or self.options.num_processes == 1 or single_test.options.gdb: - self.drain_futures(futures) - futures = [] - res = single_test.run() - self.process_test_result(res) - self.print_stats(test_count, name_max_len, tests, visible_name, res, i) + if not test.is_parallel or single_test.options.gdb: + await complete_all(futures) + await run_test(single_test, visible_name, i) else: - if not executor: - executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes) - f = executor.submit(single_test.run) - futures.append((f, test_count, name_max_len, tests, visible_name, i)) - if self.options.repeat > 1 and self.fail_count: - break + future = asyncio.ensure_future(run_test(single_test, visible_name, i)) + futures.append(future) + future.add_done_callback(test_done) if self.options.repeat > 1 and self.fail_count: break - self.drain_futures(futures) + await complete_all(futures) self.print_collected_logs() self.print_summary() @@ -1176,17 +1200,6 @@ class TestHarness: finally: os.chdir(startdir) - def drain_futures(self, futures: T.List[T.Tuple['conc.Future[TestRun]', int, int, T.List[TestSerialisation], str, int]]) -> None: - for x in futures: - (result, test_count, name_max_len, tests, name, i) = x - if self.options.repeat > 1 and self.fail_count: - result.cancel() - if self.options.verbose: - result.result() - self.process_test_result(result.result()) - self.print_stats(test_count, name_max_len, tests, name, result.result(), i) - - def list_tests(th: TestHarness) -> bool: tests = th.get_tests() for t in tests: -- cgit v1.1 From 8cf90e63707261c09564a6f641d3439e9040f871 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Tue, 13 Oct 2020 18:12:40 +0200 Subject: mtest: add back SIGINT handling --- mesonbuild/mtest.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 2caa367..59ddec1 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -608,6 +608,13 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]: # Custom waiting primitives for asyncio +async def complete(future: asyncio.Future) -> None: + """Wait for completion of the given future, ignoring cancellation.""" + try: + await future + except asyncio.CancelledError: + pass + async def complete_all(futures: T.Iterable[asyncio.Future]) -> None: """Wait for completion of all the given futures, ignoring cancellation.""" while futures: @@ -1161,10 +1168,11 @@ class TestHarness: if self.options.wd: os.chdir(self.options.wd) self.build_data = build.load(os.getcwd()) + interrupted = False async def run_test(test: SingleTestRunner, name: str, index: int) -> None: - if self.options.repeat > 1 and self.fail_count: + if interrupted or (self.options.repeat > 1 and self.fail_count): return res = await asyncio.get_event_loop().run_in_executor(executor, test.run) self.process_test_result(res) @@ -1175,6 +1183,17 @@ class TestHarness: f.result() futures.remove(f) + def cancel_all_futures() -> None: + nonlocal interrupted + if interrupted: + return + interrupted = True + mlog.warning('CTRL-C detected, interrupting') + for f in futures: + f.cancel() + + if sys.platform != 'win32': + asyncio.get_event_loop().add_signal_handler(signal.SIGINT, cancel_all_futures) try: for _ in range(self.options.repeat): for i, test in enumerate(tests, 1): @@ -1183,11 +1202,11 @@ class TestHarness: if not test.is_parallel or single_test.options.gdb: await complete_all(futures) - await run_test(single_test, visible_name, i) - else: - future = asyncio.ensure_future(run_test(single_test, visible_name, i)) - futures.append(future) - future.add_done_callback(test_done) + future = asyncio.ensure_future(run_test(single_test, visible_name, i)) + futures.append(future) + future.add_done_callback(test_done) + if not test.is_parallel or single_test.options.gdb: + await complete(future) if self.options.repeat > 1 and self.fail_count: break @@ -1198,6 +1217,8 @@ class TestHarness: if self.logfilename: print('Full log written to {}'.format(self.logfilename)) finally: + if sys.platform != 'win32': + asyncio.get_event_loop().remove_signal_handler(signal.SIGINT) os.chdir(startdir) def list_tests(th: TestHarness) -> bool: -- cgit v1.1 From 57b918f2813c2f5ea007e0e6333f192b5361daec Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Oct 2020 13:01:53 +0200 Subject: mtest: refactor _run_cmd A large part of _run_cmd is devoted to setting up and killing the test subprocess. Move that to a separate function to make the test runner logic easier to understand. --- mesonbuild/mtest.py | 186 ++++++++++++++++++++++++++++------------------------ 1 file changed, 101 insertions(+), 85 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 59ddec1..8396a42 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -665,6 +665,97 @@ class SingleTestRunner: self.test.timeout = None return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) + def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int], + stdout: T.IO, stderr: T.IO, + env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[T.Optional[int], bool, T.Optional[str]]: + def kill_process(p: subprocess.Popen) -> T.Optional[str]: + # Python does not provide multiplatform support for + # killing a process and all its children so we need + # to roll our own. + if is_windows(): + subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) + else: + + def _send_signal_to_process_group(pgid: int, signum: int) -> None: + """ sends a signal to a process group """ + try: + os.killpg(pgid, signum) + except ProcessLookupError: + # Sometimes (e.g. with Wine) this happens. + # There's nothing we can do (maybe the process + # already died) so carry on. + pass + + # Send a termination signal to the process group that setsid() + # created - giving it a chance to perform any cleanup. + _send_signal_to_process_group(p.pid, signal.SIGTERM) + + # Make sure the termination signal actually kills the process + # group, otherwise retry with a SIGKILL. + try: + p.communicate(timeout=0.5) + except subprocess.TimeoutExpired: + _send_signal_to_process_group(p.pid, signal.SIGKILL) + try: + p.communicate(timeout=1) + except subprocess.TimeoutExpired: + # An earlier kill attempt has not worked for whatever reason. + # Try to kill it one last time with a direct call. + # If the process has spawned children, they will remain around. + p.kill() + try: + p.communicate(timeout=1) + except subprocess.TimeoutExpired: + additional_error = 'Test process could not be killed.' + except ValueError: + additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?' + return additional_error + + # Let gdb handle ^C instead of us + if self.options.gdb: + previous_sigint_handler = signal.getsignal(signal.SIGINT) + # Make the meson executable ignore SIGINT while gdb is running. + signal.signal(signal.SIGINT, signal.SIG_IGN) + + def preexec_fn() -> None: + if self.options.gdb: + # Restore the SIGINT handler for the child process to + # ensure it can handle it. + signal.signal(signal.SIGINT, signal.SIG_DFL) + else: + # We don't want setsid() in gdb because gdb needs the + # terminal in order to handle ^C and not show tcsetpgrp() + # errors avoid not being able to use the terminal. + os.setsid() + + p = subprocess.Popen(args, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd, + preexec_fn=preexec_fn if not is_windows() else None) + timed_out = False + kill_test = False + try: + p.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + if self.options.verbose: + print('{} time out (After {} seconds)'.format(self.test.name, timeout)) + timed_out = True + except KeyboardInterrupt: + mlog.warning('CTRL-C detected while running {}'.format(self.test.name)) + kill_test = True + finally: + if self.options.gdb: + # Let us accept ^C again + signal.signal(signal.SIGINT, previous_sigint_handler) + + if kill_test or timed_out: + additional_error = kill_process(p) + return p.returncode, timed_out, additional_error + else: + return p.returncode, False, None + def _run_cmd(self, cmd: T.List[str]) -> TestRun: starttime = time.time() @@ -696,23 +787,6 @@ class SingleTestRunner: if self.test.protocol is TestProtocol.TAP and stderr is stdout: stdout = tempfile.TemporaryFile("wb+") - # Let gdb handle ^C instead of us - if self.options.gdb: - previous_sigint_handler = signal.getsignal(signal.SIGINT) - # Make the meson executable ignore SIGINT while gdb is running. - signal.signal(signal.SIGINT, signal.SIG_IGN) - - def preexec_fn() -> None: - if self.options.gdb: - # Restore the SIGINT handler for the child process to - # ensure it can handle it. - signal.signal(signal.SIGINT, signal.SIG_DFL) - else: - # We don't want setsid() in gdb because gdb needs the - # terminal in order to handle ^C and not show tcsetpgrp() - # errors avoid not being able to use the terminal. - os.setsid() - extra_cmd = [] # type: T.List[str] if self.test.protocol is TestProtocol.GTEST: gtestname = self.test.name @@ -720,77 +794,19 @@ class SingleTestRunner: gtestname = os.path.join(self.test.workdir, self.test.name) extra_cmd.append('--gtest_output=xml:{}.xml'.format(gtestname)) - p = subprocess.Popen(cmd + extra_cmd, - stdout=stdout, - stderr=stderr, - env=self.env, - cwd=self.test.workdir, - preexec_fn=preexec_fn if not is_windows() else None) - timed_out = False - kill_test = False if self.test.timeout is None: timeout = None elif self.options.timeout_multiplier is not None: timeout = self.test.timeout * self.options.timeout_multiplier else: timeout = self.test.timeout - try: - p.communicate(timeout=timeout) - except subprocess.TimeoutExpired: - if self.options.verbose: - print('{} time out (After {} seconds)'.format(self.test.name, timeout)) - timed_out = True - except KeyboardInterrupt: - mlog.warning('CTRL-C detected while running {}'.format(self.test.name)) - kill_test = True - finally: - if self.options.gdb: - # Let us accept ^C again - signal.signal(signal.SIGINT, previous_sigint_handler) - - additional_error = None - - if kill_test or timed_out: - # Python does not provide multiplatform support for - # killing a process and all its children so we need - # to roll our own. - if is_windows(): - subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) - else: - - def _send_signal_to_process_group(pgid: int, signum: int) -> None: - """ sends a signal to a process group """ - try: - os.killpg(pgid, signum) - except ProcessLookupError: - # Sometimes (e.g. with Wine) this happens. - # There's nothing we can do (maybe the process - # already died) so carry on. - pass - - # Send a termination signal to the process group that setsid() - # created - giving it a chance to perform any cleanup. - _send_signal_to_process_group(p.pid, signal.SIGTERM) - # Make sure the termination signal actually kills the process - # group, otherwise retry with a SIGKILL. - try: - p.communicate(timeout=0.5) - except subprocess.TimeoutExpired: - _send_signal_to_process_group(p.pid, signal.SIGKILL) - try: - p.communicate(timeout=1) - except subprocess.TimeoutExpired: - # An earlier kill attempt has not worked for whatever reason. - # Try to kill it one last time with a direct call. - # If the process has spawned children, they will remain around. - p.kill() - try: - p.communicate(timeout=1) - except subprocess.TimeoutExpired: - additional_error = 'Test process could not be killed.' - except ValueError: - additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?' + returncode, timed_out, additional_error = self._run_subprocess(cmd + extra_cmd, + timeout=timeout, + stdout=stdout, + stderr=stderr, + env=self.env, + cwd=self.test.workdir) endtime = time.time() duration = endtime - starttime if additional_error is None: @@ -808,16 +824,16 @@ class SingleTestRunner: stdo = "" stde = additional_error if timed_out: - return TestRun(self.test, self.test_env, TestResult.TIMEOUT, [], p.returncode, starttime, duration, stdo, stde, cmd) + return TestRun(self.test, self.test_env, TestResult.TIMEOUT, [], returncode, starttime, duration, stdo, stde, cmd) else: if self.test.protocol is TestProtocol.EXITCODE: - return TestRun.make_exitcode(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd) + return TestRun.make_exitcode(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd) elif self.test.protocol is TestProtocol.GTEST: - return TestRun.make_gtest(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd) + return TestRun.make_gtest(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd) else: if self.options.verbose: print(stdo, end='') - return TestRun.make_tap(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd) + return TestRun.make_tap(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd) class TestHarness: -- cgit v1.1 From bd526ec2dc60c5923d4b696a2edfae41ee945cce Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Oct 2020 14:01:30 +0200 Subject: mtest: always ignore ProcessLookupError ProcessLookupError can also happen from p.kill(). There is also nothing we can do in that case, so move the "try" for that exception to the entire kill_process function. The ValueError case seems like dead code, so get rid of it. --- mesonbuild/mtest.py | 59 +++++++++++++++++++++++------------------------------ 1 file changed, 26 insertions(+), 33 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 8396a42..e164388 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -672,44 +672,37 @@ class SingleTestRunner: # Python does not provide multiplatform support for # killing a process and all its children so we need # to roll our own. - if is_windows(): - subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) - else: + try: + if is_windows(): + subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) + else: + # Send a termination signal to the process group that setsid() + # created - giving it a chance to perform any cleanup. + os.killpg(p.pid, signal.SIGTERM) - def _send_signal_to_process_group(pgid: int, signum: int) -> None: - """ sends a signal to a process group """ + # Make sure the termination signal actually kills the process + # group, otherwise retry with a SIGKILL. try: - os.killpg(pgid, signum) - except ProcessLookupError: - # Sometimes (e.g. with Wine) this happens. - # There's nothing we can do (maybe the process - # already died) so carry on. - pass - - # Send a termination signal to the process group that setsid() - # created - giving it a chance to perform any cleanup. - _send_signal_to_process_group(p.pid, signal.SIGTERM) - - # Make sure the termination signal actually kills the process - # group, otherwise retry with a SIGKILL. - try: - p.communicate(timeout=0.5) - except subprocess.TimeoutExpired: - _send_signal_to_process_group(p.pid, signal.SIGKILL) - try: - p.communicate(timeout=1) - except subprocess.TimeoutExpired: - # An earlier kill attempt has not worked for whatever reason. - # Try to kill it one last time with a direct call. - # If the process has spawned children, they will remain around. - p.kill() + p.communicate(timeout=0.5) + except subprocess.TimeoutExpired: + os.killpg(p.pid, signal.SIGKILL) try: p.communicate(timeout=1) except subprocess.TimeoutExpired: - additional_error = 'Test process could not be killed.' - except ValueError: - additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?' - return additional_error + # An earlier kill attempt has not worked for whatever reason. + # Try to kill it one last time with a direct call. + # If the process has spawned children, they will remain around. + p.kill() + try: + p.communicate(timeout=1) + except subprocess.TimeoutExpired: + return 'Test process could not be killed.' + except ProcessLookupError: + # Sometimes (e.g. with Wine) this happens. + # There's nothing we can do (probably the process + # already died) so carry on. + pass + return None # Let gdb handle ^C instead of us if self.options.gdb: -- cgit v1.1 From 98d3863fa4d63f39aee510a2713c0586b65d40e8 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Oct 2020 13:51:56 +0200 Subject: mtest: add INTERRUPT to TestResult Distinguish a failure due to user interrupt from a presumable ERROR result due to the SIGTERM. The test should fail after CTRL+C even if the test traps SIGTERM and exits with a return code of 0. --- mesonbuild/mtest.py | 45 ++++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 21 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index e164388..92be83c 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -168,6 +168,7 @@ class TestResult(enum.Enum): OK = 'OK' TIMEOUT = 'TIMEOUT' + INTERRUPT = 'INTERRUPT' SKIP = 'SKIP' FAIL = 'FAIL' EXPECTEDFAIL = 'EXPECTEDFAIL' @@ -376,7 +377,8 @@ class JunitBuilder: 'testsuite', name=suitename, tests=str(len(test.results)), - errors=str(sum(1 for r in test.results if r is TestResult.ERROR)), + errors=str(sum(1 for r in test.results if r in + {TestResult.INTERRUPT, TestResult.ERROR})), failures=str(sum(1 for r in test.results if r in {TestResult.FAIL, TestResult.UNEXPECTEDPASS, TestResult.TIMEOUT})), skipped=str(sum(1 for r in test.results if r is TestResult.SKIP)), @@ -395,6 +397,9 @@ class JunitBuilder: elif result is TestResult.UNEXPECTEDPASS: fail = et.SubElement(testcase, 'failure') fail.text = 'Test unexpected passed.' + elif result is TestResult.INTERRUPT: + fail = et.SubElement(testcase, 'failure') + fail.text = 'Test was interrupted by user.' elif result is TestResult.TIMEOUT: fail = et.SubElement(testcase, 'failure') fail.text = 'Test did not finish before configured timeout.' @@ -667,7 +672,7 @@ class SingleTestRunner: def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int], stdout: T.IO, stderr: T.IO, - env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[T.Optional[int], bool, T.Optional[str]]: + env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[T.Optional[int], TestResult, T.Optional[str]]: def kill_process(p: subprocess.Popen) -> T.Optional[str]: # Python does not provide multiplatform support for # killing a process and all its children so we need @@ -727,27 +732,25 @@ class SingleTestRunner: env=env, cwd=cwd, preexec_fn=preexec_fn if not is_windows() else None) - timed_out = False - kill_test = False + result = None + additional_error = None try: p.communicate(timeout=timeout) except subprocess.TimeoutExpired: if self.options.verbose: print('{} time out (After {} seconds)'.format(self.test.name, timeout)) - timed_out = True + additional_error = kill_process(p) + result = TestResult.TIMEOUT except KeyboardInterrupt: mlog.warning('CTRL-C detected while running {}'.format(self.test.name)) - kill_test = True + additional_error = kill_process(p) + result = TestResult.INTERRUPT finally: if self.options.gdb: # Let us accept ^C again signal.signal(signal.SIGINT, previous_sigint_handler) - if kill_test or timed_out: - additional_error = kill_process(p) - return p.returncode, timed_out, additional_error - else: - return p.returncode, False, None + return p.returncode, result, additional_error def _run_cmd(self, cmd: T.List[str]) -> TestRun: starttime = time.time() @@ -794,12 +797,12 @@ class SingleTestRunner: else: timeout = self.test.timeout - returncode, timed_out, additional_error = self._run_subprocess(cmd + extra_cmd, - timeout=timeout, - stdout=stdout, - stderr=stderr, - env=self.env, - cwd=self.test.workdir) + returncode, result, additional_error = self._run_subprocess(cmd + extra_cmd, + timeout=timeout, + stdout=stdout, + stderr=stderr, + env=self.env, + cwd=self.test.workdir) endtime = time.time() duration = endtime - starttime if additional_error is None: @@ -816,8 +819,8 @@ class SingleTestRunner: else: stdo = "" stde = additional_error - if timed_out: - return TestRun(self.test, self.test_env, TestResult.TIMEOUT, [], returncode, starttime, duration, stdo, stde, cmd) + if result: + return TestRun(self.test, self.test_env, result, [], returncode, starttime, duration, stdo, stde, cmd) else: if self.test.protocol is TestProtocol.EXITCODE: return TestRun.make_exitcode(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd) @@ -919,7 +922,7 @@ class TestHarness: self.skip_count += 1 elif result.res is TestResult.OK: self.success_count += 1 - elif result.res is TestResult.FAIL or result.res is TestResult.ERROR: + elif result.res in {TestResult.FAIL, TestResult.ERROR, TestResult.INTERRUPT}: self.fail_count += 1 elif result.res is TestResult.EXPECTEDFAIL: self.expectedfail_count += 1 @@ -932,7 +935,7 @@ class TestHarness: tests: T.List[TestSerialisation], name: str, result: TestRun, i: int) -> None: ok_statuses = (TestResult.OK, TestResult.EXPECTEDFAIL) - bad_statuses = (TestResult.FAIL, TestResult.TIMEOUT, + bad_statuses = (TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT, TestResult.UNEXPECTEDPASS, TestResult.ERROR) result_str = '{num:{numlen}}/{testcount} {name:{name_max_len}} {res:{reslen}} {dur:.2f}s'.format( numlen=len(str(test_count)), -- cgit v1.1 From acf5d78f342ae6ce535a3dcfb196b2143603d097 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Oct 2020 16:06:13 +0200 Subject: mtest: remove usage of executors Rewrite the SingleTestRunner to use asyncio to manage subprocesses, while still using subprocess.Popen to run them. Concurrency is managed with an asyncio Semaphore; for simplicity (since this is a temporary state) we create a new thread for each test that is run instead of having a pool. This already provides the main advantage of asyncio, which is better control on cancellation; with the current code, KeyboardInterrupt was never handled by the thread executor so the code that tried to handle it in SingleTestRunner only worked for non-parallel tests. And because executor futures cannot be cancelled, there was no way for the user to kill a test that got stuck. Instead, without executors ^C exits "meson test" immediately. The next patch will improve things even further, allowing a single test to be interrupted with ^C. --- mesonbuild/mtest.py | 126 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 76 insertions(+), 50 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 92be83c..844fe4d 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -19,7 +19,6 @@ from collections import namedtuple from copy import deepcopy import argparse import asyncio -import concurrent.futures as conc import datetime import enum import io @@ -35,6 +34,7 @@ import subprocess import sys import tempfile import textwrap +import threading import time import typing as T import xml.etree.ElementTree as et @@ -613,6 +613,13 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]: # Custom waiting primitives for asyncio +async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, float]]) -> None: + try: + await asyncio.wait(awaitables, + timeout=timeout, return_when=asyncio.FIRST_COMPLETED) + except asyncio.TimeoutError: + pass + async def complete(future: asyncio.Future) -> None: """Wait for completion of the given future, ignoring cancellation.""" try: @@ -659,7 +666,7 @@ class SingleTestRunner: return self.test.exe_runner.get_command() + self.test.fname return self.test.fname - def run(self) -> TestRun: + async def run(self) -> TestRun: cmd = self._get_cmd() if cmd is None: skip_stdout = 'Not run because can not execute cross compiled binaries.' @@ -668,12 +675,12 @@ class SingleTestRunner: wrap = TestHarness.get_wrapper(self.options) if self.options.gdb: self.test.timeout = None - return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) + return await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) - def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int], - stdout: T.IO, stderr: T.IO, - env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[T.Optional[int], TestResult, T.Optional[str]]: - def kill_process(p: subprocess.Popen) -> T.Optional[str]: + async def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int], + stdout: T.IO, stderr: T.IO, + env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[int, TestResult, T.Optional[str]]: + async def kill_process(p: subprocess.Popen, f: asyncio.Future) -> T.Optional[str]: # Python does not provide multiplatform support for # killing a process and all its children so we need # to roll our own. @@ -687,27 +694,39 @@ class SingleTestRunner: # Make sure the termination signal actually kills the process # group, otherwise retry with a SIGKILL. - try: - p.communicate(timeout=0.5) - except subprocess.TimeoutExpired: - os.killpg(p.pid, signal.SIGKILL) - try: - p.communicate(timeout=1) - except subprocess.TimeoutExpired: - # An earlier kill attempt has not worked for whatever reason. - # Try to kill it one last time with a direct call. - # If the process has spawned children, they will remain around. - p.kill() - try: - p.communicate(timeout=1) - except subprocess.TimeoutExpired: - return 'Test process could not be killed.' + await try_wait_one(f, timeout=0.5) + if f.done(): + return None + + os.killpg(p.pid, signal.SIGKILL) + + await try_wait_one(f, timeout=1) + if f.done(): + return None + + # An earlier kill attempt has not worked for whatever reason. + # Try to kill it one last time with a direct call. + # If the process has spawned children, they will remain around. + p.kill() + await try_wait_one(f, timeout=1) + if f.done(): + return None + return 'Test process could not be killed.' except ProcessLookupError: - # Sometimes (e.g. with Wine) this happens. - # There's nothing we can do (probably the process - # already died) so carry on. - pass - return None + # Sometimes (e.g. with Wine) this happens. There's nothing + # we can do, probably the process already died so just wait + # for the event loop to pick that up. + await f + return None + + def wait(p: subprocess.Popen, loop: asyncio.AbstractEventLoop, f: asyncio.Future) -> None: + try: + p.wait() + loop.call_soon_threadsafe(f.set_result, p.returncode) + except BaseException as e: # lgtm [py/catch-base-exception] + # The exception will be raised again in the main thread, + # so catching BaseException is okay. + loop.call_soon_threadsafe(f.set_exception, e) # Let gdb handle ^C instead of us if self.options.gdb: @@ -734,25 +753,31 @@ class SingleTestRunner: preexec_fn=preexec_fn if not is_windows() else None) result = None additional_error = None + loop = asyncio.get_event_loop() + future = asyncio.get_event_loop().create_future() + threading.Thread(target=wait, args=(p, loop, future), daemon=True).start() try: - p.communicate(timeout=timeout) - except subprocess.TimeoutExpired: - if self.options.verbose: - print('{} time out (After {} seconds)'.format(self.test.name, timeout)) - additional_error = kill_process(p) - result = TestResult.TIMEOUT - except KeyboardInterrupt: - mlog.warning('CTRL-C detected while running {}'.format(self.test.name)) - additional_error = kill_process(p) + await try_wait_one(future, timeout=timeout) + if not future.done(): + if self.options.verbose: + print('{} time out (After {} seconds)'.format(self.test.name, timeout)) + additional_error = await kill_process(p, future) + result = TestResult.TIMEOUT + except asyncio.CancelledError: + # The main loop must have seen Ctrl-C. + additional_error = await kill_process(p, future) result = TestResult.INTERRUPT finally: if self.options.gdb: # Let us accept ^C again signal.signal(signal.SIGINT, previous_sigint_handler) - return p.returncode, result, additional_error + if future.done(): + return future.result(), result, None + else: + return 0, result, additional_error - def _run_cmd(self, cmd: T.List[str]) -> TestRun: + async def _run_cmd(self, cmd: T.List[str]) -> TestRun: starttime = time.time() if self.test.extra_paths: @@ -797,12 +822,12 @@ class SingleTestRunner: else: timeout = self.test.timeout - returncode, result, additional_error = self._run_subprocess(cmd + extra_cmd, - timeout=timeout, - stdout=stdout, - stderr=stderr, - env=self.env, - cwd=self.test.workdir) + returncode, result, additional_error = await self._run_subprocess(cmd + extra_cmd, + timeout=timeout, + stdout=stdout, + stderr=stderr, + env=self.env, + cwd=self.test.workdir) endtime = time.time() duration = endtime - starttime if additional_error is None: @@ -1171,7 +1196,7 @@ class TestHarness: loop.run_until_complete(self._run_tests(tests)) async def _run_tests(self, tests: T.List[TestSerialisation]) -> None: - executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes) + semaphore = asyncio.Semaphore(self.options.num_processes) futures = [] # type: T.List[asyncio.Future] test_count = len(tests) name_max_len = max([len(self.get_pretty_suite(test)) for test in tests]) @@ -1184,11 +1209,12 @@ class TestHarness: async def run_test(test: SingleTestRunner, name: str, index: int) -> None: - if interrupted or (self.options.repeat > 1 and self.fail_count): - return - res = await asyncio.get_event_loop().run_in_executor(executor, test.run) - self.process_test_result(res) - self.print_stats(test_count, name_max_len, tests, name, res, index) + async with semaphore: + if interrupted or (self.options.repeat > 1 and self.fail_count): + return + res = await test.run() + self.process_test_result(res) + self.print_stats(test_count, name_max_len, tests, name, res, index) def test_done(f: asyncio.Future) -> None: if not f.cancelled(): -- cgit v1.1 From cdbb0255a76c2e622f561d72c04bfcb1fc67513a Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Oct 2020 14:37:02 +0200 Subject: mtest: improve handling of SIGINT and SIGTERM Handle SIGINT and SIGTERM by respectively cancelling the longest running and all the running tests. --- mesonbuild/mtest.py | 44 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 8 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 844fe4d..91c872f 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -15,7 +15,7 @@ # A tool to run tests in many different ways. from ._pathlib import Path -from collections import namedtuple +from collections import deque, namedtuple from copy import deepcopy import argparse import asyncio @@ -1197,7 +1197,8 @@ class TestHarness: async def _run_tests(self, tests: T.List[TestSerialisation]) -> None: semaphore = asyncio.Semaphore(self.options.num_processes) - futures = [] # type: T.List[asyncio.Future] + futures = deque() # type: T.Deque[asyncio.Future] + running_tests = dict() # type: T.Dict[asyncio.Future, str] test_count = len(tests) name_max_len = max([len(self.get_pretty_suite(test)) for test in tests]) self.open_log_files() @@ -1220,18 +1221,43 @@ class TestHarness: if not f.cancelled(): f.result() futures.remove(f) - - def cancel_all_futures() -> None: + try: + del running_tests[f] + except KeyError: + pass + + def cancel_one_test(warn: bool) -> None: + future = futures.popleft() + futures.append(future) + if warn: + mlog.warning('CTRL-C detected, interrupting {}'.format(running_tests[future])) + del running_tests[future] + future.cancel() + + def sigterm_handler() -> None: nonlocal interrupted if interrupted: return interrupted = True - mlog.warning('CTRL-C detected, interrupting') - for f in futures: - f.cancel() + mlog.warning('Received SIGTERM, exiting') + while running_tests: + cancel_one_test(False) + + def sigint_handler() -> None: + # We always pick the longest-running future that has not been cancelled + # If all the tests have been CTRL-C'ed, just stop + nonlocal interrupted + if interrupted: + return + if running_tests: + cancel_one_test(True) + else: + mlog.warning('CTRL-C detected, exiting') + interrupted = True if sys.platform != 'win32': - asyncio.get_event_loop().add_signal_handler(signal.SIGINT, cancel_all_futures) + asyncio.get_event_loop().add_signal_handler(signal.SIGINT, sigint_handler) + asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, sigterm_handler) try: for _ in range(self.options.repeat): for i, test in enumerate(tests, 1): @@ -1242,6 +1268,7 @@ class TestHarness: await complete_all(futures) future = asyncio.ensure_future(run_test(single_test, visible_name, i)) futures.append(future) + running_tests[future] = visible_name future.add_done_callback(test_done) if not test.is_parallel or single_test.options.gdb: await complete(future) @@ -1257,6 +1284,7 @@ class TestHarness: finally: if sys.platform != 'win32': asyncio.get_event_loop().remove_signal_handler(signal.SIGINT) + asyncio.get_event_loop().remove_signal_handler(signal.SIGTERM) os.chdir(startdir) def list_tests(th: TestHarness) -> bool: -- cgit v1.1 From 09253c1c70a2c8946be5be0a0f86fd21461ef598 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 15 Oct 2020 12:32:46 +0200 Subject: mtest: use ProactorEventLoop This is needed to use asyncio with pipes and processes. --- mesonbuild/mtest.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 91c872f..6e4ac6e 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -1329,6 +1329,10 @@ def run(options: argparse.Namespace) -> int: if options.wrapper: check_bin = options.wrapper[0] + if sys.platform == 'win32': + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + if check_bin is not None: exe = ExternalProgram(check_bin, silent=True) if not exe.found(): -- cgit v1.1 From a2c134c30bc02defa842e771907db16c6a647e1e Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Oct 2020 16:06:13 +0200 Subject: mtest: switch to asyncio subprocesses No functional change except that the extra thread goes away. --- mesonbuild/mtest.py | 54 +++++++++++++++++++---------------------------------- 1 file changed, 19 insertions(+), 35 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 6e4ac6e..92d02b3 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -34,7 +34,6 @@ import subprocess import sys import tempfile import textwrap -import threading import time import typing as T import xml.etree.ElementTree as et @@ -680,7 +679,7 @@ class SingleTestRunner: async def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int], stdout: T.IO, stderr: T.IO, env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[int, TestResult, T.Optional[str]]: - async def kill_process(p: subprocess.Popen, f: asyncio.Future) -> T.Optional[str]: + async def kill_process(p: asyncio.subprocess.Process) -> T.Optional[str]: # Python does not provide multiplatform support for # killing a process and all its children so we need # to roll our own. @@ -694,40 +693,31 @@ class SingleTestRunner: # Make sure the termination signal actually kills the process # group, otherwise retry with a SIGKILL. - await try_wait_one(f, timeout=0.5) - if f.done(): + await try_wait_one(p.wait(), timeout=0.5) + if p.returncode is not None: return None os.killpg(p.pid, signal.SIGKILL) - await try_wait_one(f, timeout=1) - if f.done(): + await try_wait_one(p.wait(), timeout=1) + if p.returncode is not None: return None # An earlier kill attempt has not worked for whatever reason. # Try to kill it one last time with a direct call. # If the process has spawned children, they will remain around. p.kill() - await try_wait_one(f, timeout=1) - if f.done(): + await try_wait_one(p.wait(), timeout=1) + if p.returncode is not None: return None return 'Test process could not be killed.' except ProcessLookupError: # Sometimes (e.g. with Wine) this happens. There's nothing # we can do, probably the process already died so just wait # for the event loop to pick that up. - await f + await p.wait() return None - def wait(p: subprocess.Popen, loop: asyncio.AbstractEventLoop, f: asyncio.Future) -> None: - try: - p.wait() - loop.call_soon_threadsafe(f.set_result, p.returncode) - except BaseException as e: # lgtm [py/catch-base-exception] - # The exception will be raised again in the main thread, - # so catching BaseException is okay. - loop.call_soon_threadsafe(f.set_exception, e) - # Let gdb handle ^C instead of us if self.options.gdb: previous_sigint_handler = signal.getsignal(signal.SIGINT) @@ -745,37 +735,31 @@ class SingleTestRunner: # errors avoid not being able to use the terminal. os.setsid() - p = subprocess.Popen(args, - stdout=stdout, - stderr=stderr, - env=env, - cwd=cwd, - preexec_fn=preexec_fn if not is_windows() else None) + p = await asyncio.create_subprocess_exec(*args, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd, + preexec_fn=preexec_fn if not is_windows() else None) result = None additional_error = None - loop = asyncio.get_event_loop() - future = asyncio.get_event_loop().create_future() - threading.Thread(target=wait, args=(p, loop, future), daemon=True).start() try: - await try_wait_one(future, timeout=timeout) - if not future.done(): + await try_wait_one(p.wait(), timeout=timeout) + if p.returncode is None: if self.options.verbose: print('{} time out (After {} seconds)'.format(self.test.name, timeout)) - additional_error = await kill_process(p, future) + additional_error = await kill_process(p) result = TestResult.TIMEOUT except asyncio.CancelledError: # The main loop must have seen Ctrl-C. - additional_error = await kill_process(p, future) + additional_error = await kill_process(p) result = TestResult.INTERRUPT finally: if self.options.gdb: # Let us accept ^C again signal.signal(signal.SIGINT, previous_sigint_handler) - if future.done(): - return future.result(), result, None - else: - return 0, result, additional_error + return p.returncode or 0, result, additional_error async def _run_cmd(self, cmd: T.List[str]) -> TestRun: starttime = time.time() -- cgit v1.1