aboutsummaryrefslogtreecommitdiff
path: root/python/qemu/qmp/legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/qmp/legacy.py')
-rw-r--r--python/qemu/qmp/legacy.py317
1 files changed, 317 insertions, 0 deletions
diff --git a/python/qemu/qmp/legacy.py b/python/qemu/qmp/legacy.py
new file mode 100644
index 0000000..a8629b4
--- /dev/null
+++ b/python/qemu/qmp/legacy.py
@@ -0,0 +1,317 @@
+"""
+(Legacy) Sync QMP Wrapper
+
+This module provides the `QEMUMonitorProtocol` class, which is a
+synchronous wrapper around `QMPClient`.
+
+Its design closely resembles that of the original QEMUMonitorProtocol
+class, originally written by Luiz Capitulino. It is provided here for
+compatibility with scripts inside the QEMU source tree that expect the
+old interface.
+"""
+
+#
+# Copyright (C) 2009-2022 Red Hat Inc.
+#
+# Authors:
+# Luiz Capitulino <lcapitulino@redhat.com>
+# John Snow <jsnow@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+
+import asyncio
+from types import TracebackType
+from typing import (
+ Any,
+ Awaitable,
+ Dict,
+ List,
+ Optional,
+ Type,
+ TypeVar,
+ Union,
+)
+
+from .error import QMPError
+from .protocol import Runstate, SocketAddrT
+from .qmp_client import QMPClient
+
+
+#: QMPMessage is an entire QMP message of any kind.
+QMPMessage = Dict[str, Any]
+
+#: QMPReturnValue is the 'return' value of a command.
+QMPReturnValue = object
+
+#: QMPObject is any object in a QMP message.
+QMPObject = Dict[str, object]
+
+# QMPMessage can be outgoing commands or incoming events/returns.
+# QMPReturnValue is usually a dict/json object, but due to QAPI's
+# 'returns-whitelist', it can actually be anything.
+#
+# {'return': {}} is a QMPMessage,
+# {} is the QMPReturnValue.
+
+
+class QMPBadPortError(QMPError):
+ """
+ Unable to parse socket address: Port was non-numerical.
+ """
+
+
+class QEMUMonitorProtocol:
+ """
+ Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP)
+ and then allow to handle commands and events.
+
+ :param address: QEMU address, can be either a unix socket path (string)
+ or a tuple in the form ( address, port ) for a TCP
+ connection
+ :param server: Act as the socket server. (See 'accept')
+ :param nickname: Optional nickname used for logging.
+ """
+
+ def __init__(self, address: SocketAddrT,
+ server: bool = False,
+ nickname: Optional[str] = None):
+
+ self._qmp = QMPClient(nickname)
+ self._aloop = asyncio.get_event_loop()
+ self._address = address
+ self._timeout: Optional[float] = None
+
+ if server:
+ self._sync(self._qmp.start_server(self._address))
+
+ _T = TypeVar('_T')
+
+ def _sync(
+ self, future: Awaitable[_T], timeout: Optional[float] = None
+ ) -> _T:
+ return self._aloop.run_until_complete(
+ asyncio.wait_for(future, timeout=timeout)
+ )
+
+ def _get_greeting(self) -> Optional[QMPMessage]:
+ if self._qmp.greeting is not None:
+ # pylint: disable=protected-access
+ return self._qmp.greeting._asdict()
+ return None
+
+ def __enter__(self: _T) -> _T:
+ # Implement context manager enter function.
+ return self
+
+ def __exit__(self,
+ # pylint: disable=duplicate-code
+ # see https://github.com/PyCQA/pylint/issues/3619
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType]) -> None:
+ # Implement context manager exit function.
+ self.close()
+
+ @classmethod
+ def parse_address(cls, address: str) -> SocketAddrT:
+ """
+ Parse a string into a QMP address.
+
+ Figure out if the argument is in the port:host form.
+ If it's not, it's probably a file path.
+ """
+ components = address.split(':')
+ if len(components) == 2:
+ try:
+ port = int(components[1])
+ except ValueError:
+ msg = f"Bad port: '{components[1]}' in '{address}'."
+ raise QMPBadPortError(msg) from None
+ return (components[0], port)
+
+ # Treat as filepath.
+ return address
+
+ def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
+ """
+ Connect to the QMP Monitor and perform capabilities negotiation.
+
+ :return: QMP greeting dict, or None if negotiate is false
+ :raise ConnectError: on connection errors
+ """
+ self._qmp.await_greeting = negotiate
+ self._qmp.negotiate = negotiate
+
+ self._sync(
+ self._qmp.connect(self._address)
+ )
+ return self._get_greeting()
+
+ def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
+ """
+ Await connection from QMP Monitor and perform capabilities negotiation.
+
+ :param timeout:
+ timeout in seconds (nonnegative float number, or None).
+ If None, there is no timeout, and this may block forever.
+
+ :return: QMP greeting dict
+ :raise ConnectError: on connection errors
+ """
+ self._qmp.await_greeting = True
+ self._qmp.negotiate = True
+
+ self._sync(self._qmp.accept(), timeout)
+
+ ret = self._get_greeting()
+ assert ret is not None
+ return ret
+
+ def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
+ """
+ Send a QMP command to the QMP Monitor.
+
+ :param qmp_cmd: QMP command to be sent as a Python dict
+ :return: QMP response as a Python dict
+ """
+ return dict(
+ self._sync(
+ # pylint: disable=protected-access
+
+ # _raw() isn't a public API, because turning off
+ # automatic ID assignment is discouraged. For
+ # compatibility with iotests *only*, do it anyway.
+ self._qmp._raw(qmp_cmd, assign_id=False),
+ self._timeout
+ )
+ )
+
+ def cmd(self, name: str,
+ args: Optional[Dict[str, object]] = None,
+ cmd_id: Optional[object] = None) -> QMPMessage:
+ """
+ Build a QMP command and send it to the QMP Monitor.
+
+ :param name: command name (string)
+ :param args: command arguments (dict)
+ :param cmd_id: command id (dict, list, string or int)
+ """
+ qmp_cmd: QMPMessage = {'execute': name}
+ if args:
+ qmp_cmd['arguments'] = args
+ if cmd_id:
+ qmp_cmd['id'] = cmd_id
+ return self.cmd_obj(qmp_cmd)
+
+ def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
+ """
+ Build and send a QMP command to the monitor, report errors if any
+ """
+ return self._sync(
+ self._qmp.execute(cmd, kwds),
+ self._timeout
+ )
+
+ def pull_event(self,
+ wait: Union[bool, float] = False) -> Optional[QMPMessage]:
+ """
+ Pulls a single event.
+
+ :param wait:
+ If False or 0, do not wait. Return None if no events ready.
+ If True, wait forever until the next event.
+ Otherwise, wait for the specified number of seconds.
+
+ :raise asyncio.TimeoutError:
+ When a timeout is requested and the timeout period elapses.
+
+ :return: The first available QMP event, or None.
+ """
+ if not wait:
+ # wait is False/0: "do not wait, do not except."
+ if self._qmp.events.empty():
+ return None
+
+ # If wait is 'True', wait forever. If wait is False/0, the events
+ # queue must not be empty; but it still needs some real amount
+ # of time to complete.
+ timeout = None
+ if wait and isinstance(wait, float):
+ timeout = wait
+
+ return dict(
+ self._sync(
+ self._qmp.events.get(),
+ timeout
+ )
+ )
+
+ def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
+ """
+ Get a list of QMP events and clear all pending events.
+
+ :param wait:
+ If False or 0, do not wait. Return None if no events ready.
+ If True, wait until we have at least one event.
+ Otherwise, wait for up to the specified number of seconds for at
+ least one event.
+
+ :raise asyncio.TimeoutError:
+ When a timeout is requested and the timeout period elapses.
+
+ :return: A list of QMP events.
+ """
+ events = [dict(x) for x in self._qmp.events.clear()]
+ if events:
+ return events
+
+ event = self.pull_event(wait)
+ return [event] if event is not None else []
+
+ def clear_events(self) -> None:
+ """Clear current list of pending events."""
+ self._qmp.events.clear()
+
+ def close(self) -> None:
+ """Close the connection."""
+ self._sync(
+ self._qmp.disconnect()
+ )
+
+ def settimeout(self, timeout: Optional[float]) -> None:
+ """
+ Set the timeout for QMP RPC execution.
+
+ This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
+ The `accept`, `pull_event` and `get_event` methods have their
+ own configurable timeouts.
+
+ :param timeout:
+ timeout in seconds, or None.
+ None will wait indefinitely.
+ """
+ self._timeout = timeout
+
+ def send_fd_scm(self, fd: int) -> None:
+ """
+ Send a file descriptor to the remote via SCM_RIGHTS.
+ """
+ self._qmp.send_fd_scm(fd)
+
+ def __del__(self) -> None:
+ if self._qmp.runstate == Runstate.IDLE:
+ return
+
+ if not self._aloop.is_running():
+ self.close()
+ else:
+ # Garbage collection ran while the event loop was running.
+ # Nothing we can do about it now, but if we don't raise our
+ # own error, the user will be treated to a lot of traceback
+ # they might not understand.
+ raise QMPError(
+ "QEMUMonitorProtocol.close()"
+ " was not called before object was garbage collected"
+ )