aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJussi Pakkanen <jpakkane@gmail.com>2020-11-18 23:09:47 +0200
committerGitHub <noreply@github.com>2020-11-18 23:09:47 +0200
commitef6f85f8bae8750a5ded5af5b7de745cfbbbdeed (patch)
tree7b15e44bb442c39ce140132ea45de3b97a384897
parent188695f2026a8a5d23d3e586a0f450f57c82a71b (diff)
parenta2c134c30bc02defa842e771907db16c6a647e1e (diff)
downloadmeson-ef6f85f8bae8750a5ded5af5b7de745cfbbbdeed.zip
meson-ef6f85f8bae8750a5ded5af5b7de745cfbbbdeed.tar.gz
meson-ef6f85f8bae8750a5ded5af5b7de745cfbbbdeed.tar.bz2
Merge pull request #7836 from bonzini/mtest-asyncio
[RFC] mtest: use asyncio instead of concurrency.futures
-rw-r--r--mesonbuild/mtest.py349
1 files changed, 212 insertions, 137 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py
index e538992..5804303 100644
--- a/mesonbuild/mtest.py
+++ b/mesonbuild/mtest.py
@@ -15,10 +15,10 @@
# A tool to run tests in many different ways.
from ._pathlib import Path
-from collections import namedtuple
+from collections import deque, namedtuple
from copy import deepcopy
import argparse
-import concurrent.futures as conc
+import asyncio
import datetime
import enum
import io
@@ -167,6 +167,7 @@ class TestResult(enum.Enum):
OK = 'OK'
TIMEOUT = 'TIMEOUT'
+ INTERRUPT = 'INTERRUPT'
SKIP = 'SKIP'
FAIL = 'FAIL'
EXPECTEDFAIL = 'EXPECTEDFAIL'
@@ -326,7 +327,6 @@ class TAPParser:
yield self.Error('Too many tests run (expected {}, got {})'.format(plan.count, num_tests))
-
class JunitBuilder:
"""Builder for Junit test results.
@@ -376,7 +376,8 @@ class JunitBuilder:
'testsuite',
name=suitename,
tests=str(len(test.results)),
- errors=str(sum(1 for r in test.results if r is TestResult.ERROR)),
+ errors=str(sum(1 for r in test.results if r in
+ {TestResult.INTERRUPT, TestResult.ERROR})),
failures=str(sum(1 for r in test.results if r in
{TestResult.FAIL, TestResult.UNEXPECTEDPASS, TestResult.TIMEOUT})),
skipped=str(sum(1 for r in test.results if r is TestResult.SKIP)),
@@ -395,6 +396,9 @@ class JunitBuilder:
elif result is TestResult.UNEXPECTEDPASS:
fail = et.SubElement(testcase, 'failure')
fail.text = 'Test unexpected passed.'
+ elif result is TestResult.INTERRUPT:
+ fail = et.SubElement(testcase, 'failure')
+ fail.text = 'Test was interrupted by user.'
elif result is TestResult.TIMEOUT:
fail = et.SubElement(testcase, 'failure')
fail.text = 'Test did not finish before configured timeout.'
@@ -606,6 +610,31 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]:
objs = check_testdata(pickle.load(f))
return objs
+# Custom waiting primitives for asyncio
+
+async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, float]]) -> None:
+ try:
+ await asyncio.wait(awaitables,
+ timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
+ except asyncio.TimeoutError:
+ pass
+
+async def complete(future: asyncio.Future) -> None:
+ """Wait for completion of the given future, ignoring cancellation."""
+ try:
+ await future
+ except asyncio.CancelledError:
+ pass
+
+async def complete_all(futures: T.Iterable[asyncio.Future]) -> None:
+ """Wait for completion of all the given futures, ignoring cancellation."""
+ while futures:
+ done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
+ # Raise exceptions if needed for all the "done" futures
+ for f in done:
+ if not f.cancelled():
+ f.result()
+
class SingleTestRunner:
@@ -636,7 +665,7 @@ class SingleTestRunner:
return self.test.exe_runner.get_command() + self.test.fname
return self.test.fname
- def run(self) -> TestRun:
+ async def run(self) -> TestRun:
cmd = self._get_cmd()
if cmd is None:
skip_stdout = 'Not run because can not execute cross compiled binaries.'
@@ -645,9 +674,94 @@ class SingleTestRunner:
wrap = TestHarness.get_wrapper(self.options)
if self.options.gdb:
self.test.timeout = None
- return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
+ return await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
+
+ async def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int],
+ stdout: T.IO, stderr: T.IO,
+ env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[int, TestResult, T.Optional[str]]:
+ async def kill_process(p: asyncio.subprocess.Process) -> T.Optional[str]:
+ # Python does not provide multiplatform support for
+ # killing a process and all its children so we need
+ # to roll our own.
+ try:
+ if is_windows():
+ subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)])
+ else:
+ # Send a termination signal to the process group that setsid()
+ # created - giving it a chance to perform any cleanup.
+ os.killpg(p.pid, signal.SIGTERM)
+
+ # Make sure the termination signal actually kills the process
+ # group, otherwise retry with a SIGKILL.
+ await try_wait_one(p.wait(), timeout=0.5)
+ if p.returncode is not None:
+ return None
+
+ os.killpg(p.pid, signal.SIGKILL)
+
+ await try_wait_one(p.wait(), timeout=1)
+ if p.returncode is not None:
+ return None
+
+ # An earlier kill attempt has not worked for whatever reason.
+ # Try to kill it one last time with a direct call.
+ # If the process has spawned children, they will remain around.
+ p.kill()
+ await try_wait_one(p.wait(), timeout=1)
+ if p.returncode is not None:
+ return None
+ return 'Test process could not be killed.'
+ except ProcessLookupError:
+ # Sometimes (e.g. with Wine) this happens. There's nothing
+ # we can do, probably the process already died so just wait
+ # for the event loop to pick that up.
+ await p.wait()
+ return None
+
+ # Let gdb handle ^C instead of us
+ if self.options.gdb:
+ previous_sigint_handler = signal.getsignal(signal.SIGINT)
+ # Make the meson executable ignore SIGINT while gdb is running.
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
- def _run_cmd(self, cmd: T.List[str]) -> TestRun:
+ def preexec_fn() -> None:
+ if self.options.gdb:
+ # Restore the SIGINT handler for the child process to
+ # ensure it can handle it.
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ else:
+ # We don't want setsid() in gdb because gdb needs the
+ # terminal in order to handle ^C and not show tcsetpgrp()
+ # errors avoid not being able to use the terminal.
+ os.setsid()
+
+ p = await asyncio.create_subprocess_exec(*args,
+ stdout=stdout,
+ stderr=stderr,
+ env=env,
+ cwd=cwd,
+ preexec_fn=preexec_fn if not is_windows() else None)
+ result = None
+ additional_error = None
+ try:
+ await try_wait_one(p.wait(), timeout=timeout)
+ if p.returncode is None:
+ if self.options.verbose:
+ print('{} time out (After {} seconds)'.format(self.test.name, timeout))
+ additional_error = await kill_process(p)
+ result = TestResult.TIMEOUT
+ except asyncio.CancelledError:
+ # The main loop must have seen Ctrl-C.
+ additional_error = await kill_process(p)
+ result = TestResult.INTERRUPT
+ finally:
+ if self.options.gdb:
+ # Let us accept ^C again
+ signal.signal(signal.SIGINT, previous_sigint_handler)
+
+ return p.returncode or 0, result, additional_error
+
+ async def _run_cmd(self, cmd: T.List[str]) -> TestRun:
starttime = time.time()
if self.test.extra_paths:
@@ -678,23 +792,6 @@ class SingleTestRunner:
if self.test.protocol is TestProtocol.TAP and stderr is stdout:
stdout = tempfile.TemporaryFile("wb+")
- # Let gdb handle ^C instead of us
- if self.options.gdb:
- previous_sigint_handler = signal.getsignal(signal.SIGINT)
- # Make the meson executable ignore SIGINT while gdb is running.
- signal.signal(signal.SIGINT, signal.SIG_IGN)
-
- def preexec_fn() -> None:
- if self.options.gdb:
- # Restore the SIGINT handler for the child process to
- # ensure it can handle it.
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- else:
- # We don't want setsid() in gdb because gdb needs the
- # terminal in order to handle ^C and not show tcsetpgrp()
- # errors avoid not being able to use the terminal.
- os.setsid()
-
extra_cmd = [] # type: T.List[str]
if self.test.protocol is TestProtocol.GTEST:
gtestname = self.test.name
@@ -702,77 +799,19 @@ class SingleTestRunner:
gtestname = os.path.join(self.test.workdir, self.test.name)
extra_cmd.append('--gtest_output=xml:{}.xml'.format(gtestname))
- p = subprocess.Popen(cmd + extra_cmd,
- stdout=stdout,
- stderr=stderr,
- env=self.env,
- cwd=self.test.workdir,
- preexec_fn=preexec_fn if not is_windows() else None)
- timed_out = False
- kill_test = False
if self.test.timeout is None:
timeout = None
elif self.options.timeout_multiplier is not None:
timeout = self.test.timeout * self.options.timeout_multiplier
else:
timeout = self.test.timeout
- try:
- p.communicate(timeout=timeout)
- except subprocess.TimeoutExpired:
- if self.options.verbose:
- print('{} time out (After {} seconds)'.format(self.test.name, timeout))
- timed_out = True
- except KeyboardInterrupt:
- mlog.warning('CTRL-C detected while running {}'.format(self.test.name))
- kill_test = True
- finally:
- if self.options.gdb:
- # Let us accept ^C again
- signal.signal(signal.SIGINT, previous_sigint_handler)
-
- additional_error = None
- if kill_test or timed_out:
- # Python does not provide multiplatform support for
- # killing a process and all its children so we need
- # to roll our own.
- if is_windows():
- subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)])
- else:
-
- def _send_signal_to_process_group(pgid : int, signum : int) -> None:
- """ sends a signal to a process group """
- try:
- os.killpg(pgid, signum)
- except ProcessLookupError:
- # Sometimes (e.g. with Wine) this happens.
- # There's nothing we can do (maybe the process
- # already died) so carry on.
- pass
-
- # Send a termination signal to the process group that setsid()
- # created - giving it a chance to perform any cleanup.
- _send_signal_to_process_group(p.pid, signal.SIGTERM)
-
- # Make sure the termination signal actually kills the process
- # group, otherwise retry with a SIGKILL.
- try:
- p.communicate(timeout=0.5)
- except subprocess.TimeoutExpired:
- _send_signal_to_process_group(p.pid, signal.SIGKILL)
- try:
- p.communicate(timeout=1)
- except subprocess.TimeoutExpired:
- # An earlier kill attempt has not worked for whatever reason.
- # Try to kill it one last time with a direct call.
- # If the process has spawned children, they will remain around.
- p.kill()
- try:
- p.communicate(timeout=1)
- except subprocess.TimeoutExpired:
- additional_error = 'Test process could not be killed.'
- except ValueError:
- additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?'
+ returncode, result, additional_error = await self._run_subprocess(cmd + extra_cmd,
+ timeout=timeout,
+ stdout=stdout,
+ stderr=stderr,
+ env=self.env,
+ cwd=self.test.workdir)
endtime = time.time()
duration = endtime - starttime
if additional_error is None:
@@ -789,17 +828,17 @@ class SingleTestRunner:
else:
stdo = ""
stde = additional_error
- if timed_out:
- return TestRun(self.test, self.test_env, TestResult.TIMEOUT, [], p.returncode, starttime, duration, stdo, stde, cmd)
+ if result:
+ return TestRun(self.test, self.test_env, result, [], returncode, starttime, duration, stdo, stde, cmd)
else:
if self.test.protocol is TestProtocol.EXITCODE:
- return TestRun.make_exitcode(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd)
+ return TestRun.make_exitcode(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd)
elif self.test.protocol is TestProtocol.GTEST:
- return TestRun.make_gtest(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd)
+ return TestRun.make_gtest(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd)
else:
if self.options.verbose:
print(stdo, end='')
- return TestRun.make_tap(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd)
+ return TestRun.make_tap(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd)
class TestHarness:
@@ -841,7 +880,7 @@ class TestHarness:
def close_logfiles(self) -> None:
for f in ['logfile', 'jsonlogfile']:
- lfile = getattr(self, f)
+ lfile = getattr(self, f)
if lfile:
lfile.close()
setattr(self, f, None)
@@ -892,7 +931,7 @@ class TestHarness:
self.skip_count += 1
elif result.res is TestResult.OK:
self.success_count += 1
- elif result.res is TestResult.FAIL or result.res is TestResult.ERROR:
+ elif result.res in {TestResult.FAIL, TestResult.ERROR, TestResult.INTERRUPT}:
self.fail_count += 1
elif result.res is TestResult.EXPECTEDFAIL:
self.expectedfail_count += 1
@@ -905,7 +944,7 @@ class TestHarness:
tests: T.List[TestSerialisation],
name: str, result: TestRun, i: int) -> None:
ok_statuses = (TestResult.OK, TestResult.EXPECTEDFAIL)
- bad_statuses = (TestResult.FAIL, TestResult.TIMEOUT,
+ bad_statuses = (TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT,
TestResult.UNEXPECTEDPASS, TestResult.ERROR)
result_str = '{num:{numlen}}/{testcount} {name:{name_max_len}} {res:{reslen}} {dur:.2f}s'.format(
numlen=len(str(test_count)),
@@ -951,7 +990,7 @@ class TestHarness:
Skipped: {:<4}
Timeout: {:<4}
''').format(self.success_count, self.expectedfail_count, self.fail_count,
- self.unexpectedpass_count, self.skip_count, self.timeout_count)
+ self.unexpectedpass_count, self.skip_count, self.timeout_count)
print(msg)
if self.logfile:
self.logfile.write(msg)
@@ -1131,8 +1170,14 @@ class TestHarness:
return test.name
def run_tests(self, tests: T.List[TestSerialisation]) -> None:
- executor = None
- futures = [] # type: T.List[T.Tuple[conc.Future[TestRun], int, int, T.List[TestSerialisation], str, int]]
+ # Replace with asyncio.run once we can require Python 3.7
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(self._run_tests(tests))
+
+ async def _run_tests(self, tests: T.List[TestSerialisation]) -> None:
+ semaphore = asyncio.Semaphore(self.options.num_processes)
+ futures = deque() # type: T.Deque[asyncio.Future]
+ running_tests = dict() # type: T.Dict[asyncio.Future, str]
test_count = len(tests)
name_max_len = max([len(self.get_pretty_suite(test)) for test in tests])
self.open_log_files()
@@ -1140,59 +1185,87 @@ class TestHarness:
if self.options.wd:
os.chdir(self.options.wd)
self.build_data = build.load(os.getcwd())
+ interrupted = False
+
+ async def run_test(test: SingleTestRunner,
+ name: str, index: int) -> None:
+ async with semaphore:
+ if interrupted or (self.options.repeat > 1 and self.fail_count):
+ return
+ res = await test.run()
+ self.process_test_result(res)
+ self.print_stats(test_count, name_max_len, tests, name, res, index)
+
+ def test_done(f: asyncio.Future) -> None:
+ if not f.cancelled():
+ f.result()
+ futures.remove(f)
+ try:
+ del running_tests[f]
+ except KeyError:
+ pass
+
+ def cancel_one_test(warn: bool) -> None:
+ future = futures.popleft()
+ futures.append(future)
+ if warn:
+ mlog.warning('CTRL-C detected, interrupting {}'.format(running_tests[future]))
+ del running_tests[future]
+ future.cancel()
+
+ def sigterm_handler() -> None:
+ nonlocal interrupted
+ if interrupted:
+ return
+ interrupted = True
+ mlog.warning('Received SIGTERM, exiting')
+ while running_tests:
+ cancel_one_test(False)
+
+ def sigint_handler() -> None:
+ # We always pick the longest-running future that has not been cancelled
+ # If all the tests have been CTRL-C'ed, just stop
+ nonlocal interrupted
+ if interrupted:
+ return
+ if running_tests:
+ cancel_one_test(True)
+ else:
+ mlog.warning('CTRL-C detected, exiting')
+ interrupted = True
+ if sys.platform != 'win32':
+ asyncio.get_event_loop().add_signal_handler(signal.SIGINT, sigint_handler)
+ asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, sigterm_handler)
try:
for _ in range(self.options.repeat):
for i, test in enumerate(tests, 1):
visible_name = self.get_pretty_suite(test)
single_test = self.get_test_runner(test)
- if not test.is_parallel or self.options.num_processes == 1 or single_test.options.gdb:
- self.drain_futures(futures)
- futures = []
- res = single_test.run()
- self.process_test_result(res)
- self.print_stats(test_count, name_max_len, tests, visible_name, res, i)
- else:
- if not executor:
- executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes)
- f = executor.submit(single_test.run)
- futures.append((f, test_count, name_max_len, tests, visible_name, i))
- if self.options.repeat > 1 and self.fail_count:
- break
+ if not test.is_parallel or single_test.options.gdb:
+ await complete_all(futures)
+ future = asyncio.ensure_future(run_test(single_test, visible_name, i))
+ futures.append(future)
+ running_tests[future] = visible_name
+ future.add_done_callback(test_done)
+ if not test.is_parallel or single_test.options.gdb:
+ await complete(future)
if self.options.repeat > 1 and self.fail_count:
break
- self.drain_futures(futures)
+ await complete_all(futures)
self.print_collected_logs()
self.print_summary()
if self.logfilename:
print('Full log written to {}'.format(self.logfilename))
finally:
+ if sys.platform != 'win32':
+ asyncio.get_event_loop().remove_signal_handler(signal.SIGINT)
+ asyncio.get_event_loop().remove_signal_handler(signal.SIGTERM)
os.chdir(startdir)
- def drain_futures(self, futures: T.List[T.Tuple['conc.Future[TestRun]', int, int, T.List[TestSerialisation], str, int]]) -> None:
- for x in futures:
- (result, test_count, name_max_len, tests, name, i) = x
- if self.options.repeat > 1 and self.fail_count:
- result.cancel()
- if self.options.verbose:
- result.result()
- self.process_test_result(result.result())
- self.print_stats(test_count, name_max_len, tests, name, result.result(), i)
-
- def run_special(self) -> int:
- '''Tests run by the user, usually something like "under gdb 1000 times".'''
- if self.is_run:
- raise RuntimeError('Can not use run_special after a full run.')
- tests = self.get_tests()
- if not tests:
- return 0
- self.run_tests(tests)
- return self.total_failure_count()
-
-
def list_tests(th: TestHarness) -> bool:
tests = th.get_tests()
for t in tests:
@@ -1235,6 +1308,10 @@ def run(options: argparse.Namespace) -> int:
if options.wrapper:
check_bin = options.wrapper[0]
+ if sys.platform == 'win32':
+ loop = asyncio.ProactorEventLoop()
+ asyncio.set_event_loop(loop)
+
if check_bin is not None:
exe = ExternalProgram(check_bin, silent=True)
if not exe.found():
@@ -1252,9 +1329,7 @@ def run(options: argparse.Namespace) -> int:
try:
if options.list:
return list_tests(th)
- if not options.args:
- return th.doit()
- return th.run_special()
+ return th.doit()
except TestException as e:
print('Meson test encountered an error:\n')
if os.environ.get('MESON_FORCE_BACKTRACE'):