From 577737be55a3e9e5d592ad31d4b7c71ebccf3dc8 Mon Sep 17 00:00:00 2001 From: John Snow Date: Wed, 15 Sep 2021 12:29:47 -0400 Subject: python/aqmp: Add message routing to QMP protocol Add the ability to handle and route messages in qmp_protocol.py. The interface for actually sending anything still isn't added until next commit. Signed-off-by: John Snow Message-id: 20210915162955.333025-20-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/qmp_client.py | 122 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 2 deletions(-) (limited to 'python/qemu') diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py index 000ff59..fa0cc7c 100644 --- a/python/qemu/aqmp/qmp_client.py +++ b/python/qemu/aqmp/qmp_client.py @@ -7,15 +7,19 @@ used to either connect to a listening server, or used to listen and accept an incoming connection from that server. """ +# The import workarounds here are fixed in the next commit. +import asyncio # pylint: disable=unused-import # noqa import logging from typing import ( Dict, List, Mapping, Optional, + Union, + cast, ) -from .error import ProtocolError +from .error import AQMPError, ProtocolError from .events import Events from .message import Message from .models import Greeting @@ -61,6 +65,53 @@ class NegotiationError(_WrappedProtocolError): """ +class ExecInterruptedError(AQMPError): + """ + Exception raised when an RPC is interrupted. + + This error is raised when an execute() statement could not be + completed. This can occur because the connection itself was + terminated before a reply was received. + + The true cause of the interruption will be available via `disconnect()`. + """ + + +class _MsgProtocolError(ProtocolError): + """ + Abstract error class for protocol errors that have a `Message` object. + + This Exception class is used for protocol errors where the `Message` + was mechanically understood, but was found to be inappropriate or + malformed. + + :param error_message: Human-readable string describing the error. + :param msg: The QMP `Message` that caused the error. + """ + def __init__(self, error_message: str, msg: Message): + super().__init__(error_message) + #: The received `Message` that caused the error. + self.msg: Message = msg + + def __str__(self) -> str: + return "\n".join([ + super().__str__(), + f" Message was: {str(self.msg)}\n", + ]) + + +class ServerParseError(_MsgProtocolError): + """ + The Server sent a `Message` indicating parsing failure. + + i.e. A reply has arrived from the server, but it is missing the "ID" + field, indicating a parsing error. + + :param error_message: Human-readable string describing the error. + :param msg: The QMP `Message` that caused the error. + """ + + class QMPClient(AsyncProtocol[Message], Events): """ Implements a QMP client connection. @@ -106,6 +157,9 @@ class QMPClient(AsyncProtocol[Message], Events): # Read buffer limit; large enough to accept query-qmp-schema _limit = (256 * 1024) + # Type alias for pending execute() result items + _PendingT = Union[Message, ExecInterruptedError] + def __init__(self, name: Optional[str] = None) -> None: super().__init__(name) Events.__init__(self) @@ -120,6 +174,12 @@ class QMPClient(AsyncProtocol[Message], Events): # Cached Greeting, if one was awaited. self._greeting: Optional[Greeting] = None + # Incoming RPC reply messages. + self._pending: Dict[ + Union[str, None], + 'asyncio.Queue[QMPClient._PendingT]' + ] = {} + @upper_half async def _establish_session(self) -> None: """ @@ -132,6 +192,9 @@ class QMPClient(AsyncProtocol[Message], Events): :raise EOFError: When the server unexpectedly hangs up. :raise OSError: For underlying stream errors. """ + self._greeting = None + self._pending = {} + if self.await_greeting or self.negotiate: self._greeting = await self._get_greeting() @@ -204,9 +267,32 @@ class QMPClient(AsyncProtocol[Message], Events): raise @bottom_half + async def _bh_disconnect(self) -> None: + try: + await super()._bh_disconnect() + finally: + if self._pending: + self.logger.debug("Cancelling pending executions") + keys = self._pending.keys() + for key in keys: + self.logger.debug("Cancelling execution '%s'", key) + self._pending[key].put_nowait( + ExecInterruptedError("Disconnected") + ) + + self.logger.debug("QMP Disconnected.") + + @upper_half + def _cleanup(self) -> None: + super()._cleanup() + assert not self._pending + + @bottom_half async def _on_message(self, msg: Message) -> None: """ Add an incoming message to the appropriate queue/handler. + + :raise ServerParseError: When Message indicates server parse failure. """ # Incoming messages are not fully parsed/validated here; # do only light peeking to know how to route the messages. @@ -216,7 +302,39 @@ class QMPClient(AsyncProtocol[Message], Events): return # Below, we assume everything left is an execute/exec-oob response. - # ... Which we'll implement in the next commit! + + exec_id = cast(Optional[str], msg.get('id')) + + if exec_id in self._pending: + await self._pending[exec_id].put(msg) + return + + # We have a message we can't route back to a caller. + + is_error = 'error' in msg + has_id = 'id' in msg + + if is_error and not has_id: + # This is very likely a server parsing error. + # It doesn't inherently belong to any pending execution. + # Instead of performing clever recovery, just terminate. + # See "NOTE" in qmp-spec.txt, section 2.4.2 + raise ServerParseError( + ("Server sent an error response without an ID, " + "but there are no ID-less executions pending. " + "Assuming this is a server parser failure."), + msg + ) + + # qmp-spec.txt, section 2.4: + # 'Clients should drop all the responses + # that have an unknown "id" field.' + self.logger.log( + logging.ERROR if is_error else logging.WARNING, + "Unknown ID '%s', message dropped.", + exec_id, + ) + self.logger.debug("Unroutable message: %s", str(msg)) @upper_half @bottom_half -- cgit v1.1