aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2021-02-10 09:49:48 +0100
committerPaolo Bonzini <pbonzini@redhat.com>2021-02-10 12:01:55 +0100
commit6c40b134df49ae8dea373cd5c75a1348c15397e1 (patch)
tree78be5b114933c9fc5ef844a9d83c49b63fff4fa7
parentc6b135c1f99e4eb427970d94c2ddf8f1b04a3514 (diff)
downloadmeson-6c40b134df49ae8dea373cd5c75a1348c15397e1.zip
meson-6c40b134df49ae8dea373cd5c75a1348c15397e1.tar.gz
meson-6c40b134df49ae8dea373cd5c75a1348c15397e1.tar.bz2
mtest: cancel stdout/stderr tasks on timeout
Avoid that the tasks linger and SingleTestRunner.run() never terminates. In order to do this, we need read_decode() and read_decode_lines() to be cancellable, and to handle the CancelledError gracefully while returning the output they have collected so far. For read_decode(), this means always operating on a line-by-line basis, even if console_mode is not ConsoleUser.STDOUT. For read_decode_lines(), instead, we cannot return an iterator. Rather, read_decode_lines() returns the output directly (similar to read_decode) and communication with the parser is mediated by an asyncio.Queue. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
-rw-r--r--mesonbuild/mtest.py53
1 files changed, 35 insertions, 18 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py
index 1b7c774..2b5eb65 100644
--- a/mesonbuild/mtest.py
+++ b/mesonbuild/mtest.py
@@ -1078,21 +1078,22 @@ def decode(stream: T.Union[None, bytes]) -> str:
return stream.decode('iso-8859-1', errors='ignore')
async def read_decode(reader: asyncio.StreamReader, console_mode: ConsoleUser) -> str:
- if console_mode is not ConsoleUser.STDOUT:
- return decode(await reader.read(-1))
-
stdo_lines = []
- while not reader.at_eof():
- line = decode(await reader.readline())
- stdo_lines.append(line)
- print(line, end='', flush=True)
- return ''.join(stdo_lines)
+ try:
+ while not reader.at_eof():
+ line = decode(await reader.readline())
+ stdo_lines.append(line)
+ if console_mode is ConsoleUser.STDOUT:
+ print(line, end='', flush=True)
+ return ''.join(stdo_lines)
+ except asyncio.CancelledError:
+ return ''.join(stdo_lines)
# Extract lines out of the StreamReader. Print them
# along the way if requested, and at the end collect
# them all into a future.
-async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str]',
- console_mode: ConsoleUser) -> T.AsyncIterator[str]:
+async def read_decode_lines(reader: asyncio.StreamReader, q: 'asyncio.Queue[T.Optional[str]]',
+ console_mode: ConsoleUser) -> str:
stdo_lines = []
try:
while not reader.at_eof():
@@ -1100,11 +1101,12 @@ async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str
stdo_lines.append(line)
if console_mode is ConsoleUser.STDOUT:
print(line, end='', flush=True)
- yield line
- except Exception as e:
- f.set_exception(e)
+ await q.put(line)
+ return ''.join(stdo_lines)
+ except asyncio.CancelledError:
+ return ''.join(stdo_lines)
finally:
- f.set_result(''.join(stdo_lines))
+ await q.put(None)
def run_with_mono(fname: str) -> bool:
return fname.endswith('.exe') and not (is_windows() or is_cygwin())
@@ -1130,6 +1132,14 @@ async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, floa
except asyncio.TimeoutError:
pass
+async def queue_iter(q: 'asyncio.Queue[T.Optional[str]]') -> T.AsyncIterator[str]:
+ while True:
+ item = await q.get()
+ q.task_done()
+ if item is None:
+ break
+ yield item
+
async def complete(future: asyncio.Future) -> None:
"""Wait for completion of the given future, ignoring cancellation."""
try:
@@ -1153,13 +1163,15 @@ class TestSubprocess:
self._process = p
self.stdout = stdout
self.stderr = stderr
- self.stdo_task = None # type: T.Optional[T.Awaitable[str]]
- self.stde_task = None # type: T.Optional[T.Awaitable[str]]
+ self.stdo_task = None # type: T.Optional[asyncio.Future[str]]
+ self.stde_task = None # type: T.Optional[asyncio.Future[str]]
self.postwait_fn = postwait_fn # type: T.Callable[[], None]
def stdout_lines(self, console_mode: ConsoleUser) -> T.AsyncIterator[str]:
- self.stdo_task = asyncio.get_event_loop().create_future()
- return read_decode_lines(self._process.stdout, self.stdo_task, console_mode)
+ q = asyncio.Queue() # type: asyncio.Queue[T.Optional[str]]
+ decode_coro = read_decode_lines(self._process.stdout, q, console_mode)
+ self.stdo_task = asyncio.ensure_future(decode_coro)
+ return queue_iter(q)
def communicate(self, console_mode: ConsoleUser) -> T.Tuple[T.Optional[T.Awaitable[str]],
T.Optional[T.Awaitable[str]]]:
@@ -1213,6 +1225,11 @@ class TestSubprocess:
# for the event loop to pick that up.
await p.wait()
return None
+ finally:
+ if self.stdo_task:
+ self.stdo_task.cancel()
+ if self.stde_task:
+ self.stde_task.cancel()
async def wait(self, timeout: T.Optional[int]) -> T.Tuple[int, TestResult, T.Optional[str]]:
p = self._process