aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2020-11-19 09:59:06 +0100
committerPaolo Bonzini <pbonzini@redhat.com>2021-01-07 19:20:12 +0100
commit723c4c9fefade3a806e8ece73126c8c0e21477b2 (patch)
treeef0ee20e71e84912ab2bf2e0f4ff966e36ac085a
parente50861e62f6cf695ba087ab161b3938be7c2b131 (diff)
downloadmeson-723c4c9fefade3a806e8ece73126c8c0e21477b2.zip
meson-723c4c9fefade3a806e8ece73126c8c0e21477b2.tar.gz
meson-723c4c9fefade3a806e8ece73126c8c0e21477b2.tar.bz2
mtest: extract TAP parsing out of TestRun.make_tap
For now this is just a refactoring that simplifies the next patch. However, it will also come in handy when we will make the parsing asynchronous, because it will make it possible to access subtest results while the test runs. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
-rw-r--r--mesonbuild/mtest.py97
1 files changed, 53 insertions, 44 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py
index 22ab8e5..fe91040 100644
--- a/mesonbuild/mtest.py
+++ b/mesonbuild/mtest.py
@@ -195,6 +195,9 @@ class TestResult(enum.Enum):
return self in {TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT,
TestResult.UNEXPECTEDPASS, TestResult.ERROR}
+ def was_killed(self) -> bool:
+ return self in (TestResult.TIMEOUT, TestResult.INTERRUPT)
+
def get_text(self, colorize: bool) -> str:
result_str = '{res:{reslen}}'.format(res=self.value, reslen=self.maxlen())
if self.is_bad():
@@ -752,48 +755,46 @@ class TestRun:
res = TestResult.EXPECTEDFAIL if bool(returncode) else TestResult.UNEXPECTEDPASS
else:
res = TestResult.FAIL if bool(returncode) else TestResult.OK
- self.complete(res, {}, returncode, stdo, stde, cmd, **kwargs)
+ self.complete(res, returncode, stdo, stde, cmd, **kwargs)
- def complete_tap(self, returncode: int, stdo: str, stde: str, cmd: T.List[str]) -> None:
+ def parse_tap(self, lines: T.Iterator[str]) -> T.Tuple[TestResult, str]:
res = None # type: T.Optional[TestResult]
- results = {} # type: T.Dict[str, TestResult]
- failed = False
+ error = ''
- for n, i in enumerate(TAPParser().parse(io.StringIO(stdo))):
+ for n, i in enumerate(TAPParser().parse(lines)):
if isinstance(i, TAPParser.Bailout):
- results[str(n)] = TestResult.ERROR
- failed = True
+ self.results[str(n)] = i.result
+ res = TestResult.ERROR
elif isinstance(i, TAPParser.Test):
- results[str(n)] = i.result
+ self.results[str(n)] = i.result
if i.result not in {TestResult.OK, TestResult.EXPECTEDFAIL, TestResult.SKIP}:
- failed = True
+ res = TestResult.FAIL
elif isinstance(i, TAPParser.Error):
- results[str(n)] = TestResult.ERROR
- stde += '\nTAP parsing error: ' + i.message
- failed = True
+ self.results[str(n)] = TestResult.ERROR
+ error += '\nTAP parsing error: ' + i.message
+ res = TestResult.ERROR
+
+ if all(t is TestResult.SKIP for t in self.results):
+ # This includes the case where self.results is empty
+ res = TestResult.SKIP
+ return res or TestResult.OK, error
- if returncode != 0:
+ def complete_tap(self, returncode: int, res: TestResult,
+ stdo: str, stde: str, cmd: T.List[str]) -> None:
+ if self.should_fail and res in (TestResult.OK, TestResult.FAIL):
+ res = TestResult.UNEXPECTEDPASS if res.is_ok() else TestResult.EXPECTEDFAIL
+ if returncode != 0 and not res.was_killed():
res = TestResult.ERROR
stde += '\n(test program exited with status code {})'.format(returncode,)
- if res is None:
- # Now determine the overall result of the test based on the outcome of the subcases
- if all(t is TestResult.SKIP for t in results.values()):
- # This includes the case where num_tests is zero
- res = TestResult.SKIP
- elif self.should_fail:
- res = TestResult.EXPECTEDFAIL if failed else TestResult.UNEXPECTEDPASS
- else:
- res = TestResult.FAIL if failed else TestResult.OK
-
- self.complete(res, results, returncode, stdo, stde, cmd)
+ self.complete(res, returncode, stdo, stde, cmd)
def complete_rust(self, returncode: int, stdo: str, stde: str, cmd: T.List[str]) -> None:
- results = parse_rust_test(stdo)
+ self.results = parse_rust_test(stdo)
- failed = TestResult.FAIL in results.values()
+ failed = TestResult.FAIL in self.results.values()
# Now determine the overall result of the test based on the outcome of the subcases
- if all(t is TestResult.SKIP for t in results.values()):
+ if all(t is TestResult.SKIP for t in self.results.values()):
# This includes the case where num_tests is zero
res = TestResult.SKIP
elif self.should_fail:
@@ -801,7 +802,7 @@ class TestRun:
else:
res = TestResult.FAIL if failed else TestResult.OK
- self.complete(res, results, returncode, stdo, stde, cmd)
+ self.complete(res, returncode, stdo, stde, cmd)
@property
def num(self) -> int:
@@ -810,13 +811,11 @@ class TestRun:
self._num = TestRun.TEST_NUM
return self._num
- def complete(self, res: TestResult, results: T.Dict[str, TestResult],
- returncode: int,
+ def complete(self, res: TestResult, returncode: int,
stdo: T.Optional[str], stde: T.Optional[str],
cmd: T.List[str], *, junit: T.Optional[et.ElementTree] = None) -> None:
assert isinstance(res, TestResult)
self.res = res
- self.results = results # May be empty
self.returncode = returncode
self.duration = time.time() - self.starttime
self.stdo = stdo
@@ -960,7 +959,7 @@ class SingleTestRunner:
self.runobj.start()
if cmd is None:
skip_stdout = 'Not run because can not execute cross compiled binaries.'
- self.runobj.complete(TestResult.SKIP, {}, GNU_SKIP_RETURNCODE, skip_stdout, None, None)
+ self.runobj.complete(TestResult.SKIP, GNU_SKIP_RETURNCODE, skip_stdout, None, None)
else:
wrap = TestHarness.get_wrapper(self.options)
if self.options.gdb:
@@ -1116,19 +1115,29 @@ class SingleTestRunner:
else:
stdo = ""
stde = additional_error
- if result:
- self.runobj.complete(result, {}, returncode, stdo, stde, cmd)
- else:
- if self.test.protocol is TestProtocol.EXITCODE:
- self.runobj.complete_exitcode(returncode, stdo, stde, cmd)
- elif self.test.protocol is TestProtocol.GTEST:
- self.runobj.complete_gtest(returncode, stdo, stde, cmd)
- elif self.test.protocol is TestProtocol.RUST:
- return self.runobj.complete_rust(returncode, stdo, stde, cmd)
- else:
+
+ # Print lines along the way if requested
+ def lines() -> T.Iterator[str]:
+ for line in io.StringIO(stdo):
if self.options.verbose:
- print(stdo, end='')
- self.runobj.complete_tap(returncode, stdo, stde, cmd)
+ print(line, end='')
+ yield line
+
+ if self.test.protocol is TestProtocol.TAP:
+ res, error = self.runobj.parse_tap(lines())
+ if error:
+ stde += '\n' + error
+ self.runobj.complete_tap(returncode, result or res, stdo, stde, cmd)
+
+ elif result:
+ self.runobj.complete(result, returncode, stdo, stde, cmd)
+ elif self.test.protocol is TestProtocol.EXITCODE:
+ self.runobj.complete_exitcode(returncode, stdo, stde, cmd)
+ elif self.test.protocol is TestProtocol.RUST:
+ return self.runobj.complete_rust(returncode, stdo, stde, cmd)
+ elif self.test.protocol is TestProtocol.GTEST:
+ self.runobj.complete_gtest(returncode, stdo, stde, cmd)
+
class TestHarness:
def __init__(self, options: argparse.Namespace):