aboutsummaryrefslogtreecommitdiff
path: root/mesonbuild
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2020-10-12 16:06:13 +0200
committerPaolo Bonzini <pbonzini@redhat.com>2020-11-15 14:12:43 +0100
commitacf5d78f342ae6ce535a3dcfb196b2143603d097 (patch)
treed4c8d9abed1a0a6ba50cb93330e9f0b7df71307b /mesonbuild
parent98d3863fa4d63f39aee510a2713c0586b65d40e8 (diff)
downloadmeson-acf5d78f342ae6ce535a3dcfb196b2143603d097.zip
meson-acf5d78f342ae6ce535a3dcfb196b2143603d097.tar.gz
meson-acf5d78f342ae6ce535a3dcfb196b2143603d097.tar.bz2
mtest: remove usage of executors
Rewrite the SingleTestRunner to use asyncio to manage subprocesses, while still using subprocess.Popen to run them. Concurrency is managed with an asyncio Semaphore; for simplicity (since this is a temporary state) we create a new thread for each test that is run instead of having a pool. This already provides the main advantage of asyncio, which is better control on cancellation; with the current code, KeyboardInterrupt was never handled by the thread executor so the code that tried to handle it in SingleTestRunner only worked for non-parallel tests. And because executor futures cannot be cancelled, there was no way for the user to kill a test that got stuck. Instead, without executors ^C exits "meson test" immediately. The next patch will improve things even further, allowing a single test to be interrupted with ^C.
Diffstat (limited to 'mesonbuild')
-rw-r--r--mesonbuild/mtest.py126
1 files changed, 76 insertions, 50 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py
index 92be83c..844fe4d 100644
--- a/mesonbuild/mtest.py
+++ b/mesonbuild/mtest.py
@@ -19,7 +19,6 @@ from collections import namedtuple
from copy import deepcopy
import argparse
import asyncio
-import concurrent.futures as conc
import datetime
import enum
import io
@@ -35,6 +34,7 @@ import subprocess
import sys
import tempfile
import textwrap
+import threading
import time
import typing as T
import xml.etree.ElementTree as et
@@ -613,6 +613,13 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]:
# Custom waiting primitives for asyncio
+async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, float]]) -> None:
+ try:
+ await asyncio.wait(awaitables,
+ timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
+ except asyncio.TimeoutError:
+ pass
+
async def complete(future: asyncio.Future) -> None:
"""Wait for completion of the given future, ignoring cancellation."""
try:
@@ -659,7 +666,7 @@ class SingleTestRunner:
return self.test.exe_runner.get_command() + self.test.fname
return self.test.fname
- def run(self) -> TestRun:
+ async def run(self) -> TestRun:
cmd = self._get_cmd()
if cmd is None:
skip_stdout = 'Not run because can not execute cross compiled binaries.'
@@ -668,12 +675,12 @@ class SingleTestRunner:
wrap = TestHarness.get_wrapper(self.options)
if self.options.gdb:
self.test.timeout = None
- return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
+ return await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
- def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int],
- stdout: T.IO, stderr: T.IO,
- env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[T.Optional[int], TestResult, T.Optional[str]]:
- def kill_process(p: subprocess.Popen) -> T.Optional[str]:
+ async def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int],
+ stdout: T.IO, stderr: T.IO,
+ env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[int, TestResult, T.Optional[str]]:
+ async def kill_process(p: subprocess.Popen, f: asyncio.Future) -> T.Optional[str]:
# Python does not provide multiplatform support for
# killing a process and all its children so we need
# to roll our own.
@@ -687,27 +694,39 @@ class SingleTestRunner:
# Make sure the termination signal actually kills the process
# group, otherwise retry with a SIGKILL.
- try:
- p.communicate(timeout=0.5)
- except subprocess.TimeoutExpired:
- os.killpg(p.pid, signal.SIGKILL)
- try:
- p.communicate(timeout=1)
- except subprocess.TimeoutExpired:
- # An earlier kill attempt has not worked for whatever reason.
- # Try to kill it one last time with a direct call.
- # If the process has spawned children, they will remain around.
- p.kill()
- try:
- p.communicate(timeout=1)
- except subprocess.TimeoutExpired:
- return 'Test process could not be killed.'
+ await try_wait_one(f, timeout=0.5)
+ if f.done():
+ return None
+
+ os.killpg(p.pid, signal.SIGKILL)
+
+ await try_wait_one(f, timeout=1)
+ if f.done():
+ return None
+
+ # An earlier kill attempt has not worked for whatever reason.
+ # Try to kill it one last time with a direct call.
+ # If the process has spawned children, they will remain around.
+ p.kill()
+ await try_wait_one(f, timeout=1)
+ if f.done():
+ return None
+ return 'Test process could not be killed.'
except ProcessLookupError:
- # Sometimes (e.g. with Wine) this happens.
- # There's nothing we can do (probably the process
- # already died) so carry on.
- pass
- return None
+ # Sometimes (e.g. with Wine) this happens. There's nothing
+ # we can do, probably the process already died so just wait
+ # for the event loop to pick that up.
+ await f
+ return None
+
+ def wait(p: subprocess.Popen, loop: asyncio.AbstractEventLoop, f: asyncio.Future) -> None:
+ try:
+ p.wait()
+ loop.call_soon_threadsafe(f.set_result, p.returncode)
+ except BaseException as e: # lgtm [py/catch-base-exception]
+ # The exception will be raised again in the main thread,
+ # so catching BaseException is okay.
+ loop.call_soon_threadsafe(f.set_exception, e)
# Let gdb handle ^C instead of us
if self.options.gdb:
@@ -734,25 +753,31 @@ class SingleTestRunner:
preexec_fn=preexec_fn if not is_windows() else None)
result = None
additional_error = None
+ loop = asyncio.get_event_loop()
+ future = asyncio.get_event_loop().create_future()
+ threading.Thread(target=wait, args=(p, loop, future), daemon=True).start()
try:
- p.communicate(timeout=timeout)
- except subprocess.TimeoutExpired:
- if self.options.verbose:
- print('{} time out (After {} seconds)'.format(self.test.name, timeout))
- additional_error = kill_process(p)
- result = TestResult.TIMEOUT
- except KeyboardInterrupt:
- mlog.warning('CTRL-C detected while running {}'.format(self.test.name))
- additional_error = kill_process(p)
+ await try_wait_one(future, timeout=timeout)
+ if not future.done():
+ if self.options.verbose:
+ print('{} time out (After {} seconds)'.format(self.test.name, timeout))
+ additional_error = await kill_process(p, future)
+ result = TestResult.TIMEOUT
+ except asyncio.CancelledError:
+ # The main loop must have seen Ctrl-C.
+ additional_error = await kill_process(p, future)
result = TestResult.INTERRUPT
finally:
if self.options.gdb:
# Let us accept ^C again
signal.signal(signal.SIGINT, previous_sigint_handler)
- return p.returncode, result, additional_error
+ if future.done():
+ return future.result(), result, None
+ else:
+ return 0, result, additional_error
- def _run_cmd(self, cmd: T.List[str]) -> TestRun:
+ async def _run_cmd(self, cmd: T.List[str]) -> TestRun:
starttime = time.time()
if self.test.extra_paths:
@@ -797,12 +822,12 @@ class SingleTestRunner:
else:
timeout = self.test.timeout
- returncode, result, additional_error = self._run_subprocess(cmd + extra_cmd,
- timeout=timeout,
- stdout=stdout,
- stderr=stderr,
- env=self.env,
- cwd=self.test.workdir)
+ returncode, result, additional_error = await self._run_subprocess(cmd + extra_cmd,
+ timeout=timeout,
+ stdout=stdout,
+ stderr=stderr,
+ env=self.env,
+ cwd=self.test.workdir)
endtime = time.time()
duration = endtime - starttime
if additional_error is None:
@@ -1171,7 +1196,7 @@ class TestHarness:
loop.run_until_complete(self._run_tests(tests))
async def _run_tests(self, tests: T.List[TestSerialisation]) -> None:
- executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes)
+ semaphore = asyncio.Semaphore(self.options.num_processes)
futures = [] # type: T.List[asyncio.Future]
test_count = len(tests)
name_max_len = max([len(self.get_pretty_suite(test)) for test in tests])
@@ -1184,11 +1209,12 @@ class TestHarness:
async def run_test(test: SingleTestRunner,
name: str, index: int) -> None:
- if interrupted or (self.options.repeat > 1 and self.fail_count):
- return
- res = await asyncio.get_event_loop().run_in_executor(executor, test.run)
- self.process_test_result(res)
- self.print_stats(test_count, name_max_len, tests, name, res, index)
+ async with semaphore:
+ if interrupted or (self.options.repeat > 1 and self.fail_count):
+ return
+ res = await test.run()
+ self.process_test_result(res)
+ self.print_stats(test_count, name_max_len, tests, name, res, index)
def test_done(f: asyncio.Future) -> None:
if not f.cancelled():