diff options
Diffstat (limited to 'mesonbuild/mtest.py')
-rw-r--r-- | mesonbuild/mtest.py | 750 |
1 files changed, 443 insertions, 307 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 9db271e..59d2389 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -15,13 +15,12 @@ # A tool to run tests in many different ways. from pathlib import Path -from collections import deque, namedtuple +from collections import deque from copy import deepcopy import argparse import asyncio import datetime import enum -import io import json import multiprocessing import os @@ -32,10 +31,10 @@ import re import signal import subprocess import sys -import tempfile import textwrap import time import typing as T +import unicodedata import xml.etree.ElementTree as et from . import build @@ -63,6 +62,14 @@ def is_windows() -> bool: def is_cygwin() -> bool: return sys.platform == 'cygwin' +UNIWIDTH_MAPPING = {'F': 2, 'H': 1, 'W': 2, 'Na': 1, 'N': 1, 'A': 1} +def uniwidth(s: str) -> int: + result = 0 + for c in s: + w = unicodedata.east_asian_width(c) + result += UNIWIDTH_MAPPING[w] + return result + def determine_worker_count() -> int: varname = 'MESON_TESTTHREADS' if varname in os.environ: @@ -150,10 +157,10 @@ def returncode_to_status(retcode: int) -> str: signame = signal.Signals(signum).name except ValueError: signame = 'SIGinvalid' - return '(killed by signal {} {})'.format(signum, signame) + return 'killed by signal {} {}'.format(signum, signame) if retcode <= 128: - return '(exit status {})'.format(retcode) + return 'exit status {}'.format(retcode) signum = retcode - 128 try: @@ -195,6 +202,9 @@ class TestResult(enum.Enum): return self in {TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT, TestResult.UNEXPECTEDPASS, TestResult.ERROR} + def was_killed(self) -> bool: + return self in (TestResult.TIMEOUT, TestResult.INTERRUPT) + def get_text(self, colorize: bool) -> str: result_str = '{res:{reslen}}'.format(res=self.value, reslen=self.maxlen()) if self.is_bad(): @@ -206,12 +216,32 @@ class TestResult(enum.Enum): return decorator(result_str).get_text(colorize) +TYPE_TAPResult = T.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout'] + class TAPParser: - Plan = namedtuple('Plan', ['count', 'late', 'skipped', 'explanation']) - Bailout = namedtuple('Bailout', ['message']) - Test = namedtuple('Test', ['number', 'name', 'result', 'explanation']) - Error = namedtuple('Error', ['message']) - Version = namedtuple('Version', ['version']) + class Plan(T.NamedTuple): + num_tests: int + late: bool + skipped: bool + explanation: T.Optional[str] + + class Bailout(T.NamedTuple): + message: str + + class Test(T.NamedTuple): + number: int + name: str + result: TestResult + explanation: T.Optional[str] + + def __str__(self) -> str: + return '{} {}'.format(self.number, self.name).strip() + + class Error(T.NamedTuple): + message: str + + class Version(T.NamedTuple): + version: int _MAIN = 1 _AFTER_TEST = 2 @@ -225,8 +255,15 @@ class TAPParser: _RE_YAML_START = re.compile(r'(\s+)---.*') _RE_YAML_END = re.compile(r'\s+\.\.\.\s*') - def __init__(self, io: T.Iterator[str]): - self.io = io + found_late_test = False + bailed_out = False + plan: T.Optional[Plan] = None + lineno = 0 + num_tests = 0 + yaml_lineno: T.Optional[int] = None + yaml_indent = '' + state = _MAIN + version = 12 def parse_test(self, ok: bool, num: int, name: str, directive: T.Optional[str], explanation: T.Optional[str]) -> \ T.Generator[T.Union['TAPParser.Test', 'TAPParser.Error'], None, None]: @@ -246,113 +283,113 @@ class TAPParser: yield self.Test(num, name, TestResult.OK if ok else TestResult.FAIL, explanation) - def parse(self) -> T.Generator[T.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout'], None, None]: - found_late_test = False - bailed_out = False - plan = None - lineno = 0 - num_tests = 0 - yaml_lineno = None - yaml_indent = '' - state = self._MAIN - version = 12 - while True: - lineno += 1 - try: - line = next(self.io).rstrip() - except StopIteration: - break + async def parse_async(self, lines: T.AsyncIterator[str]) -> T.AsyncIterator[TYPE_TAPResult]: + async for line in lines: + for event in self.parse_line(line): + yield event + for event in self.parse_line(None): + yield event + + def parse(self, io: T.Iterator[str]) -> T.Iterator[TYPE_TAPResult]: + for line in io: + yield from self.parse_line(line) + yield from self.parse_line(None) + + def parse_line(self, line: T.Optional[str]) -> T.Iterator[TYPE_TAPResult]: + if line is not None: + self.lineno += 1 + line = line.rstrip() # YAML blocks are only accepted after a test - if state == self._AFTER_TEST: - if version >= 13: + if self.state == self._AFTER_TEST: + if self.version >= 13: m = self._RE_YAML_START.match(line) if m: - state = self._YAML - yaml_lineno = lineno - yaml_indent = m.group(1) - continue - state = self._MAIN + self.state = self._YAML + self.yaml_lineno = self.lineno + self.yaml_indent = m.group(1) + return + self.state = self._MAIN - elif state == self._YAML: + elif self.state == self._YAML: if self._RE_YAML_END.match(line): - state = self._MAIN - continue - if line.startswith(yaml_indent): - continue - yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno)) - state = self._MAIN + self.state = self._MAIN + return + if line.startswith(self.yaml_indent): + return + yield self.Error('YAML block not terminated (started on line {})'.format(self.yaml_lineno)) + self.state = self._MAIN - assert state == self._MAIN + assert self.state == self._MAIN if line.startswith('#'): - continue + return m = self._RE_TEST.match(line) if m: - if plan and plan.late and not found_late_test: + if self.plan and self.plan.late and not self.found_late_test: yield self.Error('unexpected test after late plan') - found_late_test = True - num_tests += 1 - num = num_tests if m.group(2) is None else int(m.group(2)) - if num != num_tests: + self.found_late_test = True + self.num_tests += 1 + num = self.num_tests if m.group(2) is None else int(m.group(2)) + if num != self.num_tests: yield self.Error('out of order test numbers') yield from self.parse_test(m.group(1) == 'ok', num, m.group(3), m.group(4), m.group(5)) - state = self._AFTER_TEST - continue + self.state = self._AFTER_TEST + return m = self._RE_PLAN.match(line) if m: - if plan: + if self.plan: yield self.Error('more than one plan found') else: - count = int(m.group(1)) - skipped = (count == 0) + num_tests = int(m.group(1)) + skipped = (num_tests == 0) if m.group(2): if m.group(2).upper().startswith('SKIP'): - if count > 0: + if num_tests > 0: yield self.Error('invalid SKIP directive for plan') skipped = True else: yield self.Error('invalid directive for plan') - plan = self.Plan(count=count, late=(num_tests > 0), - skipped=skipped, explanation=m.group(3)) - yield plan - continue + self.plan = self.Plan(num_tests=num_tests, late=(self.num_tests > 0), + skipped=skipped, explanation=m.group(3)) + yield self.plan + return m = self._RE_BAILOUT.match(line) if m: yield self.Bailout(m.group(1)) - bailed_out = True - continue + self.bailed_out = True + return m = self._RE_VERSION.match(line) if m: # The TAP version is only accepted as the first line - if lineno != 1: + if self.lineno != 1: yield self.Error('version number must be on the first line') - continue - version = int(m.group(1)) - if version < 13: + return + self.version = int(m.group(1)) + if self.version < 13: yield self.Error('version number should be at least 13') else: - yield self.Version(version=version) - continue + yield self.Version(version=self.version) + return if not line: - continue - - yield self.Error('unexpected input at line {}'.format((lineno,))) - - if state == self._YAML: - yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno)) + return - if not bailed_out and plan and num_tests != plan.count: - if num_tests < plan.count: - yield self.Error('Too few tests run (expected {}, got {})'.format(plan.count, num_tests)) - else: - yield self.Error('Too many tests run (expected {}, got {})'.format(plan.count, num_tests)) + yield self.Error('unexpected input at line {}'.format((self.lineno,))) + else: + # end of file + if self.state == self._YAML: + yield self.Error('YAML block not terminated (started on line {})'.format(self.yaml_lineno)) + if not self.bailed_out and self.plan and self.num_tests != self.plan.num_tests: + if self.num_tests < self.plan.num_tests: + yield self.Error('Too few tests run (expected {}, got {})'.format(self.plan.num_tests, self.num_tests)) + else: + yield self.Error('Too many tests run (expected {}, got {})'.format(self.plan.num_tests, self.num_tests)) class TestLogger: def flush(self) -> None: @@ -394,6 +431,7 @@ class ConsoleLogger(TestLogger): self.running_tests = OrderedSet() # type: OrderedSet['TestRun'] self.progress_test = None # type: T.Optional['TestRun'] self.progress_task = None # type: T.Optional[asyncio.Future] + self.max_left_width = 0 # type: int self.stop = False self.update = asyncio.Event() self.should_erase_line = '' @@ -413,7 +451,7 @@ class ConsoleLogger(TestLogger): def request_update(self) -> None: self.update.set() - def emit_progress(self) -> None: + def emit_progress(self, harness: 'TestHarness') -> None: if self.progress_test is None: self.flush() return @@ -424,8 +462,20 @@ class ConsoleLogger(TestLogger): count = '{}-{}/{}'.format(self.started_tests - len(self.running_tests) + 1, self.started_tests, self.test_count) - line = '[{}] {} {}'.format(count, self.SPINNER[self.spinner_index], self.progress_test.name) + left = '[{}] {} '.format(count, self.SPINNER[self.spinner_index]) self.spinner_index = (self.spinner_index + 1) % len(self.SPINNER) + + right = '{spaces} {dur:{durlen}}/{timeout:{durlen}}s'.format( + spaces=' ' * TestResult.maxlen(), + dur=int(time.time() - self.progress_test.starttime), + durlen=harness.duration_max_len, + timeout=int(self.progress_test.timeout)) + detail = self.progress_test.detail + if detail: + right += ' ' + detail + line = harness.format(self.progress_test, colorize=True, + max_left_width=self.max_left_width, + left=left, right=right) self.print_progress(line) @staticmethod @@ -464,13 +514,16 @@ class ConsoleLogger(TestLogger): self.progress_test = self.running_tests.pop(last=False) self.running_tests.add(self.progress_test) - self.emit_progress() + self.emit_progress(harness) self.flush() self.test_count = harness.test_count + # In verbose mode, the progress report gets in the way of the tests' # stdout and stderr. if self.is_tty() and not harness.options.verbose: + # Account for "[aa-bb/cc] OO " in the progress report + self.max_left_width = 3 * len(str(self.test_count)) + 8 self.progress_task = asyncio.ensure_future(report_progress()) def start_test(self, test: 'TestRun') -> None: @@ -481,9 +534,14 @@ class ConsoleLogger(TestLogger): def log(self, harness: 'TestHarness', result: 'TestRun') -> None: self.running_tests.remove(result) + if result.res is TestResult.TIMEOUT and harness.options.verbose: + self.flush() + print('{} time out (After {} seconds)'.format(result.name, result.timeout)) + if not harness.options.quiet or not result.res.is_ok(): self.flush() - print(harness.format(result, mlog.colorize_console()), flush=True) + print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width), + flush=True) self.request_update() async def finish(self, harness: 'TestHarness') -> None: @@ -595,32 +653,33 @@ class JunitBuilder(TestLogger): 'testsuite', name=suitename, tests=str(len(test.results)), - errors=str(sum(1 for r in test.results.values() if r in + errors=str(sum(1 for r in test.results if r.result in {TestResult.INTERRUPT, TestResult.ERROR})), - failures=str(sum(1 for r in test.results.values() if r in + failures=str(sum(1 for r in test.results if r.result in {TestResult.FAIL, TestResult.UNEXPECTEDPASS, TestResult.TIMEOUT})), - skipped=str(sum(1 for r in test.results.values() if r is TestResult.SKIP)), + skipped=str(sum(1 for r in test.results if r.result is TestResult.SKIP)), + time=str(test.duration), ) - for i, result in test.results.items(): - # Both name and classname are required. Set them both to the - # number of the test in a TAP test, as TAP doesn't give names. - testcase = et.SubElement(suite, 'testcase', name=i, classname=i) - if result is TestResult.SKIP: + for subtest in test.results: + testcase = et.SubElement(suite, 'testcase', name=str(subtest)) + if subtest.result is TestResult.SKIP: et.SubElement(testcase, 'skipped') - elif result is TestResult.ERROR: + elif subtest.result is TestResult.ERROR: et.SubElement(testcase, 'error') - elif result is TestResult.FAIL: + elif subtest.result is TestResult.FAIL: et.SubElement(testcase, 'failure') - elif result is TestResult.UNEXPECTEDPASS: + elif subtest.result is TestResult.UNEXPECTEDPASS: fail = et.SubElement(testcase, 'failure') fail.text = 'Test unexpected passed.' - elif result is TestResult.INTERRUPT: - fail = et.SubElement(testcase, 'failure') + elif subtest.result is TestResult.INTERRUPT: + fail = et.SubElement(testcase, 'error') fail.text = 'Test was interrupted by user.' - elif result is TestResult.TIMEOUT: - fail = et.SubElement(testcase, 'failure') + elif subtest.result is TestResult.TIMEOUT: + fail = et.SubElement(testcase, 'error') fail.text = 'Test did not finish before configured timeout.' + if subtest.explanation: + et.SubElement(testcase, 'system-out').text = subtest.explanation if test.stdo: out = et.SubElement(suite, 'system-out') out.text = test.stdo.rstrip() @@ -631,12 +690,13 @@ class JunitBuilder(TestLogger): if test.project not in self.suites: suite = self.suites[test.project] = et.Element( 'testsuite', name=test.project, tests='1', errors='0', - failures='0', skipped='0') + failures='0', skipped='0', time=str(test.duration)) else: suite = self.suites[test.project] suite.attrib['tests'] = str(int(suite.attrib['tests']) + 1) - testcase = et.SubElement(suite, 'testcase', name=test.name, classname=test.name) + testcase = et.SubElement(suite, 'testcase', name=test.name, + time=str(test.duration)) if test.res is TestResult.SKIP: et.SubElement(testcase, 'skipped') suite.attrib['skipped'] = str(int(suite.attrib['skipped']) + 1) @@ -666,38 +726,17 @@ class JunitBuilder(TestLogger): tree.write(f, encoding='utf-8', xml_declaration=True) -def parse_rust_test(stdout: str) -> T.Dict[str, TestResult]: - """Parse the output of rust tests.""" - res = {} # type; T.Dict[str, TestResult] - - def parse_res(res: str) -> TestResult: - if res == 'ok': - return TestResult.OK - elif res == 'ignored': - return TestResult.SKIP - elif res == 'FAILED': - return TestResult.FAIL - raise MesonException('Unsupported output from rust test: {}'.format(res)) - - for line in stdout.splitlines(): - if line.startswith('test ') and not line.startswith('test result'): - _, name, _, result = line.split(' ') - name = name.replace('::', '.') - res[name] = parse_res(result) - - return res - - class TestRun: TEST_NUM = 0 def __init__(self, test: TestSerialisation, test_env: T.Dict[str, str], - name: str): + name: str, timeout: T.Optional[int]): self.res = TestResult.PENDING self.test = test self._num = None # type: T.Optional[int] self.name = name - self.results: T.Dict[str, TestResult] = {} + self.timeout = timeout + self.results = list() # type: T.List[TAPParser.Test] self.returncode = 0 self.starttime = None # type: T.Optional[float] self.duration = None # type: T.Optional[float] @@ -731,60 +770,65 @@ class TestRun: res = TestResult.SKIP elif returncode == GNU_ERROR_RETURNCODE: res = TestResult.ERROR - elif self.should_fail: - res = TestResult.EXPECTEDFAIL if bool(returncode) else TestResult.UNEXPECTEDPASS else: res = TestResult.FAIL if bool(returncode) else TestResult.OK - self.complete(res, {}, returncode, stdo, stde, cmd, **kwargs) + self.complete(returncode, res, stdo, stde, cmd, **kwargs) - def complete_tap(self, returncode: int, stdo: str, stde: str, cmd: T.List[str]) -> None: - res = None # type: T.Optional[TestResult] - results = {} # type: T.Dict[str, TestResult] - failed = False + async def parse_tap(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: + res = TestResult.OK + error = '' - for n, i in enumerate(TAPParser(io.StringIO(stdo)).parse()): + async for i in TAPParser().parse_async(lines): if isinstance(i, TAPParser.Bailout): - results[str(n)] = TestResult.ERROR - failed = True + res = TestResult.ERROR elif isinstance(i, TAPParser.Test): - results[str(n)] = i.result - if i.result not in {TestResult.OK, TestResult.EXPECTEDFAIL, TestResult.SKIP}: - failed = True + self.results.append(i) + if i.result.is_bad(): + res = TestResult.FAIL elif isinstance(i, TAPParser.Error): - results[str(n)] = TestResult.ERROR - stde += '\nTAP parsing error: ' + i.message - failed = True + error = '\nTAP parsing error: ' + i.message + res = TestResult.ERROR + + if all(t.result is TestResult.SKIP for t in self.results): + # This includes the case where self.results is empty + res = TestResult.SKIP + return res, error - if returncode != 0: + def complete_tap(self, returncode: int, res: TestResult, + stdo: str, stde: str, cmd: T.List[str]) -> None: + if returncode != 0 and not res.was_killed(): res = TestResult.ERROR stde += '\n(test program exited with status code {})'.format(returncode,) - if res is None: - # Now determine the overall result of the test based on the outcome of the subcases - if all(t is TestResult.SKIP for t in results.values()): - # This includes the case where num_tests is zero - res = TestResult.SKIP - elif self.should_fail: - res = TestResult.EXPECTEDFAIL if failed else TestResult.UNEXPECTEDPASS - else: - res = TestResult.FAIL if failed else TestResult.OK - - self.complete(res, results, returncode, stdo, stde, cmd) - - def complete_rust(self, returncode: int, stdo: str, stde: str, cmd: T.List[str]) -> None: - results = parse_rust_test(stdo) - - failed = TestResult.FAIL in results.values() - # Now determine the overall result of the test based on the outcome of the subcases - if all(t is TestResult.SKIP for t in results.values()): - # This includes the case where num_tests is zero - res = TestResult.SKIP - elif self.should_fail: - res = TestResult.EXPECTEDFAIL if failed else TestResult.UNEXPECTEDPASS - else: - res = TestResult.FAIL if failed else TestResult.OK - - self.complete(res, results, returncode, stdo, stde, cmd) + self.complete(returncode, res, stdo, stde, cmd) + + async def parse_rust(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: + def parse_res(n: int, name: str, result: str) -> TAPParser.Test: + if result == 'ok': + return TAPParser.Test(n, name, TestResult.OK, None) + elif result == 'ignored': + return TAPParser.Test(n, name, TestResult.SKIP, None) + elif result == 'FAILED': + return TAPParser.Test(n, name, TestResult.FAIL, None) + return TAPParser.Test(n, name, TestResult.ERROR, + 'Unsupported output from rust test: {}'.format(result)) + + n = 1 + async for line in lines: + if line.startswith('test ') and not line.startswith('test result'): + _, name, _, result = line.rstrip().split(' ') + name = name.replace('::', '.') + self.results.append(parse_res(n, name, result)) + n += 1 + + if all(t.result is TestResult.SKIP for t in self.results): + # This includes the case where self.results is empty + return TestResult.SKIP, '' + elif any(t.result is TestResult.ERROR for t in self.results): + return TestResult.ERROR, '' + elif any(t.result is TestResult.FAIL for t in self.results): + return TestResult.FAIL, '' + return TestResult.OK, '' @property def num(self) -> int: @@ -793,13 +837,30 @@ class TestRun: self._num = TestRun.TEST_NUM return self._num - def complete(self, res: TestResult, results: T.Dict[str, TestResult], - returncode: int, + @property + def detail(self) -> str: + if self.res is TestResult.PENDING: + return '' + if self.returncode: + return returncode_to_status(self.returncode) + if self.results: + # running or succeeded + passed = sum((x.result.is_ok() for x in self.results)) + ran = sum((x.result is not TestResult.SKIP for x in self.results)) + if passed == ran: + return '{} subtests passed'.format(passed) + else: + return '{}/{} subtests passed'.format(passed, ran) + return '' + + def complete(self, returncode: int, res: TestResult, stdo: T.Optional[str], stde: T.Optional[str], cmd: T.List[str], *, junit: T.Optional[et.ElementTree] = None) -> None: assert isinstance(res, TestResult) + if self.should_fail and res in (TestResult.OK, TestResult.FAIL): + res = TestResult.UNEXPECTEDPASS if res.is_ok() else TestResult.EXPECTEDFAIL + self.res = res - self.results = results # May be empty self.returncode = returncode self.duration = time.time() - self.starttime self.stdo = stdo @@ -906,6 +967,77 @@ async def complete_all(futures: T.Iterable[asyncio.Future]) -> None: if not f.cancelled(): f.result() +class TestSubprocess: + def __init__(self, p: asyncio.subprocess.Process, postwait_fn: T.Callable[[], None] = None): + self._process = p + self.postwait_fn = postwait_fn # type: T.Callable[[], None] + + @property + def stdout(self) -> T.Optional[asyncio.StreamReader]: + return self._process.stdout + + @property + def stderr(self) -> T.Optional[asyncio.StreamReader]: + return self._process.stderr + + async def _kill(self) -> T.Optional[str]: + # Python does not provide multiplatform support for + # killing a process and all its children so we need + # to roll our own. + p = self._process + try: + if is_windows(): + subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) + else: + # Send a termination signal to the process group that setsid() + # created - giving it a chance to perform any cleanup. + os.killpg(p.pid, signal.SIGTERM) + + # Make sure the termination signal actually kills the process + # group, otherwise retry with a SIGKILL. + await try_wait_one(p.wait(), timeout=0.5) + if p.returncode is not None: + return None + + os.killpg(p.pid, signal.SIGKILL) + + await try_wait_one(p.wait(), timeout=1) + if p.returncode is not None: + return None + + # An earlier kill attempt has not worked for whatever reason. + # Try to kill it one last time with a direct call. + # If the process has spawned children, they will remain around. + p.kill() + await try_wait_one(p.wait(), timeout=1) + if p.returncode is not None: + return None + return 'Test process could not be killed.' + except ProcessLookupError: + # Sometimes (e.g. with Wine) this happens. There's nothing + # we can do, probably the process already died so just wait + # for the event loop to pick that up. + await p.wait() + return None + + async def wait(self, timeout: T.Optional[int]) -> T.Tuple[int, TestResult, T.Optional[str]]: + p = self._process + result = None + additional_error = None + try: + await try_wait_one(p.wait(), timeout=timeout) + if p.returncode is None: + additional_error = await self._kill() + result = TestResult.TIMEOUT + except asyncio.CancelledError: + # The main loop must have seen Ctrl-C. + additional_error = await self._kill() + result = TestResult.INTERRUPT + finally: + if self.postwait_fn: + self.postwait_fn() + + return p.returncode or 0, result, additional_error class SingleTestRunner: @@ -916,7 +1048,15 @@ class SingleTestRunner: self.test_env = test_env self.env = env self.options = options - self.runobj = TestRun(test, test_env, name) + + if self.options.gdb or self.test.timeout is None: + timeout = None + elif self.options.timeout_multiplier is not None: + timeout = self.test.timeout * self.options.timeout_multiplier + else: + timeout = self.test.timeout + + self.runobj = TestRun(test, test_env, name, timeout) def _get_cmd(self) -> T.Optional[T.List[str]]: if self.test.fname[0].endswith('.jar'): @@ -938,61 +1078,28 @@ class SingleTestRunner: return self.test.exe_runner.get_command() + self.test.fname return self.test.fname + @property + def visible_name(self) -> str: + return self.runobj.name + + @property + def timeout(self) -> T.Optional[int]: + return self.runobj.timeout + async def run(self) -> TestRun: cmd = self._get_cmd() self.runobj.start() if cmd is None: skip_stdout = 'Not run because can not execute cross compiled binaries.' - self.runobj.complete(TestResult.SKIP, {}, GNU_SKIP_RETURNCODE, skip_stdout, None, None) + self.runobj.complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, skip_stdout, None, None) else: wrap = TestHarness.get_wrapper(self.options) - if self.options.gdb: - self.test.timeout = None await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) return self.runobj - async def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int], - stdout: T.IO, stderr: T.IO, - env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[int, TestResult, T.Optional[str]]: - async def kill_process(p: asyncio.subprocess.Process) -> T.Optional[str]: - # Python does not provide multiplatform support for - # killing a process and all its children so we need - # to roll our own. - try: - if is_windows(): - subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) - else: - # Send a termination signal to the process group that setsid() - # created - giving it a chance to perform any cleanup. - os.killpg(p.pid, signal.SIGTERM) - - # Make sure the termination signal actually kills the process - # group, otherwise retry with a SIGKILL. - await try_wait_one(p.wait(), timeout=0.5) - if p.returncode is not None: - return None - - os.killpg(p.pid, signal.SIGKILL) - - await try_wait_one(p.wait(), timeout=1) - if p.returncode is not None: - return None - - # An earlier kill attempt has not worked for whatever reason. - # Try to kill it one last time with a direct call. - # If the process has spawned children, they will remain around. - p.kill() - await try_wait_one(p.wait(), timeout=1) - if p.returncode is not None: - return None - return 'Test process could not be killed.' - except ProcessLookupError: - # Sometimes (e.g. with Wine) this happens. There's nothing - # we can do, probably the process already died so just wait - # for the event loop to pick that up. - await p.wait() - return None - + async def _run_subprocess(self, args: T.List[str], *, + stdout: int, stderr: int, + env: T.Dict[str, str], cwd: T.Optional[str]) -> TestSubprocess: # Let gdb handle ^C instead of us if self.options.gdb: previous_sigint_handler = signal.getsignal(signal.SIGINT) @@ -1010,31 +1117,18 @@ class SingleTestRunner: # errors avoid not being able to use the terminal. os.setsid() + def postwait_fn() -> None: + if self.options.gdb: + # Let us accept ^C again + signal.signal(signal.SIGINT, previous_sigint_handler) + p = await asyncio.create_subprocess_exec(*args, stdout=stdout, stderr=stderr, env=env, cwd=cwd, preexec_fn=preexec_fn if not is_windows() else None) - result = None - additional_error = None - try: - await try_wait_one(p.wait(), timeout=timeout) - if p.returncode is None: - if self.options.verbose: - print('{} time out (After {} seconds)'.format(self.test.name, timeout)) - additional_error = await kill_process(p) - result = TestResult.TIMEOUT - except asyncio.CancelledError: - # The main loop must have seen Ctrl-C. - additional_error = await kill_process(p) - result = TestResult.INTERRUPT - finally: - if self.options.gdb: - # Let us accept ^C again - signal.signal(signal.SIGINT, previous_sigint_handler) - - return p.returncode or 0, result, additional_error + return TestSubprocess(p, postwait_fn=postwait_fn if not is_windows() else None) async def _run_cmd(self, cmd: T.List[str]) -> None: if self.test.extra_paths: @@ -1059,11 +1153,12 @@ class SingleTestRunner: stdout = None stderr = None - if not self.options.verbose: - stdout = tempfile.TemporaryFile("wb+") - stderr = tempfile.TemporaryFile("wb+") if self.options.split else stdout - if self.test.protocol is TestProtocol.TAP and stderr is stdout: - stdout = tempfile.TemporaryFile("wb+") + if self.test.protocol is TestProtocol.TAP: + stdout = asyncio.subprocess.PIPE + stderr = None if self.options.verbose else asyncio.subprocess.PIPE + elif not self.options.verbose: + stdout = asyncio.subprocess.PIPE + stderr = asyncio.subprocess.PIPE if self.options.split else asyncio.subprocess.STDOUT extra_cmd = [] # type: T.List[str] if self.test.protocol is TestProtocol.GTEST: @@ -1072,46 +1167,67 @@ class SingleTestRunner: gtestname = os.path.join(self.test.workdir, self.test.name) extra_cmd.append('--gtest_output=xml:{}.xml'.format(gtestname)) - if self.test.timeout is None: - timeout = None - elif self.options.timeout_multiplier is not None: - timeout = self.test.timeout * self.options.timeout_multiplier - else: - timeout = self.test.timeout + p = await self._run_subprocess(cmd + extra_cmd, + stdout=stdout, + stderr=stderr, + env=self.env, + cwd=self.test.workdir) + + stdo = stde = '' + stdo_task = stde_task = parse_task = None + + # Extract lines out of the StreamReader and print them + # along the way if requested + async def lines() -> T.AsyncIterator[str]: + stdo_lines = [] + reader = p.stdout + while not reader.at_eof(): + line = decode(await reader.readline()) + stdo_lines.append(line) + if self.options.verbose: + print(line, end='') + yield line + + nonlocal stdo + stdo = ''.join(stdo_lines) + + if self.test.protocol is TestProtocol.TAP: + parse_task = self.runobj.parse_tap(lines()) + elif self.test.protocol is TestProtocol.RUST: + parse_task = self.runobj.parse_rust(lines()) + elif stdout is not None: + stdo_task = p.stdout.read(-1) + if stderr is not None and stderr != asyncio.subprocess.STDOUT: + stde_task = p.stderr.read(-1) + + returncode, result, additional_error = await p.wait(self.runobj.timeout) + if result is TestResult.TIMEOUT and self.options.verbose: + print('{} time out (After {} seconds)'.format(self.test.name, self.runobj.timeout)) + + if stdo_task is not None: + stdo = decode(await stdo_task) + if stde_task is not None: + stde = decode(await stde_task) + + if additional_error is not None: + stde += '\n' + additional_error + + if parse_task is not None: + res, error = await parse_task + if error: + stde += '\n' + error + result = result or res + if self.test.protocol is TestProtocol.TAP: + self.runobj.complete_tap(returncode, result, stdo, stde, cmd) + return - returncode, result, additional_error = await self._run_subprocess(cmd + extra_cmd, - timeout=timeout, - stdout=stdout, - stderr=stderr, - env=self.env, - cwd=self.test.workdir) - if additional_error is None: - if stdout is None: - stdo = '' - else: - stdout.seek(0) - stdo = decode(stdout.read()) - if stderr is None or stderr is stdout: - stde = '' - else: - stderr.seek(0) - stde = decode(stderr.read()) - else: - stdo = "" - stde = additional_error if result: - self.runobj.complete(result, {}, returncode, stdo, stde, cmd) - else: - if self.test.protocol is TestProtocol.EXITCODE: - self.runobj.complete_exitcode(returncode, stdo, stde, cmd) - elif self.test.protocol is TestProtocol.GTEST: - self.runobj.complete_gtest(returncode, stdo, stde, cmd) - elif self.test.protocol is TestProtocol.RUST: - return self.runobj.complete_rust(returncode, stdo, stde, cmd) - else: - if self.options.verbose: - print(stdo, end='') - self.runobj.complete_tap(returncode, stdo, stde, cmd) + self.runobj.complete(returncode, result, stdo, stde, cmd) + elif self.test.protocol is TestProtocol.EXITCODE: + self.runobj.complete_exitcode(returncode, stdo, stde, cmd) + elif self.test.protocol is TestProtocol.GTEST: + self.runobj.complete_gtest(returncode, stdo, stde, cmd) + class TestHarness: def __init__(self, options: argparse.Namespace): @@ -1173,7 +1289,8 @@ class TestHarness: options.wrapper = current.exe_wrapper return current.env.get_env(os.environ.copy()) - def get_test_runner(self, test: TestSerialisation, name: str) -> SingleTestRunner: + def get_test_runner(self, test: TestSerialisation) -> SingleTestRunner: + name = self.get_pretty_suite(test) options = deepcopy(self.options) if not options.setup: options.setup = self.build_data.test_setup_default_name @@ -1209,18 +1326,33 @@ class TestHarness: for l in self.loggers: l.log(self, result) - def format(self, result: TestRun, colorize: bool) -> str: - result_str = '{num:{numlen}}/{testcount} {name:{name_max_len}} {res} {dur:.2f}s'.format( - numlen=len(str(self.test_count)), - num=result.num, - testcount=self.test_count, - name_max_len=self.name_max_len, - name=result.name, - res=result.res.get_text(colorize), - dur=result.duration) - if result.res is TestResult.FAIL: - result_str += ' ' + returncode_to_status(result.returncode) - return result_str + def format(self, result: TestRun, colorize: bool, + max_left_width: int = 0, + left: T.Optional[str] = None, + right: T.Optional[str] = None) -> str: + numlen = len(str(self.test_count)) + + if left is None: + left = '{num:{numlen}}/{testcount} '.format( + numlen=numlen, + num=result.num, + testcount=self.test_count) + + # A non-default max_left_width lets the logger print more stuff before the + # name, while ensuring that the rightmost columns remain aligned. + max_left_width = max(max_left_width, 2 * numlen + 2) + extra_name_width = max_left_width + self.name_max_len + 1 - uniwidth(result.name) - uniwidth(left) + middle = result.name + (' ' * max(1, extra_name_width)) + + if right is None: + right = '{res} {dur:{durlen}.2f}s'.format( + res=result.res.get_text(colorize), + dur=result.duration, + durlen=self.duration_max_len + 3) + detail = result.detail + if detail: + right += ' ' + detail + return left + middle + right def summary(self) -> str: return textwrap.dedent(''' @@ -1253,8 +1385,18 @@ class TestHarness: sys.exit(125) self.test_count = len(tests) - self.name_max_len = max([len(self.get_pretty_suite(test)) for test in tests]) - self.run_tests(tests) + self.name_max_len = max([uniwidth(self.get_pretty_suite(test)) for test in tests]) + startdir = os.getcwd() + try: + if self.options.wd: + os.chdir(self.options.wd) + self.build_data = build.load(os.getcwd()) + runners = [self.get_test_runner(test) for test in tests] + self.duration_max_len = max([len(str(int(runner.timeout or 99))) + for runner in runners]) + self.run_tests(runners) + finally: + os.chdir(startdir) return self.total_failure_count() @staticmethod @@ -1393,23 +1535,19 @@ class TestHarness: else: return test.name - def run_tests(self, tests: T.List[TestSerialisation]) -> None: + def run_tests(self, runners: T.List[SingleTestRunner]) -> None: try: self.open_logfiles() # Replace with asyncio.run once we can require Python 3.7 loop = asyncio.get_event_loop() - loop.run_until_complete(self._run_tests(tests)) + loop.run_until_complete(self._run_tests(runners)) finally: self.close_logfiles() - async def _run_tests(self, tests: T.List[TestSerialisation]) -> None: + async def _run_tests(self, runners: T.List[SingleTestRunner]) -> None: semaphore = asyncio.Semaphore(self.options.num_processes) futures = deque() # type: T.Deque[asyncio.Future] running_tests = dict() # type: T.Dict[asyncio.Future, str] - startdir = os.getcwd() - if self.options.wd: - os.chdir(self.options.wd) - self.build_data = build.load(os.getcwd()) interrupted = False async def run_test(test: SingleTestRunner) -> None: @@ -1470,17 +1608,16 @@ class TestHarness: asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, sigterm_handler) try: for _ in range(self.options.repeat): - for test in tests: - visible_name = self.get_pretty_suite(test) - single_test = self.get_test_runner(test, visible_name) + for runner in runners: + test = runner.test - if not test.is_parallel or single_test.options.gdb: + if not test.is_parallel or runner.options.gdb: await complete_all(futures) - future = asyncio.ensure_future(run_test(single_test)) + future = asyncio.ensure_future(run_test(runner)) futures.append(future) - running_tests[future] = visible_name + running_tests[future] = runner.visible_name future.add_done_callback(test_done) - if not test.is_parallel or single_test.options.gdb: + if not test.is_parallel or runner.options.gdb: await complete(future) if self.options.repeat > 1 and self.fail_count: break @@ -1492,7 +1629,6 @@ class TestHarness: asyncio.get_event_loop().remove_signal_handler(signal.SIGTERM) for l in self.loggers: await l.finish(self) - os.chdir(startdir) def list_tests(th: TestHarness) -> bool: tests = th.get_tests() |