aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJussi Pakkanen <jpakkane@gmail.com>2019-05-16 00:31:01 +0300
committerGitHub <noreply@github.com>2019-05-16 00:31:01 +0300
commit67a5af99aa9060c9f4b2350a230343b11282cb8f (patch)
tree992a7f255a89a02db5386e02ff83c76c08a640a1
parent957d8e051c0c29beb0106e75ae7a71acc5c62cf5 (diff)
parentc571b0b18507a13b6aac749a45ea85cb93ffede3 (diff)
downloadmeson-67a5af99aa9060c9f4b2350a230343b11282cb8f.zip
meson-67a5af99aa9060c9f4b2350a230343b11282cb8f.tar.gz
meson-67a5af99aa9060c9f4b2350a230343b11282cb8f.tar.bz2
Merge pull request #5395 from dcbaker/mtest-annotations
Mtest annotations and bug fixes
-rw-r--r--mesonbuild/backend/backends.py9
-rw-r--r--mesonbuild/build.py12
-rw-r--r--mesonbuild/dependencies/base.py9
-rw-r--r--mesonbuild/environment.py2
-rw-r--r--mesonbuild/interpreter.py14
-rw-r--r--mesonbuild/mesonlib.py2
-rw-r--r--mesonbuild/mtest.py265
7 files changed, 167 insertions, 146 deletions
diff --git a/mesonbuild/backend/backends.py b/mesonbuild/backend/backends.py
index d74157f..d10e1e9 100644
--- a/mesonbuild/backend/backends.py
+++ b/mesonbuild/backend/backends.py
@@ -27,6 +27,7 @@ from ..compilers import CompilerArgs, VisualStudioLikeCompiler
from collections import OrderedDict
import shlex
from functools import lru_cache
+import typing
class CleanTrees:
@@ -83,8 +84,12 @@ class ExecutableSerialisation:
self.capture = capture
class TestSerialisation:
- def __init__(self, name, project, suite, fname, is_cross_built, exe_wrapper, is_parallel,
- cmd_args, env, should_fail, timeout, workdir, extra_paths, protocol):
+ def __init__(self, name: str, project: str, suite: str, fname: typing.List[str],
+ is_cross_built: bool, exe_wrapper: typing.Optional[build.Executable],
+ is_parallel: bool, cmd_args: typing.List[str],
+ env: build.EnvironmentVariables, should_fail: bool,
+ timeout: typing.Optional[int], workdir: typing.Optional[str],
+ extra_paths: typing.List[str], protocol: str):
self.name = name
self.project_name = project
self.suite = suite
diff --git a/mesonbuild/build.py b/mesonbuild/build.py
index bc17445..093ab8f 100644
--- a/mesonbuild/build.py
+++ b/mesonbuild/build.py
@@ -19,6 +19,7 @@ import itertools, pathlib
import hashlib
import pickle
from functools import lru_cache
+import typing
from . import environment
from . import dependencies
@@ -107,7 +108,7 @@ class Build:
all dependencies and so on.
"""
- def __init__(self, environment):
+ def __init__(self, environment: environment.Environment):
self.project_name = 'name of master project'
self.project_version = None
self.environment = environment
@@ -140,7 +141,7 @@ class Build:
self.dep_manifest_name = None
self.dep_manifest = {}
self.cross_stdlibs = {}
- self.test_setups = {}
+ self.test_setups = {} # type: typing.Dict[str, TestSetup]
self.test_setup_default_name = None
self.find_overrides = {}
self.searched_programs = set() # The list of all programs that have been searched for.
@@ -335,7 +336,7 @@ class EnvironmentVariables:
return value
- def get_env(self, full_env):
+ def get_env(self, full_env: typing.Dict[str, str]) -> typing.Dict[str, str]:
env = full_env.copy()
for method, name, values, kwargs in self.envvars:
env[name] = method(full_env, name, values, kwargs)
@@ -2359,7 +2360,8 @@ class RunScript(dict):
self['args'] = args
class TestSetup:
- def __init__(self, *, exe_wrapper=None, gdb=None, timeout_multiplier=None, env=None):
+ def __init__(self, exe_wrapper: typing.Optional[typing.List[str]], gdb: bool,
+ timeout_multiplier: int, env: EnvironmentVariables):
self.exe_wrapper = exe_wrapper
self.gdb = gdb
self.timeout_multiplier = timeout_multiplier
@@ -2384,7 +2386,7 @@ def get_sources_string_names(sources):
raise AssertionError('Unknown source type: {!r}'.format(s))
return names
-def load(build_dir):
+def load(build_dir: str) -> Build:
filename = os.path.join(build_dir, 'meson-private', 'build.dat')
load_fail_msg = 'Build data file {!r} is corrupted. Try with a fresh build tree.'.format(filename)
nonexisting_fail_msg = 'No such build data file as "{!r}".'.format(filename)
diff --git a/mesonbuild/dependencies/base.py b/mesonbuild/dependencies/base.py
index 034f6df..f2397e2 100644
--- a/mesonbuild/dependencies/base.py
+++ b/mesonbuild/dependencies/base.py
@@ -1982,7 +1982,8 @@ class DubDependency(ExternalDependency):
class ExternalProgram:
windows_exts = ('exe', 'msc', 'com', 'bat', 'cmd')
- def __init__(self, name, command=None, silent=False, search_dir=None):
+ def __init__(self, name: str, command: typing.Optional[typing.List[str]] = None,
+ silent: bool = False, search_dir: typing.Optional[str] = None):
self.name = name
if command is not None:
self.command = listify(command)
@@ -2006,11 +2007,11 @@ class ExternalProgram:
else:
mlog.log('Program', mlog.bold(name), 'found:', mlog.red('NO'))
- def __repr__(self):
+ def __repr__(self) -> str:
r = '<{} {!r} -> {!r}>'
return r.format(self.__class__.__name__, self.name, self.command)
- def description(self):
+ def description(self) -> str:
'''Human friendly description of the command'''
return ' '.join(self.command)
@@ -2169,7 +2170,7 @@ class ExternalProgram:
# all executables whether in PATH or with an absolute path
return [command]
- def found(self):
+ def found(self) -> bool:
return self.command[0] is not None
def get_command(self):
diff --git a/mesonbuild/environment.py b/mesonbuild/environment.py
index f86b613..e638f31 100644
--- a/mesonbuild/environment.py
+++ b/mesonbuild/environment.py
@@ -115,7 +115,7 @@ def find_coverage_tools():
return gcovr_exe, gcovr_new_rootdir, lcov_exe, genhtml_exe
-def detect_ninja(version='1.5', log=False):
+def detect_ninja(version: str = '1.5', log: bool = False) -> str:
env_ninja = os.environ.get('NINJA', None)
for n in [env_ninja] if env_ninja else ['ninja', 'ninja-build', 'samu']:
try:
diff --git a/mesonbuild/interpreter.py b/mesonbuild/interpreter.py
index 0afbb10..4a91b68 100644
--- a/mesonbuild/interpreter.py
+++ b/mesonbuild/interpreter.py
@@ -39,6 +39,7 @@ from collections import namedtuple
from itertools import chain
from pathlib import PurePath
import functools
+import typing
import importlib
@@ -856,8 +857,10 @@ class RunTargetHolder(InterpreterObject, ObjectHolder):
return r.format(self.__class__.__name__, h.get_id(), h.command)
class Test(InterpreterObject):
- def __init__(self, name, project, suite, exe, depends, is_parallel,
- cmd_args, env, should_fail, timeout, workdir, protocol):
+ def __init__(self, name: str, project: str, suite: typing.List[str], exe: build.Executable,
+ depends: typing.List[typing.Union[build.CustomTarget, build.BuildTarget]],
+ is_parallel: bool, cmd_args: typing.List[str], env: build.EnvironmentVariables,
+ should_fail: bool, timeout: int, workdir: typing.Optional[str], protocol: str):
InterpreterObject.__init__(self)
self.name = name
self.suite = suite
@@ -3254,7 +3257,7 @@ This will become a hard error in the future.''' % kwargs['input'], location=self
def func_test(self, node, args, kwargs):
self.add_test(node, args, kwargs, True)
- def unpack_env_kwarg(self, kwargs):
+ def unpack_env_kwarg(self, kwargs) -> build.EnvironmentVariables:
envlist = kwargs.get('env', EnvironmentVariablesHolder())
if isinstance(envlist, EnvironmentVariablesHolder):
env = envlist.held_object
@@ -3762,10 +3765,7 @@ different subdirectory.
'is_default can be set to true only once' % self.build.test_setup_default_name)
self.build.test_setup_default_name = setup_name
env = self.unpack_env_kwarg(kwargs)
- self.build.test_setups[setup_name] = build.TestSetup(exe_wrapper=exe_wrapper,
- gdb=gdb,
- timeout_multiplier=timeout_multiplier,
- env=env)
+ self.build.test_setups[setup_name] = build.TestSetup(exe_wrapper, gdb, timeout_multiplier, env)
def get_argdict_on_crossness(self, native_dict, cross_dict, kwargs):
for_native = kwargs.get('native', not self.environment.is_cross_build())
diff --git a/mesonbuild/mesonlib.py b/mesonbuild/mesonlib.py
index f53197b..eb59a1c 100644
--- a/mesonbuild/mesonlib.py
+++ b/mesonbuild/mesonlib.py
@@ -1302,7 +1302,7 @@ def detect_subprojects(spdir_name, current_dir='', result=None):
def get_error_location_string(fname: str, lineno: str) -> str:
return '{}:{}:'.format(fname, lineno)
-def substring_is_in_list(substr, strlist):
+def substring_is_in_list(substr: str, strlist: typing.List[str]) -> bool:
for s in strlist:
if substr in s:
return True
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py
index 8df8f48..b09de16 100644
--- a/mesonbuild/mtest.py
+++ b/mesonbuild/mtest.py
@@ -14,26 +14,36 @@
# A tool to run tests in many different ways.
-import shlex
-import subprocess, sys, os, argparse
-import pickle
-from mesonbuild import build
-from mesonbuild import environment
-from mesonbuild.dependencies import ExternalProgram
-from mesonbuild.mesonlib import substring_is_in_list, MesonException
-from mesonbuild import mlog
-
from collections import namedtuple
-import io
-import re
-import tempfile
-import time, datetime, multiprocessing, json
+from copy import deepcopy
+import argparse
import concurrent.futures as conc
+import datetime
+import enum
+import io
+import json
+import multiprocessing
+import os
+import pickle
import platform
-import signal
import random
-from copy import deepcopy
-import enum
+import re
+import shlex
+import signal
+import subprocess
+import sys
+import tempfile
+import time
+import typing
+
+from . import build
+from . import environment
+from . import mlog
+from .dependencies import ExternalProgram
+from .mesonlib import substring_is_in_list, MesonException
+
+if typing.TYPE_CHECKING:
+ from .backend.backends import TestSerialisation
# GNU autotools interprets a return code of 77 from tests it executes to
# mean that the test should be skipped.
@@ -43,15 +53,15 @@ GNU_SKIP_RETURNCODE = 77
# mean that the test failed even before testing what it is supposed to test.
GNU_ERROR_RETURNCODE = 99
-def is_windows():
+def is_windows() -> bool:
platname = platform.system().lower()
return platname == 'windows' or 'mingw' in platname
-def is_cygwin():
+def is_cygwin() -> bool:
platname = platform.system().lower()
return 'cygwin' in platname
-def determine_worker_count():
+def determine_worker_count() -> int:
varname = 'MESON_TESTTHREADS'
if varname in os.environ:
try:
@@ -68,7 +78,7 @@ def determine_worker_count():
num_workers = 1
return num_workers
-def add_arguments(parser):
+def add_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument('--repeat', default=1, dest='repeat', type=int,
help='Number of times to run the tests.')
parser.add_argument('--no-rebuild', default=False, action='store_true',
@@ -111,7 +121,7 @@ def add_arguments(parser):
help='Optional list of tests to run')
-def returncode_to_status(retcode):
+def returncode_to_status(retcode: int) -> str:
# Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related
# functions here because the status returned by subprocess is munged. It
# returns a negative value if the process was killed by a signal rather than
@@ -136,7 +146,7 @@ def returncode_to_status(retcode):
signame = 'SIGinvalid'
return '(exit status %d or signal %d %s)' % (retcode, signum, signame)
-def env_tuple_to_str(env):
+def env_tuple_to_str(env: typing.Iterable[typing.Tuple[str, str]]) -> str:
return ''.join(["%s='%s' " % (k, v) for k, v in env])
@@ -156,7 +166,7 @@ class TestResult(enum.Enum):
ERROR = 'ERROR'
-class TAPParser(object):
+class TAPParser:
Plan = namedtuple('Plan', ['count', 'late', 'skipped', 'explanation'])
Bailout = namedtuple('Bailout', ['message'])
Test = namedtuple('Test', ['number', 'name', 'result', 'explanation'])
@@ -167,18 +177,19 @@ class TAPParser(object):
_AFTER_TEST = 2
_YAML = 3
- _RE_BAILOUT = r'Bail out!\s*(.*)'
- _RE_DIRECTIVE = r'(?:\s*\#\s*([Ss][Kk][Ii][Pp]\S*|[Tt][Oo][Dd][Oo])\b\s*(.*))?'
- _RE_PLAN = r'1\.\.([0-9]+)' + _RE_DIRECTIVE
- _RE_TEST = r'((?:not )?ok)\s*(?:([0-9]+)\s*)?([^#]*)' + _RE_DIRECTIVE
- _RE_VERSION = r'TAP version ([0-9]+)'
- _RE_YAML_START = r'(\s+)---.*'
- _RE_YAML_END = r'\s+\.\.\.\s*'
+ _RE_BAILOUT = re.compile(r'Bail out!\s*(.*)')
+ _RE_DIRECTIVE = re.compile(r'(?:\s*\#\s*([Ss][Kk][Ii][Pp]\S*|[Tt][Oo][Dd][Oo])\b\s*(.*))?')
+ _RE_PLAN = re.compile(r'1\.\.([0-9]+)' + _RE_DIRECTIVE.pattern)
+ _RE_TEST = re.compile(r'((?:not )?ok)\s*(?:([0-9]+)\s*)?([^#]*)' + _RE_DIRECTIVE.pattern)
+ _RE_VERSION = re.compile(r'TAP version ([0-9]+)')
+ _RE_YAML_START = re.compile(r'(\s+)---.*')
+ _RE_YAML_END = re.compile(r'\s+\.\.\.\s*')
- def __init__(self, io):
+ def __init__(self, io: typing.Iterator[str]):
self.io = io
- def parse_test(self, ok, num, name, directive, explanation):
+ def parse_test(self, ok: bool, num: int, name: str, directive: typing.Optional[str], explanation: typing.Optional[str]) -> \
+ typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error'], None, None]:
name = name.strip()
explanation = explanation.strip() if explanation else None
if directive is not None:
@@ -195,14 +206,14 @@ class TAPParser(object):
yield self.Test(num, name, TestResult.OK if ok else TestResult.FAIL, explanation)
- def parse(self):
+ def parse(self) -> typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout'], None, None]:
found_late_test = False
bailed_out = False
plan = None
lineno = 0
num_tests = 0
yaml_lineno = None
- yaml_indent = None
+ yaml_indent = ''
state = self._MAIN
version = 12
while True:
@@ -215,7 +226,7 @@ class TAPParser(object):
# YAML blocks are only accepted after a test
if state == self._AFTER_TEST:
if version >= 13:
- m = re.match(self._RE_YAML_START, line)
+ m = self._RE_YAML_START.match(line)
if m:
state = self._YAML
yaml_lineno = lineno
@@ -224,19 +235,19 @@ class TAPParser(object):
state = self._MAIN
elif state == self._YAML:
- if re.match(self._RE_YAML_END, line):
+ if self._RE_YAML_END.match(line):
state = self._MAIN
continue
if line.startswith(yaml_indent):
continue
- yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,))
+ yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno))
state = self._MAIN
assert state == self._MAIN
if line.startswith('#'):
continue
- m = re.match(self._RE_TEST, line)
+ m = self._RE_TEST.match(line)
if m:
if plan and plan.late and not found_late_test:
yield self.Error('unexpected test after late plan')
@@ -250,7 +261,7 @@ class TAPParser(object):
state = self._AFTER_TEST
continue
- m = re.match(self._RE_PLAN, line)
+ m = self._RE_PLAN.match(line)
if m:
if plan:
yield self.Error('more than one plan found')
@@ -269,13 +280,13 @@ class TAPParser(object):
yield plan
continue
- m = re.match(self._RE_BAILOUT, line)
+ m = self._RE_BAILOUT.match(line)
if m:
yield self.Bailout(m.group(1))
bailed_out = True
continue
- m = re.match(self._RE_VERSION, line)
+ m = self._RE_VERSION.match(line)
if m:
# The TAP version is only accepted as the first line
if lineno != 1:
@@ -291,7 +302,7 @@ class TAPParser(object):
yield self.Error('unexpected input at line %d' % (lineno,))
if state == self._YAML:
- yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,))
+ yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno))
if not bailed_out and plan and num_tests != plan.count:
if num_tests < plan.count:
@@ -301,8 +312,12 @@ class TAPParser(object):
class TestRun:
- @staticmethod
- def make_exitcode(test, returncode, duration, stdo, stde, cmd):
+
+ @classmethod
+ def make_exitcode(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str],
+ returncode: int, duration: float, stdo: typing.Optional[str],
+ stde: typing.Optional[str],
+ cmd: typing.Optional[typing.List[str]]) -> 'TestRun':
if returncode == GNU_SKIP_RETURNCODE:
res = TestResult.SKIP
elif returncode == GNU_ERROR_RETURNCODE:
@@ -311,9 +326,12 @@ class TestRun:
res = TestResult.EXPECTEDFAIL if bool(returncode) else TestResult.UNEXPECTEDPASS
else:
res = TestResult.FAIL if bool(returncode) else TestResult.OK
- return TestRun(test, res, returncode, duration, stdo, stde, cmd)
+ return cls(test, test_env, res, returncode, duration, stdo, stde, cmd)
- def make_tap(test, returncode, duration, stdo, stde, cmd):
+ @classmethod
+ def make_tap(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str],
+ returncode: int, duration: float, stdo: str, stde: str,
+ cmd: typing.Optional[typing.List[str]]) -> 'TestRun':
res = None
num_tests = 0
failed = False
@@ -346,9 +364,12 @@ class TestRun:
else:
res = TestResult.FAIL if failed else TestResult.OK
- return TestRun(test, res, returncode, duration, stdo, stde, cmd)
+ return cls(test, test_env, res, returncode, duration, stdo, stde, cmd)
- def __init__(self, test, res, returncode, duration, stdo, stde, cmd):
+ def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str],
+ res: TestResult, returncode: int, duration: float,
+ stdo: typing.Optional[str], stde: typing.Optional[str],
+ cmd: typing.Optional[typing.List[str]]):
assert isinstance(res, TestResult)
self.res = res
self.returncode = returncode
@@ -356,10 +377,10 @@ class TestRun:
self.stdo = stdo
self.stde = stde
self.cmd = cmd
- self.env = test.env
+ self.env = test_env
self.should_fail = test.should_fail
- def get_log(self):
+ def get_log(self) -> str:
res = '--- command ---\n'
if self.cmd is None:
res += 'NONE\n'
@@ -379,7 +400,7 @@ class TestRun:
res += '-------\n\n'
return res
-def decode(stream):
+def decode(stream: typing.Union[None, bytes]) -> str:
if stream is None:
return ''
try:
@@ -387,51 +408,50 @@ def decode(stream):
except UnicodeDecodeError:
return stream.decode('iso-8859-1', errors='ignore')
-def write_json_log(jsonlogfile, test_name, result):
+def write_json_log(jsonlogfile: typing.TextIO, test_name: str, result: TestRun) -> None:
jresult = {'name': test_name,
'stdout': result.stdo,
'result': result.res.value,
'duration': result.duration,
'returncode': result.returncode,
- 'command': result.cmd}
- if isinstance(result.env, dict):
- jresult['env'] = result.env
- else:
- jresult['env'] = result.env.get_env(os.environ)
+ 'env': result.env,
+ 'command': result.cmd} # type: typing.Dict[str, typing.Any]
if result.stde:
jresult['stderr'] = result.stde
jsonlogfile.write(json.dumps(jresult) + '\n')
-def run_with_mono(fname):
+def run_with_mono(fname: str) -> bool:
if fname.endswith('.exe') and not (is_windows() or is_cygwin()):
return True
return False
-def load_benchmarks(build_dir):
+def load_benchmarks(build_dir: str) -> typing.List['TestSerialisation']:
datafile = os.path.join(build_dir, 'meson-private', 'meson_benchmark_setup.dat')
if not os.path.isfile(datafile):
raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir))
with open(datafile, 'rb') as f:
- obj = pickle.load(f)
+ obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f))
return obj
-def load_tests(build_dir):
+def load_tests(build_dir: str) -> typing.List['TestSerialisation']:
datafile = os.path.join(build_dir, 'meson-private', 'meson_test_setup.dat')
if not os.path.isfile(datafile):
raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir))
with open(datafile, 'rb') as f:
- obj = pickle.load(f)
+ obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f))
return obj
class SingleTestRunner:
- def __init__(self, test, env, options):
+ def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str],
+ env: typing.Dict[str, str], options: argparse.Namespace):
self.test = test
+ self.test_env = test_env
self.env = env
self.options = options
- def _get_cmd(self):
+ def _get_cmd(self) -> typing.Optional[typing.List[str]]:
if self.test.fname[0].endswith('.jar'):
return ['java', '-jar'] + self.test.fname
elif not self.test.is_cross_built and run_with_mono(self.test.fname[0]):
@@ -451,19 +471,18 @@ class SingleTestRunner:
else:
return self.test.fname
- def run(self):
+ def run(self) -> TestRun:
cmd = self._get_cmd()
if cmd is None:
skip_stdout = 'Not run because can not execute cross compiled binaries.'
- return TestRun(test=self.test, res=TestResult.SKIP, returncode=GNU_SKIP_RETURNCODE,
- duration=0.0, stdo=skip_stdout, stde=None, cmd=None)
+ return TestRun(self.test, self.test_env, TestResult.SKIP, GNU_SKIP_RETURNCODE, 0.0, skip_stdout, None, None)
else:
wrap = TestHarness.get_wrapper(self.options)
if self.options.gdb:
self.test.timeout = None
return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
- def _run_cmd(self, cmd):
+ def _run_cmd(self, cmd: typing.List[str]) -> TestRun:
starttime = time.time()
if len(self.test.extra_paths) > 0:
@@ -500,7 +519,7 @@ class SingleTestRunner:
# Make the meson executable ignore SIGINT while gdb is running.
signal.signal(signal.SIGINT, signal.SIG_IGN)
- def preexec_fn():
+ def preexec_fn() -> None:
if self.options.gdb:
# Restore the SIGINT handler for the child process to
# ensure it can handle it.
@@ -529,7 +548,7 @@ class SingleTestRunner:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
if self.options.verbose:
- print('%s time out (After %d seconds)' % (self.test.name, timeout))
+ print('{} time out (After {} seconds)'.format(self.test.name, timeout))
timed_out = True
except KeyboardInterrupt:
mlog.warning('CTRL-C detected while running %s' % (self.test.name))
@@ -566,9 +585,9 @@ class SingleTestRunner:
try:
p.communicate(timeout=1)
except subprocess.TimeoutExpired:
- additional_error = b'Test process could not be killed.'
+ additional_error = 'Test process could not be killed.'
except ValueError:
- additional_error = b'Could not read output. Maybe the process has redirected its stdout/stderr?'
+ additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?'
endtime = time.time()
duration = endtime - starttime
if additional_error is None:
@@ -586,20 +605,20 @@ class SingleTestRunner:
stdo = ""
stde = additional_error
if timed_out:
- return TestRun(self.test, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd)
+ return TestRun(self.test, self.test_env, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd)
else:
if self.test.protocol == 'exitcode':
- return TestRun.make_exitcode(self.test, p.returncode, duration, stdo, stde, cmd)
+ return TestRun.make_exitcode(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd)
else:
if self.options.verbose:
print(stdo, end='')
- return TestRun.make_tap(self.test, p.returncode, duration, stdo, stde, cmd)
+ return TestRun.make_tap(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd)
class TestHarness:
- def __init__(self, options):
+ def __init__(self, options: argparse.Namespace):
self.options = options
- self.collected_logs = []
+ self.collected_logs = [] # type: typing.List[str]
self.fail_count = 0
self.expectedfail_count = 0
self.unexpectedpass_count = 0
@@ -608,23 +627,26 @@ class TestHarness:
self.timeout_count = 0
self.is_run = False
self.tests = None
- self.suites = None
- self.logfilename = None
- self.logfile = None
- self.jsonlogfile = None
+ self.logfilename = None # type: typing.Optional[str]
+ self.logfile = None # type: typing.Optional[typing.TextIO]
+ self.jsonlogfile = None # type: typing.Optional[typing.TextIO]
if self.options.benchmark:
self.tests = load_benchmarks(options.wd)
else:
self.tests = load_tests(options.wd)
- self.load_suites()
+ ss = set()
+ for t in self.tests:
+ for s in t.suite:
+ ss.add(s)
+ self.suites = list(ss)
- def __del__(self):
+ def __del__(self) -> None:
if self.logfile:
self.logfile.close()
if self.jsonlogfile:
self.jsonlogfile.close()
- def merge_suite_options(self, options, test):
+ def merge_suite_options(self, options: argparse.Namespace, test: 'TestSerialisation') -> typing.Dict[str, str]:
if ':' in options.setup:
if options.setup not in self.build_data.test_setups:
sys.exit("Unknown test setup '%s'." % options.setup)
@@ -648,7 +670,7 @@ class TestHarness:
options.wrapper = current.exe_wrapper
return current.env.get_env(os.environ.copy())
- def get_test_runner(self, test):
+ def get_test_runner(self, test: 'TestSerialisation') -> SingleTestRunner:
options = deepcopy(self.options)
if not options.setup:
options.setup = self.build_data.test_setup_default_name
@@ -656,12 +678,11 @@ class TestHarness:
env = self.merge_suite_options(options, test)
else:
env = os.environ.copy()
- if isinstance(test.env, build.EnvironmentVariables):
- test.env = test.env.get_env(env)
- env.update(test.env)
- return SingleTestRunner(test, env, options)
+ test_env = test.env.get_env(env)
+ env.update(test_env)
+ return SingleTestRunner(test, test_env, env, options)
- def process_test_result(self, result):
+ def process_test_result(self, result: TestRun) -> None:
if result.res is TestResult.TIMEOUT:
self.timeout_count += 1
elif result.res is TestResult.SKIP:
@@ -677,7 +698,8 @@ class TestHarness:
else:
sys.exit('Unknown test result encountered: {}'.format(result.res))
- def print_stats(self, numlen, tests, name, result, i):
+ def print_stats(self, numlen: int, tests: typing.List['TestSerialisation'],
+ name: str, result: TestRun, i: int) -> None:
startpad = ' ' * (numlen - len('%d' % (i + 1)))
num = '%s%d/%d' % (startpad, i + 1, len(tests))
padding1 = ' ' * (38 - len(name))
@@ -712,7 +734,7 @@ class TestHarness:
if self.jsonlogfile:
write_json_log(self.jsonlogfile, name, result)
- def print_summary(self):
+ def print_summary(self) -> None:
msg = '''
Ok: %4d
Expected Fail: %4d
@@ -726,7 +748,7 @@ Timeout: %4d
if self.logfile:
self.logfile.write(msg)
- def print_collected_logs(self):
+ def print_collected_logs(self) -> None:
if len(self.collected_logs) > 0:
if len(self.collected_logs) > 10:
print('\nThe output from 10 first failed tests:\n')
@@ -745,10 +767,10 @@ Timeout: %4d
line = line.encode('ascii', errors='replace').decode()
print(line)
- def total_failure_count(self):
+ def total_failure_count(self) -> int:
return self.fail_count + self.unexpectedpass_count + self.timeout_count
- def doit(self):
+ def doit(self) -> int:
if self.is_run:
raise RuntimeError('Test harness object can only be used once.')
self.is_run = True
@@ -759,14 +781,16 @@ Timeout: %4d
return self.total_failure_count()
@staticmethod
- def split_suite_string(suite):
+ def split_suite_string(suite: str) -> typing.Tuple[str, str]:
if ':' in suite:
- return suite.split(':', 1)
+ # mypy can't figure out that str.split(n, 1) will return a list of
+ # length 2, so we have to help it.
+ return typing.cast(typing.Tuple[str, str], tuple(suite.split(':', 1)))
else:
return suite, ""
@staticmethod
- def test_in_suites(test, suites):
+ def test_in_suites(test: 'TestSerialisation', suites: typing.List[str]) -> bool:
for suite in suites:
(prj_match, st_match) = TestHarness.split_suite_string(suite)
for prjst in test.suite:
@@ -797,18 +821,11 @@ Timeout: %4d
return True
return False
- def test_suitable(self, test):
+ def test_suitable(self, test: 'TestSerialisation') -> bool:
return (not self.options.include_suites or TestHarness.test_in_suites(test, self.options.include_suites)) \
and not TestHarness.test_in_suites(test, self.options.exclude_suites)
- def load_suites(self):
- ss = set()
- for t in self.tests:
- for s in t.suite:
- ss.add(s)
- self.suites = list(ss)
-
- def get_tests(self):
+ def get_tests(self) -> typing.List['TestSerialisation']:
if not self.tests:
print('No tests defined.')
return []
@@ -828,14 +845,11 @@ Timeout: %4d
print('No suitable tests defined.')
return []
- for test in tests:
- test.rebuilt = False
-
return tests
- def open_log_files(self):
+ def open_log_files(self) -> None:
if not self.options.logbase or self.options.verbose:
- return None, None, None, None
+ return
namebase = None
logfile_base = os.path.join(self.options.wd, 'meson-logs', self.options.logbase)
@@ -859,8 +873,8 @@ Timeout: %4d
self.logfile.write('Inherited environment: {}\n\n'.format(inherit_env))
@staticmethod
- def get_wrapper(options):
- wrap = []
+ def get_wrapper(options: argparse.Namespace) -> typing.List[str]:
+ wrap = [] # type: typing.List[str]
if options.gdb:
wrap = ['gdb', '--quiet', '--nh']
if options.repeat > 1:
@@ -869,10 +883,9 @@ Timeout: %4d
wrap += ['--args']
if options.wrapper:
wrap += options.wrapper
- assert(isinstance(wrap, list))
return wrap
- def get_pretty_suite(self, test):
+ def get_pretty_suite(self, test: 'TestSerialisation') -> str:
if len(self.suites) > 1 and test.suite:
rv = TestHarness.split_suite_string(test.suite[0])[0]
s = "+".join(TestHarness.split_suite_string(s)[1] for s in test.suite)
@@ -882,9 +895,9 @@ Timeout: %4d
else:
return test.name
- def run_tests(self, tests):
+ def run_tests(self, tests: typing.List['TestSerialisation']) -> None:
executor = None
- futures = []
+ futures = [] # type: typing.List[typing.Tuple[conc.Future[TestRun], int, typing.List[TestSerialisation], str, int]]
numlen = len('%d' % len(tests))
self.open_log_files()
startdir = os.getcwd()
@@ -923,9 +936,9 @@ Timeout: %4d
finally:
os.chdir(startdir)
- def drain_futures(self, futures):
- for i in futures:
- (result, numlen, tests, name, i) = i
+ def drain_futures(self, futures: typing.List[typing.Tuple['conc.Future[TestRun]', int, typing.List['TestSerialisation'], str, int]]) -> None:
+ for x in futures:
+ (result, numlen, tests, name, i) = x
if self.options.repeat > 1 and self.fail_count:
result.cancel()
if self.options.verbose:
@@ -933,7 +946,7 @@ Timeout: %4d
self.process_test_result(result.result())
self.print_stats(numlen, tests, name, result.result(), i)
- def run_special(self):
+ def run_special(self) -> int:
'''Tests run by the user, usually something like "under gdb 1000 times".'''
if self.is_run:
raise RuntimeError('Can not use run_special after a full run.')
@@ -944,13 +957,13 @@ Timeout: %4d
return self.total_failure_count()
-def list_tests(th):
+def list_tests(th: TestHarness) -> bool:
tests = th.get_tests()
for t in tests:
print(th.get_pretty_suite(t))
return not tests
-def rebuild_all(wd):
+def rebuild_all(wd: str) -> bool:
if not os.path.isfile(os.path.join(wd, 'build.ninja')):
print('Only ninja backend is supported to rebuild tests before running them.')
return True
@@ -969,7 +982,7 @@ def rebuild_all(wd):
return True
-def run(options):
+def run(options: argparse.Namespace) -> int:
if options.benchmark:
options.num_processes = 1
@@ -1014,7 +1027,7 @@ def run(options):
print(e)
return 1
-def run_with_args(args):
+def run_with_args(args: typing.List[str]) -> int:
parser = argparse.ArgumentParser(prog='meson test')
add_arguments(parser)
options = parser.parse_args(args)