aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2020-10-13 17:54:15 +0200
committerPaolo Bonzini <pbonzini@redhat.com>2020-11-15 14:12:43 +0100
commit659a5cbaa31e7a66e3ba3e17f9f31be78cec8b18 (patch)
treecd6d101684f18273eeb9e04f66861b258c088955
parentf532b0a9c3781cd06c99cd63e9c3b31bf3de7413 (diff)
downloadmeson-659a5cbaa31e7a66e3ba3e17f9f31be78cec8b18.zip
meson-659a5cbaa31e7a66e3ba3e17f9f31be78cec8b18.tar.gz
meson-659a5cbaa31e7a66e3ba3e17f9f31be78cec8b18.tar.bz2
mtest: use asyncio for run loop
Use asyncio futures for the run loop, while still handling I/O in a thread pool using run_on_executor. The handling of the test result is not duplicated anymore between run_tests and drain_futures. Instead, the test result is always processed and printed by run_test after single_test.run() completes and (in verbose mode) it cannot interleave with the test output. Therefore the special case for self.options.num_processes == 1 can be removed.
-rw-r--r--mesonbuild/mtest.py65
1 files changed, 39 insertions, 26 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py
index f36f1d1..2caa367 100644
--- a/mesonbuild/mtest.py
+++ b/mesonbuild/mtest.py
@@ -18,6 +18,7 @@ from ._pathlib import Path
from collections import namedtuple
from copy import deepcopy
import argparse
+import asyncio
import concurrent.futures as conc
import datetime
import enum
@@ -605,6 +606,17 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]:
objs = check_testdata(pickle.load(f))
return objs
+# Custom waiting primitives for asyncio
+
+async def complete_all(futures: T.Iterable[asyncio.Future]) -> None:
+ """Wait for completion of all the given futures, ignoring cancellation."""
+ while futures:
+ done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
+ # Raise exceptions if needed for all the "done" futures
+ for f in done:
+ if not f.cancelled():
+ f.result()
+
class SingleTestRunner:
@@ -1135,8 +1147,13 @@ class TestHarness:
return test.name
def run_tests(self, tests: T.List[TestSerialisation]) -> None:
- executor = None
- futures = [] # type: T.List[T.Tuple[conc.Future[TestRun], int, int, T.List[TestSerialisation], str, int]]
+ # Replace with asyncio.run once we can require Python 3.7
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(self._run_tests(tests))
+
+ async def _run_tests(self, tests: T.List[TestSerialisation]) -> None:
+ executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes)
+ futures = [] # type: T.List[asyncio.Future]
test_count = len(tests)
name_max_len = max([len(self.get_pretty_suite(test)) for test in tests])
self.open_log_files()
@@ -1145,29 +1162,36 @@ class TestHarness:
os.chdir(self.options.wd)
self.build_data = build.load(os.getcwd())
+ async def run_test(test: SingleTestRunner,
+ name: str, index: int) -> None:
+ if self.options.repeat > 1 and self.fail_count:
+ return
+ res = await asyncio.get_event_loop().run_in_executor(executor, test.run)
+ self.process_test_result(res)
+ self.print_stats(test_count, name_max_len, tests, name, res, index)
+
+ def test_done(f: asyncio.Future) -> None:
+ if not f.cancelled():
+ f.result()
+ futures.remove(f)
+
try:
for _ in range(self.options.repeat):
for i, test in enumerate(tests, 1):
visible_name = self.get_pretty_suite(test)
single_test = self.get_test_runner(test)
- if not test.is_parallel or self.options.num_processes == 1 or single_test.options.gdb:
- self.drain_futures(futures)
- futures = []
- res = single_test.run()
- self.process_test_result(res)
- self.print_stats(test_count, name_max_len, tests, visible_name, res, i)
+ if not test.is_parallel or single_test.options.gdb:
+ await complete_all(futures)
+ await run_test(single_test, visible_name, i)
else:
- if not executor:
- executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes)
- f = executor.submit(single_test.run)
- futures.append((f, test_count, name_max_len, tests, visible_name, i))
- if self.options.repeat > 1 and self.fail_count:
- break
+ future = asyncio.ensure_future(run_test(single_test, visible_name, i))
+ futures.append(future)
+ future.add_done_callback(test_done)
if self.options.repeat > 1 and self.fail_count:
break
- self.drain_futures(futures)
+ await complete_all(futures)
self.print_collected_logs()
self.print_summary()
@@ -1176,17 +1200,6 @@ class TestHarness:
finally:
os.chdir(startdir)
- def drain_futures(self, futures: T.List[T.Tuple['conc.Future[TestRun]', int, int, T.List[TestSerialisation], str, int]]) -> None:
- for x in futures:
- (result, test_count, name_max_len, tests, name, i) = x
- if self.options.repeat > 1 and self.fail_count:
- result.cancel()
- if self.options.verbose:
- result.result()
- self.process_test_result(result.result())
- self.print_stats(test_count, name_max_len, tests, name, result.result(), i)
-
-
def list_tests(th: TestHarness) -> bool:
tests = th.get_tests()
for t in tests: