diff options
author | Jussi Pakkanen <jpakkane@gmail.com> | 2019-05-16 00:31:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-16 00:31:01 +0300 |
commit | 67a5af99aa9060c9f4b2350a230343b11282cb8f (patch) | |
tree | 992a7f255a89a02db5386e02ff83c76c08a640a1 | |
parent | 957d8e051c0c29beb0106e75ae7a71acc5c62cf5 (diff) | |
parent | c571b0b18507a13b6aac749a45ea85cb93ffede3 (diff) | |
download | meson-67a5af99aa9060c9f4b2350a230343b11282cb8f.zip meson-67a5af99aa9060c9f4b2350a230343b11282cb8f.tar.gz meson-67a5af99aa9060c9f4b2350a230343b11282cb8f.tar.bz2 |
Merge pull request #5395 from dcbaker/mtest-annotations
Mtest annotations and bug fixes
-rw-r--r-- | mesonbuild/backend/backends.py | 9 | ||||
-rw-r--r-- | mesonbuild/build.py | 12 | ||||
-rw-r--r-- | mesonbuild/dependencies/base.py | 9 | ||||
-rw-r--r-- | mesonbuild/environment.py | 2 | ||||
-rw-r--r-- | mesonbuild/interpreter.py | 14 | ||||
-rw-r--r-- | mesonbuild/mesonlib.py | 2 | ||||
-rw-r--r-- | mesonbuild/mtest.py | 265 |
7 files changed, 167 insertions, 146 deletions
diff --git a/mesonbuild/backend/backends.py b/mesonbuild/backend/backends.py index d74157f..d10e1e9 100644 --- a/mesonbuild/backend/backends.py +++ b/mesonbuild/backend/backends.py @@ -27,6 +27,7 @@ from ..compilers import CompilerArgs, VisualStudioLikeCompiler from collections import OrderedDict import shlex from functools import lru_cache +import typing class CleanTrees: @@ -83,8 +84,12 @@ class ExecutableSerialisation: self.capture = capture class TestSerialisation: - def __init__(self, name, project, suite, fname, is_cross_built, exe_wrapper, is_parallel, - cmd_args, env, should_fail, timeout, workdir, extra_paths, protocol): + def __init__(self, name: str, project: str, suite: str, fname: typing.List[str], + is_cross_built: bool, exe_wrapper: typing.Optional[build.Executable], + is_parallel: bool, cmd_args: typing.List[str], + env: build.EnvironmentVariables, should_fail: bool, + timeout: typing.Optional[int], workdir: typing.Optional[str], + extra_paths: typing.List[str], protocol: str): self.name = name self.project_name = project self.suite = suite diff --git a/mesonbuild/build.py b/mesonbuild/build.py index bc17445..093ab8f 100644 --- a/mesonbuild/build.py +++ b/mesonbuild/build.py @@ -19,6 +19,7 @@ import itertools, pathlib import hashlib import pickle from functools import lru_cache +import typing from . import environment from . import dependencies @@ -107,7 +108,7 @@ class Build: all dependencies and so on. """ - def __init__(self, environment): + def __init__(self, environment: environment.Environment): self.project_name = 'name of master project' self.project_version = None self.environment = environment @@ -140,7 +141,7 @@ class Build: self.dep_manifest_name = None self.dep_manifest = {} self.cross_stdlibs = {} - self.test_setups = {} + self.test_setups = {} # type: typing.Dict[str, TestSetup] self.test_setup_default_name = None self.find_overrides = {} self.searched_programs = set() # The list of all programs that have been searched for. @@ -335,7 +336,7 @@ class EnvironmentVariables: return value - def get_env(self, full_env): + def get_env(self, full_env: typing.Dict[str, str]) -> typing.Dict[str, str]: env = full_env.copy() for method, name, values, kwargs in self.envvars: env[name] = method(full_env, name, values, kwargs) @@ -2359,7 +2360,8 @@ class RunScript(dict): self['args'] = args class TestSetup: - def __init__(self, *, exe_wrapper=None, gdb=None, timeout_multiplier=None, env=None): + def __init__(self, exe_wrapper: typing.Optional[typing.List[str]], gdb: bool, + timeout_multiplier: int, env: EnvironmentVariables): self.exe_wrapper = exe_wrapper self.gdb = gdb self.timeout_multiplier = timeout_multiplier @@ -2384,7 +2386,7 @@ def get_sources_string_names(sources): raise AssertionError('Unknown source type: {!r}'.format(s)) return names -def load(build_dir): +def load(build_dir: str) -> Build: filename = os.path.join(build_dir, 'meson-private', 'build.dat') load_fail_msg = 'Build data file {!r} is corrupted. Try with a fresh build tree.'.format(filename) nonexisting_fail_msg = 'No such build data file as "{!r}".'.format(filename) diff --git a/mesonbuild/dependencies/base.py b/mesonbuild/dependencies/base.py index 034f6df..f2397e2 100644 --- a/mesonbuild/dependencies/base.py +++ b/mesonbuild/dependencies/base.py @@ -1982,7 +1982,8 @@ class DubDependency(ExternalDependency): class ExternalProgram: windows_exts = ('exe', 'msc', 'com', 'bat', 'cmd') - def __init__(self, name, command=None, silent=False, search_dir=None): + def __init__(self, name: str, command: typing.Optional[typing.List[str]] = None, + silent: bool = False, search_dir: typing.Optional[str] = None): self.name = name if command is not None: self.command = listify(command) @@ -2006,11 +2007,11 @@ class ExternalProgram: else: mlog.log('Program', mlog.bold(name), 'found:', mlog.red('NO')) - def __repr__(self): + def __repr__(self) -> str: r = '<{} {!r} -> {!r}>' return r.format(self.__class__.__name__, self.name, self.command) - def description(self): + def description(self) -> str: '''Human friendly description of the command''' return ' '.join(self.command) @@ -2169,7 +2170,7 @@ class ExternalProgram: # all executables whether in PATH or with an absolute path return [command] - def found(self): + def found(self) -> bool: return self.command[0] is not None def get_command(self): diff --git a/mesonbuild/environment.py b/mesonbuild/environment.py index f86b613..e638f31 100644 --- a/mesonbuild/environment.py +++ b/mesonbuild/environment.py @@ -115,7 +115,7 @@ def find_coverage_tools(): return gcovr_exe, gcovr_new_rootdir, lcov_exe, genhtml_exe -def detect_ninja(version='1.5', log=False): +def detect_ninja(version: str = '1.5', log: bool = False) -> str: env_ninja = os.environ.get('NINJA', None) for n in [env_ninja] if env_ninja else ['ninja', 'ninja-build', 'samu']: try: diff --git a/mesonbuild/interpreter.py b/mesonbuild/interpreter.py index 0afbb10..4a91b68 100644 --- a/mesonbuild/interpreter.py +++ b/mesonbuild/interpreter.py @@ -39,6 +39,7 @@ from collections import namedtuple from itertools import chain from pathlib import PurePath import functools +import typing import importlib @@ -856,8 +857,10 @@ class RunTargetHolder(InterpreterObject, ObjectHolder): return r.format(self.__class__.__name__, h.get_id(), h.command) class Test(InterpreterObject): - def __init__(self, name, project, suite, exe, depends, is_parallel, - cmd_args, env, should_fail, timeout, workdir, protocol): + def __init__(self, name: str, project: str, suite: typing.List[str], exe: build.Executable, + depends: typing.List[typing.Union[build.CustomTarget, build.BuildTarget]], + is_parallel: bool, cmd_args: typing.List[str], env: build.EnvironmentVariables, + should_fail: bool, timeout: int, workdir: typing.Optional[str], protocol: str): InterpreterObject.__init__(self) self.name = name self.suite = suite @@ -3254,7 +3257,7 @@ This will become a hard error in the future.''' % kwargs['input'], location=self def func_test(self, node, args, kwargs): self.add_test(node, args, kwargs, True) - def unpack_env_kwarg(self, kwargs): + def unpack_env_kwarg(self, kwargs) -> build.EnvironmentVariables: envlist = kwargs.get('env', EnvironmentVariablesHolder()) if isinstance(envlist, EnvironmentVariablesHolder): env = envlist.held_object @@ -3762,10 +3765,7 @@ different subdirectory. 'is_default can be set to true only once' % self.build.test_setup_default_name) self.build.test_setup_default_name = setup_name env = self.unpack_env_kwarg(kwargs) - self.build.test_setups[setup_name] = build.TestSetup(exe_wrapper=exe_wrapper, - gdb=gdb, - timeout_multiplier=timeout_multiplier, - env=env) + self.build.test_setups[setup_name] = build.TestSetup(exe_wrapper, gdb, timeout_multiplier, env) def get_argdict_on_crossness(self, native_dict, cross_dict, kwargs): for_native = kwargs.get('native', not self.environment.is_cross_build()) diff --git a/mesonbuild/mesonlib.py b/mesonbuild/mesonlib.py index f53197b..eb59a1c 100644 --- a/mesonbuild/mesonlib.py +++ b/mesonbuild/mesonlib.py @@ -1302,7 +1302,7 @@ def detect_subprojects(spdir_name, current_dir='', result=None): def get_error_location_string(fname: str, lineno: str) -> str: return '{}:{}:'.format(fname, lineno) -def substring_is_in_list(substr, strlist): +def substring_is_in_list(substr: str, strlist: typing.List[str]) -> bool: for s in strlist: if substr in s: return True diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 8df8f48..b09de16 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -14,26 +14,36 @@ # A tool to run tests in many different ways. -import shlex -import subprocess, sys, os, argparse -import pickle -from mesonbuild import build -from mesonbuild import environment -from mesonbuild.dependencies import ExternalProgram -from mesonbuild.mesonlib import substring_is_in_list, MesonException -from mesonbuild import mlog - from collections import namedtuple -import io -import re -import tempfile -import time, datetime, multiprocessing, json +from copy import deepcopy +import argparse import concurrent.futures as conc +import datetime +import enum +import io +import json +import multiprocessing +import os +import pickle import platform -import signal import random -from copy import deepcopy -import enum +import re +import shlex +import signal +import subprocess +import sys +import tempfile +import time +import typing + +from . import build +from . import environment +from . import mlog +from .dependencies import ExternalProgram +from .mesonlib import substring_is_in_list, MesonException + +if typing.TYPE_CHECKING: + from .backend.backends import TestSerialisation # GNU autotools interprets a return code of 77 from tests it executes to # mean that the test should be skipped. @@ -43,15 +53,15 @@ GNU_SKIP_RETURNCODE = 77 # mean that the test failed even before testing what it is supposed to test. GNU_ERROR_RETURNCODE = 99 -def is_windows(): +def is_windows() -> bool: platname = platform.system().lower() return platname == 'windows' or 'mingw' in platname -def is_cygwin(): +def is_cygwin() -> bool: platname = platform.system().lower() return 'cygwin' in platname -def determine_worker_count(): +def determine_worker_count() -> int: varname = 'MESON_TESTTHREADS' if varname in os.environ: try: @@ -68,7 +78,7 @@ def determine_worker_count(): num_workers = 1 return num_workers -def add_arguments(parser): +def add_arguments(parser: argparse.ArgumentParser) -> None: parser.add_argument('--repeat', default=1, dest='repeat', type=int, help='Number of times to run the tests.') parser.add_argument('--no-rebuild', default=False, action='store_true', @@ -111,7 +121,7 @@ def add_arguments(parser): help='Optional list of tests to run') -def returncode_to_status(retcode): +def returncode_to_status(retcode: int) -> str: # Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related # functions here because the status returned by subprocess is munged. It # returns a negative value if the process was killed by a signal rather than @@ -136,7 +146,7 @@ def returncode_to_status(retcode): signame = 'SIGinvalid' return '(exit status %d or signal %d %s)' % (retcode, signum, signame) -def env_tuple_to_str(env): +def env_tuple_to_str(env: typing.Iterable[typing.Tuple[str, str]]) -> str: return ''.join(["%s='%s' " % (k, v) for k, v in env]) @@ -156,7 +166,7 @@ class TestResult(enum.Enum): ERROR = 'ERROR' -class TAPParser(object): +class TAPParser: Plan = namedtuple('Plan', ['count', 'late', 'skipped', 'explanation']) Bailout = namedtuple('Bailout', ['message']) Test = namedtuple('Test', ['number', 'name', 'result', 'explanation']) @@ -167,18 +177,19 @@ class TAPParser(object): _AFTER_TEST = 2 _YAML = 3 - _RE_BAILOUT = r'Bail out!\s*(.*)' - _RE_DIRECTIVE = r'(?:\s*\#\s*([Ss][Kk][Ii][Pp]\S*|[Tt][Oo][Dd][Oo])\b\s*(.*))?' - _RE_PLAN = r'1\.\.([0-9]+)' + _RE_DIRECTIVE - _RE_TEST = r'((?:not )?ok)\s*(?:([0-9]+)\s*)?([^#]*)' + _RE_DIRECTIVE - _RE_VERSION = r'TAP version ([0-9]+)' - _RE_YAML_START = r'(\s+)---.*' - _RE_YAML_END = r'\s+\.\.\.\s*' + _RE_BAILOUT = re.compile(r'Bail out!\s*(.*)') + _RE_DIRECTIVE = re.compile(r'(?:\s*\#\s*([Ss][Kk][Ii][Pp]\S*|[Tt][Oo][Dd][Oo])\b\s*(.*))?') + _RE_PLAN = re.compile(r'1\.\.([0-9]+)' + _RE_DIRECTIVE.pattern) + _RE_TEST = re.compile(r'((?:not )?ok)\s*(?:([0-9]+)\s*)?([^#]*)' + _RE_DIRECTIVE.pattern) + _RE_VERSION = re.compile(r'TAP version ([0-9]+)') + _RE_YAML_START = re.compile(r'(\s+)---.*') + _RE_YAML_END = re.compile(r'\s+\.\.\.\s*') - def __init__(self, io): + def __init__(self, io: typing.Iterator[str]): self.io = io - def parse_test(self, ok, num, name, directive, explanation): + def parse_test(self, ok: bool, num: int, name: str, directive: typing.Optional[str], explanation: typing.Optional[str]) -> \ + typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error'], None, None]: name = name.strip() explanation = explanation.strip() if explanation else None if directive is not None: @@ -195,14 +206,14 @@ class TAPParser(object): yield self.Test(num, name, TestResult.OK if ok else TestResult.FAIL, explanation) - def parse(self): + def parse(self) -> typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout'], None, None]: found_late_test = False bailed_out = False plan = None lineno = 0 num_tests = 0 yaml_lineno = None - yaml_indent = None + yaml_indent = '' state = self._MAIN version = 12 while True: @@ -215,7 +226,7 @@ class TAPParser(object): # YAML blocks are only accepted after a test if state == self._AFTER_TEST: if version >= 13: - m = re.match(self._RE_YAML_START, line) + m = self._RE_YAML_START.match(line) if m: state = self._YAML yaml_lineno = lineno @@ -224,19 +235,19 @@ class TAPParser(object): state = self._MAIN elif state == self._YAML: - if re.match(self._RE_YAML_END, line): + if self._RE_YAML_END.match(line): state = self._MAIN continue if line.startswith(yaml_indent): continue - yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,)) + yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno)) state = self._MAIN assert state == self._MAIN if line.startswith('#'): continue - m = re.match(self._RE_TEST, line) + m = self._RE_TEST.match(line) if m: if plan and plan.late and not found_late_test: yield self.Error('unexpected test after late plan') @@ -250,7 +261,7 @@ class TAPParser(object): state = self._AFTER_TEST continue - m = re.match(self._RE_PLAN, line) + m = self._RE_PLAN.match(line) if m: if plan: yield self.Error('more than one plan found') @@ -269,13 +280,13 @@ class TAPParser(object): yield plan continue - m = re.match(self._RE_BAILOUT, line) + m = self._RE_BAILOUT.match(line) if m: yield self.Bailout(m.group(1)) bailed_out = True continue - m = re.match(self._RE_VERSION, line) + m = self._RE_VERSION.match(line) if m: # The TAP version is only accepted as the first line if lineno != 1: @@ -291,7 +302,7 @@ class TAPParser(object): yield self.Error('unexpected input at line %d' % (lineno,)) if state == self._YAML: - yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,)) + yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno)) if not bailed_out and plan and num_tests != plan.count: if num_tests < plan.count: @@ -301,8 +312,12 @@ class TAPParser(object): class TestRun: - @staticmethod - def make_exitcode(test, returncode, duration, stdo, stde, cmd): + + @classmethod + def make_exitcode(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str], + returncode: int, duration: float, stdo: typing.Optional[str], + stde: typing.Optional[str], + cmd: typing.Optional[typing.List[str]]) -> 'TestRun': if returncode == GNU_SKIP_RETURNCODE: res = TestResult.SKIP elif returncode == GNU_ERROR_RETURNCODE: @@ -311,9 +326,12 @@ class TestRun: res = TestResult.EXPECTEDFAIL if bool(returncode) else TestResult.UNEXPECTEDPASS else: res = TestResult.FAIL if bool(returncode) else TestResult.OK - return TestRun(test, res, returncode, duration, stdo, stde, cmd) + return cls(test, test_env, res, returncode, duration, stdo, stde, cmd) - def make_tap(test, returncode, duration, stdo, stde, cmd): + @classmethod + def make_tap(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str], + returncode: int, duration: float, stdo: str, stde: str, + cmd: typing.Optional[typing.List[str]]) -> 'TestRun': res = None num_tests = 0 failed = False @@ -346,9 +364,12 @@ class TestRun: else: res = TestResult.FAIL if failed else TestResult.OK - return TestRun(test, res, returncode, duration, stdo, stde, cmd) + return cls(test, test_env, res, returncode, duration, stdo, stde, cmd) - def __init__(self, test, res, returncode, duration, stdo, stde, cmd): + def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str], + res: TestResult, returncode: int, duration: float, + stdo: typing.Optional[str], stde: typing.Optional[str], + cmd: typing.Optional[typing.List[str]]): assert isinstance(res, TestResult) self.res = res self.returncode = returncode @@ -356,10 +377,10 @@ class TestRun: self.stdo = stdo self.stde = stde self.cmd = cmd - self.env = test.env + self.env = test_env self.should_fail = test.should_fail - def get_log(self): + def get_log(self) -> str: res = '--- command ---\n' if self.cmd is None: res += 'NONE\n' @@ -379,7 +400,7 @@ class TestRun: res += '-------\n\n' return res -def decode(stream): +def decode(stream: typing.Union[None, bytes]) -> str: if stream is None: return '' try: @@ -387,51 +408,50 @@ def decode(stream): except UnicodeDecodeError: return stream.decode('iso-8859-1', errors='ignore') -def write_json_log(jsonlogfile, test_name, result): +def write_json_log(jsonlogfile: typing.TextIO, test_name: str, result: TestRun) -> None: jresult = {'name': test_name, 'stdout': result.stdo, 'result': result.res.value, 'duration': result.duration, 'returncode': result.returncode, - 'command': result.cmd} - if isinstance(result.env, dict): - jresult['env'] = result.env - else: - jresult['env'] = result.env.get_env(os.environ) + 'env': result.env, + 'command': result.cmd} # type: typing.Dict[str, typing.Any] if result.stde: jresult['stderr'] = result.stde jsonlogfile.write(json.dumps(jresult) + '\n') -def run_with_mono(fname): +def run_with_mono(fname: str) -> bool: if fname.endswith('.exe') and not (is_windows() or is_cygwin()): return True return False -def load_benchmarks(build_dir): +def load_benchmarks(build_dir: str) -> typing.List['TestSerialisation']: datafile = os.path.join(build_dir, 'meson-private', 'meson_benchmark_setup.dat') if not os.path.isfile(datafile): raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir)) with open(datafile, 'rb') as f: - obj = pickle.load(f) + obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f)) return obj -def load_tests(build_dir): +def load_tests(build_dir: str) -> typing.List['TestSerialisation']: datafile = os.path.join(build_dir, 'meson-private', 'meson_test_setup.dat') if not os.path.isfile(datafile): raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir)) with open(datafile, 'rb') as f: - obj = pickle.load(f) + obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f)) return obj class SingleTestRunner: - def __init__(self, test, env, options): + def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str], + env: typing.Dict[str, str], options: argparse.Namespace): self.test = test + self.test_env = test_env self.env = env self.options = options - def _get_cmd(self): + def _get_cmd(self) -> typing.Optional[typing.List[str]]: if self.test.fname[0].endswith('.jar'): return ['java', '-jar'] + self.test.fname elif not self.test.is_cross_built and run_with_mono(self.test.fname[0]): @@ -451,19 +471,18 @@ class SingleTestRunner: else: return self.test.fname - def run(self): + def run(self) -> TestRun: cmd = self._get_cmd() if cmd is None: skip_stdout = 'Not run because can not execute cross compiled binaries.' - return TestRun(test=self.test, res=TestResult.SKIP, returncode=GNU_SKIP_RETURNCODE, - duration=0.0, stdo=skip_stdout, stde=None, cmd=None) + return TestRun(self.test, self.test_env, TestResult.SKIP, GNU_SKIP_RETURNCODE, 0.0, skip_stdout, None, None) else: wrap = TestHarness.get_wrapper(self.options) if self.options.gdb: self.test.timeout = None return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) - def _run_cmd(self, cmd): + def _run_cmd(self, cmd: typing.List[str]) -> TestRun: starttime = time.time() if len(self.test.extra_paths) > 0: @@ -500,7 +519,7 @@ class SingleTestRunner: # Make the meson executable ignore SIGINT while gdb is running. signal.signal(signal.SIGINT, signal.SIG_IGN) - def preexec_fn(): + def preexec_fn() -> None: if self.options.gdb: # Restore the SIGINT handler for the child process to # ensure it can handle it. @@ -529,7 +548,7 @@ class SingleTestRunner: p.communicate(timeout=timeout) except subprocess.TimeoutExpired: if self.options.verbose: - print('%s time out (After %d seconds)' % (self.test.name, timeout)) + print('{} time out (After {} seconds)'.format(self.test.name, timeout)) timed_out = True except KeyboardInterrupt: mlog.warning('CTRL-C detected while running %s' % (self.test.name)) @@ -566,9 +585,9 @@ class SingleTestRunner: try: p.communicate(timeout=1) except subprocess.TimeoutExpired: - additional_error = b'Test process could not be killed.' + additional_error = 'Test process could not be killed.' except ValueError: - additional_error = b'Could not read output. Maybe the process has redirected its stdout/stderr?' + additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?' endtime = time.time() duration = endtime - starttime if additional_error is None: @@ -586,20 +605,20 @@ class SingleTestRunner: stdo = "" stde = additional_error if timed_out: - return TestRun(self.test, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd) + return TestRun(self.test, self.test_env, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd) else: if self.test.protocol == 'exitcode': - return TestRun.make_exitcode(self.test, p.returncode, duration, stdo, stde, cmd) + return TestRun.make_exitcode(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd) else: if self.options.verbose: print(stdo, end='') - return TestRun.make_tap(self.test, p.returncode, duration, stdo, stde, cmd) + return TestRun.make_tap(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd) class TestHarness: - def __init__(self, options): + def __init__(self, options: argparse.Namespace): self.options = options - self.collected_logs = [] + self.collected_logs = [] # type: typing.List[str] self.fail_count = 0 self.expectedfail_count = 0 self.unexpectedpass_count = 0 @@ -608,23 +627,26 @@ class TestHarness: self.timeout_count = 0 self.is_run = False self.tests = None - self.suites = None - self.logfilename = None - self.logfile = None - self.jsonlogfile = None + self.logfilename = None # type: typing.Optional[str] + self.logfile = None # type: typing.Optional[typing.TextIO] + self.jsonlogfile = None # type: typing.Optional[typing.TextIO] if self.options.benchmark: self.tests = load_benchmarks(options.wd) else: self.tests = load_tests(options.wd) - self.load_suites() + ss = set() + for t in self.tests: + for s in t.suite: + ss.add(s) + self.suites = list(ss) - def __del__(self): + def __del__(self) -> None: if self.logfile: self.logfile.close() if self.jsonlogfile: self.jsonlogfile.close() - def merge_suite_options(self, options, test): + def merge_suite_options(self, options: argparse.Namespace, test: 'TestSerialisation') -> typing.Dict[str, str]: if ':' in options.setup: if options.setup not in self.build_data.test_setups: sys.exit("Unknown test setup '%s'." % options.setup) @@ -648,7 +670,7 @@ class TestHarness: options.wrapper = current.exe_wrapper return current.env.get_env(os.environ.copy()) - def get_test_runner(self, test): + def get_test_runner(self, test: 'TestSerialisation') -> SingleTestRunner: options = deepcopy(self.options) if not options.setup: options.setup = self.build_data.test_setup_default_name @@ -656,12 +678,11 @@ class TestHarness: env = self.merge_suite_options(options, test) else: env = os.environ.copy() - if isinstance(test.env, build.EnvironmentVariables): - test.env = test.env.get_env(env) - env.update(test.env) - return SingleTestRunner(test, env, options) + test_env = test.env.get_env(env) + env.update(test_env) + return SingleTestRunner(test, test_env, env, options) - def process_test_result(self, result): + def process_test_result(self, result: TestRun) -> None: if result.res is TestResult.TIMEOUT: self.timeout_count += 1 elif result.res is TestResult.SKIP: @@ -677,7 +698,8 @@ class TestHarness: else: sys.exit('Unknown test result encountered: {}'.format(result.res)) - def print_stats(self, numlen, tests, name, result, i): + def print_stats(self, numlen: int, tests: typing.List['TestSerialisation'], + name: str, result: TestRun, i: int) -> None: startpad = ' ' * (numlen - len('%d' % (i + 1))) num = '%s%d/%d' % (startpad, i + 1, len(tests)) padding1 = ' ' * (38 - len(name)) @@ -712,7 +734,7 @@ class TestHarness: if self.jsonlogfile: write_json_log(self.jsonlogfile, name, result) - def print_summary(self): + def print_summary(self) -> None: msg = ''' Ok: %4d Expected Fail: %4d @@ -726,7 +748,7 @@ Timeout: %4d if self.logfile: self.logfile.write(msg) - def print_collected_logs(self): + def print_collected_logs(self) -> None: if len(self.collected_logs) > 0: if len(self.collected_logs) > 10: print('\nThe output from 10 first failed tests:\n') @@ -745,10 +767,10 @@ Timeout: %4d line = line.encode('ascii', errors='replace').decode() print(line) - def total_failure_count(self): + def total_failure_count(self) -> int: return self.fail_count + self.unexpectedpass_count + self.timeout_count - def doit(self): + def doit(self) -> int: if self.is_run: raise RuntimeError('Test harness object can only be used once.') self.is_run = True @@ -759,14 +781,16 @@ Timeout: %4d return self.total_failure_count() @staticmethod - def split_suite_string(suite): + def split_suite_string(suite: str) -> typing.Tuple[str, str]: if ':' in suite: - return suite.split(':', 1) + # mypy can't figure out that str.split(n, 1) will return a list of + # length 2, so we have to help it. + return typing.cast(typing.Tuple[str, str], tuple(suite.split(':', 1))) else: return suite, "" @staticmethod - def test_in_suites(test, suites): + def test_in_suites(test: 'TestSerialisation', suites: typing.List[str]) -> bool: for suite in suites: (prj_match, st_match) = TestHarness.split_suite_string(suite) for prjst in test.suite: @@ -797,18 +821,11 @@ Timeout: %4d return True return False - def test_suitable(self, test): + def test_suitable(self, test: 'TestSerialisation') -> bool: return (not self.options.include_suites or TestHarness.test_in_suites(test, self.options.include_suites)) \ and not TestHarness.test_in_suites(test, self.options.exclude_suites) - def load_suites(self): - ss = set() - for t in self.tests: - for s in t.suite: - ss.add(s) - self.suites = list(ss) - - def get_tests(self): + def get_tests(self) -> typing.List['TestSerialisation']: if not self.tests: print('No tests defined.') return [] @@ -828,14 +845,11 @@ Timeout: %4d print('No suitable tests defined.') return [] - for test in tests: - test.rebuilt = False - return tests - def open_log_files(self): + def open_log_files(self) -> None: if not self.options.logbase or self.options.verbose: - return None, None, None, None + return namebase = None logfile_base = os.path.join(self.options.wd, 'meson-logs', self.options.logbase) @@ -859,8 +873,8 @@ Timeout: %4d self.logfile.write('Inherited environment: {}\n\n'.format(inherit_env)) @staticmethod - def get_wrapper(options): - wrap = [] + def get_wrapper(options: argparse.Namespace) -> typing.List[str]: + wrap = [] # type: typing.List[str] if options.gdb: wrap = ['gdb', '--quiet', '--nh'] if options.repeat > 1: @@ -869,10 +883,9 @@ Timeout: %4d wrap += ['--args'] if options.wrapper: wrap += options.wrapper - assert(isinstance(wrap, list)) return wrap - def get_pretty_suite(self, test): + def get_pretty_suite(self, test: 'TestSerialisation') -> str: if len(self.suites) > 1 and test.suite: rv = TestHarness.split_suite_string(test.suite[0])[0] s = "+".join(TestHarness.split_suite_string(s)[1] for s in test.suite) @@ -882,9 +895,9 @@ Timeout: %4d else: return test.name - def run_tests(self, tests): + def run_tests(self, tests: typing.List['TestSerialisation']) -> None: executor = None - futures = [] + futures = [] # type: typing.List[typing.Tuple[conc.Future[TestRun], int, typing.List[TestSerialisation], str, int]] numlen = len('%d' % len(tests)) self.open_log_files() startdir = os.getcwd() @@ -923,9 +936,9 @@ Timeout: %4d finally: os.chdir(startdir) - def drain_futures(self, futures): - for i in futures: - (result, numlen, tests, name, i) = i + def drain_futures(self, futures: typing.List[typing.Tuple['conc.Future[TestRun]', int, typing.List['TestSerialisation'], str, int]]) -> None: + for x in futures: + (result, numlen, tests, name, i) = x if self.options.repeat > 1 and self.fail_count: result.cancel() if self.options.verbose: @@ -933,7 +946,7 @@ Timeout: %4d self.process_test_result(result.result()) self.print_stats(numlen, tests, name, result.result(), i) - def run_special(self): + def run_special(self) -> int: '''Tests run by the user, usually something like "under gdb 1000 times".''' if self.is_run: raise RuntimeError('Can not use run_special after a full run.') @@ -944,13 +957,13 @@ Timeout: %4d return self.total_failure_count() -def list_tests(th): +def list_tests(th: TestHarness) -> bool: tests = th.get_tests() for t in tests: print(th.get_pretty_suite(t)) return not tests -def rebuild_all(wd): +def rebuild_all(wd: str) -> bool: if not os.path.isfile(os.path.join(wd, 'build.ninja')): print('Only ninja backend is supported to rebuild tests before running them.') return True @@ -969,7 +982,7 @@ def rebuild_all(wd): return True -def run(options): +def run(options: argparse.Namespace) -> int: if options.benchmark: options.num_processes = 1 @@ -1014,7 +1027,7 @@ def run(options): print(e) return 1 -def run_with_args(args): +def run_with_args(args: typing.List[str]) -> int: parser = argparse.ArgumentParser(prog='meson test') add_arguments(parser) options = parser.parse_args(args) |