aboutsummaryrefslogtreecommitdiff
path: root/python/qemu
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu')
-rw-r--r--python/qemu/machine/qtest.py2
-rw-r--r--python/qemu/qmp/__init__.py3
-rw-r--r--python/qemu/qmp/error.py7
-rw-r--r--python/qemu/qmp/events.py50
-rw-r--r--python/qemu/qmp/legacy.py46
-rw-r--r--python/qemu/qmp/message.py22
-rw-r--r--python/qemu/qmp/models.py8
-rw-r--r--python/qemu/qmp/protocol.py194
-rw-r--r--python/qemu/qmp/qmp_client.py155
-rw-r--r--python/qemu/qmp/qmp_shell.py159
-rw-r--r--python/qemu/qmp/qmp_tui.py30
-rw-r--r--python/qemu/qmp/util.py143
-rw-r--r--python/qemu/utils/__init__.py8
-rw-r--r--python/qemu/utils/accel.py9
-rw-r--r--python/qemu/utils/qom.py45
-rw-r--r--python/qemu/utils/qom_common.py55
16 files changed, 606 insertions, 330 deletions
diff --git a/python/qemu/machine/qtest.py b/python/qemu/machine/qtest.py
index 4f5ede8..781f674 100644
--- a/python/qemu/machine/qtest.py
+++ b/python/qemu/machine/qtest.py
@@ -177,6 +177,8 @@ class QEMUQtestMachine(QEMUMachine):
self._qtest_sock_pair[0].close()
self._qtest_sock_pair[1].close()
self._qtest_sock_pair = None
+ if self._qtest is not None:
+ self._qtest.close()
super()._post_shutdown()
def qtest(self, cmd: str) -> str:
diff --git a/python/qemu/qmp/__init__.py b/python/qemu/qmp/__init__.py
index 69190d0..058139d 100644
--- a/python/qemu/qmp/__init__.py
+++ b/python/qemu/qmp/__init__.py
@@ -39,7 +39,8 @@ from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
logging.getLogger('qemu.qmp').addHandler(logging.NullHandler())
-# The order of these fields impact the Sphinx documentation order.
+# IMPORTANT: When modifying this list, update the Sphinx overview docs.
+# Anything visible in the qemu.qmp namespace should be on the overview page.
__all__ = (
# Classes, most to least important
'QMPClient',
diff --git a/python/qemu/qmp/error.py b/python/qemu/qmp/error.py
index 24ba4d5..c87b078 100644
--- a/python/qemu/qmp/error.py
+++ b/python/qemu/qmp/error.py
@@ -44,7 +44,10 @@ class ProtocolError(QMPError):
:param error_message: Human-readable string describing the error.
"""
- def __init__(self, error_message: str):
- super().__init__(error_message)
+ def __init__(self, error_message: str, *args: object):
+ super().__init__(error_message, *args)
#: Human-readable error message, without any prefix.
self.error_message: str = error_message
+
+ def __str__(self) -> str:
+ return self.error_message
diff --git a/python/qemu/qmp/events.py b/python/qemu/qmp/events.py
index 6199776..cfb5f0a 100644
--- a/python/qemu/qmp/events.py
+++ b/python/qemu/qmp/events.py
@@ -12,7 +12,14 @@ EventListener Tutorial
----------------------
In all of the following examples, we assume that we have a `QMPClient`
-instantiated named ``qmp`` that is already connected.
+instantiated named ``qmp`` that is already connected. For example:
+
+.. code:: python
+
+ from qemu.qmp import QMPClient
+
+ qmp = QMPClient('example-vm')
+ await qmp.connect('127.0.0.1', 1234)
`listener()` context blocks with one name
@@ -87,7 +94,9 @@ This is analogous to the following code:
event = listener.get()
print(f"Event arrived: {event['event']}")
-This event stream will never end, so these blocks will never terminate.
+This event stream will never end, so these blocks will never
+terminate. Even if the QMP connection errors out prematurely, this
+listener will go silent without raising an error.
Using asyncio.Task to concurrently retrieve events
@@ -227,16 +236,20 @@ Clearing listeners
.. code:: python
await qmp.execute('stop')
- qmp.events.clear()
+ discarded = qmp.events.clear()
await qmp.execute('cont')
event = await qmp.events.get()
assert event['event'] == 'RESUME'
+ assert discarded[0]['event'] == 'STOP'
`EventListener` objects are FIFO queues. If events are not consumed,
they will remain in the queue until they are witnessed or discarded via
`clear()`. FIFO queues will be drained automatically upon leaving a
context block, or when calling `remove_listener()`.
+Any events removed from the queue in this fashion will be returned by
+the clear call.
+
Accessing listener history
~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -350,6 +363,12 @@ While `listener()` is only capable of creating a single listener,
break
+Note that in the above example, we explicitly wait on jobA to conclude
+first, and then wait for jobB to do the same. All we have guaranteed is
+that the code that waits for jobA will not accidentally consume the
+event intended for the jobB waiter.
+
+
Extending the `EventListener` class
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -407,13 +426,13 @@ Experimental Interfaces & Design Issues
These interfaces are not ones I am sure I will keep or otherwise modify
heavily.
-qmp.listener()’s type signature
+qmp.listen()’s type signature
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-`listener()` does not return anything, because it was assumed the caller
+`listen()` does not return anything, because it was assumed the caller
already had a handle to the listener. However, for
-``qmp.listener(EventListener())`` forms, the caller will not have saved
-a handle to the listener.
+``qmp.listen(EventListener())`` forms, the caller will not have saved a
+handle to the listener.
Because this function can accept *many* listeners, I found it hard to
accurately type in a way where it could be used in both “one” or “many”
@@ -497,6 +516,21 @@ class EventListener:
#: Optional, secondary event filter.
self.event_filter: Optional[EventFilter] = event_filter
+ def __repr__(self) -> str:
+ args: List[str] = []
+ if self.names:
+ args.append(f"names={self.names!r}")
+ if self.event_filter:
+ args.append(f"event_filter={self.event_filter!r}")
+
+ if self._queue.qsize():
+ state = f"<pending={self._queue.qsize()}>"
+ else:
+ state = ''
+
+ argstr = ", ".join(args)
+ return f"{type(self).__name__}{state}({argstr})"
+
@property
def history(self) -> Tuple[Message, ...]:
"""
@@ -618,7 +652,7 @@ class Events:
def __init__(self) -> None:
self._listeners: List[EventListener] = []
- #: Default, all-events `EventListener`.
+ #: Default, all-events `EventListener`. See `qmp.events` for more info.
self.events: EventListener = EventListener()
self.register_listener(self.events)
diff --git a/python/qemu/qmp/legacy.py b/python/qemu/qmp/legacy.py
index 22a2b56..060ed0e 100644
--- a/python/qemu/qmp/legacy.py
+++ b/python/qemu/qmp/legacy.py
@@ -38,6 +38,7 @@ from typing import (
from .error import QMPError
from .protocol import Runstate, SocketAddrT
from .qmp_client import QMPClient
+from .util import get_or_create_event_loop
#: QMPMessage is an entire QMP message of any kind.
@@ -86,10 +87,13 @@ class QEMUMonitorProtocol:
"server argument should be False when passing a socket")
self._qmp = QMPClient(nickname)
- self._aloop = asyncio.get_event_loop()
self._address = address
self._timeout: Optional[float] = None
+ # This is a sync shim intended for use in fully synchronous
+ # programs. Create and set an event loop if necessary.
+ self._aloop = get_or_create_event_loop()
+
if server:
assert not isinstance(self._address, socket.socket)
self._sync(self._qmp.start_server(self._address))
@@ -231,6 +235,9 @@ class QEMUMonitorProtocol:
:return: The first available QMP event, or None.
"""
+ # Kick the event loop to allow events to accumulate
+ self._sync(asyncio.sleep(0))
+
if not wait:
# wait is False/0: "do not wait, do not except."
if self._qmp.events.empty():
@@ -286,8 +293,8 @@ class QEMUMonitorProtocol:
"""
Set the timeout for QMP RPC execution.
- This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
- The `accept`, `pull_event` and `get_event` methods have their
+ This timeout affects the `cmd`, `cmd_obj`, and `cmd_raw` methods.
+ The `accept`, `pull_event` and `get_events` methods have their
own configurable timeouts.
:param timeout:
@@ -303,17 +310,30 @@ class QEMUMonitorProtocol:
self._qmp.send_fd_scm(fd)
def __del__(self) -> None:
- if self._qmp.runstate == Runstate.IDLE:
- return
+ if self._qmp.runstate != Runstate.IDLE:
+ self._qmp.logger.warning(
+ "QEMUMonitorProtocol object garbage collected without a prior "
+ "call to close()"
+ )
if not self._aloop.is_running():
- self.close()
- else:
- # Garbage collection ran while the event loop was running.
- # Nothing we can do about it now, but if we don't raise our
- # own error, the user will be treated to a lot of traceback
- # they might not understand.
+ if self._qmp.runstate != Runstate.IDLE:
+ # If the user neglected to close the QMP session and we
+ # are not currently running in an asyncio context, we
+ # have the opportunity to close the QMP session. If we
+ # do not do this, the error messages presented over
+ # dangling async resources may not make any sense to the
+ # user.
+ self.close()
+
+ if self._qmp.runstate != Runstate.IDLE:
+ # If QMP is still not quiesced, it means that the garbage
+ # collector ran from a context within the event loop and we
+ # are simply too late to take any corrective action. Raise
+ # our own error to give meaningful feedback to the user in
+ # order to prevent pages of asyncio stacktrace jargon.
raise QMPError(
- "QEMUMonitorProtocol.close()"
- " was not called before object was garbage collected"
+ "QEMUMonitorProtocol.close() was not called before object was "
+ "garbage collected, and could not be closed due to GC running "
+ "in the event loop"
)
diff --git a/python/qemu/qmp/message.py b/python/qemu/qmp/message.py
index f76ccc9..dabb8ec 100644
--- a/python/qemu/qmp/message.py
+++ b/python/qemu/qmp/message.py
@@ -28,7 +28,8 @@ class Message(MutableMapping[str, object]):
be instantiated from either another mapping (like a `dict`), or from
raw `bytes` that still need to be deserialized.
- Once instantiated, it may be treated like any other MutableMapping::
+ Once instantiated, it may be treated like any other
+ :py:obj:`~collections.abc.MutableMapping`::
>>> msg = Message(b'{"hello": "world"}')
>>> assert msg['hello'] == 'world'
@@ -50,12 +51,19 @@ class Message(MutableMapping[str, object]):
>>> dict(msg)
{'hello': 'world'}
+ Or pretty-printed::
+
+ >>> print(str(msg))
+ {
+ "hello": "world"
+ }
:param value: Initial value, if any.
:param eager:
When `True`, attempt to serialize or deserialize the initial value
immediately, so that conversion exceptions are raised during
the call to ``__init__()``.
+
"""
# pylint: disable=too-many-ancestors
@@ -178,15 +186,15 @@ class DeserializationError(ProtocolError):
:param raw: The raw `bytes` that prompted the failure.
"""
def __init__(self, error_message: str, raw: bytes):
- super().__init__(error_message)
+ super().__init__(error_message, raw)
#: The raw `bytes` that were not understood as JSON.
self.raw: bytes = raw
def __str__(self) -> str:
- return "\n".join([
+ return "\n".join((
super().__str__(),
f" raw bytes were: {str(self.raw)}",
- ])
+ ))
class UnexpectedTypeError(ProtocolError):
@@ -197,13 +205,13 @@ class UnexpectedTypeError(ProtocolError):
:param value: The deserialized JSON value that wasn't an object.
"""
def __init__(self, error_message: str, value: object):
- super().__init__(error_message)
+ super().__init__(error_message, value)
#: The JSON value that was expected to be an object.
self.value: object = value
def __str__(self) -> str:
strval = json.dumps(self.value, indent=2)
- return "\n".join([
+ return "\n".join((
super().__str__(),
f" json value was: {strval}",
- ])
+ ))
diff --git a/python/qemu/qmp/models.py b/python/qemu/qmp/models.py
index da52848..7e0d0ba 100644
--- a/python/qemu/qmp/models.py
+++ b/python/qemu/qmp/models.py
@@ -54,7 +54,7 @@ class Model:
class Greeting(Model):
"""
- Defined in qmp-spec.rst, section "Server Greeting".
+ Defined in `interop/qmp-spec`, "Server Greeting" section.
:param raw: The raw Greeting object.
:raise KeyError: If any required fields are absent.
@@ -82,7 +82,7 @@ class Greeting(Model):
class QMPGreeting(Model):
"""
- Defined in qmp-spec.rst, section "Server Greeting".
+ Defined in `interop/qmp-spec`, "Server Greeting" section.
:param raw: The raw QMPGreeting object.
:raise KeyError: If any required fields are absent.
@@ -104,7 +104,7 @@ class QMPGreeting(Model):
class ErrorResponse(Model):
"""
- Defined in qmp-spec.rst, section "Error".
+ Defined in `interop/qmp-spec`, "Error" section.
:param raw: The raw ErrorResponse object.
:raise KeyError: If any required fields are absent.
@@ -126,7 +126,7 @@ class ErrorResponse(Model):
class ErrorInfo(Model):
"""
- Defined in qmp-spec.rst, section "Error".
+ Defined in `interop/qmp-spec`, "Error" section.
:param raw: The raw ErrorInfo object.
:raise KeyError: If any required fields are absent.
diff --git a/python/qemu/qmp/protocol.py b/python/qemu/qmp/protocol.py
index a4ffdfa..219d092 100644
--- a/python/qemu/qmp/protocol.py
+++ b/python/qemu/qmp/protocol.py
@@ -15,13 +15,16 @@ class.
import asyncio
from asyncio import StreamReader, StreamWriter
+from contextlib import asynccontextmanager
from enum import Enum
from functools import wraps
+from inspect import iscoroutinefunction
import logging
import socket
from ssl import SSLContext
from typing import (
Any,
+ AsyncGenerator,
Awaitable,
Callable,
Generic,
@@ -36,13 +39,10 @@ from typing import (
from .error import QMPError
from .util import (
bottom_half,
- create_task,
exception_summary,
flush,
- is_closing,
pretty_traceback,
upper_half,
- wait_closed,
)
@@ -54,6 +54,9 @@ InternetAddrT = Tuple[str, int]
UnixAddrT = str
SocketAddrT = Union[UnixAddrT, InternetAddrT]
+# Maximum allowable size of read buffer, default
+_DEFAULT_READBUFLEN = 64 * 1024
+
class Runstate(Enum):
"""Protocol session runstate."""
@@ -76,11 +79,17 @@ class ConnectError(QMPError):
This Exception always wraps a "root cause" exception that can be
interrogated for additional information.
+ For example, when connecting to a non-existent socket::
+
+ await qmp.connect('not_found.sock')
+ # ConnectError: Failed to establish connection:
+ # [Errno 2] No such file or directory
+
:param error_message: Human-readable string describing the error.
:param exc: The root-cause exception.
"""
def __init__(self, error_message: str, exc: Exception):
- super().__init__(error_message)
+ super().__init__(error_message, exc)
#: Human-readable error string
self.error_message: str = error_message
#: Wrapped root cause exception
@@ -99,8 +108,8 @@ class StateError(QMPError):
An API command (connect, execute, etc) was issued at an inappropriate time.
This error is raised when a command like
- :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
- time.
+ :py:meth:`~AsyncProtocol.connect()` is called when the client is
+ already connected.
:param error_message: Human-readable string describing the state violation.
:param state: The actual `Runstate` seen at the time of the violation.
@@ -108,11 +117,14 @@ class StateError(QMPError):
"""
def __init__(self, error_message: str,
state: Runstate, required: Runstate):
- super().__init__(error_message)
+ super().__init__(error_message, state, required)
self.error_message = error_message
self.state = state
self.required = required
+ def __str__(self) -> str:
+ return self.error_message
+
F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name
@@ -125,6 +137,25 @@ def require(required_state: Runstate) -> Callable[[F], F]:
:param required_state: The `Runstate` required to invoke this method.
:raise StateError: When the required `Runstate` is not met.
"""
+ def _check(proto: 'AsyncProtocol[Any]') -> None:
+ name = type(proto).__name__
+ if proto.runstate == required_state:
+ return
+
+ if proto.runstate == Runstate.CONNECTING:
+ emsg = f"{name} is currently connecting."
+ elif proto.runstate == Runstate.DISCONNECTING:
+ emsg = (f"{name} is disconnecting."
+ " Call disconnect() to return to IDLE state.")
+ elif proto.runstate == Runstate.RUNNING:
+ emsg = f"{name} is already connected and running."
+ elif proto.runstate == Runstate.IDLE:
+ emsg = f"{name} is disconnected and idle."
+ else:
+ assert False
+
+ raise StateError(emsg, proto.runstate, required_state)
+
def _decorator(func: F) -> F:
# _decorator is the decorator that is built by calling the
# require() decorator factory; e.g.:
@@ -135,29 +166,20 @@ def require(required_state: Runstate) -> Callable[[F], F]:
@wraps(func)
def _wrapper(proto: 'AsyncProtocol[Any]',
*args: Any, **kwargs: Any) -> Any:
- # _wrapper is the function that gets executed prior to the
- # decorated method.
-
- name = type(proto).__name__
-
- if proto.runstate != required_state:
- if proto.runstate == Runstate.CONNECTING:
- emsg = f"{name} is currently connecting."
- elif proto.runstate == Runstate.DISCONNECTING:
- emsg = (f"{name} is disconnecting."
- " Call disconnect() to return to IDLE state.")
- elif proto.runstate == Runstate.RUNNING:
- emsg = f"{name} is already connected and running."
- elif proto.runstate == Runstate.IDLE:
- emsg = f"{name} is disconnected and idle."
- else:
- assert False
- raise StateError(emsg, proto.runstate, required_state)
- # No StateError, so call the wrapped method.
+ _check(proto)
return func(proto, *args, **kwargs)
- # Return the decorated method;
- # Transforming Func to Decorated[Func].
+ @wraps(func)
+ async def _async_wrapper(proto: 'AsyncProtocol[Any]',
+ *args: Any, **kwargs: Any) -> Any:
+ _check(proto)
+ return await func(proto, *args, **kwargs)
+
+ # Return the decorated method; F => Decorated[F]
+ # Use an async version when applicable, which
+ # preserves async signature generation in sphinx.
+ if iscoroutinefunction(func):
+ return cast(F, _async_wrapper)
return cast(F, _wrapper)
# Return the decorator instance from the decorator factory. Phew!
@@ -200,24 +222,26 @@ class AsyncProtocol(Generic[T]):
will log to 'qemu.qmp.protocol', but each individual connection
can be given its own logger by giving it a name; messages will
then log to 'qemu.qmp.protocol.${name}'.
+ :param readbuflen:
+ The maximum read buffer length of the underlying StreamReader
+ instance.
"""
# pylint: disable=too-many-instance-attributes
#: Logger object for debugging messages from this connection.
logger = logging.getLogger(__name__)
- # Maximum allowable size of read buffer
- _limit = 64 * 1024
-
# -------------------------
# Section: Public interface
# -------------------------
- def __init__(self, name: Optional[str] = None) -> None:
- #: The nickname for this connection, if any.
- self.name: Optional[str] = name
- if self.name is not None:
- self.logger = self.logger.getChild(self.name)
+ def __init__(
+ self, name: Optional[str] = None,
+ readbuflen: int = _DEFAULT_READBUFLEN
+ ) -> None:
+ self._name: Optional[str]
+ self.name = name
+ self.readbuflen = readbuflen
# stream I/O
self._reader: Optional[StreamReader] = None
@@ -254,6 +278,24 @@ class AsyncProtocol(Generic[T]):
tokens.append(f"runstate={self.runstate.name}")
return f"<{cls_name} {' '.join(tokens)}>"
+ @property
+ def name(self) -> Optional[str]:
+ """
+ The nickname for this connection, if any.
+
+ This name is used for differentiating instances in debug output.
+ """
+ return self._name
+
+ @name.setter
+ def name(self, name: Optional[str]) -> None:
+ logger = logging.getLogger(__name__)
+ if name:
+ self.logger = logger.getChild(name)
+ else:
+ self.logger = logger
+ self._name = name
+
@property # @upper_half
def runstate(self) -> Runstate:
"""The current `Runstate` of the connection."""
@@ -262,7 +304,7 @@ class AsyncProtocol(Generic[T]):
@upper_half
async def runstate_changed(self) -> Runstate:
"""
- Wait for the `runstate` to change, then return that runstate.
+ Wait for the `runstate` to change, then return that `Runstate`.
"""
await self._runstate_event.wait()
return self.runstate
@@ -276,9 +318,9 @@ class AsyncProtocol(Generic[T]):
"""
Accept a connection and begin processing message queues.
- If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
- This method is precisely equivalent to calling `start_server()`
- followed by `accept()`.
+ If this call fails, `runstate` is guaranteed to be set back to
+ `IDLE`. This method is precisely equivalent to calling
+ `start_server()` followed by :py:meth:`~AsyncProtocol.accept()`.
:param address:
Address to listen on; UNIX socket path or TCP address/port.
@@ -291,7 +333,8 @@ class AsyncProtocol(Generic[T]):
This exception will wrap a more concrete one. In most cases,
the wrapped exception will be `OSError` or `EOFError`. If a
protocol-level failure occurs while establishing a new
- session, the wrapped error may also be an `QMPError`.
+ session, the wrapped error may also be a `QMPError`.
+
"""
await self.start_server(address, ssl)
await self.accept()
@@ -307,8 +350,8 @@ class AsyncProtocol(Generic[T]):
This method starts listening for an incoming connection, but
does not block waiting for a peer. This call will return
immediately after binding and listening on a socket. A later
- call to `accept()` must be made in order to finalize the
- incoming connection.
+ call to :py:meth:`~AsyncProtocol.accept()` must be made in order
+ to finalize the incoming connection.
:param address:
Address to listen on; UNIX socket path or TCP address/port.
@@ -321,9 +364,8 @@ class AsyncProtocol(Generic[T]):
This exception will wrap a more concrete one. In most cases,
the wrapped exception will be `OSError`.
"""
- await self._session_guard(
- self._do_start_server(address, ssl),
- 'Failed to establish connection')
+ async with self._session_guard('Failed to establish connection'):
+ await self._do_start_server(address, ssl)
assert self.runstate == Runstate.CONNECTING
@upper_half
@@ -332,10 +374,12 @@ class AsyncProtocol(Generic[T]):
"""
Accept an incoming connection and begin processing message queues.
- If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+ Used after a previous call to `start_server()` to accept an
+ incoming connection. If this call fails, `runstate` is
+ guaranteed to be set back to `IDLE`.
:raise StateError: When the `Runstate` is not `CONNECTING`.
- :raise QMPError: When `start_server()` was not called yet.
+ :raise QMPError: When `start_server()` was not called first.
:raise ConnectError:
When a connection or session cannot be established.
@@ -346,12 +390,10 @@ class AsyncProtocol(Generic[T]):
"""
if self._accepted is None:
raise QMPError("Cannot call accept() before start_server().")
- await self._session_guard(
- self._do_accept(),
- 'Failed to establish connection')
- await self._session_guard(
- self._establish_session(),
- 'Failed to establish session')
+ async with self._session_guard('Failed to establish connection'):
+ await self._do_accept()
+ async with self._session_guard('Failed to establish session'):
+ await self._establish_session()
assert self.runstate == Runstate.RUNNING
@upper_half
@@ -376,12 +418,10 @@ class AsyncProtocol(Generic[T]):
protocol-level failure occurs while establishing a new
session, the wrapped error may also be an `QMPError`.
"""
- await self._session_guard(
- self._do_connect(address, ssl),
- 'Failed to establish connection')
- await self._session_guard(
- self._establish_session(),
- 'Failed to establish session')
+ async with self._session_guard('Failed to establish connection'):
+ await self._do_connect(address, ssl)
+ async with self._session_guard('Failed to establish session'):
+ await self._establish_session()
assert self.runstate == Runstate.RUNNING
@upper_half
@@ -392,7 +432,11 @@ class AsyncProtocol(Generic[T]):
If there was an exception that caused the reader/writers to
terminate prematurely, it will be raised here.
- :raise Exception: When the reader or writer terminate unexpectedly.
+ :raise Exception:
+ When the reader or writer terminate unexpectedly. You can
+ expect to see `EOFError` if the server hangs up, or
+ `OSError` for connection-related issues. If there was a QMP
+ protocol-level problem, `ProtocolError` will be seen.
"""
self.logger.debug("disconnect() called.")
self._schedule_disconnect()
@@ -402,7 +446,8 @@ class AsyncProtocol(Generic[T]):
# Section: Session machinery
# --------------------------
- async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None:
+ @asynccontextmanager
+ async def _session_guard(self, emsg: str) -> AsyncGenerator[None, None]:
"""
Async guard function used to roll back to `IDLE` on any error.
@@ -419,10 +464,9 @@ class AsyncProtocol(Generic[T]):
:raise ConnectError:
When any other error is encountered in the guarded block.
"""
- # Note: After Python 3.6 support is removed, this should be an
- # @asynccontextmanager instead of accepting a callback.
try:
- await coro
+ # Caller's code runs here.
+ yield
except BaseException as err:
self.logger.error("%s: %s", emsg, exception_summary(err))
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
@@ -561,7 +605,7 @@ class AsyncProtocol(Generic[T]):
port=address[1],
ssl=ssl,
backlog=1,
- limit=self._limit,
+ limit=self.readbuflen,
)
else:
coro = asyncio.start_unix_server(
@@ -569,7 +613,7 @@ class AsyncProtocol(Generic[T]):
path=address,
ssl=ssl,
backlog=1,
- limit=self._limit,
+ limit=self.readbuflen,
)
# Allow runstate watchers to witness 'CONNECTING' state; some
@@ -624,7 +668,7 @@ class AsyncProtocol(Generic[T]):
"fd=%d, family=%r, type=%r",
address.fileno(), address.family, address.type)
connect = asyncio.open_connection(
- limit=self._limit,
+ limit=self.readbuflen,
ssl=ssl,
sock=address,
)
@@ -634,14 +678,14 @@ class AsyncProtocol(Generic[T]):
address[0],
address[1],
ssl=ssl,
- limit=self._limit,
+ limit=self.readbuflen,
)
else:
self.logger.debug("Connecting to file://%s ...", address)
connect = asyncio.open_unix_connection(
path=address,
ssl=ssl,
- limit=self._limit,
+ limit=self.readbuflen,
)
self._reader, self._writer = await connect
@@ -663,8 +707,8 @@ class AsyncProtocol(Generic[T]):
reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
- self._reader_task = create_task(reader_coro)
- self._writer_task = create_task(writer_coro)
+ self._reader_task = asyncio.create_task(reader_coro)
+ self._writer_task = asyncio.create_task(writer_coro)
self._bh_tasks = asyncio.gather(
self._reader_task,
@@ -689,7 +733,7 @@ class AsyncProtocol(Generic[T]):
if not self._dc_task:
self._set_state(Runstate.DISCONNECTING)
self.logger.debug("Scheduling disconnect.")
- self._dc_task = create_task(self._bh_disconnect())
+ self._dc_task = asyncio.create_task(self._bh_disconnect())
@upper_half
async def _wait_disconnect(self) -> None:
@@ -825,13 +869,13 @@ class AsyncProtocol(Generic[T]):
if not self._writer:
return
- if not is_closing(self._writer):
+ if not self._writer.is_closing():
self.logger.debug("Closing StreamWriter.")
self._writer.close()
self.logger.debug("Waiting for StreamWriter to close ...")
try:
- await wait_closed(self._writer)
+ await self._writer.wait_closed()
except Exception: # pylint: disable=broad-except
# It's hard to tell if the Stream is already closed or
# not. Even if one of the tasks has failed, it may have
diff --git a/python/qemu/qmp/qmp_client.py b/python/qemu/qmp/qmp_client.py
index 2a817f9..8beccfe 100644
--- a/python/qemu/qmp/qmp_client.py
+++ b/python/qemu/qmp/qmp_client.py
@@ -41,7 +41,7 @@ class _WrappedProtocolError(ProtocolError):
:param exc: The root-cause exception.
"""
def __init__(self, error_message: str, exc: Exception):
- super().__init__(error_message)
+ super().__init__(error_message, exc)
self.exc = exc
def __str__(self) -> str:
@@ -70,21 +70,38 @@ class ExecuteError(QMPError):
"""
Exception raised by `QMPClient.execute()` on RPC failure.
+ This exception is raised when the server received, interpreted, and
+ replied to a command successfully; but the command itself returned a
+ failure status.
+
+ For example::
+
+ await qmp.execute('block-dirty-bitmap-add',
+ {'node': 'foo', 'name': 'my_bitmap'})
+ # qemu.qmp.qmp_client.ExecuteError:
+ # Cannot find device='foo' nor node-name='foo'
+
:param error_response: The RPC error response object.
:param sent: The sent RPC message that caused the failure.
:param received: The raw RPC error reply received.
"""
def __init__(self, error_response: ErrorResponse,
sent: Message, received: Message):
- super().__init__(error_response.error.desc)
+ super().__init__(error_response, sent, received)
#: The sent `Message` that caused the failure
self.sent: Message = sent
#: The received `Message` that indicated failure
self.received: Message = received
#: The parsed error response
self.error: ErrorResponse = error_response
- #: The QMP error class
- self.error_class: str = error_response.error.class_
+
+ @property
+ def error_class(self) -> str:
+ """The QMP error class"""
+ return self.error.error.class_
+
+ def __str__(self) -> str:
+ return self.error.error.desc
class ExecInterruptedError(QMPError):
@@ -93,9 +110,22 @@ class ExecInterruptedError(QMPError):
This error is raised when an `execute()` statement could not be
completed. This can occur because the connection itself was
- terminated before a reply was received.
+ terminated before a reply was received. The true cause of the
+ interruption will be available via `disconnect()`.
- The true cause of the interruption will be available via `disconnect()`.
+ The QMP protocol does not make it possible to know if a command
+ succeeded or failed after such an event; the client will need to
+ query the server to determine the state of the server on a
+ case-by-case basis.
+
+ For example, ECONNRESET might look like this::
+
+ try:
+ await qmp.execute('query-block')
+ # ExecInterruptedError: Disconnected
+ except ExecInterruptedError:
+ await qmp.disconnect()
+ # ConnectionResetError: [Errno 104] Connection reset by peer
"""
@@ -110,8 +140,8 @@ class _MsgProtocolError(ProtocolError):
:param error_message: Human-readable string describing the error.
:param msg: The QMP `Message` that caused the error.
"""
- def __init__(self, error_message: str, msg: Message):
- super().__init__(error_message)
+ def __init__(self, error_message: str, msg: Message, *args: object):
+ super().__init__(error_message, msg, *args)
#: The received `Message` that caused the error.
self.msg: Message = msg
@@ -150,30 +180,44 @@ class BadReplyError(_MsgProtocolError):
:param sent: The message that was sent that prompted the error.
"""
def __init__(self, error_message: str, msg: Message, sent: Message):
- super().__init__(error_message, msg)
+ super().__init__(error_message, msg, sent)
#: The sent `Message` that caused the failure
self.sent = sent
class QMPClient(AsyncProtocol[Message], Events):
- """
- Implements a QMP client connection.
+ """Implements a QMP client connection.
+
+ `QMPClient` can be used to either connect or listen to a QMP server,
+ but always acts as the QMP client.
- QMP can be used to establish a connection as either the transport
- client or server, though this class always acts as the QMP client.
+ :param name:
+ Optional nickname for the connection, used to differentiate
+ instances when logging.
- :param name: Optional nickname for the connection, used for logging.
+ :param readbuflen:
+ The maximum buffer length for reads and writes to and from the QMP
+ server, in bytes. Default is 10MB. If `QMPClient` is used to
+ connect to a guest agent to transfer files via ``guest-file-read``/
+ ``guest-file-write``, increasing this value may be required.
Basic script-style usage looks like this::
- qmp = QMPClient('my_virtual_machine_name')
- await qmp.connect(('127.0.0.1', 1234))
- ...
- res = await qmp.execute('block-query')
- ...
- await qmp.disconnect()
+ import asyncio
+ from qemu.qmp import QMPClient
+
+ async def main():
+ qmp = QMPClient('my_virtual_machine_name')
+ await qmp.connect(('127.0.0.1', 1234))
+ ...
+ res = await qmp.execute('query-block')
+ ...
+ await qmp.disconnect()
- Basic async client-style usage looks like this::
+ asyncio.run(main())
+
+ A more advanced example that starts to take advantage of asyncio
+ might look like this::
class Client:
def __init__(self, name: str):
@@ -193,25 +237,32 @@ class QMPClient(AsyncProtocol[Message], Events):
await self.disconnect()
See `qmp.events` for more detail on event handling patterns.
+
"""
#: Logger object used for debugging messages.
logger = logging.getLogger(__name__)
- # Read buffer limit; 10MB like libvirt default
- _limit = 10 * 1024 * 1024
+ # Read buffer default limit; 10MB like libvirt default
+ _readbuflen = 10 * 1024 * 1024
# Type alias for pending execute() result items
_PendingT = Union[Message, ExecInterruptedError]
- def __init__(self, name: Optional[str] = None) -> None:
- super().__init__(name)
+ def __init__(
+ self,
+ name: Optional[str] = None,
+ readbuflen: int = _readbuflen
+ ) -> None:
+ super().__init__(name, readbuflen)
Events.__init__(self)
#: Whether or not to await a greeting after establishing a connection.
+ #: Defaults to True; QGA servers expect this to be False.
self.await_greeting: bool = True
- #: Whether or not to perform capabilities negotiation upon connection.
- #: Implies `await_greeting`.
+ #: Whether or not to perform capabilities negotiation upon
+ #: connection. Implies `await_greeting`. Defaults to True; QGA
+ #: servers expect this to be False.
self.negotiate: bool = True
# Cached Greeting, if one was awaited.
@@ -228,7 +279,13 @@ class QMPClient(AsyncProtocol[Message], Events):
@property
def greeting(self) -> Optional[Greeting]:
- """The `Greeting` from the QMP server, if any."""
+ """
+ The `Greeting` from the QMP server, if any.
+
+ Defaults to ``None``, and will be set after a greeting is
+ received during the connection process. It is reset at the start
+ of each connection attempt.
+ """
return self._greeting
@upper_half
@@ -369,7 +426,7 @@ class QMPClient(AsyncProtocol[Message], Events):
# This is very likely a server parsing error.
# It doesn't inherently belong to any pending execution.
# Instead of performing clever recovery, just terminate.
- # See "NOTE" in qmp-spec.rst, section "Error".
+ # See "NOTE" in interop/qmp-spec, "Error" section.
raise ServerParseError(
("Server sent an error response without an ID, "
"but there are no ID-less executions pending. "
@@ -377,7 +434,7 @@ class QMPClient(AsyncProtocol[Message], Events):
msg
)
- # qmp-spec.rst, section "Commands Responses":
+ # qmp-spec.rst, "Commands Responses" section:
# 'Clients should drop all the responses
# that have an unknown "id" field.'
self.logger.log(
@@ -550,7 +607,7 @@ class QMPClient(AsyncProtocol[Message], Events):
@require(Runstate.RUNNING)
async def execute_msg(self, msg: Message) -> object:
"""
- Execute a QMP command and return its value.
+ Execute a QMP command on the server and return its value.
:param msg: The QMP `Message` to execute.
@@ -562,7 +619,9 @@ class QMPClient(AsyncProtocol[Message], Events):
If the QMP `Message` does not have either the 'execute' or
'exec-oob' fields set.
:raise ExecuteError: When the server returns an error response.
- :raise ExecInterruptedError: if the connection was terminated early.
+ :raise ExecInterruptedError:
+ If the connection was disrupted before
+ receiving a reply from the server.
"""
if not ('execute' in msg or 'exec-oob' in msg):
raise ValueError("Requires 'execute' or 'exec-oob' message")
@@ -601,9 +660,11 @@ class QMPClient(AsyncProtocol[Message], Events):
:param cmd: QMP command name.
:param arguments: Arguments (if any). Must be JSON-serializable.
- :param oob: If `True`, execute "out of band".
+ :param oob:
+ If `True`, execute "out of band". See `interop/qmp-spec`
+ section "Out-of-band execution".
- :return: An executable QMP `Message`.
+ :return: A QMP `Message` that can be executed with `execute_msg()`.
"""
msg = Message({'exec-oob' if oob else 'execute': cmd})
if arguments is not None:
@@ -615,18 +676,22 @@ class QMPClient(AsyncProtocol[Message], Events):
arguments: Optional[Mapping[str, object]] = None,
oob: bool = False) -> object:
"""
- Execute a QMP command and return its value.
+ Execute a QMP command on the server and return its value.
:param cmd: QMP command name.
:param arguments: Arguments (if any). Must be JSON-serializable.
- :param oob: If `True`, execute "out of band".
+ :param oob:
+ If `True`, execute "out of band". See `interop/qmp-spec`
+ section "Out-of-band execution".
:return:
The command execution return value from the server. The type of
object returned depends on the command that was issued,
though most in QEMU return a `dict`.
:raise ExecuteError: When the server returns an error response.
- :raise ExecInterruptedError: if the connection was terminated early.
+ :raise ExecInterruptedError:
+ If the connection was disrupted before
+ receiving a reply from the server.
"""
msg = self.make_execute_msg(cmd, arguments, oob=oob)
return await self.execute_msg(msg)
@@ -634,8 +699,20 @@ class QMPClient(AsyncProtocol[Message], Events):
@upper_half
@require(Runstate.RUNNING)
def send_fd_scm(self, fd: int) -> None:
- """
- Send a file descriptor to the remote via SCM_RIGHTS.
+ """Send a file descriptor to the remote via SCM_RIGHTS.
+
+ This method does not close the file descriptor.
+
+ :param fd: The file descriptor to send to QEMU.
+
+ This is an advanced feature of QEMU where file descriptors can
+ be passed from client to server. This is usually used as a
+ security measure to isolate the QEMU process from being able to
+ open its own files. See the QMP commands ``getfd`` and
+ ``add-fd`` for more information.
+
+ See `socket.socket.sendmsg` for more information on the Python
+ implementation for sending file descriptors over a UNIX socket.
"""
assert self._writer is not None
sock = self._writer.transport.get_extra_info('socket')
diff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py
index 98e684e..f818800 100644
--- a/python/qemu/qmp/qmp_shell.py
+++ b/python/qemu/qmp/qmp_shell.py
@@ -10,9 +10,15 @@
#
"""
-Low-level QEMU shell on top of QMP.
+qmp-shell - An interactive QEMU shell powered by QMP
-usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
+qmp-shell offers a simple shell with a convenient shorthand syntax as an
+alternative to typing JSON by hand. This syntax is not standardized and
+is not meant to be used as a scriptable interface. This shorthand *may*
+change incompatibly in the future, and it is strongly encouraged to use
+the QMP library to provide API-stable scripting when needed.
+
+usage: qmp-shell [-h] [-H] [-v] [-p] [-l LOGFILE] [-N] qmp_server
positional arguments:
qmp_server < UNIX socket path | TCP address:port >
@@ -20,41 +26,52 @@ positional arguments:
optional arguments:
-h, --help show this help message and exit
-H, --hmp Use HMP interface
- -N, --skip-negotiation
- Skip negotiate (for qemu-ga)
-v, --verbose Verbose (echo commands sent and received)
-p, --pretty Pretty-print JSON
+ -l LOGFILE, --logfile LOGFILE
+ Save log of all QMP messages to PATH
+ -N, --skip-negotiation
+ Skip negotiate (for qemu-ga)
+
+Usage
+-----
+First, start QEMU with::
-Start QEMU with:
+ > qemu [...] -qmp unix:./qmp-sock,server=on[,wait=off]
-# qemu [...] -qmp unix:./qmp-sock,server
+Then run the shell, passing the address of the socket::
-Run the shell:
+ > qmp-shell ./qmp-sock
-$ qmp-shell ./qmp-sock
+Syntax
+------
-Commands have the following format:
+Commands have the following format::
- < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+ < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
-For example:
+For example, to add a network device::
-(QEMU) device_add driver=e1000 id=net1
-{'return': {}}
-(QEMU)
+ (QEMU) device_add driver=e1000 id=net1
+ {'return': {}}
+ (QEMU)
-key=value pairs also support Python or JSON object literal subset notations,
-without spaces. Dictionaries/objects {} are supported as are arrays [].
+key=value pairs support either Python or JSON object literal notations,
+**without spaces**. Dictionaries/objects ``{}`` are supported, as are
+arrays ``[]``::
- example-command arg-name1={'key':'value','obj'={'prop':"value"}}
+ example-command arg-name1={'key':'value','obj'={'prop':"value"}}
-Both JSON and Python formatting should work, including both styles of
-string literal quotes. Both paradigms of literal values should work,
-including null/true/false for JSON and None/True/False for Python.
+Either JSON or Python formatting for compound values works, including
+both styles of string literal quotes (either single or double
+quotes). Both paradigms of literal values are accepted, including
+``null/true/false`` for JSON and ``None/True/False`` for Python.
+Transactions
+------------
-Transactions have the following multi-line format:
+Transactions have the following multi-line format::
transaction(
action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
@@ -62,11 +79,11 @@ Transactions have the following multi-line format:
action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
)
-One line transactions are also supported:
+One line transactions are also supported::
transaction( action-name1 ... )
-For example:
+For example::
(QEMU) transaction(
TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
@@ -75,9 +92,35 @@ For example:
{"return": {}}
(QEMU)
-Use the -v and -p options to activate the verbose and pretty-print options,
-which will echo back the properly formatted JSON-compliant QMP that is being
-sent to QEMU, which is useful for debugging and documentation generation.
+Commands
+--------
+
+Autocomplete of command names using <tab> is supported. Pressing <tab>
+at a blank CLI prompt will show you a list of all available commands
+that the connected QEMU instance supports.
+
+For documentation on QMP commands and their arguments, please see
+`qmp ref`.
+
+Events
+------
+
+qmp-shell will display events received from the server, but this version
+does not do so asynchronously. To check for new events from the server,
+press <enter> on a blank line::
+
+ (QEMU) ⏎
+ {'timestamp': {'seconds': 1660071944, 'microseconds': 184667},
+ 'event': 'STOP'}
+
+Display options
+---------------
+
+Use the -v and -p options to activate the verbose and pretty-print
+options, which will echo back the properly formatted JSON-compliant QMP
+that is being sent to QEMU. This is useful for debugging to see the
+wire-level QMP data being exchanged, and generating output for use in
+writing documentation for QEMU.
"""
import argparse
@@ -514,21 +557,29 @@ def die(msg: str) -> NoReturn:
sys.exit(1)
-def main() -> None:
- """
- qmp-shell entry point: parse command line arguments and start the REPL.
- """
+def common_parser() -> argparse.ArgumentParser:
+ """Build common parsing options used by qmp-shell and qmp-shell-wrap."""
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--hmp', action='store_true',
help='Use HMP interface')
- parser.add_argument('-N', '--skip-negotiation', action='store_true',
- help='Skip negotiate (for qemu-ga)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose (echo commands sent and received)')
parser.add_argument('-p', '--pretty', action='store_true',
help='Pretty-print JSON')
parser.add_argument('-l', '--logfile',
help='Save log of all QMP messages to PATH')
+ # NOTE: When changing arguments, update both this module docstring
+ # and the manpage synopsis in docs/man/qmp_shell.rst.
+ return parser
+
+
+def main() -> None:
+ """
+ qmp-shell entry point: parse command line arguments and start the REPL.
+ """
+ parser = common_parser()
+ parser.add_argument('-N', '--skip-negotiation', action='store_true',
+ help='Skip negotiate (for qemu-ga)')
default_server = os.environ.get('QMP_SOCKET')
parser.add_argument('qmp_server', action='store',
@@ -561,19 +612,37 @@ def main() -> None:
def main_wrap() -> None:
"""
- qmp-shell-wrap entry point: parse command line arguments and
- start the REPL.
- """
- parser = argparse.ArgumentParser()
- parser.add_argument('-H', '--hmp', action='store_true',
- help='Use HMP interface')
- parser.add_argument('-v', '--verbose', action='store_true',
- help='Verbose (echo commands sent and received)')
- parser.add_argument('-p', '--pretty', action='store_true',
- help='Pretty-print JSON')
- parser.add_argument('-l', '--logfile',
- help='Save log of all QMP messages to PATH')
+ qmp-shell-wrap - QEMU + qmp-shell launcher utility
+
+ Launch QEMU and connect to it with `qmp-shell` in a single command.
+ CLI arguments will be forwarded to qemu, with additional arguments
+ added to allow `qmp-shell` to then connect to the recently launched
+ QEMU instance.
+
+ usage: qmp-shell-wrap [-h] [-H] [-v] [-p] [-l LOGFILE] ...
+ positional arguments:
+ command QEMU command line to invoke
+
+ optional arguments:
+ -h, --help show this help message and exit
+ -H, --hmp Use HMP interface
+ -v, --verbose Verbose (echo commands sent and received)
+ -p, --pretty Pretty-print JSON
+ -l LOGFILE, --logfile LOGFILE
+ Save log of all QMP messages to PATH
+
+ Usage
+ -----
+
+ Prepend "qmp-shell-wrap" to your usual QEMU command line::
+
+ > qmp-shell-wrap qemu-system-x86_64 -M q35 -m 4096 -display none
+ Welcome to the QMP low-level shell!
+ Connected
+ (QEMU)
+ """
+ parser = common_parser()
parser.add_argument('command', nargs=argparse.REMAINDER,
help='QEMU command line to invoke')
@@ -610,6 +679,8 @@ def main_wrap() -> None:
for _ in qemu.repl():
pass
+ except FileNotFoundError:
+ sys.stderr.write(f"ERROR: QEMU executable '{cmd[0]}' not found.\n")
finally:
os.unlink(sockpath)
diff --git a/python/qemu/qmp/qmp_tui.py b/python/qemu/qmp/qmp_tui.py
index 2d9ebbd..d946c20 100644
--- a/python/qemu/qmp/qmp_tui.py
+++ b/python/qemu/qmp/qmp_tui.py
@@ -21,6 +21,7 @@ import json
import logging
from logging import Handler, LogRecord
import signal
+import sys
from typing import (
List,
Optional,
@@ -30,17 +31,27 @@ from typing import (
cast,
)
-from pygments import lexers
-from pygments import token as Token
-import urwid
-import urwid_readline
+
+try:
+ from pygments import lexers
+ from pygments import token as Token
+ import urwid
+ import urwid_readline
+except ModuleNotFoundError as exc:
+ print(
+ f"Module '{exc.name}' not found.",
+ "You need the optional 'tui' group: pip install qemu.qmp[tui]",
+ sep='\n',
+ file=sys.stderr,
+ )
+ sys.exit(1)
from .error import ProtocolError
from .legacy import QEMUMonitorProtocol, QMPBadPortError
from .message import DeserializationError, Message, UnexpectedTypeError
from .protocol import ConnectError, Runstate
from .qmp_client import ExecInterruptedError, QMPClient
-from .util import create_task, pretty_traceback
+from .util import get_or_create_event_loop, pretty_traceback
# The name of the signal that is used to update the history list
@@ -225,7 +236,7 @@ class App(QMPClient):
"""
try:
msg = Message(bytes(raw_msg, encoding='utf-8'))
- create_task(self._send_to_server(msg))
+ asyncio.create_task(self._send_to_server(msg))
except (DeserializationError, UnexpectedTypeError) as err:
raw_msg = format_json(raw_msg)
logging.info('Invalid message: %s', err.error_message)
@@ -246,7 +257,7 @@ class App(QMPClient):
Initiates killing of app. A bridge between asynchronous and synchronous
code.
"""
- create_task(self._kill_app())
+ asyncio.create_task(self._kill_app())
async def _kill_app(self) -> None:
"""
@@ -376,8 +387,7 @@ class App(QMPClient):
"""
screen = urwid.raw_display.Screen()
screen.set_terminal_properties(256)
-
- self.aloop = asyncio.get_event_loop()
+ self.aloop = get_or_create_event_loop()
self.aloop.set_debug(debug)
# Gracefully handle SIGTERM and SIGINT signals
@@ -393,7 +403,7 @@ class App(QMPClient):
handle_mouse=True,
event_loop=event_loop)
- create_task(self.manage_connection(), self.aloop)
+ self.aloop.create_task(self.manage_connection())
try:
main_loop.run()
except Exception as err:
diff --git a/python/qemu/qmp/util.py b/python/qemu/qmp/util.py
index ca6225e..a8229e5 100644
--- a/python/qemu/qmp/util.py
+++ b/python/qemu/qmp/util.py
@@ -1,25 +1,16 @@
"""
Miscellaneous Utilities
-This module provides asyncio utilities and compatibility wrappers for
-Python 3.6 to provide some features that otherwise become available in
-Python 3.7+.
-
-Various logging and debugging utilities are also provided, such as
-`exception_summary()` and `pretty_traceback()`, used primarily for
-adding information into the logging stream.
+This module provides asyncio and various logging and debugging
+utilities, such as `exception_summary()` and `pretty_traceback()`, used
+primarily for adding information into the logging stream.
"""
import asyncio
import sys
import traceback
-from typing import (
- Any,
- Coroutine,
- Optional,
- TypeVar,
- cast,
-)
+from typing import TypeVar, cast
+import warnings
T = TypeVar('T')
@@ -30,9 +21,35 @@ T = TypeVar('T')
# --------------------------
+def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
+ """
+ Return this thread's current event loop, or create a new one.
+
+ This function behaves similarly to asyncio.get_event_loop() in
+ Python<=3.13, where if there is no event loop currently associated
+ with the current context, it will create and register one. It should
+ generally not be used in any asyncio-native applications.
+ """
+ try:
+ with warnings.catch_warnings():
+ # Python <= 3.13 will trigger deprecation warnings if no
+ # event loop is set, but will create and set a new loop.
+ warnings.simplefilter("ignore")
+ loop = asyncio.get_event_loop()
+ except RuntimeError:
+ # Python 3.14+: No event loop set for this thread,
+ # create and set one.
+ loop = asyncio.new_event_loop()
+ # Set this loop as the current thread's loop, to be returned
+ # by calls to get_event_loop() in the future.
+ asyncio.set_event_loop(loop)
+
+ return loop
+
+
async def flush(writer: asyncio.StreamWriter) -> None:
"""
- Utility function to ensure a StreamWriter is *fully* drained.
+ Utility function to ensure an `asyncio.StreamWriter` is *fully* drained.
`asyncio.StreamWriter.drain` only promises we will return to below
the "high-water mark". This function ensures we flush the entire
@@ -72,102 +89,13 @@ def bottom_half(func: T) -> T:
These methods do not, in general, have the ability to directly
report information to a caller’s context and will usually be
- collected as a Task result instead.
+ collected as an `asyncio.Task` result instead.
They must not call upper-half functions directly.
"""
return func
-# -------------------------------
-# Section: Compatibility Wrappers
-# -------------------------------
-
-
-def create_task(coro: Coroutine[Any, Any, T],
- loop: Optional[asyncio.AbstractEventLoop] = None
- ) -> 'asyncio.Future[T]':
- """
- Python 3.6-compatible `asyncio.create_task` wrapper.
-
- :param coro: The coroutine to execute in a task.
- :param loop: Optionally, the loop to create the task in.
-
- :return: An `asyncio.Future` object.
- """
- if sys.version_info >= (3, 7):
- if loop is not None:
- return loop.create_task(coro)
- return asyncio.create_task(coro) # pylint: disable=no-member
-
- # Python 3.6:
- return asyncio.ensure_future(coro, loop=loop)
-
-
-def is_closing(writer: asyncio.StreamWriter) -> bool:
- """
- Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
-
- :param writer: The `asyncio.StreamWriter` object.
- :return: `True` if the writer is closing, or closed.
- """
- if sys.version_info >= (3, 7):
- return writer.is_closing()
-
- # Python 3.6:
- transport = writer.transport
- assert isinstance(transport, asyncio.WriteTransport)
- return transport.is_closing()
-
-
-async def wait_closed(writer: asyncio.StreamWriter) -> None:
- """
- Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
-
- :param writer: The `asyncio.StreamWriter` to wait on.
- """
- if sys.version_info >= (3, 7):
- await writer.wait_closed()
- return
-
- # Python 3.6
- transport = writer.transport
- assert isinstance(transport, asyncio.WriteTransport)
-
- while not transport.is_closing():
- await asyncio.sleep(0)
-
- # This is an ugly workaround, but it's the best I can come up with.
- sock = transport.get_extra_info('socket')
-
- if sock is None:
- # Our transport doesn't have a socket? ...
- # Nothing we can reasonably do.
- return
-
- while sock.fileno() != -1:
- await asyncio.sleep(0)
-
-
-def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
- """
- Python 3.6-compatible `asyncio.run` wrapper.
-
- :param coro: A coroutine to execute now.
- :return: The return value from the coroutine.
- """
- if sys.version_info >= (3, 7):
- return asyncio.run(coro, debug=debug)
-
- # Python 3.6
- loop = asyncio.get_event_loop()
- loop.set_debug(debug)
- ret = loop.run_until_complete(coro)
- loop.close()
-
- return ret
-
-
# ----------------------------
# Section: Logging & Debugging
# ----------------------------
@@ -177,8 +105,11 @@ def exception_summary(exc: BaseException) -> str:
"""
Return a summary string of an arbitrary exception.
- It will be of the form "ExceptionType: Error Message", if the error
+ It will be of the form "ExceptionType: Error Message" if the error
string is non-empty, and just "ExceptionType" otherwise.
+
+ This code is based on CPython's implementation of
+ `traceback.TracebackException.format_exception_only`.
"""
name = type(exc).__qualname__
smod = type(exc).__module__
diff --git a/python/qemu/utils/__init__.py b/python/qemu/utils/__init__.py
index 017cfdc..be5daa8 100644
--- a/python/qemu/utils/__init__.py
+++ b/python/qemu/utils/__init__.py
@@ -23,13 +23,19 @@ import textwrap
from typing import Optional
# pylint: disable=import-error
-from .accel import kvm_available, list_accel, tcg_available
+from .accel import (
+ hvf_available,
+ kvm_available,
+ list_accel,
+ tcg_available,
+)
__all__ = (
'VerboseProcessError',
'add_visual_margin',
'get_info_usernet_hostfwd_port',
+ 'hvf_available',
'kvm_available',
'list_accel',
'tcg_available',
diff --git a/python/qemu/utils/accel.py b/python/qemu/utils/accel.py
index 386ff64..f915b64 100644
--- a/python/qemu/utils/accel.py
+++ b/python/qemu/utils/accel.py
@@ -82,3 +82,12 @@ def tcg_available(qemu_bin: str) -> bool:
@param qemu_bin (str): path to the QEMU binary
"""
return 'tcg' in list_accel(qemu_bin)
+
+
+def hvf_available(qemu_bin: str) -> bool:
+ """
+ Check if HVF is available.
+
+ @param qemu_bin (str): path to the QEMU binary
+ """
+ return 'hvf' in list_accel(qemu_bin)
diff --git a/python/qemu/utils/qom.py b/python/qemu/utils/qom.py
index 426a0f2..05e5f14 100644
--- a/python/qemu/utils/qom.py
+++ b/python/qemu/utils/qom.py
@@ -31,8 +31,7 @@ QOM commands:
##
import argparse
-
-from qemu.qmp import ExecuteError
+from typing import List
from .qom_common import QOMCommand
@@ -224,28 +223,34 @@ class QOMTree(QOMCommand):
super().__init__(args)
self.path = args.path
- def _list_node(self, path: str) -> None:
- print(path)
- items = self.qom_list(path)
- for item in items:
- if item.child:
- continue
- try:
- rsp = self.qmp.cmd('qom-get', path=path,
- property=item.name)
- print(f" {item.name}: {rsp} ({item.type})")
- except ExecuteError as err:
- print(f" {item.name}: <EXCEPTION: {err!s}> ({item.type})")
- print('')
- for item in items:
- if not item.child:
- continue
+ def _list_nodes(self, paths: List[str]) -> None:
+ all_paths_props = self.qom_list_get(paths)
+ i = 0
+
+ for props in all_paths_props:
+ path = paths[i]
+ i = i + 1
+ print(path)
if path == '/':
path = ''
- self._list_node(f"{path}/{item.name}")
+ newpaths = []
+
+ for item in props.properties:
+ if item.child:
+ newpaths += [f"{path}/{item.name}"]
+ else:
+ value = item.value
+ if value is None:
+ value = "<EXCEPTION: property could not be read>"
+ print(f" {item.name}: {value} ({item.type})")
+
+ print('')
+
+ if newpaths:
+ self._list_nodes(newpaths)
def run(self) -> int:
- self._list_node(self.path)
+ self._list_nodes([self.path])
return 0
diff --git a/python/qemu/utils/qom_common.py b/python/qemu/utils/qom_common.py
index dd2c8b1..ab21a4d 100644
--- a/python/qemu/utils/qom_common.py
+++ b/python/qemu/utils/qom_common.py
@@ -65,6 +65,52 @@ class ObjectPropertyInfo:
return self.type.startswith('link<')
+class ObjectPropertyValue:
+ """
+ Represents a property return from e.g. qom-tree-get
+ """
+ def __init__(self, name: str, type_: str, value: object):
+ self.name = name
+ self.type = type_
+ self.value = value
+
+ @classmethod
+ def make(cls, value: Dict[str, Any]) -> 'ObjectPropertyValue':
+ """
+ Build an ObjectPropertyValue from a Dict with an unknown shape.
+ """
+ assert value.keys() >= {'name', 'type'}
+ assert value.keys() <= {'name', 'type', 'value'}
+ return cls(value['name'], value['type'], value.get('value'))
+
+ @property
+ def child(self) -> bool:
+ """Is this property a child property?"""
+ return self.type.startswith('child<')
+
+
+class ObjectPropertiesValues:
+ """
+ Represents the return type from e.g. qom-list-get
+ """
+ # pylint: disable=too-few-public-methods
+
+ def __init__(self, properties: List[ObjectPropertyValue]) -> None:
+ self.properties = properties
+
+ @classmethod
+ def make(cls, value: Dict[str, Any]) -> 'ObjectPropertiesValues':
+ """
+ Build an ObjectPropertiesValues from a Dict with an unknown shape.
+ """
+ assert value.keys() == {'properties'}
+ props = [ObjectPropertyValue(item['name'],
+ item['type'],
+ item.get('value'))
+ for item in value['properties']]
+ return cls(props)
+
+
CommandT = TypeVar('CommandT', bound='QOMCommand')
@@ -145,6 +191,15 @@ class QOMCommand:
assert isinstance(rsp, list)
return [ObjectPropertyInfo.make(x) for x in rsp]
+ def qom_list_get(self, paths: List[str]) -> List[ObjectPropertiesValues]:
+ """
+ :return: a strongly typed list from the 'qom-list-get' command.
+ """
+ rsp = self.qmp.cmd('qom-list-get', paths=paths)
+ # qom-list-get returns List[ObjectPropertiesValues]
+ assert isinstance(rsp, list)
+ return [ObjectPropertiesValues.make(x) for x in rsp]
+
@classmethod
def command_runner(
cls: Type[CommandT],