diff options
author | Paolo Bonzini <pbonzini@redhat.com> | 2021-01-14 10:03:16 +0100 |
---|---|---|
committer | Paolo Bonzini <pbonzini@redhat.com> | 2021-01-15 15:58:53 +0100 |
commit | e7c85555751399506e3f967e044f3d327abd3a14 (patch) | |
tree | d8bc9cb14424cf10591ff9a8b0e823f10523f28d | |
parent | 401464c61a7dce3819539398a59ff3fcce88d0ca (diff) | |
download | meson-e7c85555751399506e3f967e044f3d327abd3a14.zip meson-e7c85555751399506e3f967e044f3d327abd3a14.tar.gz meson-e7c85555751399506e3f967e044f3d327abd3a14.tar.bz2 |
mtest: move I/O handling to TestSubprocess
Move the logic to start the read/decode
tasks to TestSubprocess and keep SingleTestRunner simple.
The lines() inner function is tweaked to produce stdout as a future.
This removes the nonlocal access (which is not possible anymore
when the code is moved out of _run_cmd), and also lets _run_cmd
use "await stdo_task" for both parsed and unparsed output.
-rw-r--r-- | mesonbuild/mtest.py | 80 |
1 files changed, 44 insertions, 36 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 300eb11..dd7f926 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -976,6 +976,27 @@ def decode(stream: T.Union[None, bytes]) -> str: except UnicodeDecodeError: return stream.decode('iso-8859-1', errors='ignore') +async def read_decode(reader: asyncio.StreamReader) -> str: + return decode(await reader.read(-1)) + +# Extract lines out of the StreamReader. Print them +# along the way if requested, and at the end collect +# them all into a future. +async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str]', + console_mode: ConsoleUser) -> T.AsyncIterator[str]: + stdo_lines = [] + try: + while not reader.at_eof(): + line = decode(await reader.readline()) + stdo_lines.append(line) + if console_mode is ConsoleUser.STDOUT: + print(line, end='', flush=True) + yield line + except Exception as e: + f.set_exception(e) + finally: + f.set_result(''.join(stdo_lines)) + def run_with_mono(fname: str) -> bool: return fname.endswith('.exe') and not (is_windows() or is_cygwin()) @@ -1033,17 +1054,27 @@ async def complete_all(futures: T.Iterable[asyncio.Future]) -> None: f.result() class TestSubprocess: - def __init__(self, p: asyncio.subprocess.Process, postwait_fn: T.Callable[[], None] = None): + def __init__(self, p: asyncio.subprocess.Process, + stdout: T.Optional[int], stderr: T.Optional[int], + postwait_fn: T.Callable[[], None] = None): self._process = p + self.stdout = stdout + self.stderr = stderr + self.stdo_task = None # type: T.Optional[T.Awaitable[str]] + self.stde_task = None # type: T.Optional[T.Awaitable[str]] self.postwait_fn = postwait_fn # type: T.Callable[[], None] - @property - def stdout(self) -> T.Optional[asyncio.StreamReader]: - return self._process.stdout + def stdout_lines(self, console_mode: ConsoleUser) -> T.AsyncIterator[str]: + self.stdo_task = asyncio.get_event_loop().create_future() + return read_decode_lines(self._process.stdout, self.stdo_task, console_mode) - @property - def stderr(self) -> T.Optional[asyncio.StreamReader]: - return self._process.stderr + def communicate(self) -> T.Tuple[T.Optional[T.Awaitable[str]], + T.Optional[T.Awaitable[str]]]: + if self.stdo_task is None and self.stdout is not None: + self.stdo_task = read_decode(self._process.stdout) + if self.stderr is not None and self.stderr != asyncio.subprocess.STDOUT: + self.stde_task = read_decode(self._process.stderr) + return self.stdo_task, self.stde_task async def _kill(self) -> T.Optional[str]: # Python does not provide multiplatform support for @@ -1199,7 +1230,8 @@ class SingleTestRunner: env=env, cwd=cwd, preexec_fn=preexec_fn if not is_windows() else None) - return TestSubprocess(p, postwait_fn=postwait_fn if not is_windows() else None) + return TestSubprocess(p, stdout=stdout, stderr=stderr, + postwait_fn=postwait_fn if not is_windows() else None) async def _run_cmd(self, cmd: T.List[str]) -> None: if self.test.extra_paths: @@ -1245,32 +1277,11 @@ class SingleTestRunner: env=self.env, cwd=self.test.workdir) - stdo = stde = '' - stdo_task = stde_task = parse_task = None - - # Extract lines out of the StreamReader and print them - # along the way if requested - async def lines() -> T.AsyncIterator[str]: - stdo_lines = [] - reader = p.stdout - while not reader.at_eof(): - line = decode(await reader.readline()) - stdo_lines.append(line) - if self.console_mode is ConsoleUser.STDOUT: - print(line, end='') - yield line - - nonlocal stdo - stdo = ''.join(stdo_lines) - parse_task = None if self.runobj.needs_parsing: - parse_task = self.runobj.parse(lines()) - elif stdout is not None: - stdo_task = p.stdout.read(-1) - if stderr is not None and stderr != asyncio.subprocess.STDOUT: - stde_task = p.stderr.read(-1) + parse_task = self.runobj.parse(p.stdout_lines(self.console_mode)) + stdo_task, stde_task = p.communicate() returncode, result, additional_error = await p.wait(self.runobj.timeout) if parse_task is not None: @@ -1279,11 +1290,8 @@ class SingleTestRunner: additional_error = join_lines(additional_error, error) result = result or res - if stdo_task is not None: - stdo = decode(await stdo_task) - if stde_task is not None: - stde = decode(await stde_task) - + stdo = await stdo_task if stdo_task else '' + stde = await stde_task if stde_task else '' stde = join_lines(stde, additional_error) self.runobj.complete(returncode, result, stdo, stde) |