diff options
author | Jussi Pakkanen <jpakkane@gmail.com> | 2021-01-20 19:02:25 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-20 19:02:25 +0000 |
commit | 4d5f6876f9d4b61cefea0036a15576a8fce220a3 (patch) | |
tree | 8ed82164365abf6898248f03ec6fea16a24e14c2 | |
parent | 5dd1aac5c9da384328495929eb8b30e89f4eb1ec (diff) | |
parent | 2e982a38643dfc869cb91b49d3f44ee271a7062d (diff) | |
download | meson-4d5f6876f9d4b61cefea0036a15576a8fce220a3.zip meson-4d5f6876f9d4b61cefea0036a15576a8fce220a3.tar.gz meson-4d5f6876f9d4b61cefea0036a15576a8fce220a3.tar.bz2 |
Merge pull request #8225 from bonzini/mtest-asyncio-cleanups
mtest: cleanups and bugfixes
-rw-r--r-- | mesonbuild/mtest.py | 383 |
1 files changed, 232 insertions, 151 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 7e2d221..d410dba 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -147,6 +147,13 @@ def print_safe(s: str) -> None: s = s.encode('ascii', errors='backslashreplace').decode('ascii') print(s) +def join_lines(a: str, b: str) -> str: + if not a: + return b + if not b: + return a + return a + '\n' + b + def returncode_to_status(retcode: int) -> str: # Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related # functions here because the status returned by subprocess is munged. It @@ -181,6 +188,19 @@ class TestException(MesonException): @enum.unique +class ConsoleUser(enum.Enum): + + # the logger can use the console + LOGGER = 0 + + # the console is used by gdb + GDB = 1 + + # the console is used to write stdout/stderr + STDOUT = 2 + + +@enum.unique class TestResult(enum.Enum): PENDING = 'PENDING' @@ -522,9 +542,7 @@ class ConsoleLogger(TestLogger): self.test_count = harness.test_count - # In verbose mode, the progress report gets in the way of the tests' - # stdout and stderr. - if self.is_tty() and not harness.options.verbose: + if self.is_tty() and not harness.need_console: # Account for "[aa-bb/cc] OO " in the progress report self.max_left_width = 3 * len(str(self.test_count)) + 8 self.progress_task = asyncio.ensure_future(report_progress()) @@ -731,6 +749,10 @@ class JunitBuilder(TestLogger): class TestRun: TEST_NUM = 0 + PROTOCOL_TO_CLASS: T.Dict[TestProtocol, T.Type['TestRun']] = {} + + def __new__(cls, test: TestSerialisation, *args: T.Any, **kwargs: T.Any) -> T.Any: + return super().__new__(TestRun.PROTOCOL_TO_CLASS[test.protocol]) def __init__(self, test: TestSerialisation, test_env: T.Dict[str, str], name: str, timeout: T.Optional[int]): @@ -746,92 +768,15 @@ class TestRun: self.stdo = None # type: T.Optional[str] self.stde = None # type: T.Optional[str] self.cmd = None # type: T.Optional[T.List[str]] - self.env = dict() # type: T.Dict[str, str] + self.env = test_env # type: T.Dict[str, str] self.should_fail = test.should_fail self.project = test.project_name self.junit = None # type: T.Optional[et.ElementTree] - def start(self) -> None: + def start(self, cmd: T.List[str]) -> None: self.res = TestResult.RUNNING self.starttime = time.time() - - def complete_gtest(self, returncode: int, - stdo: T.Optional[str], stde: T.Optional[str], - cmd: T.List[str]) -> None: - filename = '{}.xml'.format(self.test.name) - if self.test.workdir: - filename = os.path.join(self.test.workdir, filename) - tree = et.parse(filename) - - self.complete_exitcode(returncode, stdo, stde, cmd, junit=tree) - - def complete_exitcode(self, returncode: int, - stdo: T.Optional[str], stde: T.Optional[str], - cmd: T.List[str], - **kwargs: T.Any) -> None: - if returncode == GNU_SKIP_RETURNCODE: - res = TestResult.SKIP - elif returncode == GNU_ERROR_RETURNCODE: - res = TestResult.ERROR - else: - res = TestResult.FAIL if bool(returncode) else TestResult.OK - self.complete(returncode, res, stdo, stde, cmd, **kwargs) - - async def parse_tap(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: - res = TestResult.OK - error = '' - - async for i in TAPParser().parse_async(lines): - if isinstance(i, TAPParser.Bailout): - res = TestResult.ERROR - elif isinstance(i, TAPParser.Test): - self.results.append(i) - if i.result.is_bad(): - res = TestResult.FAIL - elif isinstance(i, TAPParser.Error): - error = '\nTAP parsing error: ' + i.message - res = TestResult.ERROR - - if all(t.result is TestResult.SKIP for t in self.results): - # This includes the case where self.results is empty - res = TestResult.SKIP - return res, error - - def complete_tap(self, returncode: int, res: TestResult, - stdo: str, stde: str, cmd: T.List[str]) -> None: - if returncode != 0 and not res.was_killed(): - res = TestResult.ERROR - stde += '\n(test program exited with status code {})'.format(returncode,) - - self.complete(returncode, res, stdo, stde, cmd) - - async def parse_rust(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: - def parse_res(n: int, name: str, result: str) -> TAPParser.Test: - if result == 'ok': - return TAPParser.Test(n, name, TestResult.OK, None) - elif result == 'ignored': - return TAPParser.Test(n, name, TestResult.SKIP, None) - elif result == 'FAILED': - return TAPParser.Test(n, name, TestResult.FAIL, None) - return TAPParser.Test(n, name, TestResult.ERROR, - 'Unsupported output from rust test: {}'.format(result)) - - n = 1 - async for line in lines: - if line.startswith('test ') and not line.startswith('test result'): - _, name, _, result = line.rstrip().split(' ') - name = name.replace('::', '.') - self.results.append(parse_res(n, name, result)) - n += 1 - - if all(t.result is TestResult.SKIP for t in self.results): - # This includes the case where self.results is empty - return TestResult.SKIP, '' - elif any(t.result is TestResult.ERROR for t in self.results): - return TestResult.ERROR, '' - elif any(t.result is TestResult.FAIL for t in self.results): - return TestResult.FAIL, '' - return TestResult.OK, '' + self.cmd = cmd @property def num(self) -> int: @@ -856,9 +801,8 @@ class TestRun: return '{}/{} subtests passed'.format(passed, ran) return '' - def complete(self, returncode: int, res: TestResult, - stdo: T.Optional[str], stde: T.Optional[str], - cmd: T.List[str], *, junit: T.Optional[et.ElementTree] = None) -> None: + def _complete(self, returncode: int, res: TestResult, + stdo: T.Optional[str], stde: T.Optional[str]) -> None: assert isinstance(res, TestResult) if self.should_fail and res in (TestResult.OK, TestResult.FAIL): res = TestResult.UNEXPECTEDPASS if res.is_ok() else TestResult.EXPECTEDFAIL @@ -868,8 +812,14 @@ class TestRun: self.duration = time.time() - self.starttime self.stdo = stdo self.stde = stde - self.cmd = cmd - self.junit = junit + + def complete_skip(self, message: str) -> None: + self.starttime = time.time() + self._complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, message, None) + + def complete(self, returncode: int, res: TestResult, + stdo: T.Optional[str], stde: T.Optional[str]) -> None: + self._complete(returncode, res, stdo, stde) def get_log(self) -> str: res = '--- command ---\n' @@ -906,6 +856,118 @@ class TestRun: log += '\n'.join(lines[-100:]) return log + @property + def needs_parsing(self) -> bool: + return False + + async def parse(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: + async for l in lines: + pass + return TestResult.OK, '' + + +class TestRunExitCode(TestRun): + + def complete(self, returncode: int, res: TestResult, + stdo: T.Optional[str], stde: T.Optional[str]) -> None: + if res: + pass + elif returncode == GNU_SKIP_RETURNCODE: + res = TestResult.SKIP + elif returncode == GNU_ERROR_RETURNCODE: + res = TestResult.ERROR + else: + res = TestResult.FAIL if bool(returncode) else TestResult.OK + super().complete(returncode, res, stdo, stde) + +TestRun.PROTOCOL_TO_CLASS[TestProtocol.EXITCODE] = TestRunExitCode + + +class TestRunGTest(TestRunExitCode): + def complete(self, returncode: int, res: TestResult, + stdo: T.Optional[str], stde: T.Optional[str]) -> None: + filename = '{}.xml'.format(self.test.name) + if self.test.workdir: + filename = os.path.join(self.test.workdir, filename) + + self.junit = et.parse(filename) + super().complete(returncode, res, stdo, stde) + +TestRun.PROTOCOL_TO_CLASS[TestProtocol.GTEST] = TestRunGTest + + +class TestRunTAP(TestRun): + @property + def needs_parsing(self) -> bool: + return True + + def complete(self, returncode: int, res: TestResult, + stdo: str, stde: str) -> None: + if returncode != 0 and not res.was_killed(): + res = TestResult.ERROR + stde += '\n(test program exited with status code {})'.format(returncode,) + + super().complete(returncode, res, stdo, stde) + + async def parse(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: + res = TestResult.OK + error = '' + + async for i in TAPParser().parse_async(lines): + if isinstance(i, TAPParser.Bailout): + res = TestResult.ERROR + elif isinstance(i, TAPParser.Test): + self.results.append(i) + if i.result.is_bad(): + res = TestResult.FAIL + elif isinstance(i, TAPParser.Error): + error = '\nTAP parsing error: ' + i.message + res = TestResult.ERROR + + if all(t.result is TestResult.SKIP for t in self.results): + # This includes the case where self.results is empty + res = TestResult.SKIP + return res, error + +TestRun.PROTOCOL_TO_CLASS[TestProtocol.TAP] = TestRunTAP + + +class TestRunRust(TestRun): + @property + def needs_parsing(self) -> bool: + return True + + async def parse(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: + def parse_res(n: int, name: str, result: str) -> TAPParser.Test: + if result == 'ok': + return TAPParser.Test(n, name, TestResult.OK, None) + elif result == 'ignored': + return TAPParser.Test(n, name, TestResult.SKIP, None) + elif result == 'FAILED': + return TAPParser.Test(n, name, TestResult.FAIL, None) + return TAPParser.Test(n, name, TestResult.ERROR, + 'Unsupported output from rust test: {}'.format(result)) + + n = 1 + async for line in lines: + if line.startswith('test ') and not line.startswith('test result'): + _, name, _, result = line.rstrip().split(' ') + name = name.replace('::', '.') + self.results.append(parse_res(n, name, result)) + n += 1 + + if all(t.result is TestResult.SKIP for t in self.results): + # This includes the case where self.results is empty + return TestResult.SKIP, '' + elif any(t.result is TestResult.ERROR for t in self.results): + return TestResult.ERROR, '' + elif any(t.result is TestResult.FAIL for t in self.results): + return TestResult.FAIL, '' + return TestResult.OK, '' + +TestRun.PROTOCOL_TO_CLASS[TestProtocol.RUST] = TestRunRust + + def decode(stream: T.Union[None, bytes]) -> str: if stream is None: return '' @@ -914,6 +976,35 @@ def decode(stream: T.Union[None, bytes]) -> str: except UnicodeDecodeError: return stream.decode('iso-8859-1', errors='ignore') +async def read_decode(reader: asyncio.StreamReader, console_mode: ConsoleUser) -> str: + if console_mode is not ConsoleUser.STDOUT: + return decode(await reader.read(-1)) + + stdo_lines = [] + while not reader.at_eof(): + line = decode(await reader.readline()) + stdo_lines.append(line) + print(line, end='', flush=True) + return ''.join(stdo_lines) + +# Extract lines out of the StreamReader. Print them +# along the way if requested, and at the end collect +# them all into a future. +async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str]', + console_mode: ConsoleUser) -> T.AsyncIterator[str]: + stdo_lines = [] + try: + while not reader.at_eof(): + line = decode(await reader.readline()) + stdo_lines.append(line) + if console_mode is ConsoleUser.STDOUT: + print(line, end='', flush=True) + yield line + except Exception as e: + f.set_exception(e) + finally: + f.set_result(''.join(stdo_lines)) + def run_with_mono(fname: str) -> bool: return fname.endswith('.exe') and not (is_windows() or is_cygwin()) @@ -971,17 +1062,32 @@ async def complete_all(futures: T.Iterable[asyncio.Future]) -> None: f.result() class TestSubprocess: - def __init__(self, p: asyncio.subprocess.Process, postwait_fn: T.Callable[[], None] = None): + def __init__(self, p: asyncio.subprocess.Process, + stdout: T.Optional[int], stderr: T.Optional[int], + postwait_fn: T.Callable[[], None] = None): self._process = p + self.stdout = stdout + self.stderr = stderr + self.stdo_task = None # type: T.Optional[T.Awaitable[str]] + self.stde_task = None # type: T.Optional[T.Awaitable[str]] self.postwait_fn = postwait_fn # type: T.Callable[[], None] - @property - def stdout(self) -> T.Optional[asyncio.StreamReader]: - return self._process.stdout + def stdout_lines(self, console_mode: ConsoleUser) -> T.AsyncIterator[str]: + self.stdo_task = asyncio.get_event_loop().create_future() + return read_decode_lines(self._process.stdout, self.stdo_task, console_mode) - @property - def stderr(self) -> T.Optional[asyncio.StreamReader]: - return self._process.stderr + def communicate(self, console_mode: ConsoleUser) -> T.Tuple[T.Optional[T.Awaitable[str]], + T.Optional[T.Awaitable[str]]]: + # asyncio.ensure_future ensures that printing can + # run in the background, even before it is awaited + if self.stdo_task is None and self.stdout is not None: + decode_task = read_decode(self._process.stdout, console_mode) + self.stdo_task = asyncio.ensure_future(decode_task) + if self.stderr is not None and self.stderr != asyncio.subprocess.STDOUT: + decode_task = read_decode(self._process.stderr, console_mode) + self.stde_task = asyncio.ensure_future(decode_task) + + return self.stdo_task, self.stde_task async def _kill(self) -> T.Optional[str]: # Python does not provide multiplatform support for @@ -1061,6 +1167,13 @@ class SingleTestRunner: self.runobj = TestRun(test, test_env, name, timeout) + if self.options.gdb: + self.console_mode = ConsoleUser.GDB + elif self.options.verbose and not self.runobj.needs_parsing: + self.console_mode = ConsoleUser.STDOUT + else: + self.console_mode = ConsoleUser.LOGGER + def _get_cmd(self) -> T.Optional[T.List[str]]: if self.test.fname[0].endswith('.jar'): return ['java', '-jar'] + self.test.fname @@ -1091,10 +1204,9 @@ class SingleTestRunner: async def run(self) -> TestRun: cmd = self._get_cmd() - self.runobj.start() if cmd is None: skip_stdout = 'Not run because can not execute cross compiled binaries.' - self.runobj.complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, skip_stdout, None, None) + self.runobj.complete_skip(skip_stdout) else: wrap = TestHarness.get_wrapper(self.options) await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) @@ -1131,7 +1243,8 @@ class SingleTestRunner: env=env, cwd=cwd, preexec_fn=preexec_fn if not is_windows() else None) - return TestSubprocess(p, postwait_fn=postwait_fn if not is_windows() else None) + return TestSubprocess(p, stdout=stdout, stderr=stderr, + postwait_fn=postwait_fn if not is_windows() else None) async def _run_cmd(self, cmd: T.List[str]) -> None: if self.test.extra_paths: @@ -1154,14 +1267,15 @@ class SingleTestRunner: if ('MALLOC_PERTURB_' not in self.env or not self.env['MALLOC_PERTURB_']) and not self.options.benchmark: self.env['MALLOC_PERTURB_'] = str(random.randint(1, 255)) - stdout = None - stderr = None - if self.test.protocol is TestProtocol.TAP: - stdout = asyncio.subprocess.PIPE - stderr = None if self.options.verbose else asyncio.subprocess.PIPE - elif not self.options.verbose: + self.runobj.start(cmd) + if self.console_mode is ConsoleUser.GDB: + stdout = None + stderr = None + else: stdout = asyncio.subprocess.PIPE - stderr = asyncio.subprocess.PIPE if self.options.split else asyncio.subprocess.STDOUT + stderr = asyncio.subprocess.STDOUT \ + if not self.options.split and not self.runobj.needs_parsing \ + else asyncio.subprocess.PIPE extra_cmd = [] # type: T.List[str] if self.test.protocol is TestProtocol.GTEST: @@ -1176,60 +1290,23 @@ class SingleTestRunner: env=self.env, cwd=self.test.workdir) - stdo = stde = '' - stdo_task = stde_task = parse_task = None - - # Extract lines out of the StreamReader and print them - # along the way if requested - async def lines() -> T.AsyncIterator[str]: - stdo_lines = [] - reader = p.stdout - while not reader.at_eof(): - line = decode(await reader.readline()) - stdo_lines.append(line) - if self.options.verbose: - print(line, end='') - yield line - - nonlocal stdo - stdo = ''.join(stdo_lines) - - if self.test.protocol is TestProtocol.TAP: - parse_task = self.runobj.parse_tap(lines()) - elif self.test.protocol is TestProtocol.RUST: - parse_task = self.runobj.parse_rust(lines()) - elif stdout is not None: - stdo_task = p.stdout.read(-1) - if stderr is not None and stderr != asyncio.subprocess.STDOUT: - stde_task = p.stderr.read(-1) + parse_task = None + if self.runobj.needs_parsing: + parse_task = self.runobj.parse(p.stdout_lines(self.console_mode)) + stdo_task, stde_task = p.communicate(self.console_mode) returncode, result, additional_error = await p.wait(self.runobj.timeout) - if result is TestResult.TIMEOUT and self.options.verbose: - print('{} time out (After {} seconds)'.format(self.test.name, self.runobj.timeout)) - - if stdo_task is not None: - stdo = decode(await stdo_task) - if stde_task is not None: - stde = decode(await stde_task) - - if additional_error is not None: - stde += '\n' + additional_error if parse_task is not None: res, error = await parse_task if error: - stde += '\n' + error + additional_error = join_lines(additional_error, error) result = result or res - if self.test.protocol is TestProtocol.TAP: - self.runobj.complete_tap(returncode, result, stdo, stde, cmd) - return - if result: - self.runobj.complete(returncode, result, stdo, stde, cmd) - elif self.test.protocol is TestProtocol.EXITCODE: - self.runobj.complete_exitcode(returncode, stdo, stde, cmd) - elif self.test.protocol is TestProtocol.GTEST: - self.runobj.complete_gtest(returncode, stdo, stde, cmd) + stdo = await stdo_task if stdo_task else '' + stde = await stde_task if stde_task else '' + stde = join_lines(stde, additional_error) + self.runobj.complete(returncode, result, stdo, stde) class TestHarness: @@ -1247,6 +1324,7 @@ class TestHarness: self.is_run = False self.loggers = [] # type: T.List[TestLogger] self.loggers.append(ConsoleLogger()) + self.need_console = False if self.options.benchmark: self.tests = load_benchmarks(options.wd) @@ -1397,6 +1475,9 @@ class TestHarness: runners = [self.get_test_runner(test) for test in tests] self.duration_max_len = max([len(str(int(runner.timeout or 99))) for runner in runners]) + # Disable the progress report if it gets in the way + self.need_console = any((runner.console_mode is not ConsoleUser.LOGGER + for runner in runners)) self.run_tests(runners) finally: os.chdir(startdir) |