aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2021-01-14 10:03:16 +0100
committerPaolo Bonzini <pbonzini@redhat.com>2021-01-15 15:58:53 +0100
commite7c85555751399506e3f967e044f3d327abd3a14 (patch)
treed8bc9cb14424cf10591ff9a8b0e823f10523f28d
parent401464c61a7dce3819539398a59ff3fcce88d0ca (diff)
downloadmeson-e7c85555751399506e3f967e044f3d327abd3a14.zip
meson-e7c85555751399506e3f967e044f3d327abd3a14.tar.gz
meson-e7c85555751399506e3f967e044f3d327abd3a14.tar.bz2
mtest: move I/O handling to TestSubprocess
Move the logic to start the read/decode tasks to TestSubprocess and keep SingleTestRunner simple. The lines() inner function is tweaked to produce stdout as a future. This removes the nonlocal access (which is not possible anymore when the code is moved out of _run_cmd), and also lets _run_cmd use "await stdo_task" for both parsed and unparsed output.
-rw-r--r--mesonbuild/mtest.py80
1 files changed, 44 insertions, 36 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py
index 300eb11..dd7f926 100644
--- a/mesonbuild/mtest.py
+++ b/mesonbuild/mtest.py
@@ -976,6 +976,27 @@ def decode(stream: T.Union[None, bytes]) -> str:
except UnicodeDecodeError:
return stream.decode('iso-8859-1', errors='ignore')
+async def read_decode(reader: asyncio.StreamReader) -> str:
+ return decode(await reader.read(-1))
+
+# Extract lines out of the StreamReader. Print them
+# along the way if requested, and at the end collect
+# them all into a future.
+async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str]',
+ console_mode: ConsoleUser) -> T.AsyncIterator[str]:
+ stdo_lines = []
+ try:
+ while not reader.at_eof():
+ line = decode(await reader.readline())
+ stdo_lines.append(line)
+ if console_mode is ConsoleUser.STDOUT:
+ print(line, end='', flush=True)
+ yield line
+ except Exception as e:
+ f.set_exception(e)
+ finally:
+ f.set_result(''.join(stdo_lines))
+
def run_with_mono(fname: str) -> bool:
return fname.endswith('.exe') and not (is_windows() or is_cygwin())
@@ -1033,17 +1054,27 @@ async def complete_all(futures: T.Iterable[asyncio.Future]) -> None:
f.result()
class TestSubprocess:
- def __init__(self, p: asyncio.subprocess.Process, postwait_fn: T.Callable[[], None] = None):
+ def __init__(self, p: asyncio.subprocess.Process,
+ stdout: T.Optional[int], stderr: T.Optional[int],
+ postwait_fn: T.Callable[[], None] = None):
self._process = p
+ self.stdout = stdout
+ self.stderr = stderr
+ self.stdo_task = None # type: T.Optional[T.Awaitable[str]]
+ self.stde_task = None # type: T.Optional[T.Awaitable[str]]
self.postwait_fn = postwait_fn # type: T.Callable[[], None]
- @property
- def stdout(self) -> T.Optional[asyncio.StreamReader]:
- return self._process.stdout
+ def stdout_lines(self, console_mode: ConsoleUser) -> T.AsyncIterator[str]:
+ self.stdo_task = asyncio.get_event_loop().create_future()
+ return read_decode_lines(self._process.stdout, self.stdo_task, console_mode)
- @property
- def stderr(self) -> T.Optional[asyncio.StreamReader]:
- return self._process.stderr
+ def communicate(self) -> T.Tuple[T.Optional[T.Awaitable[str]],
+ T.Optional[T.Awaitable[str]]]:
+ if self.stdo_task is None and self.stdout is not None:
+ self.stdo_task = read_decode(self._process.stdout)
+ if self.stderr is not None and self.stderr != asyncio.subprocess.STDOUT:
+ self.stde_task = read_decode(self._process.stderr)
+ return self.stdo_task, self.stde_task
async def _kill(self) -> T.Optional[str]:
# Python does not provide multiplatform support for
@@ -1199,7 +1230,8 @@ class SingleTestRunner:
env=env,
cwd=cwd,
preexec_fn=preexec_fn if not is_windows() else None)
- return TestSubprocess(p, postwait_fn=postwait_fn if not is_windows() else None)
+ return TestSubprocess(p, stdout=stdout, stderr=stderr,
+ postwait_fn=postwait_fn if not is_windows() else None)
async def _run_cmd(self, cmd: T.List[str]) -> None:
if self.test.extra_paths:
@@ -1245,32 +1277,11 @@ class SingleTestRunner:
env=self.env,
cwd=self.test.workdir)
- stdo = stde = ''
- stdo_task = stde_task = parse_task = None
-
- # Extract lines out of the StreamReader and print them
- # along the way if requested
- async def lines() -> T.AsyncIterator[str]:
- stdo_lines = []
- reader = p.stdout
- while not reader.at_eof():
- line = decode(await reader.readline())
- stdo_lines.append(line)
- if self.console_mode is ConsoleUser.STDOUT:
- print(line, end='')
- yield line
-
- nonlocal stdo
- stdo = ''.join(stdo_lines)
-
parse_task = None
if self.runobj.needs_parsing:
- parse_task = self.runobj.parse(lines())
- elif stdout is not None:
- stdo_task = p.stdout.read(-1)
- if stderr is not None and stderr != asyncio.subprocess.STDOUT:
- stde_task = p.stderr.read(-1)
+ parse_task = self.runobj.parse(p.stdout_lines(self.console_mode))
+ stdo_task, stde_task = p.communicate()
returncode, result, additional_error = await p.wait(self.runobj.timeout)
if parse_task is not None:
@@ -1279,11 +1290,8 @@ class SingleTestRunner:
additional_error = join_lines(additional_error, error)
result = result or res
- if stdo_task is not None:
- stdo = decode(await stdo_task)
- if stde_task is not None:
- stde = decode(await stde_task)
-
+ stdo = await stdo_task if stdo_task else ''
+ stde = await stde_task if stde_task else ''
stde = join_lines(stde, additional_error)
self.runobj.complete(returncode, result, stdo, stde)