From 723c4c9fefade3a806e8ece73126c8c0e21477b2 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 19 Nov 2020 09:59:06 +0100 Subject: mtest: extract TAP parsing out of TestRun.make_tap For now this is just a refactoring that simplifies the next patch. However, it will also come in handy when we will make the parsing asynchronous, because it will make it possible to access subtest results while the test runs. Signed-off-by: Paolo Bonzini --- mesonbuild/mtest.py | 97 +++++++++++++++++++++++++++++------------------------ 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 22ab8e5..fe91040 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -195,6 +195,9 @@ class TestResult(enum.Enum): return self in {TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT, TestResult.UNEXPECTEDPASS, TestResult.ERROR} + def was_killed(self) -> bool: + return self in (TestResult.TIMEOUT, TestResult.INTERRUPT) + def get_text(self, colorize: bool) -> str: result_str = '{res:{reslen}}'.format(res=self.value, reslen=self.maxlen()) if self.is_bad(): @@ -752,48 +755,46 @@ class TestRun: res = TestResult.EXPECTEDFAIL if bool(returncode) else TestResult.UNEXPECTEDPASS else: res = TestResult.FAIL if bool(returncode) else TestResult.OK - self.complete(res, {}, returncode, stdo, stde, cmd, **kwargs) + self.complete(res, returncode, stdo, stde, cmd, **kwargs) - def complete_tap(self, returncode: int, stdo: str, stde: str, cmd: T.List[str]) -> None: + def parse_tap(self, lines: T.Iterator[str]) -> T.Tuple[TestResult, str]: res = None # type: T.Optional[TestResult] - results = {} # type: T.Dict[str, TestResult] - failed = False + error = '' - for n, i in enumerate(TAPParser().parse(io.StringIO(stdo))): + for n, i in enumerate(TAPParser().parse(lines)): if isinstance(i, TAPParser.Bailout): - results[str(n)] = TestResult.ERROR - failed = True + self.results[str(n)] = i.result + res = TestResult.ERROR elif isinstance(i, TAPParser.Test): - results[str(n)] = i.result + self.results[str(n)] = i.result if i.result not in {TestResult.OK, TestResult.EXPECTEDFAIL, TestResult.SKIP}: - failed = True + res = TestResult.FAIL elif isinstance(i, TAPParser.Error): - results[str(n)] = TestResult.ERROR - stde += '\nTAP parsing error: ' + i.message - failed = True + self.results[str(n)] = TestResult.ERROR + error += '\nTAP parsing error: ' + i.message + res = TestResult.ERROR + + if all(t is TestResult.SKIP for t in self.results): + # This includes the case where self.results is empty + res = TestResult.SKIP + return res or TestResult.OK, error - if returncode != 0: + def complete_tap(self, returncode: int, res: TestResult, + stdo: str, stde: str, cmd: T.List[str]) -> None: + if self.should_fail and res in (TestResult.OK, TestResult.FAIL): + res = TestResult.UNEXPECTEDPASS if res.is_ok() else TestResult.EXPECTEDFAIL + if returncode != 0 and not res.was_killed(): res = TestResult.ERROR stde += '\n(test program exited with status code {})'.format(returncode,) - if res is None: - # Now determine the overall result of the test based on the outcome of the subcases - if all(t is TestResult.SKIP for t in results.values()): - # This includes the case where num_tests is zero - res = TestResult.SKIP - elif self.should_fail: - res = TestResult.EXPECTEDFAIL if failed else TestResult.UNEXPECTEDPASS - else: - res = TestResult.FAIL if failed else TestResult.OK - - self.complete(res, results, returncode, stdo, stde, cmd) + self.complete(res, returncode, stdo, stde, cmd) def complete_rust(self, returncode: int, stdo: str, stde: str, cmd: T.List[str]) -> None: - results = parse_rust_test(stdo) + self.results = parse_rust_test(stdo) - failed = TestResult.FAIL in results.values() + failed = TestResult.FAIL in self.results.values() # Now determine the overall result of the test based on the outcome of the subcases - if all(t is TestResult.SKIP for t in results.values()): + if all(t is TestResult.SKIP for t in self.results.values()): # This includes the case where num_tests is zero res = TestResult.SKIP elif self.should_fail: @@ -801,7 +802,7 @@ class TestRun: else: res = TestResult.FAIL if failed else TestResult.OK - self.complete(res, results, returncode, stdo, stde, cmd) + self.complete(res, returncode, stdo, stde, cmd) @property def num(self) -> int: @@ -810,13 +811,11 @@ class TestRun: self._num = TestRun.TEST_NUM return self._num - def complete(self, res: TestResult, results: T.Dict[str, TestResult], - returncode: int, + def complete(self, res: TestResult, returncode: int, stdo: T.Optional[str], stde: T.Optional[str], cmd: T.List[str], *, junit: T.Optional[et.ElementTree] = None) -> None: assert isinstance(res, TestResult) self.res = res - self.results = results # May be empty self.returncode = returncode self.duration = time.time() - self.starttime self.stdo = stdo @@ -960,7 +959,7 @@ class SingleTestRunner: self.runobj.start() if cmd is None: skip_stdout = 'Not run because can not execute cross compiled binaries.' - self.runobj.complete(TestResult.SKIP, {}, GNU_SKIP_RETURNCODE, skip_stdout, None, None) + self.runobj.complete(TestResult.SKIP, GNU_SKIP_RETURNCODE, skip_stdout, None, None) else: wrap = TestHarness.get_wrapper(self.options) if self.options.gdb: @@ -1116,19 +1115,29 @@ class SingleTestRunner: else: stdo = "" stde = additional_error - if result: - self.runobj.complete(result, {}, returncode, stdo, stde, cmd) - else: - if self.test.protocol is TestProtocol.EXITCODE: - self.runobj.complete_exitcode(returncode, stdo, stde, cmd) - elif self.test.protocol is TestProtocol.GTEST: - self.runobj.complete_gtest(returncode, stdo, stde, cmd) - elif self.test.protocol is TestProtocol.RUST: - return self.runobj.complete_rust(returncode, stdo, stde, cmd) - else: + + # Print lines along the way if requested + def lines() -> T.Iterator[str]: + for line in io.StringIO(stdo): if self.options.verbose: - print(stdo, end='') - self.runobj.complete_tap(returncode, stdo, stde, cmd) + print(line, end='') + yield line + + if self.test.protocol is TestProtocol.TAP: + res, error = self.runobj.parse_tap(lines()) + if error: + stde += '\n' + error + self.runobj.complete_tap(returncode, result or res, stdo, stde, cmd) + + elif result: + self.runobj.complete(result, returncode, stdo, stde, cmd) + elif self.test.protocol is TestProtocol.EXITCODE: + self.runobj.complete_exitcode(returncode, stdo, stde, cmd) + elif self.test.protocol is TestProtocol.RUST: + return self.runobj.complete_rust(returncode, stdo, stde, cmd) + elif self.test.protocol is TestProtocol.GTEST: + self.runobj.complete_gtest(returncode, stdo, stde, cmd) + class TestHarness: def __init__(self, options: argparse.Namespace): -- cgit v1.1