aboutsummaryrefslogtreecommitdiff
path: root/python/qemu
diff options
context:
space:
mode:
authorJohn Snow <jsnow@redhat.com>2021-09-15 12:29:47 -0400
committerJohn Snow <jsnow@redhat.com>2021-09-27 12:10:29 -0400
commit577737be55a3e9e5d592ad31d4b7c71ebccf3dc8 (patch)
treef16dd9f27566bd70e431295a283c814bc09b539a /python/qemu
parent4cd17f375daaa73f0f6fd214e08e2290d86c24be (diff)
downloadqemu-577737be55a3e9e5d592ad31d4b7c71ebccf3dc8.zip
qemu-577737be55a3e9e5d592ad31d4b7c71ebccf3dc8.tar.gz
qemu-577737be55a3e9e5d592ad31d4b7c71ebccf3dc8.tar.bz2
python/aqmp: Add message routing to QMP protocol
Add the ability to handle and route messages in qmp_protocol.py. The interface for actually sending anything still isn't added until next commit. Signed-off-by: John Snow <jsnow@redhat.com> Message-id: 20210915162955.333025-20-jsnow@redhat.com Signed-off-by: John Snow <jsnow@redhat.com>
Diffstat (limited to 'python/qemu')
-rw-r--r--python/qemu/aqmp/qmp_client.py122
1 files changed, 120 insertions, 2 deletions
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
index 000ff59..fa0cc7c 100644
--- a/python/qemu/aqmp/qmp_client.py
+++ b/python/qemu/aqmp/qmp_client.py
@@ -7,15 +7,19 @@ used to either connect to a listening server, or used to listen and
accept an incoming connection from that server.
"""
+# The import workarounds here are fixed in the next commit.
+import asyncio # pylint: disable=unused-import # noqa
import logging
from typing import (
Dict,
List,
Mapping,
Optional,
+ Union,
+ cast,
)
-from .error import ProtocolError
+from .error import AQMPError, ProtocolError
from .events import Events
from .message import Message
from .models import Greeting
@@ -61,6 +65,53 @@ class NegotiationError(_WrappedProtocolError):
"""
+class ExecInterruptedError(AQMPError):
+ """
+ Exception raised when an RPC is interrupted.
+
+ This error is raised when an execute() statement could not be
+ completed. This can occur because the connection itself was
+ terminated before a reply was received.
+
+ The true cause of the interruption will be available via `disconnect()`.
+ """
+
+
+class _MsgProtocolError(ProtocolError):
+ """
+ Abstract error class for protocol errors that have a `Message` object.
+
+ This Exception class is used for protocol errors where the `Message`
+ was mechanically understood, but was found to be inappropriate or
+ malformed.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+ def __init__(self, error_message: str, msg: Message):
+ super().__init__(error_message)
+ #: The received `Message` that caused the error.
+ self.msg: Message = msg
+
+ def __str__(self) -> str:
+ return "\n".join([
+ super().__str__(),
+ f" Message was: {str(self.msg)}\n",
+ ])
+
+
+class ServerParseError(_MsgProtocolError):
+ """
+ The Server sent a `Message` indicating parsing failure.
+
+ i.e. A reply has arrived from the server, but it is missing the "ID"
+ field, indicating a parsing error.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+
+
class QMPClient(AsyncProtocol[Message], Events):
"""
Implements a QMP client connection.
@@ -106,6 +157,9 @@ class QMPClient(AsyncProtocol[Message], Events):
# Read buffer limit; large enough to accept query-qmp-schema
_limit = (256 * 1024)
+ # Type alias for pending execute() result items
+ _PendingT = Union[Message, ExecInterruptedError]
+
def __init__(self, name: Optional[str] = None) -> None:
super().__init__(name)
Events.__init__(self)
@@ -120,6 +174,12 @@ class QMPClient(AsyncProtocol[Message], Events):
# Cached Greeting, if one was awaited.
self._greeting: Optional[Greeting] = None
+ # Incoming RPC reply messages.
+ self._pending: Dict[
+ Union[str, None],
+ 'asyncio.Queue[QMPClient._PendingT]'
+ ] = {}
+
@upper_half
async def _establish_session(self) -> None:
"""
@@ -132,6 +192,9 @@ class QMPClient(AsyncProtocol[Message], Events):
:raise EOFError: When the server unexpectedly hangs up.
:raise OSError: For underlying stream errors.
"""
+ self._greeting = None
+ self._pending = {}
+
if self.await_greeting or self.negotiate:
self._greeting = await self._get_greeting()
@@ -204,9 +267,32 @@ class QMPClient(AsyncProtocol[Message], Events):
raise
@bottom_half
+ async def _bh_disconnect(self) -> None:
+ try:
+ await super()._bh_disconnect()
+ finally:
+ if self._pending:
+ self.logger.debug("Cancelling pending executions")
+ keys = self._pending.keys()
+ for key in keys:
+ self.logger.debug("Cancelling execution '%s'", key)
+ self._pending[key].put_nowait(
+ ExecInterruptedError("Disconnected")
+ )
+
+ self.logger.debug("QMP Disconnected.")
+
+ @upper_half
+ def _cleanup(self) -> None:
+ super()._cleanup()
+ assert not self._pending
+
+ @bottom_half
async def _on_message(self, msg: Message) -> None:
"""
Add an incoming message to the appropriate queue/handler.
+
+ :raise ServerParseError: When Message indicates server parse failure.
"""
# Incoming messages are not fully parsed/validated here;
# do only light peeking to know how to route the messages.
@@ -216,7 +302,39 @@ class QMPClient(AsyncProtocol[Message], Events):
return
# Below, we assume everything left is an execute/exec-oob response.
- # ... Which we'll implement in the next commit!
+
+ exec_id = cast(Optional[str], msg.get('id'))
+
+ if exec_id in self._pending:
+ await self._pending[exec_id].put(msg)
+ return
+
+ # We have a message we can't route back to a caller.
+
+ is_error = 'error' in msg
+ has_id = 'id' in msg
+
+ if is_error and not has_id:
+ # This is very likely a server parsing error.
+ # It doesn't inherently belong to any pending execution.
+ # Instead of performing clever recovery, just terminate.
+ # See "NOTE" in qmp-spec.txt, section 2.4.2
+ raise ServerParseError(
+ ("Server sent an error response without an ID, "
+ "but there are no ID-less executions pending. "
+ "Assuming this is a server parser failure."),
+ msg
+ )
+
+ # qmp-spec.txt, section 2.4:
+ # 'Clients should drop all the responses
+ # that have an unknown "id" field.'
+ self.logger.log(
+ logging.ERROR if is_error else logging.WARNING,
+ "Unknown ID '%s', message dropped.",
+ exec_id,
+ )
+ self.logger.debug("Unroutable message: %s", str(msg))
@upper_half
@bottom_half