diff options
-rw-r--r-- | mesonbuild/mtest.py | 201 |
1 files changed, 104 insertions, 97 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index ade2aea..b09de16 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -34,6 +34,7 @@ import subprocess import sys import tempfile import time +import typing from . import build from . import environment @@ -41,6 +42,9 @@ from . import mlog from .dependencies import ExternalProgram from .mesonlib import substring_is_in_list, MesonException +if typing.TYPE_CHECKING: + from .backend.backends import TestSerialisation + # GNU autotools interprets a return code of 77 from tests it executes to # mean that the test should be skipped. GNU_SKIP_RETURNCODE = 77 @@ -49,15 +53,15 @@ GNU_SKIP_RETURNCODE = 77 # mean that the test failed even before testing what it is supposed to test. GNU_ERROR_RETURNCODE = 99 -def is_windows(): +def is_windows() -> bool: platname = platform.system().lower() return platname == 'windows' or 'mingw' in platname -def is_cygwin(): +def is_cygwin() -> bool: platname = platform.system().lower() return 'cygwin' in platname -def determine_worker_count(): +def determine_worker_count() -> int: varname = 'MESON_TESTTHREADS' if varname in os.environ: try: @@ -74,7 +78,7 @@ def determine_worker_count(): num_workers = 1 return num_workers -def add_arguments(parser): +def add_arguments(parser: argparse.ArgumentParser) -> None: parser.add_argument('--repeat', default=1, dest='repeat', type=int, help='Number of times to run the tests.') parser.add_argument('--no-rebuild', default=False, action='store_true', @@ -117,7 +121,7 @@ def add_arguments(parser): help='Optional list of tests to run') -def returncode_to_status(retcode): +def returncode_to_status(retcode: int) -> str: # Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related # functions here because the status returned by subprocess is munged. It # returns a negative value if the process was killed by a signal rather than @@ -142,7 +146,7 @@ def returncode_to_status(retcode): signame = 'SIGinvalid' return '(exit status %d or signal %d %s)' % (retcode, signum, signame) -def env_tuple_to_str(env): +def env_tuple_to_str(env: typing.Iterable[typing.Tuple[str, str]]) -> str: return ''.join(["%s='%s' " % (k, v) for k, v in env]) @@ -162,7 +166,7 @@ class TestResult(enum.Enum): ERROR = 'ERROR' -class TAPParser(object): +class TAPParser: Plan = namedtuple('Plan', ['count', 'late', 'skipped', 'explanation']) Bailout = namedtuple('Bailout', ['message']) Test = namedtuple('Test', ['number', 'name', 'result', 'explanation']) @@ -181,10 +185,11 @@ class TAPParser(object): _RE_YAML_START = re.compile(r'(\s+)---.*') _RE_YAML_END = re.compile(r'\s+\.\.\.\s*') - def __init__(self, io): + def __init__(self, io: typing.Iterator[str]): self.io = io - def parse_test(self, ok, num, name, directive, explanation): + def parse_test(self, ok: bool, num: int, name: str, directive: typing.Optional[str], explanation: typing.Optional[str]) -> \ + typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error'], None, None]: name = name.strip() explanation = explanation.strip() if explanation else None if directive is not None: @@ -201,14 +206,14 @@ class TAPParser(object): yield self.Test(num, name, TestResult.OK if ok else TestResult.FAIL, explanation) - def parse(self): + def parse(self) -> typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout'], None, None]: found_late_test = False bailed_out = False plan = None lineno = 0 num_tests = 0 yaml_lineno = None - yaml_indent = None + yaml_indent = '' state = self._MAIN version = 12 while True: @@ -235,7 +240,7 @@ class TAPParser(object): continue if line.startswith(yaml_indent): continue - yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,)) + yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno)) state = self._MAIN assert state == self._MAIN @@ -297,7 +302,7 @@ class TAPParser(object): yield self.Error('unexpected input at line %d' % (lineno,)) if state == self._YAML: - yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,)) + yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno)) if not bailed_out and plan and num_tests != plan.count: if num_tests < plan.count: @@ -307,8 +312,12 @@ class TAPParser(object): class TestRun: - @staticmethod - def make_exitcode(test, returncode, duration, stdo, stde, cmd): + + @classmethod + def make_exitcode(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str], + returncode: int, duration: float, stdo: typing.Optional[str], + stde: typing.Optional[str], + cmd: typing.Optional[typing.List[str]]) -> 'TestRun': if returncode == GNU_SKIP_RETURNCODE: res = TestResult.SKIP elif returncode == GNU_ERROR_RETURNCODE: @@ -317,9 +326,12 @@ class TestRun: res = TestResult.EXPECTEDFAIL if bool(returncode) else TestResult.UNEXPECTEDPASS else: res = TestResult.FAIL if bool(returncode) else TestResult.OK - return TestRun(test, res, returncode, duration, stdo, stde, cmd) + return cls(test, test_env, res, returncode, duration, stdo, stde, cmd) - def make_tap(test, returncode, duration, stdo, stde, cmd): + @classmethod + def make_tap(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str], + returncode: int, duration: float, stdo: str, stde: str, + cmd: typing.Optional[typing.List[str]]) -> 'TestRun': res = None num_tests = 0 failed = False @@ -352,9 +364,12 @@ class TestRun: else: res = TestResult.FAIL if failed else TestResult.OK - return TestRun(test, res, returncode, duration, stdo, stde, cmd) + return cls(test, test_env, res, returncode, duration, stdo, stde, cmd) - def __init__(self, test, res, returncode, duration, stdo, stde, cmd): + def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str], + res: TestResult, returncode: int, duration: float, + stdo: typing.Optional[str], stde: typing.Optional[str], + cmd: typing.Optional[typing.List[str]]): assert isinstance(res, TestResult) self.res = res self.returncode = returncode @@ -362,10 +377,10 @@ class TestRun: self.stdo = stdo self.stde = stde self.cmd = cmd - self.env = test.env + self.env = test_env self.should_fail = test.should_fail - def get_log(self): + def get_log(self) -> str: res = '--- command ---\n' if self.cmd is None: res += 'NONE\n' @@ -385,7 +400,7 @@ class TestRun: res += '-------\n\n' return res -def decode(stream): +def decode(stream: typing.Union[None, bytes]) -> str: if stream is None: return '' try: @@ -393,51 +408,50 @@ def decode(stream): except UnicodeDecodeError: return stream.decode('iso-8859-1', errors='ignore') -def write_json_log(jsonlogfile, test_name, result): +def write_json_log(jsonlogfile: typing.TextIO, test_name: str, result: TestRun) -> None: jresult = {'name': test_name, 'stdout': result.stdo, 'result': result.res.value, 'duration': result.duration, 'returncode': result.returncode, - 'command': result.cmd} - if isinstance(result.env, dict): - jresult['env'] = result.env - else: - jresult['env'] = result.env.get_env(os.environ) + 'env': result.env, + 'command': result.cmd} # type: typing.Dict[str, typing.Any] if result.stde: jresult['stderr'] = result.stde jsonlogfile.write(json.dumps(jresult) + '\n') -def run_with_mono(fname): +def run_with_mono(fname: str) -> bool: if fname.endswith('.exe') and not (is_windows() or is_cygwin()): return True return False -def load_benchmarks(build_dir): +def load_benchmarks(build_dir: str) -> typing.List['TestSerialisation']: datafile = os.path.join(build_dir, 'meson-private', 'meson_benchmark_setup.dat') if not os.path.isfile(datafile): raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir)) with open(datafile, 'rb') as f: - obj = pickle.load(f) + obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f)) return obj -def load_tests(build_dir): +def load_tests(build_dir: str) -> typing.List['TestSerialisation']: datafile = os.path.join(build_dir, 'meson-private', 'meson_test_setup.dat') if not os.path.isfile(datafile): raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir)) with open(datafile, 'rb') as f: - obj = pickle.load(f) + obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f)) return obj class SingleTestRunner: - def __init__(self, test, env, options): + def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str], + env: typing.Dict[str, str], options: argparse.Namespace): self.test = test + self.test_env = test_env self.env = env self.options = options - def _get_cmd(self): + def _get_cmd(self) -> typing.Optional[typing.List[str]]: if self.test.fname[0].endswith('.jar'): return ['java', '-jar'] + self.test.fname elif not self.test.is_cross_built and run_with_mono(self.test.fname[0]): @@ -457,19 +471,18 @@ class SingleTestRunner: else: return self.test.fname - def run(self): + def run(self) -> TestRun: cmd = self._get_cmd() if cmd is None: skip_stdout = 'Not run because can not execute cross compiled binaries.' - return TestRun(test=self.test, res=TestResult.SKIP, returncode=GNU_SKIP_RETURNCODE, - duration=0.0, stdo=skip_stdout, stde=None, cmd=None) + return TestRun(self.test, self.test_env, TestResult.SKIP, GNU_SKIP_RETURNCODE, 0.0, skip_stdout, None, None) else: wrap = TestHarness.get_wrapper(self.options) if self.options.gdb: self.test.timeout = None return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) - def _run_cmd(self, cmd): + def _run_cmd(self, cmd: typing.List[str]) -> TestRun: starttime = time.time() if len(self.test.extra_paths) > 0: @@ -506,7 +519,7 @@ class SingleTestRunner: # Make the meson executable ignore SIGINT while gdb is running. signal.signal(signal.SIGINT, signal.SIG_IGN) - def preexec_fn(): + def preexec_fn() -> None: if self.options.gdb: # Restore the SIGINT handler for the child process to # ensure it can handle it. @@ -535,7 +548,7 @@ class SingleTestRunner: p.communicate(timeout=timeout) except subprocess.TimeoutExpired: if self.options.verbose: - print('%s time out (After %d seconds)' % (self.test.name, timeout)) + print('{} time out (After {} seconds)'.format(self.test.name, timeout)) timed_out = True except KeyboardInterrupt: mlog.warning('CTRL-C detected while running %s' % (self.test.name)) @@ -572,9 +585,9 @@ class SingleTestRunner: try: p.communicate(timeout=1) except subprocess.TimeoutExpired: - additional_error = b'Test process could not be killed.' + additional_error = 'Test process could not be killed.' except ValueError: - additional_error = b'Could not read output. Maybe the process has redirected its stdout/stderr?' + additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?' endtime = time.time() duration = endtime - starttime if additional_error is None: @@ -592,20 +605,20 @@ class SingleTestRunner: stdo = "" stde = additional_error if timed_out: - return TestRun(self.test, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd) + return TestRun(self.test, self.test_env, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd) else: if self.test.protocol == 'exitcode': - return TestRun.make_exitcode(self.test, p.returncode, duration, stdo, stde, cmd) + return TestRun.make_exitcode(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd) else: if self.options.verbose: print(stdo, end='') - return TestRun.make_tap(self.test, p.returncode, duration, stdo, stde, cmd) + return TestRun.make_tap(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd) class TestHarness: - def __init__(self, options): + def __init__(self, options: argparse.Namespace): self.options = options - self.collected_logs = [] + self.collected_logs = [] # type: typing.List[str] self.fail_count = 0 self.expectedfail_count = 0 self.unexpectedpass_count = 0 @@ -614,23 +627,26 @@ class TestHarness: self.timeout_count = 0 self.is_run = False self.tests = None - self.suites = None - self.logfilename = None - self.logfile = None - self.jsonlogfile = None + self.logfilename = None # type: typing.Optional[str] + self.logfile = None # type: typing.Optional[typing.TextIO] + self.jsonlogfile = None # type: typing.Optional[typing.TextIO] if self.options.benchmark: self.tests = load_benchmarks(options.wd) else: self.tests = load_tests(options.wd) - self.load_suites() + ss = set() + for t in self.tests: + for s in t.suite: + ss.add(s) + self.suites = list(ss) - def __del__(self): + def __del__(self) -> None: if self.logfile: self.logfile.close() if self.jsonlogfile: self.jsonlogfile.close() - def merge_suite_options(self, options, test): + def merge_suite_options(self, options: argparse.Namespace, test: 'TestSerialisation') -> typing.Dict[str, str]: if ':' in options.setup: if options.setup not in self.build_data.test_setups: sys.exit("Unknown test setup '%s'." % options.setup) @@ -654,7 +670,7 @@ class TestHarness: options.wrapper = current.exe_wrapper return current.env.get_env(os.environ.copy()) - def get_test_runner(self, test): + def get_test_runner(self, test: 'TestSerialisation') -> SingleTestRunner: options = deepcopy(self.options) if not options.setup: options.setup = self.build_data.test_setup_default_name @@ -662,12 +678,11 @@ class TestHarness: env = self.merge_suite_options(options, test) else: env = os.environ.copy() - if isinstance(test.env, build.EnvironmentVariables): - test.env = test.env.get_env(env) - env.update(test.env) - return SingleTestRunner(test, env, options) + test_env = test.env.get_env(env) + env.update(test_env) + return SingleTestRunner(test, test_env, env, options) - def process_test_result(self, result): + def process_test_result(self, result: TestRun) -> None: if result.res is TestResult.TIMEOUT: self.timeout_count += 1 elif result.res is TestResult.SKIP: @@ -683,7 +698,8 @@ class TestHarness: else: sys.exit('Unknown test result encountered: {}'.format(result.res)) - def print_stats(self, numlen, tests, name, result, i): + def print_stats(self, numlen: int, tests: typing.List['TestSerialisation'], + name: str, result: TestRun, i: int) -> None: startpad = ' ' * (numlen - len('%d' % (i + 1))) num = '%s%d/%d' % (startpad, i + 1, len(tests)) padding1 = ' ' * (38 - len(name)) @@ -718,7 +734,7 @@ class TestHarness: if self.jsonlogfile: write_json_log(self.jsonlogfile, name, result) - def print_summary(self): + def print_summary(self) -> None: msg = ''' Ok: %4d Expected Fail: %4d @@ -732,7 +748,7 @@ Timeout: %4d if self.logfile: self.logfile.write(msg) - def print_collected_logs(self): + def print_collected_logs(self) -> None: if len(self.collected_logs) > 0: if len(self.collected_logs) > 10: print('\nThe output from 10 first failed tests:\n') @@ -751,10 +767,10 @@ Timeout: %4d line = line.encode('ascii', errors='replace').decode() print(line) - def total_failure_count(self): + def total_failure_count(self) -> int: return self.fail_count + self.unexpectedpass_count + self.timeout_count - def doit(self): + def doit(self) -> int: if self.is_run: raise RuntimeError('Test harness object can only be used once.') self.is_run = True @@ -765,14 +781,16 @@ Timeout: %4d return self.total_failure_count() @staticmethod - def split_suite_string(suite): + def split_suite_string(suite: str) -> typing.Tuple[str, str]: if ':' in suite: - return suite.split(':', 1) + # mypy can't figure out that str.split(n, 1) will return a list of + # length 2, so we have to help it. + return typing.cast(typing.Tuple[str, str], tuple(suite.split(':', 1))) else: return suite, "" @staticmethod - def test_in_suites(test, suites): + def test_in_suites(test: 'TestSerialisation', suites: typing.List[str]) -> bool: for suite in suites: (prj_match, st_match) = TestHarness.split_suite_string(suite) for prjst in test.suite: @@ -803,18 +821,11 @@ Timeout: %4d return True return False - def test_suitable(self, test): + def test_suitable(self, test: 'TestSerialisation') -> bool: return (not self.options.include_suites or TestHarness.test_in_suites(test, self.options.include_suites)) \ and not TestHarness.test_in_suites(test, self.options.exclude_suites) - def load_suites(self): - ss = set() - for t in self.tests: - for s in t.suite: - ss.add(s) - self.suites = list(ss) - - def get_tests(self): + def get_tests(self) -> typing.List['TestSerialisation']: if not self.tests: print('No tests defined.') return [] @@ -834,14 +845,11 @@ Timeout: %4d print('No suitable tests defined.') return [] - for test in tests: - test.rebuilt = False - return tests - def open_log_files(self): + def open_log_files(self) -> None: if not self.options.logbase or self.options.verbose: - return None, None, None, None + return namebase = None logfile_base = os.path.join(self.options.wd, 'meson-logs', self.options.logbase) @@ -865,8 +873,8 @@ Timeout: %4d self.logfile.write('Inherited environment: {}\n\n'.format(inherit_env)) @staticmethod - def get_wrapper(options): - wrap = [] + def get_wrapper(options: argparse.Namespace) -> typing.List[str]: + wrap = [] # type: typing.List[str] if options.gdb: wrap = ['gdb', '--quiet', '--nh'] if options.repeat > 1: @@ -875,10 +883,9 @@ Timeout: %4d wrap += ['--args'] if options.wrapper: wrap += options.wrapper - assert(isinstance(wrap, list)) return wrap - def get_pretty_suite(self, test): + def get_pretty_suite(self, test: 'TestSerialisation') -> str: if len(self.suites) > 1 and test.suite: rv = TestHarness.split_suite_string(test.suite[0])[0] s = "+".join(TestHarness.split_suite_string(s)[1] for s in test.suite) @@ -888,9 +895,9 @@ Timeout: %4d else: return test.name - def run_tests(self, tests): + def run_tests(self, tests: typing.List['TestSerialisation']) -> None: executor = None - futures = [] + futures = [] # type: typing.List[typing.Tuple[conc.Future[TestRun], int, typing.List[TestSerialisation], str, int]] numlen = len('%d' % len(tests)) self.open_log_files() startdir = os.getcwd() @@ -929,9 +936,9 @@ Timeout: %4d finally: os.chdir(startdir) - def drain_futures(self, futures): - for i in futures: - (result, numlen, tests, name, i) = i + def drain_futures(self, futures: typing.List[typing.Tuple['conc.Future[TestRun]', int, typing.List['TestSerialisation'], str, int]]) -> None: + for x in futures: + (result, numlen, tests, name, i) = x if self.options.repeat > 1 and self.fail_count: result.cancel() if self.options.verbose: @@ -939,7 +946,7 @@ Timeout: %4d self.process_test_result(result.result()) self.print_stats(numlen, tests, name, result.result(), i) - def run_special(self): + def run_special(self) -> int: '''Tests run by the user, usually something like "under gdb 1000 times".''' if self.is_run: raise RuntimeError('Can not use run_special after a full run.') @@ -950,13 +957,13 @@ Timeout: %4d return self.total_failure_count() -def list_tests(th): +def list_tests(th: TestHarness) -> bool: tests = th.get_tests() for t in tests: print(th.get_pretty_suite(t)) return not tests -def rebuild_all(wd): +def rebuild_all(wd: str) -> bool: if not os.path.isfile(os.path.join(wd, 'build.ninja')): print('Only ninja backend is supported to rebuild tests before running them.') return True @@ -975,7 +982,7 @@ def rebuild_all(wd): return True -def run(options): +def run(options: argparse.Namespace) -> int: if options.benchmark: options.num_processes = 1 @@ -1020,7 +1027,7 @@ def run(options): print(e) return 1 -def run_with_args(args): +def run_with_args(args: typing.List[str]) -> int: parser = argparse.ArgumentParser(prog='meson test') add_arguments(parser) options = parser.parse_args(args) |