From acf5d78f342ae6ce535a3dcfb196b2143603d097 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Oct 2020 16:06:13 +0200 Subject: mtest: remove usage of executors Rewrite the SingleTestRunner to use asyncio to manage subprocesses, while still using subprocess.Popen to run them. Concurrency is managed with an asyncio Semaphore; for simplicity (since this is a temporary state) we create a new thread for each test that is run instead of having a pool. This already provides the main advantage of asyncio, which is better control on cancellation; with the current code, KeyboardInterrupt was never handled by the thread executor so the code that tried to handle it in SingleTestRunner only worked for non-parallel tests. And because executor futures cannot be cancelled, there was no way for the user to kill a test that got stuck. Instead, without executors ^C exits "meson test" immediately. The next patch will improve things even further, allowing a single test to be interrupted with ^C. --- mesonbuild/mtest.py | 126 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 76 insertions(+), 50 deletions(-) (limited to 'mesonbuild') diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 92be83c..844fe4d 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -19,7 +19,6 @@ from collections import namedtuple from copy import deepcopy import argparse import asyncio -import concurrent.futures as conc import datetime import enum import io @@ -35,6 +34,7 @@ import subprocess import sys import tempfile import textwrap +import threading import time import typing as T import xml.etree.ElementTree as et @@ -613,6 +613,13 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]: # Custom waiting primitives for asyncio +async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, float]]) -> None: + try: + await asyncio.wait(awaitables, + timeout=timeout, return_when=asyncio.FIRST_COMPLETED) + except asyncio.TimeoutError: + pass + async def complete(future: asyncio.Future) -> None: """Wait for completion of the given future, ignoring cancellation.""" try: @@ -659,7 +666,7 @@ class SingleTestRunner: return self.test.exe_runner.get_command() + self.test.fname return self.test.fname - def run(self) -> TestRun: + async def run(self) -> TestRun: cmd = self._get_cmd() if cmd is None: skip_stdout = 'Not run because can not execute cross compiled binaries.' @@ -668,12 +675,12 @@ class SingleTestRunner: wrap = TestHarness.get_wrapper(self.options) if self.options.gdb: self.test.timeout = None - return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) + return await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) - def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int], - stdout: T.IO, stderr: T.IO, - env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[T.Optional[int], TestResult, T.Optional[str]]: - def kill_process(p: subprocess.Popen) -> T.Optional[str]: + async def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int], + stdout: T.IO, stderr: T.IO, + env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[int, TestResult, T.Optional[str]]: + async def kill_process(p: subprocess.Popen, f: asyncio.Future) -> T.Optional[str]: # Python does not provide multiplatform support for # killing a process and all its children so we need # to roll our own. @@ -687,27 +694,39 @@ class SingleTestRunner: # Make sure the termination signal actually kills the process # group, otherwise retry with a SIGKILL. - try: - p.communicate(timeout=0.5) - except subprocess.TimeoutExpired: - os.killpg(p.pid, signal.SIGKILL) - try: - p.communicate(timeout=1) - except subprocess.TimeoutExpired: - # An earlier kill attempt has not worked for whatever reason. - # Try to kill it one last time with a direct call. - # If the process has spawned children, they will remain around. - p.kill() - try: - p.communicate(timeout=1) - except subprocess.TimeoutExpired: - return 'Test process could not be killed.' + await try_wait_one(f, timeout=0.5) + if f.done(): + return None + + os.killpg(p.pid, signal.SIGKILL) + + await try_wait_one(f, timeout=1) + if f.done(): + return None + + # An earlier kill attempt has not worked for whatever reason. + # Try to kill it one last time with a direct call. + # If the process has spawned children, they will remain around. + p.kill() + await try_wait_one(f, timeout=1) + if f.done(): + return None + return 'Test process could not be killed.' except ProcessLookupError: - # Sometimes (e.g. with Wine) this happens. - # There's nothing we can do (probably the process - # already died) so carry on. - pass - return None + # Sometimes (e.g. with Wine) this happens. There's nothing + # we can do, probably the process already died so just wait + # for the event loop to pick that up. + await f + return None + + def wait(p: subprocess.Popen, loop: asyncio.AbstractEventLoop, f: asyncio.Future) -> None: + try: + p.wait() + loop.call_soon_threadsafe(f.set_result, p.returncode) + except BaseException as e: # lgtm [py/catch-base-exception] + # The exception will be raised again in the main thread, + # so catching BaseException is okay. + loop.call_soon_threadsafe(f.set_exception, e) # Let gdb handle ^C instead of us if self.options.gdb: @@ -734,25 +753,31 @@ class SingleTestRunner: preexec_fn=preexec_fn if not is_windows() else None) result = None additional_error = None + loop = asyncio.get_event_loop() + future = asyncio.get_event_loop().create_future() + threading.Thread(target=wait, args=(p, loop, future), daemon=True).start() try: - p.communicate(timeout=timeout) - except subprocess.TimeoutExpired: - if self.options.verbose: - print('{} time out (After {} seconds)'.format(self.test.name, timeout)) - additional_error = kill_process(p) - result = TestResult.TIMEOUT - except KeyboardInterrupt: - mlog.warning('CTRL-C detected while running {}'.format(self.test.name)) - additional_error = kill_process(p) + await try_wait_one(future, timeout=timeout) + if not future.done(): + if self.options.verbose: + print('{} time out (After {} seconds)'.format(self.test.name, timeout)) + additional_error = await kill_process(p, future) + result = TestResult.TIMEOUT + except asyncio.CancelledError: + # The main loop must have seen Ctrl-C. + additional_error = await kill_process(p, future) result = TestResult.INTERRUPT finally: if self.options.gdb: # Let us accept ^C again signal.signal(signal.SIGINT, previous_sigint_handler) - return p.returncode, result, additional_error + if future.done(): + return future.result(), result, None + else: + return 0, result, additional_error - def _run_cmd(self, cmd: T.List[str]) -> TestRun: + async def _run_cmd(self, cmd: T.List[str]) -> TestRun: starttime = time.time() if self.test.extra_paths: @@ -797,12 +822,12 @@ class SingleTestRunner: else: timeout = self.test.timeout - returncode, result, additional_error = self._run_subprocess(cmd + extra_cmd, - timeout=timeout, - stdout=stdout, - stderr=stderr, - env=self.env, - cwd=self.test.workdir) + returncode, result, additional_error = await self._run_subprocess(cmd + extra_cmd, + timeout=timeout, + stdout=stdout, + stderr=stderr, + env=self.env, + cwd=self.test.workdir) endtime = time.time() duration = endtime - starttime if additional_error is None: @@ -1171,7 +1196,7 @@ class TestHarness: loop.run_until_complete(self._run_tests(tests)) async def _run_tests(self, tests: T.List[TestSerialisation]) -> None: - executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes) + semaphore = asyncio.Semaphore(self.options.num_processes) futures = [] # type: T.List[asyncio.Future] test_count = len(tests) name_max_len = max([len(self.get_pretty_suite(test)) for test in tests]) @@ -1184,11 +1209,12 @@ class TestHarness: async def run_test(test: SingleTestRunner, name: str, index: int) -> None: - if interrupted or (self.options.repeat > 1 and self.fail_count): - return - res = await asyncio.get_event_loop().run_in_executor(executor, test.run) - self.process_test_result(res) - self.print_stats(test_count, name_max_len, tests, name, res, index) + async with semaphore: + if interrupted or (self.options.repeat > 1 and self.fail_count): + return + res = await test.run() + self.process_test_result(res) + self.print_stats(test_count, name_max_len, tests, name, res, index) def test_done(f: asyncio.Future) -> None: if not f.cancelled(): -- cgit v1.1