aboutsummaryrefslogtreecommitdiff
path: root/mesonbuild/mtest.py
diff options
context:
space:
mode:
Diffstat (limited to 'mesonbuild/mtest.py')
-rw-r--r--mesonbuild/mtest.py201
1 files changed, 104 insertions, 97 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py
index ade2aea..b09de16 100644
--- a/mesonbuild/mtest.py
+++ b/mesonbuild/mtest.py
@@ -34,6 +34,7 @@ import subprocess
import sys
import tempfile
import time
+import typing
from . import build
from . import environment
@@ -41,6 +42,9 @@ from . import mlog
from .dependencies import ExternalProgram
from .mesonlib import substring_is_in_list, MesonException
+if typing.TYPE_CHECKING:
+ from .backend.backends import TestSerialisation
+
# GNU autotools interprets a return code of 77 from tests it executes to
# mean that the test should be skipped.
GNU_SKIP_RETURNCODE = 77
@@ -49,15 +53,15 @@ GNU_SKIP_RETURNCODE = 77
# mean that the test failed even before testing what it is supposed to test.
GNU_ERROR_RETURNCODE = 99
-def is_windows():
+def is_windows() -> bool:
platname = platform.system().lower()
return platname == 'windows' or 'mingw' in platname
-def is_cygwin():
+def is_cygwin() -> bool:
platname = platform.system().lower()
return 'cygwin' in platname
-def determine_worker_count():
+def determine_worker_count() -> int:
varname = 'MESON_TESTTHREADS'
if varname in os.environ:
try:
@@ -74,7 +78,7 @@ def determine_worker_count():
num_workers = 1
return num_workers
-def add_arguments(parser):
+def add_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument('--repeat', default=1, dest='repeat', type=int,
help='Number of times to run the tests.')
parser.add_argument('--no-rebuild', default=False, action='store_true',
@@ -117,7 +121,7 @@ def add_arguments(parser):
help='Optional list of tests to run')
-def returncode_to_status(retcode):
+def returncode_to_status(retcode: int) -> str:
# Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related
# functions here because the status returned by subprocess is munged. It
# returns a negative value if the process was killed by a signal rather than
@@ -142,7 +146,7 @@ def returncode_to_status(retcode):
signame = 'SIGinvalid'
return '(exit status %d or signal %d %s)' % (retcode, signum, signame)
-def env_tuple_to_str(env):
+def env_tuple_to_str(env: typing.Iterable[typing.Tuple[str, str]]) -> str:
return ''.join(["%s='%s' " % (k, v) for k, v in env])
@@ -162,7 +166,7 @@ class TestResult(enum.Enum):
ERROR = 'ERROR'
-class TAPParser(object):
+class TAPParser:
Plan = namedtuple('Plan', ['count', 'late', 'skipped', 'explanation'])
Bailout = namedtuple('Bailout', ['message'])
Test = namedtuple('Test', ['number', 'name', 'result', 'explanation'])
@@ -181,10 +185,11 @@ class TAPParser(object):
_RE_YAML_START = re.compile(r'(\s+)---.*')
_RE_YAML_END = re.compile(r'\s+\.\.\.\s*')
- def __init__(self, io):
+ def __init__(self, io: typing.Iterator[str]):
self.io = io
- def parse_test(self, ok, num, name, directive, explanation):
+ def parse_test(self, ok: bool, num: int, name: str, directive: typing.Optional[str], explanation: typing.Optional[str]) -> \
+ typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error'], None, None]:
name = name.strip()
explanation = explanation.strip() if explanation else None
if directive is not None:
@@ -201,14 +206,14 @@ class TAPParser(object):
yield self.Test(num, name, TestResult.OK if ok else TestResult.FAIL, explanation)
- def parse(self):
+ def parse(self) -> typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout'], None, None]:
found_late_test = False
bailed_out = False
plan = None
lineno = 0
num_tests = 0
yaml_lineno = None
- yaml_indent = None
+ yaml_indent = ''
state = self._MAIN
version = 12
while True:
@@ -235,7 +240,7 @@ class TAPParser(object):
continue
if line.startswith(yaml_indent):
continue
- yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,))
+ yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno))
state = self._MAIN
assert state == self._MAIN
@@ -297,7 +302,7 @@ class TAPParser(object):
yield self.Error('unexpected input at line %d' % (lineno,))
if state == self._YAML:
- yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,))
+ yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno))
if not bailed_out and plan and num_tests != plan.count:
if num_tests < plan.count:
@@ -307,8 +312,12 @@ class TAPParser(object):
class TestRun:
- @staticmethod
- def make_exitcode(test, returncode, duration, stdo, stde, cmd):
+
+ @classmethod
+ def make_exitcode(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str],
+ returncode: int, duration: float, stdo: typing.Optional[str],
+ stde: typing.Optional[str],
+ cmd: typing.Optional[typing.List[str]]) -> 'TestRun':
if returncode == GNU_SKIP_RETURNCODE:
res = TestResult.SKIP
elif returncode == GNU_ERROR_RETURNCODE:
@@ -317,9 +326,12 @@ class TestRun:
res = TestResult.EXPECTEDFAIL if bool(returncode) else TestResult.UNEXPECTEDPASS
else:
res = TestResult.FAIL if bool(returncode) else TestResult.OK
- return TestRun(test, res, returncode, duration, stdo, stde, cmd)
+ return cls(test, test_env, res, returncode, duration, stdo, stde, cmd)
- def make_tap(test, returncode, duration, stdo, stde, cmd):
+ @classmethod
+ def make_tap(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str],
+ returncode: int, duration: float, stdo: str, stde: str,
+ cmd: typing.Optional[typing.List[str]]) -> 'TestRun':
res = None
num_tests = 0
failed = False
@@ -352,9 +364,12 @@ class TestRun:
else:
res = TestResult.FAIL if failed else TestResult.OK
- return TestRun(test, res, returncode, duration, stdo, stde, cmd)
+ return cls(test, test_env, res, returncode, duration, stdo, stde, cmd)
- def __init__(self, test, res, returncode, duration, stdo, stde, cmd):
+ def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str],
+ res: TestResult, returncode: int, duration: float,
+ stdo: typing.Optional[str], stde: typing.Optional[str],
+ cmd: typing.Optional[typing.List[str]]):
assert isinstance(res, TestResult)
self.res = res
self.returncode = returncode
@@ -362,10 +377,10 @@ class TestRun:
self.stdo = stdo
self.stde = stde
self.cmd = cmd
- self.env = test.env
+ self.env = test_env
self.should_fail = test.should_fail
- def get_log(self):
+ def get_log(self) -> str:
res = '--- command ---\n'
if self.cmd is None:
res += 'NONE\n'
@@ -385,7 +400,7 @@ class TestRun:
res += '-------\n\n'
return res
-def decode(stream):
+def decode(stream: typing.Union[None, bytes]) -> str:
if stream is None:
return ''
try:
@@ -393,51 +408,50 @@ def decode(stream):
except UnicodeDecodeError:
return stream.decode('iso-8859-1', errors='ignore')
-def write_json_log(jsonlogfile, test_name, result):
+def write_json_log(jsonlogfile: typing.TextIO, test_name: str, result: TestRun) -> None:
jresult = {'name': test_name,
'stdout': result.stdo,
'result': result.res.value,
'duration': result.duration,
'returncode': result.returncode,
- 'command': result.cmd}
- if isinstance(result.env, dict):
- jresult['env'] = result.env
- else:
- jresult['env'] = result.env.get_env(os.environ)
+ 'env': result.env,
+ 'command': result.cmd} # type: typing.Dict[str, typing.Any]
if result.stde:
jresult['stderr'] = result.stde
jsonlogfile.write(json.dumps(jresult) + '\n')
-def run_with_mono(fname):
+def run_with_mono(fname: str) -> bool:
if fname.endswith('.exe') and not (is_windows() or is_cygwin()):
return True
return False
-def load_benchmarks(build_dir):
+def load_benchmarks(build_dir: str) -> typing.List['TestSerialisation']:
datafile = os.path.join(build_dir, 'meson-private', 'meson_benchmark_setup.dat')
if not os.path.isfile(datafile):
raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir))
with open(datafile, 'rb') as f:
- obj = pickle.load(f)
+ obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f))
return obj
-def load_tests(build_dir):
+def load_tests(build_dir: str) -> typing.List['TestSerialisation']:
datafile = os.path.join(build_dir, 'meson-private', 'meson_test_setup.dat')
if not os.path.isfile(datafile):
raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir))
with open(datafile, 'rb') as f:
- obj = pickle.load(f)
+ obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f))
return obj
class SingleTestRunner:
- def __init__(self, test, env, options):
+ def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str],
+ env: typing.Dict[str, str], options: argparse.Namespace):
self.test = test
+ self.test_env = test_env
self.env = env
self.options = options
- def _get_cmd(self):
+ def _get_cmd(self) -> typing.Optional[typing.List[str]]:
if self.test.fname[0].endswith('.jar'):
return ['java', '-jar'] + self.test.fname
elif not self.test.is_cross_built and run_with_mono(self.test.fname[0]):
@@ -457,19 +471,18 @@ class SingleTestRunner:
else:
return self.test.fname
- def run(self):
+ def run(self) -> TestRun:
cmd = self._get_cmd()
if cmd is None:
skip_stdout = 'Not run because can not execute cross compiled binaries.'
- return TestRun(test=self.test, res=TestResult.SKIP, returncode=GNU_SKIP_RETURNCODE,
- duration=0.0, stdo=skip_stdout, stde=None, cmd=None)
+ return TestRun(self.test, self.test_env, TestResult.SKIP, GNU_SKIP_RETURNCODE, 0.0, skip_stdout, None, None)
else:
wrap = TestHarness.get_wrapper(self.options)
if self.options.gdb:
self.test.timeout = None
return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
- def _run_cmd(self, cmd):
+ def _run_cmd(self, cmd: typing.List[str]) -> TestRun:
starttime = time.time()
if len(self.test.extra_paths) > 0:
@@ -506,7 +519,7 @@ class SingleTestRunner:
# Make the meson executable ignore SIGINT while gdb is running.
signal.signal(signal.SIGINT, signal.SIG_IGN)
- def preexec_fn():
+ def preexec_fn() -> None:
if self.options.gdb:
# Restore the SIGINT handler for the child process to
# ensure it can handle it.
@@ -535,7 +548,7 @@ class SingleTestRunner:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
if self.options.verbose:
- print('%s time out (After %d seconds)' % (self.test.name, timeout))
+ print('{} time out (After {} seconds)'.format(self.test.name, timeout))
timed_out = True
except KeyboardInterrupt:
mlog.warning('CTRL-C detected while running %s' % (self.test.name))
@@ -572,9 +585,9 @@ class SingleTestRunner:
try:
p.communicate(timeout=1)
except subprocess.TimeoutExpired:
- additional_error = b'Test process could not be killed.'
+ additional_error = 'Test process could not be killed.'
except ValueError:
- additional_error = b'Could not read output. Maybe the process has redirected its stdout/stderr?'
+ additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?'
endtime = time.time()
duration = endtime - starttime
if additional_error is None:
@@ -592,20 +605,20 @@ class SingleTestRunner:
stdo = ""
stde = additional_error
if timed_out:
- return TestRun(self.test, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd)
+ return TestRun(self.test, self.test_env, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd)
else:
if self.test.protocol == 'exitcode':
- return TestRun.make_exitcode(self.test, p.returncode, duration, stdo, stde, cmd)
+ return TestRun.make_exitcode(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd)
else:
if self.options.verbose:
print(stdo, end='')
- return TestRun.make_tap(self.test, p.returncode, duration, stdo, stde, cmd)
+ return TestRun.make_tap(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd)
class TestHarness:
- def __init__(self, options):
+ def __init__(self, options: argparse.Namespace):
self.options = options
- self.collected_logs = []
+ self.collected_logs = [] # type: typing.List[str]
self.fail_count = 0
self.expectedfail_count = 0
self.unexpectedpass_count = 0
@@ -614,23 +627,26 @@ class TestHarness:
self.timeout_count = 0
self.is_run = False
self.tests = None
- self.suites = None
- self.logfilename = None
- self.logfile = None
- self.jsonlogfile = None
+ self.logfilename = None # type: typing.Optional[str]
+ self.logfile = None # type: typing.Optional[typing.TextIO]
+ self.jsonlogfile = None # type: typing.Optional[typing.TextIO]
if self.options.benchmark:
self.tests = load_benchmarks(options.wd)
else:
self.tests = load_tests(options.wd)
- self.load_suites()
+ ss = set()
+ for t in self.tests:
+ for s in t.suite:
+ ss.add(s)
+ self.suites = list(ss)
- def __del__(self):
+ def __del__(self) -> None:
if self.logfile:
self.logfile.close()
if self.jsonlogfile:
self.jsonlogfile.close()
- def merge_suite_options(self, options, test):
+ def merge_suite_options(self, options: argparse.Namespace, test: 'TestSerialisation') -> typing.Dict[str, str]:
if ':' in options.setup:
if options.setup not in self.build_data.test_setups:
sys.exit("Unknown test setup '%s'." % options.setup)
@@ -654,7 +670,7 @@ class TestHarness:
options.wrapper = current.exe_wrapper
return current.env.get_env(os.environ.copy())
- def get_test_runner(self, test):
+ def get_test_runner(self, test: 'TestSerialisation') -> SingleTestRunner:
options = deepcopy(self.options)
if not options.setup:
options.setup = self.build_data.test_setup_default_name
@@ -662,12 +678,11 @@ class TestHarness:
env = self.merge_suite_options(options, test)
else:
env = os.environ.copy()
- if isinstance(test.env, build.EnvironmentVariables):
- test.env = test.env.get_env(env)
- env.update(test.env)
- return SingleTestRunner(test, env, options)
+ test_env = test.env.get_env(env)
+ env.update(test_env)
+ return SingleTestRunner(test, test_env, env, options)
- def process_test_result(self, result):
+ def process_test_result(self, result: TestRun) -> None:
if result.res is TestResult.TIMEOUT:
self.timeout_count += 1
elif result.res is TestResult.SKIP:
@@ -683,7 +698,8 @@ class TestHarness:
else:
sys.exit('Unknown test result encountered: {}'.format(result.res))
- def print_stats(self, numlen, tests, name, result, i):
+ def print_stats(self, numlen: int, tests: typing.List['TestSerialisation'],
+ name: str, result: TestRun, i: int) -> None:
startpad = ' ' * (numlen - len('%d' % (i + 1)))
num = '%s%d/%d' % (startpad, i + 1, len(tests))
padding1 = ' ' * (38 - len(name))
@@ -718,7 +734,7 @@ class TestHarness:
if self.jsonlogfile:
write_json_log(self.jsonlogfile, name, result)
- def print_summary(self):
+ def print_summary(self) -> None:
msg = '''
Ok: %4d
Expected Fail: %4d
@@ -732,7 +748,7 @@ Timeout: %4d
if self.logfile:
self.logfile.write(msg)
- def print_collected_logs(self):
+ def print_collected_logs(self) -> None:
if len(self.collected_logs) > 0:
if len(self.collected_logs) > 10:
print('\nThe output from 10 first failed tests:\n')
@@ -751,10 +767,10 @@ Timeout: %4d
line = line.encode('ascii', errors='replace').decode()
print(line)
- def total_failure_count(self):
+ def total_failure_count(self) -> int:
return self.fail_count + self.unexpectedpass_count + self.timeout_count
- def doit(self):
+ def doit(self) -> int:
if self.is_run:
raise RuntimeError('Test harness object can only be used once.')
self.is_run = True
@@ -765,14 +781,16 @@ Timeout: %4d
return self.total_failure_count()
@staticmethod
- def split_suite_string(suite):
+ def split_suite_string(suite: str) -> typing.Tuple[str, str]:
if ':' in suite:
- return suite.split(':', 1)
+ # mypy can't figure out that str.split(n, 1) will return a list of
+ # length 2, so we have to help it.
+ return typing.cast(typing.Tuple[str, str], tuple(suite.split(':', 1)))
else:
return suite, ""
@staticmethod
- def test_in_suites(test, suites):
+ def test_in_suites(test: 'TestSerialisation', suites: typing.List[str]) -> bool:
for suite in suites:
(prj_match, st_match) = TestHarness.split_suite_string(suite)
for prjst in test.suite:
@@ -803,18 +821,11 @@ Timeout: %4d
return True
return False
- def test_suitable(self, test):
+ def test_suitable(self, test: 'TestSerialisation') -> bool:
return (not self.options.include_suites or TestHarness.test_in_suites(test, self.options.include_suites)) \
and not TestHarness.test_in_suites(test, self.options.exclude_suites)
- def load_suites(self):
- ss = set()
- for t in self.tests:
- for s in t.suite:
- ss.add(s)
- self.suites = list(ss)
-
- def get_tests(self):
+ def get_tests(self) -> typing.List['TestSerialisation']:
if not self.tests:
print('No tests defined.')
return []
@@ -834,14 +845,11 @@ Timeout: %4d
print('No suitable tests defined.')
return []
- for test in tests:
- test.rebuilt = False
-
return tests
- def open_log_files(self):
+ def open_log_files(self) -> None:
if not self.options.logbase or self.options.verbose:
- return None, None, None, None
+ return
namebase = None
logfile_base = os.path.join(self.options.wd, 'meson-logs', self.options.logbase)
@@ -865,8 +873,8 @@ Timeout: %4d
self.logfile.write('Inherited environment: {}\n\n'.format(inherit_env))
@staticmethod
- def get_wrapper(options):
- wrap = []
+ def get_wrapper(options: argparse.Namespace) -> typing.List[str]:
+ wrap = [] # type: typing.List[str]
if options.gdb:
wrap = ['gdb', '--quiet', '--nh']
if options.repeat > 1:
@@ -875,10 +883,9 @@ Timeout: %4d
wrap += ['--args']
if options.wrapper:
wrap += options.wrapper
- assert(isinstance(wrap, list))
return wrap
- def get_pretty_suite(self, test):
+ def get_pretty_suite(self, test: 'TestSerialisation') -> str:
if len(self.suites) > 1 and test.suite:
rv = TestHarness.split_suite_string(test.suite[0])[0]
s = "+".join(TestHarness.split_suite_string(s)[1] for s in test.suite)
@@ -888,9 +895,9 @@ Timeout: %4d
else:
return test.name
- def run_tests(self, tests):
+ def run_tests(self, tests: typing.List['TestSerialisation']) -> None:
executor = None
- futures = []
+ futures = [] # type: typing.List[typing.Tuple[conc.Future[TestRun], int, typing.List[TestSerialisation], str, int]]
numlen = len('%d' % len(tests))
self.open_log_files()
startdir = os.getcwd()
@@ -929,9 +936,9 @@ Timeout: %4d
finally:
os.chdir(startdir)
- def drain_futures(self, futures):
- for i in futures:
- (result, numlen, tests, name, i) = i
+ def drain_futures(self, futures: typing.List[typing.Tuple['conc.Future[TestRun]', int, typing.List['TestSerialisation'], str, int]]) -> None:
+ for x in futures:
+ (result, numlen, tests, name, i) = x
if self.options.repeat > 1 and self.fail_count:
result.cancel()
if self.options.verbose:
@@ -939,7 +946,7 @@ Timeout: %4d
self.process_test_result(result.result())
self.print_stats(numlen, tests, name, result.result(), i)
- def run_special(self):
+ def run_special(self) -> int:
'''Tests run by the user, usually something like "under gdb 1000 times".'''
if self.is_run:
raise RuntimeError('Can not use run_special after a full run.')
@@ -950,13 +957,13 @@ Timeout: %4d
return self.total_failure_count()
-def list_tests(th):
+def list_tests(th: TestHarness) -> bool:
tests = th.get_tests()
for t in tests:
print(th.get_pretty_suite(t))
return not tests
-def rebuild_all(wd):
+def rebuild_all(wd: str) -> bool:
if not os.path.isfile(os.path.join(wd, 'build.ninja')):
print('Only ninja backend is supported to rebuild tests before running them.')
return True
@@ -975,7 +982,7 @@ def rebuild_all(wd):
return True
-def run(options):
+def run(options: argparse.Namespace) -> int:
if options.benchmark:
options.num_processes = 1
@@ -1020,7 +1027,7 @@ def run(options):
print(e)
return 1
-def run_with_args(args):
+def run_with_args(args: typing.List[str]) -> int:
parser = argparse.ArgumentParser(prog='meson test')
add_arguments(parser)
options = parser.parse_args(args)