diff options
author | Jussi Pakkanen <jpakkane@gmail.com> | 2020-11-18 23:09:47 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-18 23:09:47 +0200 |
commit | ef6f85f8bae8750a5ded5af5b7de745cfbbbdeed (patch) | |
tree | 7b15e44bb442c39ce140132ea45de3b97a384897 | |
parent | 188695f2026a8a5d23d3e586a0f450f57c82a71b (diff) | |
parent | a2c134c30bc02defa842e771907db16c6a647e1e (diff) | |
download | meson-ef6f85f8bae8750a5ded5af5b7de745cfbbbdeed.zip meson-ef6f85f8bae8750a5ded5af5b7de745cfbbbdeed.tar.gz meson-ef6f85f8bae8750a5ded5af5b7de745cfbbbdeed.tar.bz2 |
Merge pull request #7836 from bonzini/mtest-asyncio
[RFC] mtest: use asyncio instead of concurrency.futures
-rw-r--r-- | mesonbuild/mtest.py | 349 |
1 files changed, 212 insertions, 137 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index e538992..5804303 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -15,10 +15,10 @@ # A tool to run tests in many different ways. from ._pathlib import Path -from collections import namedtuple +from collections import deque, namedtuple from copy import deepcopy import argparse -import concurrent.futures as conc +import asyncio import datetime import enum import io @@ -167,6 +167,7 @@ class TestResult(enum.Enum): OK = 'OK' TIMEOUT = 'TIMEOUT' + INTERRUPT = 'INTERRUPT' SKIP = 'SKIP' FAIL = 'FAIL' EXPECTEDFAIL = 'EXPECTEDFAIL' @@ -326,7 +327,6 @@ class TAPParser: yield self.Error('Too many tests run (expected {}, got {})'.format(plan.count, num_tests)) - class JunitBuilder: """Builder for Junit test results. @@ -376,7 +376,8 @@ class JunitBuilder: 'testsuite', name=suitename, tests=str(len(test.results)), - errors=str(sum(1 for r in test.results if r is TestResult.ERROR)), + errors=str(sum(1 for r in test.results if r in + {TestResult.INTERRUPT, TestResult.ERROR})), failures=str(sum(1 for r in test.results if r in {TestResult.FAIL, TestResult.UNEXPECTEDPASS, TestResult.TIMEOUT})), skipped=str(sum(1 for r in test.results if r is TestResult.SKIP)), @@ -395,6 +396,9 @@ class JunitBuilder: elif result is TestResult.UNEXPECTEDPASS: fail = et.SubElement(testcase, 'failure') fail.text = 'Test unexpected passed.' + elif result is TestResult.INTERRUPT: + fail = et.SubElement(testcase, 'failure') + fail.text = 'Test was interrupted by user.' elif result is TestResult.TIMEOUT: fail = et.SubElement(testcase, 'failure') fail.text = 'Test did not finish before configured timeout.' @@ -606,6 +610,31 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]: objs = check_testdata(pickle.load(f)) return objs +# Custom waiting primitives for asyncio + +async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, float]]) -> None: + try: + await asyncio.wait(awaitables, + timeout=timeout, return_when=asyncio.FIRST_COMPLETED) + except asyncio.TimeoutError: + pass + +async def complete(future: asyncio.Future) -> None: + """Wait for completion of the given future, ignoring cancellation.""" + try: + await future + except asyncio.CancelledError: + pass + +async def complete_all(futures: T.Iterable[asyncio.Future]) -> None: + """Wait for completion of all the given futures, ignoring cancellation.""" + while futures: + done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION) + # Raise exceptions if needed for all the "done" futures + for f in done: + if not f.cancelled(): + f.result() + class SingleTestRunner: @@ -636,7 +665,7 @@ class SingleTestRunner: return self.test.exe_runner.get_command() + self.test.fname return self.test.fname - def run(self) -> TestRun: + async def run(self) -> TestRun: cmd = self._get_cmd() if cmd is None: skip_stdout = 'Not run because can not execute cross compiled binaries.' @@ -645,9 +674,94 @@ class SingleTestRunner: wrap = TestHarness.get_wrapper(self.options) if self.options.gdb: self.test.timeout = None - return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) + return await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) + + async def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int], + stdout: T.IO, stderr: T.IO, + env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[int, TestResult, T.Optional[str]]: + async def kill_process(p: asyncio.subprocess.Process) -> T.Optional[str]: + # Python does not provide multiplatform support for + # killing a process and all its children so we need + # to roll our own. + try: + if is_windows(): + subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) + else: + # Send a termination signal to the process group that setsid() + # created - giving it a chance to perform any cleanup. + os.killpg(p.pid, signal.SIGTERM) + + # Make sure the termination signal actually kills the process + # group, otherwise retry with a SIGKILL. + await try_wait_one(p.wait(), timeout=0.5) + if p.returncode is not None: + return None + + os.killpg(p.pid, signal.SIGKILL) + + await try_wait_one(p.wait(), timeout=1) + if p.returncode is not None: + return None + + # An earlier kill attempt has not worked for whatever reason. + # Try to kill it one last time with a direct call. + # If the process has spawned children, they will remain around. + p.kill() + await try_wait_one(p.wait(), timeout=1) + if p.returncode is not None: + return None + return 'Test process could not be killed.' + except ProcessLookupError: + # Sometimes (e.g. with Wine) this happens. There's nothing + # we can do, probably the process already died so just wait + # for the event loop to pick that up. + await p.wait() + return None + + # Let gdb handle ^C instead of us + if self.options.gdb: + previous_sigint_handler = signal.getsignal(signal.SIGINT) + # Make the meson executable ignore SIGINT while gdb is running. + signal.signal(signal.SIGINT, signal.SIG_IGN) - def _run_cmd(self, cmd: T.List[str]) -> TestRun: + def preexec_fn() -> None: + if self.options.gdb: + # Restore the SIGINT handler for the child process to + # ensure it can handle it. + signal.signal(signal.SIGINT, signal.SIG_DFL) + else: + # We don't want setsid() in gdb because gdb needs the + # terminal in order to handle ^C and not show tcsetpgrp() + # errors avoid not being able to use the terminal. + os.setsid() + + p = await asyncio.create_subprocess_exec(*args, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd, + preexec_fn=preexec_fn if not is_windows() else None) + result = None + additional_error = None + try: + await try_wait_one(p.wait(), timeout=timeout) + if p.returncode is None: + if self.options.verbose: + print('{} time out (After {} seconds)'.format(self.test.name, timeout)) + additional_error = await kill_process(p) + result = TestResult.TIMEOUT + except asyncio.CancelledError: + # The main loop must have seen Ctrl-C. + additional_error = await kill_process(p) + result = TestResult.INTERRUPT + finally: + if self.options.gdb: + # Let us accept ^C again + signal.signal(signal.SIGINT, previous_sigint_handler) + + return p.returncode or 0, result, additional_error + + async def _run_cmd(self, cmd: T.List[str]) -> TestRun: starttime = time.time() if self.test.extra_paths: @@ -678,23 +792,6 @@ class SingleTestRunner: if self.test.protocol is TestProtocol.TAP and stderr is stdout: stdout = tempfile.TemporaryFile("wb+") - # Let gdb handle ^C instead of us - if self.options.gdb: - previous_sigint_handler = signal.getsignal(signal.SIGINT) - # Make the meson executable ignore SIGINT while gdb is running. - signal.signal(signal.SIGINT, signal.SIG_IGN) - - def preexec_fn() -> None: - if self.options.gdb: - # Restore the SIGINT handler for the child process to - # ensure it can handle it. - signal.signal(signal.SIGINT, signal.SIG_DFL) - else: - # We don't want setsid() in gdb because gdb needs the - # terminal in order to handle ^C and not show tcsetpgrp() - # errors avoid not being able to use the terminal. - os.setsid() - extra_cmd = [] # type: T.List[str] if self.test.protocol is TestProtocol.GTEST: gtestname = self.test.name @@ -702,77 +799,19 @@ class SingleTestRunner: gtestname = os.path.join(self.test.workdir, self.test.name) extra_cmd.append('--gtest_output=xml:{}.xml'.format(gtestname)) - p = subprocess.Popen(cmd + extra_cmd, - stdout=stdout, - stderr=stderr, - env=self.env, - cwd=self.test.workdir, - preexec_fn=preexec_fn if not is_windows() else None) - timed_out = False - kill_test = False if self.test.timeout is None: timeout = None elif self.options.timeout_multiplier is not None: timeout = self.test.timeout * self.options.timeout_multiplier else: timeout = self.test.timeout - try: - p.communicate(timeout=timeout) - except subprocess.TimeoutExpired: - if self.options.verbose: - print('{} time out (After {} seconds)'.format(self.test.name, timeout)) - timed_out = True - except KeyboardInterrupt: - mlog.warning('CTRL-C detected while running {}'.format(self.test.name)) - kill_test = True - finally: - if self.options.gdb: - # Let us accept ^C again - signal.signal(signal.SIGINT, previous_sigint_handler) - - additional_error = None - if kill_test or timed_out: - # Python does not provide multiplatform support for - # killing a process and all its children so we need - # to roll our own. - if is_windows(): - subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) - else: - - def _send_signal_to_process_group(pgid : int, signum : int) -> None: - """ sends a signal to a process group """ - try: - os.killpg(pgid, signum) - except ProcessLookupError: - # Sometimes (e.g. with Wine) this happens. - # There's nothing we can do (maybe the process - # already died) so carry on. - pass - - # Send a termination signal to the process group that setsid() - # created - giving it a chance to perform any cleanup. - _send_signal_to_process_group(p.pid, signal.SIGTERM) - - # Make sure the termination signal actually kills the process - # group, otherwise retry with a SIGKILL. - try: - p.communicate(timeout=0.5) - except subprocess.TimeoutExpired: - _send_signal_to_process_group(p.pid, signal.SIGKILL) - try: - p.communicate(timeout=1) - except subprocess.TimeoutExpired: - # An earlier kill attempt has not worked for whatever reason. - # Try to kill it one last time with a direct call. - # If the process has spawned children, they will remain around. - p.kill() - try: - p.communicate(timeout=1) - except subprocess.TimeoutExpired: - additional_error = 'Test process could not be killed.' - except ValueError: - additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?' + returncode, result, additional_error = await self._run_subprocess(cmd + extra_cmd, + timeout=timeout, + stdout=stdout, + stderr=stderr, + env=self.env, + cwd=self.test.workdir) endtime = time.time() duration = endtime - starttime if additional_error is None: @@ -789,17 +828,17 @@ class SingleTestRunner: else: stdo = "" stde = additional_error - if timed_out: - return TestRun(self.test, self.test_env, TestResult.TIMEOUT, [], p.returncode, starttime, duration, stdo, stde, cmd) + if result: + return TestRun(self.test, self.test_env, result, [], returncode, starttime, duration, stdo, stde, cmd) else: if self.test.protocol is TestProtocol.EXITCODE: - return TestRun.make_exitcode(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd) + return TestRun.make_exitcode(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd) elif self.test.protocol is TestProtocol.GTEST: - return TestRun.make_gtest(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd) + return TestRun.make_gtest(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd) else: if self.options.verbose: print(stdo, end='') - return TestRun.make_tap(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd) + return TestRun.make_tap(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd) class TestHarness: @@ -841,7 +880,7 @@ class TestHarness: def close_logfiles(self) -> None: for f in ['logfile', 'jsonlogfile']: - lfile = getattr(self, f) + lfile = getattr(self, f) if lfile: lfile.close() setattr(self, f, None) @@ -892,7 +931,7 @@ class TestHarness: self.skip_count += 1 elif result.res is TestResult.OK: self.success_count += 1 - elif result.res is TestResult.FAIL or result.res is TestResult.ERROR: + elif result.res in {TestResult.FAIL, TestResult.ERROR, TestResult.INTERRUPT}: self.fail_count += 1 elif result.res is TestResult.EXPECTEDFAIL: self.expectedfail_count += 1 @@ -905,7 +944,7 @@ class TestHarness: tests: T.List[TestSerialisation], name: str, result: TestRun, i: int) -> None: ok_statuses = (TestResult.OK, TestResult.EXPECTEDFAIL) - bad_statuses = (TestResult.FAIL, TestResult.TIMEOUT, + bad_statuses = (TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT, TestResult.UNEXPECTEDPASS, TestResult.ERROR) result_str = '{num:{numlen}}/{testcount} {name:{name_max_len}} {res:{reslen}} {dur:.2f}s'.format( numlen=len(str(test_count)), @@ -951,7 +990,7 @@ class TestHarness: Skipped: {:<4} Timeout: {:<4} ''').format(self.success_count, self.expectedfail_count, self.fail_count, - self.unexpectedpass_count, self.skip_count, self.timeout_count) + self.unexpectedpass_count, self.skip_count, self.timeout_count) print(msg) if self.logfile: self.logfile.write(msg) @@ -1131,8 +1170,14 @@ class TestHarness: return test.name def run_tests(self, tests: T.List[TestSerialisation]) -> None: - executor = None - futures = [] # type: T.List[T.Tuple[conc.Future[TestRun], int, int, T.List[TestSerialisation], str, int]] + # Replace with asyncio.run once we can require Python 3.7 + loop = asyncio.get_event_loop() + loop.run_until_complete(self._run_tests(tests)) + + async def _run_tests(self, tests: T.List[TestSerialisation]) -> None: + semaphore = asyncio.Semaphore(self.options.num_processes) + futures = deque() # type: T.Deque[asyncio.Future] + running_tests = dict() # type: T.Dict[asyncio.Future, str] test_count = len(tests) name_max_len = max([len(self.get_pretty_suite(test)) for test in tests]) self.open_log_files() @@ -1140,59 +1185,87 @@ class TestHarness: if self.options.wd: os.chdir(self.options.wd) self.build_data = build.load(os.getcwd()) + interrupted = False + + async def run_test(test: SingleTestRunner, + name: str, index: int) -> None: + async with semaphore: + if interrupted or (self.options.repeat > 1 and self.fail_count): + return + res = await test.run() + self.process_test_result(res) + self.print_stats(test_count, name_max_len, tests, name, res, index) + + def test_done(f: asyncio.Future) -> None: + if not f.cancelled(): + f.result() + futures.remove(f) + try: + del running_tests[f] + except KeyError: + pass + + def cancel_one_test(warn: bool) -> None: + future = futures.popleft() + futures.append(future) + if warn: + mlog.warning('CTRL-C detected, interrupting {}'.format(running_tests[future])) + del running_tests[future] + future.cancel() + + def sigterm_handler() -> None: + nonlocal interrupted + if interrupted: + return + interrupted = True + mlog.warning('Received SIGTERM, exiting') + while running_tests: + cancel_one_test(False) + + def sigint_handler() -> None: + # We always pick the longest-running future that has not been cancelled + # If all the tests have been CTRL-C'ed, just stop + nonlocal interrupted + if interrupted: + return + if running_tests: + cancel_one_test(True) + else: + mlog.warning('CTRL-C detected, exiting') + interrupted = True + if sys.platform != 'win32': + asyncio.get_event_loop().add_signal_handler(signal.SIGINT, sigint_handler) + asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, sigterm_handler) try: for _ in range(self.options.repeat): for i, test in enumerate(tests, 1): visible_name = self.get_pretty_suite(test) single_test = self.get_test_runner(test) - if not test.is_parallel or self.options.num_processes == 1 or single_test.options.gdb: - self.drain_futures(futures) - futures = [] - res = single_test.run() - self.process_test_result(res) - self.print_stats(test_count, name_max_len, tests, visible_name, res, i) - else: - if not executor: - executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes) - f = executor.submit(single_test.run) - futures.append((f, test_count, name_max_len, tests, visible_name, i)) - if self.options.repeat > 1 and self.fail_count: - break + if not test.is_parallel or single_test.options.gdb: + await complete_all(futures) + future = asyncio.ensure_future(run_test(single_test, visible_name, i)) + futures.append(future) + running_tests[future] = visible_name + future.add_done_callback(test_done) + if not test.is_parallel or single_test.options.gdb: + await complete(future) if self.options.repeat > 1 and self.fail_count: break - self.drain_futures(futures) + await complete_all(futures) self.print_collected_logs() self.print_summary() if self.logfilename: print('Full log written to {}'.format(self.logfilename)) finally: + if sys.platform != 'win32': + asyncio.get_event_loop().remove_signal_handler(signal.SIGINT) + asyncio.get_event_loop().remove_signal_handler(signal.SIGTERM) os.chdir(startdir) - def drain_futures(self, futures: T.List[T.Tuple['conc.Future[TestRun]', int, int, T.List[TestSerialisation], str, int]]) -> None: - for x in futures: - (result, test_count, name_max_len, tests, name, i) = x - if self.options.repeat > 1 and self.fail_count: - result.cancel() - if self.options.verbose: - result.result() - self.process_test_result(result.result()) - self.print_stats(test_count, name_max_len, tests, name, result.result(), i) - - def run_special(self) -> int: - '''Tests run by the user, usually something like "under gdb 1000 times".''' - if self.is_run: - raise RuntimeError('Can not use run_special after a full run.') - tests = self.get_tests() - if not tests: - return 0 - self.run_tests(tests) - return self.total_failure_count() - - def list_tests(th: TestHarness) -> bool: tests = th.get_tests() for t in tests: @@ -1235,6 +1308,10 @@ def run(options: argparse.Namespace) -> int: if options.wrapper: check_bin = options.wrapper[0] + if sys.platform == 'win32': + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + if check_bin is not None: exe = ExternalProgram(check_bin, silent=True) if not exe.found(): @@ -1252,9 +1329,7 @@ def run(options: argparse.Namespace) -> int: try: if options.list: return list_tests(th) - if not options.args: - return th.doit() - return th.run_special() + return th.doit() except TestException as e: print('Meson test encountered an error:\n') if os.environ.get('MESON_FORCE_BACKTRACE'): |