From 37094b6dd59f56978b918e79cadf17c6fd5d36e2 Mon Sep 17 00:00:00 2001 From: John Snow Date: Wed, 30 Mar 2022 13:28:10 -0400 Subject: python: rename qemu.aqmp to qemu.qmp Now that we are fully switched over to the new QMP library, move it back over the old namespace. This is being done primarily so that we may upload this package simply as "qemu.qmp" without introducing confusion over whether or not "aqmp" is a new protocol or not. The trade-off is increased confusion inside the QEMU developer tree. Sorry! Note: the 'private' member "_aqmp" in legacy.py also changes to "_qmp"; not out of necessity, but just to remove any traces of the "aqmp" name. Signed-off-by: John Snow Reviewed-by: Beraldo Leal Acked-by: Hanna Reitz Reviewed-by: Vladimir Sementsov-Ogievskiy Message-id: 20220330172812.3427355-8-jsnow@redhat.com Signed-off-by: John Snow --- python/PACKAGE.rst | 4 +- python/README.rst | 4 +- python/qemu/aqmp/__init__.py | 59 -- python/qemu/aqmp/aqmp_tui.py | 652 ---------------------- python/qemu/aqmp/error.py | 50 -- python/qemu/aqmp/events.py | 717 ------------------------ python/qemu/aqmp/legacy.py | 317 ----------- python/qemu/aqmp/message.py | 209 ------- python/qemu/aqmp/models.py | 146 ----- python/qemu/aqmp/protocol.py | 1048 ----------------------------------- python/qemu/aqmp/py.typed | 0 python/qemu/aqmp/qmp_client.py | 655 ---------------------- python/qemu/aqmp/qmp_shell.py | 610 -------------------- python/qemu/aqmp/util.py | 217 -------- python/qemu/machine/machine.py | 4 +- python/qemu/machine/qtest.py | 2 +- python/qemu/qmp/__init__.py | 59 ++ python/qemu/qmp/aqmp_tui.py | 652 ++++++++++++++++++++++ python/qemu/qmp/error.py | 50 ++ python/qemu/qmp/events.py | 717 ++++++++++++++++++++++++ python/qemu/qmp/legacy.py | 317 +++++++++++ python/qemu/qmp/message.py | 209 +++++++ python/qemu/qmp/models.py | 146 +++++ python/qemu/qmp/protocol.py | 1048 +++++++++++++++++++++++++++++++++++ python/qemu/qmp/py.typed | 0 python/qemu/qmp/qmp_client.py | 655 ++++++++++++++++++++++ python/qemu/qmp/qmp_shell.py | 610 ++++++++++++++++++++ python/qemu/qmp/util.py | 217 ++++++++ python/qemu/utils/qemu_ga_client.py | 4 +- python/qemu/utils/qom.py | 2 +- python/qemu/utils/qom_common.py | 4 +- python/qemu/utils/qom_fuse.py | 2 +- python/setup.cfg | 10 +- python/tests/protocol.py | 14 +- 34 files changed, 4705 insertions(+), 4705 deletions(-) delete mode 100644 python/qemu/aqmp/__init__.py delete mode 100644 python/qemu/aqmp/aqmp_tui.py delete mode 100644 python/qemu/aqmp/error.py delete mode 100644 python/qemu/aqmp/events.py delete mode 100644 python/qemu/aqmp/legacy.py delete mode 100644 python/qemu/aqmp/message.py delete mode 100644 python/qemu/aqmp/models.py delete mode 100644 python/qemu/aqmp/protocol.py delete mode 100644 python/qemu/aqmp/py.typed delete mode 100644 python/qemu/aqmp/qmp_client.py delete mode 100644 python/qemu/aqmp/qmp_shell.py delete mode 100644 python/qemu/aqmp/util.py create mode 100644 python/qemu/qmp/__init__.py create mode 100644 python/qemu/qmp/aqmp_tui.py create mode 100644 python/qemu/qmp/error.py create mode 100644 python/qemu/qmp/events.py create mode 100644 python/qemu/qmp/legacy.py create mode 100644 python/qemu/qmp/message.py create mode 100644 python/qemu/qmp/models.py create mode 100644 python/qemu/qmp/protocol.py create mode 100644 python/qemu/qmp/py.typed create mode 100644 python/qemu/qmp/qmp_client.py create mode 100644 python/qemu/qmp/qmp_shell.py create mode 100644 python/qemu/qmp/util.py (limited to 'python') diff --git a/python/PACKAGE.rst b/python/PACKAGE.rst index ddfa9ba..b0b86cc 100644 --- a/python/PACKAGE.rst +++ b/python/PACKAGE.rst @@ -8,11 +8,11 @@ to change at any time. Usage ----- -The ``qemu.aqmp`` subpackage provides a library for communicating with +The ``qemu.qmp`` subpackage provides a library for communicating with QMP servers. The ``qemu.machine`` subpackage offers rudimentary facilities for launching and managing QEMU processes. Refer to each package's documentation -(``>>> help(qemu.aqmp)``, ``>>> help(qemu.machine)``) +(``>>> help(qemu.qmp)``, ``>>> help(qemu.machine)``) for more information. Contributing diff --git a/python/README.rst b/python/README.rst index eb52133..9c1fcea 100644 --- a/python/README.rst +++ b/python/README.rst @@ -3,7 +3,7 @@ QEMU Python Tooling This directory houses Python tooling used by the QEMU project to build, configure, and test QEMU. It is organized by namespace (``qemu``), and -then by package (e.g. ``qemu/machine``, ``qemu/aqmp``, etc). +then by package (e.g. ``qemu/machine``, ``qemu/qmp``, etc). ``setup.py`` is used by ``pip`` to install this tooling to the current environment. ``setup.cfg`` provides the packaging configuration used by @@ -59,7 +59,7 @@ Package installation also normally provides executable console scripts, so that tools like ``qmp-shell`` are always available via $PATH. To invoke them without installation, you can invoke e.g.: -``> PYTHONPATH=~/src/qemu/python python3 -m qemu.aqmp.qmp_shell`` +``> PYTHONPATH=~/src/qemu/python python3 -m qemu.qmp.qmp_shell`` The mappings between console script name and python module path can be found in ``setup.cfg``. diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py deleted file mode 100644 index 2b69b26..0000000 --- a/python/qemu/aqmp/__init__.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -QEMU Monitor Protocol (QMP) development library & tooling. - -This package provides a fairly low-level class for communicating -asynchronously with QMP protocol servers, as implemented by QEMU, the -QEMU Guest Agent, and the QEMU Storage Daemon. - -`QMPClient` provides the main functionality of this package. All errors -raised by this library derive from `QMPError`, see `aqmp.error` for -additional detail. See `aqmp.events` for an in-depth tutorial on -managing QMP events. -""" - -# Copyright (C) 2020-2022 John Snow for Red Hat, Inc. -# -# Authors: -# John Snow -# -# Based on earlier work by Luiz Capitulino . -# -# This work is licensed under the terms of the GNU LGPL, version 2 or -# later. See the COPYING file in the top-level directory. - -import logging - -from .error import QMPError -from .events import EventListener -from .message import Message -from .protocol import ( - ConnectError, - Runstate, - SocketAddrT, - StateError, -) -from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient - - -# Suppress logging unless an application engages it. -logging.getLogger('qemu.aqmp').addHandler(logging.NullHandler()) - - -# The order of these fields impact the Sphinx documentation order. -__all__ = ( - # Classes, most to least important - 'QMPClient', - 'Message', - 'EventListener', - 'Runstate', - - # Exceptions, most generic to most explicit - 'QMPError', - 'StateError', - 'ConnectError', - 'ExecuteError', - 'ExecInterruptedError', - - # Type aliases - 'SocketAddrT', -) diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py deleted file mode 100644 index 59d3036..0000000 --- a/python/qemu/aqmp/aqmp_tui.py +++ /dev/null @@ -1,652 +0,0 @@ -# Copyright (c) 2021 -# -# Authors: -# Niteesh Babu G S -# -# This work is licensed under the terms of the GNU LGPL, version 2 or -# later. See the COPYING file in the top-level directory. -""" -AQMP TUI - -AQMP TUI is an asynchronous interface built on top the of the AQMP library. -It is the successor of QMP-shell and is bought-in as a replacement for it. - -Example Usage: aqmp-tui -Full Usage: aqmp-tui --help -""" - -import argparse -import asyncio -import json -import logging -from logging import Handler, LogRecord -import signal -from typing import ( - List, - Optional, - Tuple, - Type, - Union, - cast, -) - -from pygments import lexers -from pygments import token as Token -import urwid -import urwid_readline - -from .error import ProtocolError -from .legacy import QEMUMonitorProtocol, QMPBadPortError -from .message import DeserializationError, Message, UnexpectedTypeError -from .protocol import ConnectError, Runstate -from .qmp_client import ExecInterruptedError, QMPClient -from .util import create_task, pretty_traceback - - -# The name of the signal that is used to update the history list -UPDATE_MSG: str = 'UPDATE_MSG' - - -palette = [ - (Token.Punctuation, '', '', '', 'h15,bold', 'g7'), - (Token.Text, '', '', '', '', 'g7'), - (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'), - (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'), - (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'), - (Token.Keyword.Constant, '', '', '', '#6af', 'g7'), - ('DEBUG', '', '', '', '#ddf', 'g7'), - ('INFO', '', '', '', 'g100', 'g7'), - ('WARNING', '', '', '', '#ff6', 'g7'), - ('ERROR', '', '', '', '#a00', 'g7'), - ('CRITICAL', '', '', '', '#a00', 'g7'), - ('background', '', 'black', '', '', 'g7'), -] - - -def format_json(msg: str) -> str: - """ - Formats valid/invalid multi-line JSON message into a single-line message. - - Formatting is first tried using the standard json module. If that fails - due to an decoding error then a simple string manipulation is done to - achieve a single line JSON string. - - Converting into single line is more asthetically pleasing when looking - along with error messages. - - Eg: - Input: - [ 1, - true, - 3 ] - The above input is not a valid QMP message and produces the following error - "QMP message is not a JSON object." - When displaying this in TUI in multiline mode we get - - [ 1, - true, - 3 ]: QMP message is not a JSON object. - - whereas in singleline mode we get the following - - [1, true, 3]: QMP message is not a JSON object. - - The single line mode is more asthetically pleasing. - - :param msg: - The message to formatted into single line. - - :return: Formatted singleline message. - """ - try: - msg = json.loads(msg) - return str(json.dumps(msg)) - except json.decoder.JSONDecodeError: - msg = msg.replace('\n', '') - words = msg.split(' ') - words = list(filter(None, words)) - return ' '.join(words) - - -def has_handler_type(logger: logging.Logger, - handler_type: Type[Handler]) -> bool: - """ - The Logger class has no interface to check if a certain type of handler is - installed or not. So we provide an interface to do so. - - :param logger: - Logger object - :param handler_type: - The type of the handler to be checked. - - :return: returns True if handler of type `handler_type`. - """ - for handler in logger.handlers: - if isinstance(handler, handler_type): - return True - return False - - -class App(QMPClient): - """ - Implements the AQMP TUI. - - Initializes the widgets and starts the urwid event loop. - - :param address: - Address of the server to connect to. - :param num_retries: - The number of times to retry before stopping to reconnect. - :param retry_delay: - The delay(sec) before each retry - """ - def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int, - retry_delay: Optional[int]) -> None: - urwid.register_signal(type(self), UPDATE_MSG) - self.window = Window(self) - self.address = address - self.aloop: Optional[asyncio.AbstractEventLoop] = None - self.num_retries = num_retries - self.retry_delay = retry_delay if retry_delay else 2 - self.retry: bool = False - self.exiting: bool = False - super().__init__() - - def add_to_history(self, msg: str, level: Optional[str] = None) -> None: - """ - Appends the msg to the history list. - - :param msg: - The raw message to be appended in string type. - """ - urwid.emit_signal(self, UPDATE_MSG, msg, level) - - def _cb_outbound(self, msg: Message) -> Message: - """ - Callback: outbound message hook. - - Appends the outgoing messages to the history box. - - :param msg: raw outbound message. - :return: final outbound message. - """ - str_msg = str(msg) - - if not has_handler_type(logging.getLogger(), TUILogHandler): - logging.debug('Request: %s', str_msg) - self.add_to_history('<-- ' + str_msg) - return msg - - def _cb_inbound(self, msg: Message) -> Message: - """ - Callback: outbound message hook. - - Appends the incoming messages to the history box. - - :param msg: raw inbound message. - :return: final inbound message. - """ - str_msg = str(msg) - - if not has_handler_type(logging.getLogger(), TUILogHandler): - logging.debug('Request: %s', str_msg) - self.add_to_history('--> ' + str_msg) - return msg - - async def _send_to_server(self, msg: Message) -> None: - """ - This coroutine sends the message to the server. - The message has to be pre-validated. - - :param msg: - Pre-validated message to be to sent to the server. - - :raise Exception: When an unhandled exception is caught. - """ - try: - await self._raw(msg, assign_id='id' not in msg) - except ExecInterruptedError as err: - logging.info('Error server disconnected before reply %s', str(err)) - self.add_to_history('Server disconnected before reply', 'ERROR') - except Exception as err: - logging.error('Exception from _send_to_server: %s', str(err)) - raise err - - def cb_send_to_server(self, raw_msg: str) -> None: - """ - Validates and sends the message to the server. - The raw string message is first converted into a Message object - and is then sent to the server. - - :param raw_msg: - The raw string message to be sent to the server. - - :raise Exception: When an unhandled exception is caught. - """ - try: - msg = Message(bytes(raw_msg, encoding='utf-8')) - create_task(self._send_to_server(msg)) - except (DeserializationError, UnexpectedTypeError) as err: - raw_msg = format_json(raw_msg) - logging.info('Invalid message: %s', err.error_message) - self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR') - - def unhandled_input(self, key: str) -> None: - """ - Handle's keys which haven't been handled by the child widgets. - - :param key: - Unhandled key - """ - if key == 'esc': - self.kill_app() - - def kill_app(self) -> None: - """ - Initiates killing of app. A bridge between asynchronous and synchronous - code. - """ - create_task(self._kill_app()) - - async def _kill_app(self) -> None: - """ - This coroutine initiates the actual disconnect process and calls - urwid.ExitMainLoop() to kill the TUI. - - :raise Exception: When an unhandled exception is caught. - """ - self.exiting = True - await self.disconnect() - logging.debug('Disconnect finished. Exiting app') - raise urwid.ExitMainLoop() - - async def disconnect(self) -> None: - """ - Overrides the disconnect method to handle the errors locally. - """ - try: - await super().disconnect() - except (OSError, EOFError) as err: - logging.info('disconnect: %s', str(err)) - self.retry = True - except ProtocolError as err: - logging.info('disconnect: %s', str(err)) - except Exception as err: - logging.error('disconnect: Unhandled exception %s', str(err)) - raise err - - def _set_status(self, msg: str) -> None: - """ - Sets the message as the status. - - :param msg: - The message to be displayed in the status bar. - """ - self.window.footer.set_text(msg) - - def _get_formatted_address(self) -> str: - """ - Returns a formatted version of the server's address. - - :return: formatted address - """ - if isinstance(self.address, tuple): - host, port = self.address - addr = f'{host}:{port}' - else: - addr = f'{self.address}' - return addr - - async def _initiate_connection(self) -> Optional[ConnectError]: - """ - Tries connecting to a server a number of times with a delay between - each try. If all retries failed then return the error faced during - the last retry. - - :return: Error faced during last retry. - """ - current_retries = 0 - err = None - - # initial try - await self.connect_server() - while self.retry and current_retries < self.num_retries: - logging.info('Connection Failed, retrying in %d', self.retry_delay) - status = f'[Retry #{current_retries} ({self.retry_delay}s)]' - self._set_status(status) - - await asyncio.sleep(self.retry_delay) - - err = await self.connect_server() - current_retries += 1 - # If all retries failed report the last error - if err: - logging.info('All retries failed: %s', err) - return err - return None - - async def manage_connection(self) -> None: - """ - Manage the connection based on the current run state. - - A reconnect is issued when the current state is IDLE and the number - of retries is not exhausted. - A disconnect is issued when the current state is DISCONNECTING. - """ - while not self.exiting: - if self.runstate == Runstate.IDLE: - err = await self._initiate_connection() - # If retry is still true then, we have exhausted all our tries. - if err: - self._set_status(f'[Error: {err.error_message}]') - else: - addr = self._get_formatted_address() - self._set_status(f'[Connected {addr}]') - elif self.runstate == Runstate.DISCONNECTING: - self._set_status('[Disconnected]') - await self.disconnect() - # check if a retry is needed - if self.runstate == Runstate.IDLE: - continue - await self.runstate_changed() - - async def connect_server(self) -> Optional[ConnectError]: - """ - Initiates a connection to the server at address `self.address` - and in case of a failure, sets the status to the respective error. - """ - try: - await self.connect(self.address) - self.retry = False - except ConnectError as err: - logging.info('connect_server: ConnectError %s', str(err)) - self.retry = True - return err - return None - - def run(self, debug: bool = False) -> None: - """ - Starts the long running co-routines and the urwid event loop. - - :param debug: - Enables/Disables asyncio event loop debugging - """ - screen = urwid.raw_display.Screen() - screen.set_terminal_properties(256) - - self.aloop = asyncio.get_event_loop() - self.aloop.set_debug(debug) - - # Gracefully handle SIGTERM and SIGINT signals - cancel_signals = [signal.SIGTERM, signal.SIGINT] - for sig in cancel_signals: - self.aloop.add_signal_handler(sig, self.kill_app) - - event_loop = urwid.AsyncioEventLoop(loop=self.aloop) - main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'), - unhandled_input=self.unhandled_input, - screen=screen, - palette=palette, - handle_mouse=True, - event_loop=event_loop) - - create_task(self.manage_connection(), self.aloop) - try: - main_loop.run() - except Exception as err: - logging.error('%s\n%s\n', str(err), pretty_traceback()) - raise err - - -class StatusBar(urwid.Text): - """ - A simple statusbar modelled using the Text widget. The status can be - set using the set_text function. All text set is aligned to right. - - :param text: Initial text to be displayed. Default is empty str. - """ - def __init__(self, text: str = ''): - super().__init__(text, align='right') - - -class Editor(urwid_readline.ReadlineEdit): - """ - A simple editor modelled using the urwid_readline.ReadlineEdit widget. - Mimcs GNU readline shortcuts and provides history support. - - The readline shortcuts can be found below: - https://github.com/rr-/urwid_readline#features - - Along with the readline features, this editor also has support for - history. Pressing the 'up'/'down' switches between the prev/next messages - available in the history. - - Currently there is no support to save the history to a file. The history of - previous commands is lost on exit. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - super().__init__(caption='> ', multiline=True) - self.parent = parent - self.history: List[str] = [] - self.last_index: int = -1 - self.show_history: bool = False - - def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]: - """ - Handles the keypress on this widget. - - :param size: - The current size of the widget. - :param key: - The key to be handled. - - :return: Unhandled key if any. - """ - msg = self.get_edit_text() - if key == 'up' and not msg: - # Show the history when 'up arrow' is pressed with no input text. - # NOTE: The show_history logic is necessary because in 'multiline' - # mode (which we use) 'up arrow' is used to move between lines. - if not self.history: - return None - self.show_history = True - last_msg = self.history[self.last_index] - self.set_edit_text(last_msg) - self.edit_pos = len(last_msg) - elif key == 'up' and self.show_history: - self.last_index = max(self.last_index - 1, -len(self.history)) - self.set_edit_text(self.history[self.last_index]) - self.edit_pos = len(self.history[self.last_index]) - elif key == 'down' and self.show_history: - if self.last_index == -1: - self.set_edit_text('') - self.show_history = False - else: - self.last_index += 1 - self.set_edit_text(self.history[self.last_index]) - self.edit_pos = len(self.history[self.last_index]) - elif key == 'meta enter': - # When using multiline, enter inserts a new line into the editor - # send the input to the server on alt + enter - self.parent.cb_send_to_server(msg) - self.history.append(msg) - self.set_edit_text('') - self.last_index = -1 - self.show_history = False - else: - self.show_history = False - self.last_index = -1 - return cast(Optional[str], super().keypress(size, key)) - return None - - -class EditorWidget(urwid.Filler): - """ - Wrapper around the editor widget. - - The Editor is a flow widget and has to wrapped inside a box widget. - This class wraps the Editor inside filler widget. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - super().__init__(Editor(parent), valign='top') - - -class HistoryBox(urwid.ListBox): - """ - This widget is modelled using the ListBox widget, contains the list of - all messages both QMP messages and log messsages to be shown in the TUI. - - The messages are urwid.Text widgets. On every append of a message, the - focus is shifted to the last appended message. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - self.parent = parent - self.history = urwid.SimpleFocusListWalker([]) - super().__init__(self.history) - - def add_to_history(self, - history: Union[str, List[Tuple[str, str]]]) -> None: - """ - Appends a message to the list and set the focus to the last appended - message. - - :param history: - The history item(message/event) to be appended to the list. - """ - self.history.append(urwid.Text(history)) - self.history.set_focus(len(self.history) - 1) - - def mouse_event(self, size: Tuple[int, int], _event: str, button: float, - _x: int, _y: int, focus: bool) -> None: - # Unfortunately there are no urwid constants that represent the mouse - # events. - if button == 4: # Scroll up event - super().keypress(size, 'up') - elif button == 5: # Scroll down event - super().keypress(size, 'down') - - -class HistoryWindow(urwid.Frame): - """ - This window composes the HistoryBox and EditorWidget in a horizontal split. - By default the first focus is given to the history box. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - self.parent = parent - self.editor_widget = EditorWidget(parent) - self.editor = urwid.LineBox(self.editor_widget) - self.history = HistoryBox(parent) - self.body = urwid.Pile([('weight', 80, self.history), - ('weight', 20, self.editor)]) - super().__init__(self.body) - urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history) - - def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None: - """ - Appends a message to the history box - - :param msg: - The message to be appended to the history box. - :param level: - The log level of the message, if it is a log message. - """ - formatted = [] - if level: - msg = f'[{level}]: {msg}' - formatted.append((level, msg)) - else: - lexer = lexers.JsonLexer() # pylint: disable=no-member - for token in lexer.get_tokens(msg): - formatted.append(token) - self.history.add_to_history(formatted) - - -class Window(urwid.Frame): - """ - This window is the top most widget of the TUI and will contain other - windows. Each child of this widget is responsible for displaying a specific - functionality. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - self.parent = parent - footer = StatusBar() - body = HistoryWindow(parent) - super().__init__(body, footer=footer) - - -class TUILogHandler(Handler): - """ - This handler routes all the log messages to the TUI screen. - It is installed to the root logger to so that the log message from all - libraries begin used is routed to the screen. - - :param tui: Reference to the TUI object. - """ - def __init__(self, tui: App) -> None: - super().__init__() - self.tui = tui - - def emit(self, record: LogRecord) -> None: - """ - Emits a record to the TUI screen. - - Appends the log message to the TUI screen - """ - level = record.levelname - msg = record.getMessage() - self.tui.add_to_history(msg, level) - - -def main() -> None: - """ - Driver of the whole script, parses arguments, initialize the TUI and - the logger. - """ - parser = argparse.ArgumentParser(description='AQMP TUI') - parser.add_argument('qmp_server', help='Address of the QMP server. ' - 'Format ') - parser.add_argument('--num-retries', type=int, default=10, - help='Number of times to reconnect before giving up.') - parser.add_argument('--retry-delay', type=int, - help='Time(s) to wait before next retry. ' - 'Default action is to wait 2s between each retry.') - parser.add_argument('--log-file', help='The Log file name') - parser.add_argument('--log-level', default='WARNING', - help='Log level ') - parser.add_argument('--asyncio-debug', action='store_true', - help='Enable debug mode for asyncio loop. ' - 'Generates lot of output, makes TUI unusable when ' - 'logs are logged in the TUI. ' - 'Use only when logging to a file.') - args = parser.parse_args() - - try: - address = QEMUMonitorProtocol.parse_address(args.qmp_server) - except QMPBadPortError as err: - parser.error(str(err)) - - app = App(address, args.num_retries, args.retry_delay) - - root_logger = logging.getLogger() - root_logger.setLevel(logging.getLevelName(args.log_level)) - - if args.log_file: - root_logger.addHandler(logging.FileHandler(args.log_file)) - else: - root_logger.addHandler(TUILogHandler(app)) - - app.run(args.asyncio_debug) - - -if __name__ == '__main__': - main() diff --git a/python/qemu/aqmp/error.py b/python/qemu/aqmp/error.py deleted file mode 100644 index 24ba4d5..0000000 --- a/python/qemu/aqmp/error.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -QMP Error Classes - -This package seeks to provide semantic error classes that are intended -to be used directly by clients when they would like to handle particular -semantic failures (e.g. "failed to connect") without needing to know the -enumeration of possible reasons for that failure. - -QMPError serves as the ancestor for all exceptions raised by this -package, and is suitable for use in handling semantic errors from this -library. In most cases, individual public methods will attempt to catch -and re-encapsulate various exceptions to provide a semantic -error-handling interface. - -.. admonition:: QMP Exception Hierarchy Reference - - | `Exception` - | +-- `QMPError` - | +-- `ConnectError` - | +-- `StateError` - | +-- `ExecInterruptedError` - | +-- `ExecuteError` - | +-- `ListenerError` - | +-- `ProtocolError` - | +-- `DeserializationError` - | +-- `UnexpectedTypeError` - | +-- `ServerParseError` - | +-- `BadReplyError` - | +-- `GreetingError` - | +-- `NegotiationError` -""" - - -class QMPError(Exception): - """Abstract error class for all errors originating from this package.""" - - -class ProtocolError(QMPError): - """ - Abstract error class for protocol failures. - - Semantically, these errors are generally the fault of either the - protocol server or as a result of a bug in this library. - - :param error_message: Human-readable string describing the error. - """ - def __init__(self, error_message: str): - super().__init__(error_message) - #: Human-readable error message, without any prefix. - self.error_message: str = error_message diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py deleted file mode 100644 index f3d4e2b..0000000 --- a/python/qemu/aqmp/events.py +++ /dev/null @@ -1,717 +0,0 @@ -""" -AQMP Events and EventListeners - -Asynchronous QMP uses `EventListener` objects to listen for events. An -`EventListener` is a FIFO event queue that can be pre-filtered to listen -for only specific events. Each `EventListener` instance receives its own -copy of events that it hears, so events may be consumed without fear or -worry for depriving other listeners of events they need to hear. - - -EventListener Tutorial ----------------------- - -In all of the following examples, we assume that we have a `QMPClient` -instantiated named ``qmp`` that is already connected. - - -`listener()` context blocks with one name -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The most basic usage is by using the `listener()` context manager to -construct them: - -.. code:: python - - with qmp.listener('STOP') as listener: - await qmp.execute('stop') - await listener.get() - -The listener is active only for the duration of the ‘with’ block. This -instance listens only for ‘STOP’ events. - - -`listener()` context blocks with two or more names -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Multiple events can be selected for by providing any ``Iterable[str]``: - -.. code:: python - - with qmp.listener(('STOP', 'RESUME')) as listener: - await qmp.execute('stop') - event = await listener.get() - assert event['event'] == 'STOP' - - await qmp.execute('cont') - event = await listener.get() - assert event['event'] == 'RESUME' - - -`listener()` context blocks with no names -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -By omitting names entirely, you can listen to ALL events. - -.. code:: python - - with qmp.listener() as listener: - await qmp.execute('stop') - event = await listener.get() - assert event['event'] == 'STOP' - -This isn’t a very good use case for this feature: In a non-trivial -running system, we may not know what event will arrive next. Grabbing -the top of a FIFO queue returning multiple kinds of events may be prone -to error. - - -Using async iterators to retrieve events -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -If you’d like to simply watch what events happen to arrive, you can use -the listener as an async iterator: - -.. code:: python - - with qmp.listener() as listener: - async for event in listener: - print(f"Event arrived: {event['event']}") - -This is analogous to the following code: - -.. code:: python - - with qmp.listener() as listener: - while True: - event = listener.get() - print(f"Event arrived: {event['event']}") - -This event stream will never end, so these blocks will never terminate. - - -Using asyncio.Task to concurrently retrieve events -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Since a listener’s event stream will never terminate, it is not likely -useful to use that form in a script. For longer-running clients, we can -create event handlers by using `asyncio.Task` to create concurrent -coroutines: - -.. code:: python - - async def print_events(listener): - try: - async for event in listener: - print(f"Event arrived: {event['event']}") - except asyncio.CancelledError: - return - - with qmp.listener() as listener: - task = asyncio.Task(print_events(listener)) - await qmp.execute('stop') - await qmp.execute('cont') - task.cancel() - await task - -However, there is no guarantee that these events will be received by the -time we leave this context block. Once the context block is exited, the -listener will cease to hear any new events, and becomes inert. - -Be mindful of the timing: the above example will *probably*– but does -not *guarantee*– that both STOP/RESUMED events will be printed. The -example below outlines how to use listeners outside of a context block. - - -Using `register_listener()` and `remove_listener()` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To create a listener with a longer lifetime, beyond the scope of a -single block, create a listener and then call `register_listener()`: - -.. code:: python - - class MyClient: - def __init__(self, qmp): - self.qmp = qmp - self.listener = EventListener() - - async def print_events(self): - try: - async for event in self.listener: - print(f"Event arrived: {event['event']}") - except asyncio.CancelledError: - return - - async def run(self): - self.task = asyncio.Task(self.print_events) - self.qmp.register_listener(self.listener) - await qmp.execute('stop') - await qmp.execute('cont') - - async def stop(self): - self.task.cancel() - await self.task - self.qmp.remove_listener(self.listener) - -The listener can be deactivated by using `remove_listener()`. When it is -removed, any possible pending events are cleared and it can be -re-registered at a later time. - - -Using the built-in all events listener -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The `QMPClient` object creates its own default listener named -:py:obj:`~Events.events` that can be used for the same purpose without -having to create your own: - -.. code:: python - - async def print_events(listener): - try: - async for event in listener: - print(f"Event arrived: {event['event']}") - except asyncio.CancelledError: - return - - task = asyncio.Task(print_events(qmp.events)) - - await qmp.execute('stop') - await qmp.execute('cont') - - task.cancel() - await task - - -Using both .get() and async iterators -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The async iterator and `get()` methods pull events from the same FIFO -queue. If you mix the usage of both, be aware: Events are emitted -precisely once per listener. - -If multiple contexts try to pull events from the same listener instance, -events are still emitted only precisely once. - -This restriction can be lifted by creating additional listeners. - - -Creating multiple listeners -~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Additional `EventListener` objects can be created at-will. Each one -receives its own copy of events, with separate FIFO event queues. - -.. code:: python - - my_listener = EventListener() - qmp.register_listener(my_listener) - - await qmp.execute('stop') - copy1 = await my_listener.get() - copy2 = await qmp.events.get() - - assert copy1 == copy2 - -In this example, we await an event from both a user-created -`EventListener` and the built-in events listener. Both receive the same -event. - - -Clearing listeners -~~~~~~~~~~~~~~~~~~ - -`EventListener` objects can be cleared, clearing all events seen thus far: - -.. code:: python - - await qmp.execute('stop') - qmp.events.clear() - await qmp.execute('cont') - event = await qmp.events.get() - assert event['event'] == 'RESUME' - -`EventListener` objects are FIFO queues. If events are not consumed, -they will remain in the queue until they are witnessed or discarded via -`clear()`. FIFO queues will be drained automatically upon leaving a -context block, or when calling `remove_listener()`. - - -Accessing listener history -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -`EventListener` objects record their history. Even after being cleared, -you can obtain a record of all events seen so far: - -.. code:: python - - await qmp.execute('stop') - await qmp.execute('cont') - qmp.events.clear() - - assert len(qmp.events.history) == 2 - assert qmp.events.history[0]['event'] == 'STOP' - assert qmp.events.history[1]['event'] == 'RESUME' - -The history is updated immediately and does not require the event to be -witnessed first. - - -Using event filters -~~~~~~~~~~~~~~~~~~~ - -`EventListener` objects can be given complex filtering criteria if names -are not sufficient: - -.. code:: python - - def job1_filter(event) -> bool: - event_data = event.get('data', {}) - event_job_id = event_data.get('id') - return event_job_id == "job1" - - with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener: - await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...}) - async for event in listener: - if event['data']['status'] == 'concluded': - break - -These filters might be most useful when parameterized. `EventListener` -objects expect a function that takes only a single argument (the raw -event, as a `Message`) and returns a bool; True if the event should be -accepted into the stream. You can create a function that adapts this -signature to accept configuration parameters: - -.. code:: python - - def job_filter(job_id: str) -> EventFilter: - def filter(event: Message) -> bool: - return event['data']['id'] == job_id - return filter - - with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener: - await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...}) - async for event in listener: - if event['data']['status'] == 'concluded': - break - - -Activating an existing listener with `listen()` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Listeners with complex, long configurations can also be created manually -and activated temporarily by using `listen()` instead of `listener()`: - -.. code:: python - - listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', - 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', - 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) - - with qmp.listen(listener): - await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...}) - async for event in listener: - print(event) - if event['event'] == 'BLOCK_JOB_COMPLETED': - break - -Any events that are not witnessed by the time the block is left will be -cleared from the queue; entering the block is an implicit -`register_listener()` and leaving the block is an implicit -`remove_listener()`. - - -Activating multiple existing listeners with `listen()` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -While `listener()` is only capable of creating a single listener, -`listen()` is capable of activating multiple listeners simultaneously: - -.. code:: python - - def job_filter(job_id: str) -> EventFilter: - def filter(event: Message) -> bool: - return event['data']['id'] == job_id - return filter - - jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA')) - jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB')) - - with qmp.listen(jobA, jobB): - qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...}) - qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...}) - - async for event in jobA.get(): - if event['data']['status'] == 'concluded': - break - async for event in jobB.get(): - if event['data']['status'] == 'concluded': - break - - -Extending the `EventListener` class -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -In the case that a more specialized `EventListener` is desired to -provide either more functionality or more compact syntax for specialized -cases, it can be extended. - -One of the key methods to extend or override is -:py:meth:`~EventListener.accept()`. The default implementation checks an -incoming message for: - -1. A qualifying name, if any :py:obj:`~EventListener.names` were - specified at initialization time -2. That :py:obj:`~EventListener.event_filter()` returns True. - -This can be modified however you see fit to change the criteria for -inclusion in the stream. - -For convenience, a ``JobListener`` class could be created that simply -bakes in configuration so it does not need to be repeated: - -.. code:: python - - class JobListener(EventListener): - def __init__(self, job_id: str): - super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', - 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', - 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) - self.job_id = job_id - - def accept(self, event) -> bool: - if not super().accept(event): - return False - if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'): - return event['data']['id'] == job_id - return event['data']['device'] == job_id - -From here on out, you can conjure up a custom-purpose listener that -listens only for job-related events for a specific job-id easily: - -.. code:: python - - listener = JobListener('job4') - with qmp.listener(listener): - await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...}) - async for event in listener: - print(event) - if event['event'] == 'BLOCK_JOB_COMPLETED': - break - - -Experimental Interfaces & Design Issues ---------------------------------------- - -These interfaces are not ones I am sure I will keep or otherwise modify -heavily. - -qmp.listener()’s type signature -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -`listener()` does not return anything, because it was assumed the caller -already had a handle to the listener. However, for -``qmp.listener(EventListener())`` forms, the caller will not have saved -a handle to the listener. - -Because this function can accept *many* listeners, I found it hard to -accurately type in a way where it could be used in both “one” or “many” -forms conveniently and in a statically type-safe manner. - -Ultimately, I removed the return altogether, but perhaps with more time -I can work out a way to re-add it. - - -API Reference -------------- - -""" - -import asyncio -from contextlib import contextmanager -import logging -from typing import ( - AsyncIterator, - Callable, - Iterable, - Iterator, - List, - Optional, - Set, - Tuple, - Union, -) - -from .error import QMPError -from .message import Message - - -EventNames = Union[str, Iterable[str], None] -EventFilter = Callable[[Message], bool] - - -class ListenerError(QMPError): - """ - Generic error class for `EventListener`-related problems. - """ - - -class EventListener: - """ - Selectively listens for events with runtime configurable filtering. - - This class is designed to be directly usable for the most common cases, - but it can be extended to provide more rigorous control. - - :param names: - One or more names of events to listen for. - When not provided, listen for ALL events. - :param event_filter: - An optional event filtering function. - When names are also provided, this acts as a secondary filter. - - When ``names`` and ``event_filter`` are both provided, the names - will be filtered first, and then the filter function will be called - second. The event filter function can assume that the format of the - event is a known format. - """ - def __init__( - self, - names: EventNames = None, - event_filter: Optional[EventFilter] = None, - ): - # Queue of 'heard' events yet to be witnessed by a caller. - self._queue: 'asyncio.Queue[Message]' = asyncio.Queue() - - # Intended as a historical record, NOT a processing queue or backlog. - self._history: List[Message] = [] - - #: Primary event filter, based on one or more event names. - self.names: Set[str] = set() - if isinstance(names, str): - self.names.add(names) - elif names is not None: - self.names.update(names) - - #: Optional, secondary event filter. - self.event_filter: Optional[EventFilter] = event_filter - - @property - def history(self) -> Tuple[Message, ...]: - """ - A read-only history of all events seen so far. - - This represents *every* event, including those not yet witnessed - via `get()` or ``async for``. It persists between `clear()` - calls and is immutable. - """ - return tuple(self._history) - - def accept(self, event: Message) -> bool: - """ - Determine if this listener accepts this event. - - This method determines which events will appear in the stream. - The default implementation simply checks the event against the - list of names and the event_filter to decide if this - `EventListener` accepts a given event. It can be - overridden/extended to provide custom listener behavior. - - User code is not expected to need to invoke this method. - - :param event: The event under consideration. - :return: `True`, if this listener accepts this event. - """ - name_ok = (not self.names) or (event['event'] in self.names) - return name_ok and ( - (not self.event_filter) or self.event_filter(event) - ) - - async def put(self, event: Message) -> None: - """ - Conditionally put a new event into the FIFO queue. - - This method is not designed to be invoked from user code, and it - should not need to be overridden. It is a public interface so - that `QMPClient` has an interface by which it can inform - registered listeners of new events. - - The event will be put into the queue if - :py:meth:`~EventListener.accept()` returns `True`. - - :param event: The new event to put into the FIFO queue. - """ - if not self.accept(event): - return - - self._history.append(event) - await self._queue.put(event) - - async def get(self) -> Message: - """ - Wait for the very next event in this stream. - - If one is already available, return that one. - """ - return await self._queue.get() - - def empty(self) -> bool: - """ - Return `True` if there are no pending events. - """ - return self._queue.empty() - - def clear(self) -> List[Message]: - """ - Clear this listener of all pending events. - - Called when an `EventListener` is being unregistered, this clears the - pending FIFO queue synchronously. It can be also be used to - manually clear any pending events, if desired. - - :return: The cleared events, if any. - - .. warning:: - Take care when discarding events. Cleared events will be - silently tossed on the floor. All events that were ever - accepted by this listener are visible in `history()`. - """ - events = [] - while True: - try: - events.append(self._queue.get_nowait()) - except asyncio.QueueEmpty: - break - - return events - - def __aiter__(self) -> AsyncIterator[Message]: - return self - - async def __anext__(self) -> Message: - """ - Enables the `EventListener` to function as an async iterator. - - It may be used like this: - - .. code:: python - - async for event in listener: - print(event) - - These iterators will never terminate of their own accord; you - must provide break conditions or otherwise prepare to run them - in an `asyncio.Task` that can be cancelled. - """ - return await self.get() - - -class Events: - """ - Events is a mix-in class that adds event functionality to the QMP class. - - It's designed specifically as a mix-in for `QMPClient`, and it - relies upon the class it is being mixed into having a 'logger' - property. - """ - def __init__(self) -> None: - self._listeners: List[EventListener] = [] - - #: Default, all-events `EventListener`. - self.events: EventListener = EventListener() - self.register_listener(self.events) - - # Parent class needs to have a logger - self.logger: logging.Logger - - async def _event_dispatch(self, msg: Message) -> None: - """ - Given a new event, propagate it to all of the active listeners. - - :param msg: The event to propagate. - """ - for listener in self._listeners: - await listener.put(msg) - - def register_listener(self, listener: EventListener) -> None: - """ - Register and activate an `EventListener`. - - :param listener: The listener to activate. - :raise ListenerError: If the given listener is already registered. - """ - if listener in self._listeners: - raise ListenerError("Attempted to re-register existing listener") - self.logger.debug("Registering %s.", str(listener)) - self._listeners.append(listener) - - def remove_listener(self, listener: EventListener) -> None: - """ - Unregister and deactivate an `EventListener`. - - The removed listener will have its pending events cleared via - `clear()`. The listener can be re-registered later when - desired. - - :param listener: The listener to deactivate. - :raise ListenerError: If the given listener is not registered. - """ - if listener == self.events: - raise ListenerError("Cannot remove the default listener.") - self.logger.debug("Removing %s.", str(listener)) - listener.clear() - self._listeners.remove(listener) - - @contextmanager - def listen(self, *listeners: EventListener) -> Iterator[None]: - r""" - Context manager: Temporarily listen with an `EventListener`. - - Accepts one or more `EventListener` objects and registers them, - activating them for the duration of the context block. - - `EventListener` objects will have any pending events in their - FIFO queue cleared upon exiting the context block, when they are - deactivated. - - :param \*listeners: One or more EventListeners to activate. - :raise ListenerError: If the given listener(s) are already active. - """ - _added = [] - - try: - for listener in listeners: - self.register_listener(listener) - _added.append(listener) - - yield - - finally: - for listener in _added: - self.remove_listener(listener) - - @contextmanager - def listener( - self, - names: EventNames = (), - event_filter: Optional[EventFilter] = None - ) -> Iterator[EventListener]: - """ - Context manager: Temporarily listen with a new `EventListener`. - - Creates an `EventListener` object and registers it, activating - it for the duration of the context block. - - :param names: - One or more names of events to listen for. - When not provided, listen for ALL events. - :param event_filter: - An optional event filtering function. - When names are also provided, this acts as a secondary filter. - - :return: The newly created and active `EventListener`. - """ - listener = EventListener(names, event_filter) - with self.listen(listener): - yield listener diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py deleted file mode 100644 index dfcd20b..0000000 --- a/python/qemu/aqmp/legacy.py +++ /dev/null @@ -1,317 +0,0 @@ -""" -(Legacy) Sync QMP Wrapper - -This module provides the `QEMUMonitorProtocol` class, which is a -synchronous wrapper around `QMPClient`. - -Its design closely resembles that of the original QEMUMonitorProtocol -class, originally written by Luiz Capitulino. It is provided here for -compatibility with scripts inside the QEMU source tree that expect the -old interface. -""" - -# -# Copyright (C) 2009-2022 Red Hat Inc. -# -# Authors: -# Luiz Capitulino -# John Snow -# -# This work is licensed under the terms of the GNU GPL, version 2. See -# the COPYING file in the top-level directory. -# - -import asyncio -from types import TracebackType -from typing import ( - Any, - Awaitable, - Dict, - List, - Optional, - Type, - TypeVar, - Union, -) - -from .error import QMPError -from .protocol import Runstate, SocketAddrT -from .qmp_client import QMPClient - - -#: QMPMessage is an entire QMP message of any kind. -QMPMessage = Dict[str, Any] - -#: QMPReturnValue is the 'return' value of a command. -QMPReturnValue = object - -#: QMPObject is any object in a QMP message. -QMPObject = Dict[str, object] - -# QMPMessage can be outgoing commands or incoming events/returns. -# QMPReturnValue is usually a dict/json object, but due to QAPI's -# 'returns-whitelist', it can actually be anything. -# -# {'return': {}} is a QMPMessage, -# {} is the QMPReturnValue. - - -class QMPBadPortError(QMPError): - """ - Unable to parse socket address: Port was non-numerical. - """ - - -class QEMUMonitorProtocol: - """ - Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) - and then allow to handle commands and events. - - :param address: QEMU address, can be either a unix socket path (string) - or a tuple in the form ( address, port ) for a TCP - connection - :param server: Act as the socket server. (See 'accept') - :param nickname: Optional nickname used for logging. - """ - - def __init__(self, address: SocketAddrT, - server: bool = False, - nickname: Optional[str] = None): - - self._aqmp = QMPClient(nickname) - self._aloop = asyncio.get_event_loop() - self._address = address - self._timeout: Optional[float] = None - - if server: - self._sync(self._aqmp.start_server(self._address)) - - _T = TypeVar('_T') - - def _sync( - self, future: Awaitable[_T], timeout: Optional[float] = None - ) -> _T: - return self._aloop.run_until_complete( - asyncio.wait_for(future, timeout=timeout) - ) - - def _get_greeting(self) -> Optional[QMPMessage]: - if self._aqmp.greeting is not None: - # pylint: disable=protected-access - return self._aqmp.greeting._asdict() - return None - - def __enter__(self: _T) -> _T: - # Implement context manager enter function. - return self - - def __exit__(self, - # pylint: disable=duplicate-code - # see https://github.com/PyCQA/pylint/issues/3619 - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType]) -> None: - # Implement context manager exit function. - self.close() - - @classmethod - def parse_address(cls, address: str) -> SocketAddrT: - """ - Parse a string into a QMP address. - - Figure out if the argument is in the port:host form. - If it's not, it's probably a file path. - """ - components = address.split(':') - if len(components) == 2: - try: - port = int(components[1]) - except ValueError: - msg = f"Bad port: '{components[1]}' in '{address}'." - raise QMPBadPortError(msg) from None - return (components[0], port) - - # Treat as filepath. - return address - - def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: - """ - Connect to the QMP Monitor and perform capabilities negotiation. - - :return: QMP greeting dict, or None if negotiate is false - :raise ConnectError: on connection errors - """ - self._aqmp.await_greeting = negotiate - self._aqmp.negotiate = negotiate - - self._sync( - self._aqmp.connect(self._address) - ) - return self._get_greeting() - - def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: - """ - Await connection from QMP Monitor and perform capabilities negotiation. - - :param timeout: - timeout in seconds (nonnegative float number, or None). - If None, there is no timeout, and this may block forever. - - :return: QMP greeting dict - :raise ConnectError: on connection errors - """ - self._aqmp.await_greeting = True - self._aqmp.negotiate = True - - self._sync(self._aqmp.accept(), timeout) - - ret = self._get_greeting() - assert ret is not None - return ret - - def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: - """ - Send a QMP command to the QMP Monitor. - - :param qmp_cmd: QMP command to be sent as a Python dict - :return: QMP response as a Python dict - """ - return dict( - self._sync( - # pylint: disable=protected-access - - # _raw() isn't a public API, because turning off - # automatic ID assignment is discouraged. For - # compatibility with iotests *only*, do it anyway. - self._aqmp._raw(qmp_cmd, assign_id=False), - self._timeout - ) - ) - - def cmd(self, name: str, - args: Optional[Dict[str, object]] = None, - cmd_id: Optional[object] = None) -> QMPMessage: - """ - Build a QMP command and send it to the QMP Monitor. - - :param name: command name (string) - :param args: command arguments (dict) - :param cmd_id: command id (dict, list, string or int) - """ - qmp_cmd: QMPMessage = {'execute': name} - if args: - qmp_cmd['arguments'] = args - if cmd_id: - qmp_cmd['id'] = cmd_id - return self.cmd_obj(qmp_cmd) - - def command(self, cmd: str, **kwds: object) -> QMPReturnValue: - """ - Build and send a QMP command to the monitor, report errors if any - """ - return self._sync( - self._aqmp.execute(cmd, kwds), - self._timeout - ) - - def pull_event(self, - wait: Union[bool, float] = False) -> Optional[QMPMessage]: - """ - Pulls a single event. - - :param wait: - If False or 0, do not wait. Return None if no events ready. - If True, wait forever until the next event. - Otherwise, wait for the specified number of seconds. - - :raise asyncio.TimeoutError: - When a timeout is requested and the timeout period elapses. - - :return: The first available QMP event, or None. - """ - if not wait: - # wait is False/0: "do not wait, do not except." - if self._aqmp.events.empty(): - return None - - # If wait is 'True', wait forever. If wait is False/0, the events - # queue must not be empty; but it still needs some real amount - # of time to complete. - timeout = None - if wait and isinstance(wait, float): - timeout = wait - - return dict( - self._sync( - self._aqmp.events.get(), - timeout - ) - ) - - def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: - """ - Get a list of QMP events and clear all pending events. - - :param wait: - If False or 0, do not wait. Return None if no events ready. - If True, wait until we have at least one event. - Otherwise, wait for up to the specified number of seconds for at - least one event. - - :raise asyncio.TimeoutError: - When a timeout is requested and the timeout period elapses. - - :return: A list of QMP events. - """ - events = [dict(x) for x in self._aqmp.events.clear()] - if events: - return events - - event = self.pull_event(wait) - return [event] if event is not None else [] - - def clear_events(self) -> None: - """Clear current list of pending events.""" - self._aqmp.events.clear() - - def close(self) -> None: - """Close the connection.""" - self._sync( - self._aqmp.disconnect() - ) - - def settimeout(self, timeout: Optional[float]) -> None: - """ - Set the timeout for QMP RPC execution. - - This timeout affects the `cmd`, `cmd_obj`, and `command` methods. - The `accept`, `pull_event` and `get_event` methods have their - own configurable timeouts. - - :param timeout: - timeout in seconds, or None. - None will wait indefinitely. - """ - self._timeout = timeout - - def send_fd_scm(self, fd: int) -> None: - """ - Send a file descriptor to the remote via SCM_RIGHTS. - """ - self._aqmp.send_fd_scm(fd) - - def __del__(self) -> None: - if self._aqmp.runstate == Runstate.IDLE: - return - - if not self._aloop.is_running(): - self.close() - else: - # Garbage collection ran while the event loop was running. - # Nothing we can do about it now, but if we don't raise our - # own error, the user will be treated to a lot of traceback - # they might not understand. - raise QMPError( - "QEMUMonitorProtocol.close()" - " was not called before object was garbage collected" - ) diff --git a/python/qemu/aqmp/message.py b/python/qemu/aqmp/message.py deleted file mode 100644 index f76ccc9..0000000 --- a/python/qemu/aqmp/message.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -QMP Message Format - -This module provides the `Message` class, which represents a single QMP -message sent to or from the server. -""" - -import json -from json import JSONDecodeError -from typing import ( - Dict, - Iterator, - Mapping, - MutableMapping, - Optional, - Union, -) - -from .error import ProtocolError - - -class Message(MutableMapping[str, object]): - """ - Represents a single QMP protocol message. - - QMP uses JSON objects as its basic communicative unit; so this - Python object is a :py:obj:`~collections.abc.MutableMapping`. It may - be instantiated from either another mapping (like a `dict`), or from - raw `bytes` that still need to be deserialized. - - Once instantiated, it may be treated like any other MutableMapping:: - - >>> msg = Message(b'{"hello": "world"}') - >>> assert msg['hello'] == 'world' - >>> msg['id'] = 'foobar' - >>> print(msg) - { - "hello": "world", - "id": "foobar" - } - - It can be converted to `bytes`:: - - >>> msg = Message({"hello": "world"}) - >>> print(bytes(msg)) - b'{"hello":"world","id":"foobar"}' - - Or back into a garden-variety `dict`:: - - >>> dict(msg) - {'hello': 'world'} - - - :param value: Initial value, if any. - :param eager: - When `True`, attempt to serialize or deserialize the initial value - immediately, so that conversion exceptions are raised during - the call to ``__init__()``. - """ - # pylint: disable=too-many-ancestors - - def __init__(self, - value: Union[bytes, Mapping[str, object]] = b'{}', *, - eager: bool = True): - self._data: Optional[bytes] = None - self._obj: Optional[Dict[str, object]] = None - - if isinstance(value, bytes): - self._data = value - if eager: - self._obj = self._deserialize(self._data) - else: - self._obj = dict(value) - if eager: - self._data = self._serialize(self._obj) - - # Methods necessary to implement the MutableMapping interface, see: - # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping - - # We get pop, popitem, clear, update, setdefault, __contains__, - # keys, items, values, get, __eq__ and __ne__ for free. - - def __getitem__(self, key: str) -> object: - return self._object[key] - - def __setitem__(self, key: str, value: object) -> None: - self._object[key] = value - self._data = None - - def __delitem__(self, key: str) -> None: - del self._object[key] - self._data = None - - def __iter__(self) -> Iterator[str]: - return iter(self._object) - - def __len__(self) -> int: - return len(self._object) - - # Dunder methods not related to MutableMapping: - - def __repr__(self) -> str: - if self._obj is not None: - return f"Message({self._object!r})" - return f"Message({bytes(self)!r})" - - def __str__(self) -> str: - """Pretty-printed representation of this QMP message.""" - return json.dumps(self._object, indent=2) - - def __bytes__(self) -> bytes: - """bytes representing this QMP message.""" - if self._data is None: - self._data = self._serialize(self._obj or {}) - return self._data - - # Conversion Methods - - @property - def _object(self) -> Dict[str, object]: - """ - A `dict` representing this QMP message. - - Generated on-demand, if required. This property is private - because it returns an object that could be used to invalidate - the internal state of the `Message` object. - """ - if self._obj is None: - self._obj = self._deserialize(self._data or b'{}') - return self._obj - - @classmethod - def _serialize(cls, value: object) -> bytes: - """ - Serialize a JSON object as `bytes`. - - :raise ValueError: When the object cannot be serialized. - :raise TypeError: When the object cannot be serialized. - - :return: `bytes` ready to be sent over the wire. - """ - return json.dumps(value, separators=(',', ':')).encode('utf-8') - - @classmethod - def _deserialize(cls, data: bytes) -> Dict[str, object]: - """ - Deserialize JSON `bytes` into a native Python `dict`. - - :raise DeserializationError: - If JSON deserialization fails for any reason. - :raise UnexpectedTypeError: - If the data does not represent a JSON object. - - :return: A `dict` representing this QMP message. - """ - try: - obj = json.loads(data) - except JSONDecodeError as err: - emsg = "Failed to deserialize QMP message." - raise DeserializationError(emsg, data) from err - if not isinstance(obj, dict): - raise UnexpectedTypeError( - "QMP message is not a JSON object.", - obj - ) - return obj - - -class DeserializationError(ProtocolError): - """ - A QMP message was not understood as JSON. - - When this Exception is raised, ``__cause__`` will be set to the - `json.JSONDecodeError` Exception, which can be interrogated for - further details. - - :param error_message: Human-readable string describing the error. - :param raw: The raw `bytes` that prompted the failure. - """ - def __init__(self, error_message: str, raw: bytes): - super().__init__(error_message) - #: The raw `bytes` that were not understood as JSON. - self.raw: bytes = raw - - def __str__(self) -> str: - return "\n".join([ - super().__str__(), - f" raw bytes were: {str(self.raw)}", - ]) - - -class UnexpectedTypeError(ProtocolError): - """ - A QMP message was JSON, but not a JSON object. - - :param error_message: Human-readable string describing the error. - :param value: The deserialized JSON value that wasn't an object. - """ - def __init__(self, error_message: str, value: object): - super().__init__(error_message) - #: The JSON value that was expected to be an object. - self.value: object = value - - def __str__(self) -> str: - strval = json.dumps(self.value, indent=2) - return "\n".join([ - super().__str__(), - f" json value was: {strval}", - ]) diff --git a/python/qemu/aqmp/models.py b/python/qemu/aqmp/models.py deleted file mode 100644 index de87f87..0000000 --- a/python/qemu/aqmp/models.py +++ /dev/null @@ -1,146 +0,0 @@ -""" -QMP Data Models - -This module provides simplistic data classes that represent the few -structures that the QMP spec mandates; they are used to verify incoming -data to make sure it conforms to spec. -""" -# pylint: disable=too-few-public-methods - -from collections import abc -import copy -from typing import ( - Any, - Dict, - Mapping, - Optional, - Sequence, -) - - -class Model: - """ - Abstract data model, representing some QMP object of some kind. - - :param raw: The raw object to be validated. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - self._raw = raw - - def _check_key(self, key: str) -> None: - if key not in self._raw: - raise KeyError(f"'{self._name}' object requires '{key}' member") - - def _check_value(self, key: str, type_: type, typestr: str) -> None: - assert key in self._raw - if not isinstance(self._raw[key], type_): - raise TypeError( - f"'{self._name}' member '{key}' must be a {typestr}" - ) - - def _check_member(self, key: str, type_: type, typestr: str) -> None: - self._check_key(key) - self._check_value(key, type_, typestr) - - @property - def _name(self) -> str: - return type(self).__name__ - - def __repr__(self) -> str: - return f"{self._name}({self._raw!r})" - - -class Greeting(Model): - """ - Defined in qmp-spec.txt, section 2.2, "Server Greeting". - - :param raw: The raw Greeting object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'QMP' member - self.QMP: QMPGreeting # pylint: disable=invalid-name - - self._check_member('QMP', abc.Mapping, "JSON object") - self.QMP = QMPGreeting(self._raw['QMP']) - - def _asdict(self) -> Dict[str, object]: - """ - For compatibility with the iotests sync QMP wrapper. - - The legacy QMP interface needs Greetings as a garden-variety Dict. - - This interface is private in the hopes that it will be able to - be dropped again in the near-future. Caller beware! - """ - return dict(copy.deepcopy(self._raw)) - - -class QMPGreeting(Model): - """ - Defined in qmp-spec.txt, section 2.2, "Server Greeting". - - :param raw: The raw QMPGreeting object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'version' member - self.version: Mapping[str, object] - #: 'capabilities' member - self.capabilities: Sequence[object] - - self._check_member('version', abc.Mapping, "JSON object") - self.version = self._raw['version'] - - self._check_member('capabilities', abc.Sequence, "JSON array") - self.capabilities = self._raw['capabilities'] - - -class ErrorResponse(Model): - """ - Defined in qmp-spec.txt, section 2.4.2, "error". - - :param raw: The raw ErrorResponse object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'error' member - self.error: ErrorInfo - #: 'id' member - self.id: Optional[object] = None # pylint: disable=invalid-name - - self._check_member('error', abc.Mapping, "JSON object") - self.error = ErrorInfo(self._raw['error']) - - if 'id' in raw: - self.id = raw['id'] - - -class ErrorInfo(Model): - """ - Defined in qmp-spec.txt, section 2.4.2, "error". - - :param raw: The raw ErrorInfo object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'class' member, with an underscore to avoid conflicts in Python. - self.class_: str - #: 'desc' member - self.desc: str - - self._check_member('class', str, "string") - self.class_ = self._raw['class'] - - self._check_member('desc', str, "string") - self.desc = self._raw['desc'] diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py deleted file mode 100644 index 36fae57..0000000 --- a/python/qemu/aqmp/protocol.py +++ /dev/null @@ -1,1048 +0,0 @@ -""" -Generic Asynchronous Message-based Protocol Support - -This module provides a generic framework for sending and receiving -messages over an asyncio stream. `AsyncProtocol` is an abstract class -that implements the core mechanisms of a simple send/receive protocol, -and is designed to be extended. - -In this package, it is used as the implementation for the `QMPClient` -class. -""" - -# It's all the docstrings ... ! It's long for a good reason ^_^; -# pylint: disable=too-many-lines - -import asyncio -from asyncio import StreamReader, StreamWriter -from enum import Enum -from functools import wraps -import logging -from ssl import SSLContext -from typing import ( - Any, - Awaitable, - Callable, - Generic, - List, - Optional, - Tuple, - TypeVar, - Union, - cast, -) - -from .error import QMPError -from .util import ( - bottom_half, - create_task, - exception_summary, - flush, - is_closing, - pretty_traceback, - upper_half, - wait_closed, -) - - -T = TypeVar('T') -_U = TypeVar('_U') -_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None`` - -InternetAddrT = Tuple[str, int] -UnixAddrT = str -SocketAddrT = Union[UnixAddrT, InternetAddrT] - - -class Runstate(Enum): - """Protocol session runstate.""" - - #: Fully quiesced and disconnected. - IDLE = 0 - #: In the process of connecting or establishing a session. - CONNECTING = 1 - #: Fully connected and active session. - RUNNING = 2 - #: In the process of disconnecting. - #: Runstate may be returned to `IDLE` by calling `disconnect()`. - DISCONNECTING = 3 - - -class ConnectError(QMPError): - """ - Raised when the initial connection process has failed. - - This Exception always wraps a "root cause" exception that can be - interrogated for additional information. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - def __init__(self, error_message: str, exc: Exception): - super().__init__(error_message) - #: Human-readable error string - self.error_message: str = error_message - #: Wrapped root cause exception - self.exc: Exception = exc - - def __str__(self) -> str: - cause = str(self.exc) - if not cause: - # If there's no error string, use the exception name. - cause = exception_summary(self.exc) - return f"{self.error_message}: {cause}" - - -class StateError(QMPError): - """ - An API command (connect, execute, etc) was issued at an inappropriate time. - - This error is raised when a command like - :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate - time. - - :param error_message: Human-readable string describing the state violation. - :param state: The actual `Runstate` seen at the time of the violation. - :param required: The `Runstate` required to process this command. - """ - def __init__(self, error_message: str, - state: Runstate, required: Runstate): - super().__init__(error_message) - self.error_message = error_message - self.state = state - self.required = required - - -F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name - - -# Don't Panic. -def require(required_state: Runstate) -> Callable[[F], F]: - """ - Decorator: protect a method so it can only be run in a certain `Runstate`. - - :param required_state: The `Runstate` required to invoke this method. - :raise StateError: When the required `Runstate` is not met. - """ - def _decorator(func: F) -> F: - # _decorator is the decorator that is built by calling the - # require() decorator factory; e.g.: - # - # @require(Runstate.IDLE) def foo(): ... - # will replace 'foo' with the result of '_decorator(foo)'. - - @wraps(func) - def _wrapper(proto: 'AsyncProtocol[Any]', - *args: Any, **kwargs: Any) -> Any: - # _wrapper is the function that gets executed prior to the - # decorated method. - - name = type(proto).__name__ - - if proto.runstate != required_state: - if proto.runstate == Runstate.CONNECTING: - emsg = f"{name} is currently connecting." - elif proto.runstate == Runstate.DISCONNECTING: - emsg = (f"{name} is disconnecting." - " Call disconnect() to return to IDLE state.") - elif proto.runstate == Runstate.RUNNING: - emsg = f"{name} is already connected and running." - elif proto.runstate == Runstate.IDLE: - emsg = f"{name} is disconnected and idle." - else: - assert False - raise StateError(emsg, proto.runstate, required_state) - # No StateError, so call the wrapped method. - return func(proto, *args, **kwargs) - - # Return the decorated method; - # Transforming Func to Decorated[Func]. - return cast(F, _wrapper) - - # Return the decorator instance from the decorator factory. Phew! - return _decorator - - -class AsyncProtocol(Generic[T]): - """ - AsyncProtocol implements a generic async message-based protocol. - - This protocol assumes the basic unit of information transfer between - client and server is a "message", the details of which are left up - to the implementation. It assumes the sending and receiving of these - messages is full-duplex and not necessarily correlated; i.e. it - supports asynchronous inbound messages. - - It is designed to be extended by a specific protocol which provides - the implementations for how to read and send messages. These must be - defined in `_do_recv()` and `_do_send()`, respectively. - - Other callbacks have a default implementation, but are intended to be - either extended or overridden: - - - `_establish_session`: - The base implementation starts the reader/writer tasks. - A protocol implementation can override this call, inserting - actions to be taken prior to starting the reader/writer tasks - before the super() call; actions needing to occur afterwards - can be written after the super() call. - - `_on_message`: - Actions to be performed when a message is received. - - `_cb_outbound`: - Logging/Filtering hook for all outbound messages. - - `_cb_inbound`: - Logging/Filtering hook for all inbound messages. - This hook runs *before* `_on_message()`. - - :param name: - Name used for logging messages, if any. By default, messages - will log to 'qemu.aqmp.protocol', but each individual connection - can be given its own logger by giving it a name; messages will - then log to 'qemu.aqmp.protocol.${name}'. - """ - # pylint: disable=too-many-instance-attributes - - #: Logger object for debugging messages from this connection. - logger = logging.getLogger(__name__) - - # Maximum allowable size of read buffer - _limit = (64 * 1024) - - # ------------------------- - # Section: Public interface - # ------------------------- - - def __init__(self, name: Optional[str] = None) -> None: - #: The nickname for this connection, if any. - self.name: Optional[str] = name - if self.name is not None: - self.logger = self.logger.getChild(self.name) - - # stream I/O - self._reader: Optional[StreamReader] = None - self._writer: Optional[StreamWriter] = None - - # Outbound Message queue - self._outgoing: asyncio.Queue[T] - - # Special, long-running tasks: - self._reader_task: Optional[asyncio.Future[None]] = None - self._writer_task: Optional[asyncio.Future[None]] = None - - # Aggregate of the above two tasks, used for Exception management. - self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None - - #: Disconnect task. The disconnect implementation runs in a task - #: so that asynchronous disconnects (initiated by the - #: reader/writer) are allowed to wait for the reader/writers to - #: exit. - self._dc_task: Optional[asyncio.Future[None]] = None - - self._runstate = Runstate.IDLE - self._runstate_changed: Optional[asyncio.Event] = None - - # Server state for start_server() and _incoming() - self._server: Optional[asyncio.AbstractServer] = None - self._accepted: Optional[asyncio.Event] = None - - def __repr__(self) -> str: - cls_name = type(self).__name__ - tokens = [] - if self.name is not None: - tokens.append(f"name={self.name!r}") - tokens.append(f"runstate={self.runstate.name}") - return f"<{cls_name} {' '.join(tokens)}>" - - @property # @upper_half - def runstate(self) -> Runstate: - """The current `Runstate` of the connection.""" - return self._runstate - - @upper_half - async def runstate_changed(self) -> Runstate: - """ - Wait for the `runstate` to change, then return that runstate. - """ - await self._runstate_event.wait() - return self.runstate - - @upper_half - @require(Runstate.IDLE) - async def start_server_and_accept( - self, address: SocketAddrT, - ssl: Optional[SSLContext] = None - ) -> None: - """ - Accept a connection and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - This method is precisely equivalent to calling `start_server()` - followed by `accept()`. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - await self.start_server(address, ssl) - await self.accept() - assert self.runstate == Runstate.RUNNING - - @upper_half - @require(Runstate.IDLE) - async def start_server(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Start listening for an incoming connection, but do not wait for a peer. - - This method starts listening for an incoming connection, but - does not block waiting for a peer. This call will return - immediately after binding and listening on a socket. A later - call to `accept()` must be made in order to finalize the - incoming connection. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: - When the server could not start listening on this address. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError`. - """ - await self._session_guard( - self._do_start_server(address, ssl), - 'Failed to establish connection') - assert self.runstate == Runstate.CONNECTING - - @upper_half - @require(Runstate.CONNECTING) - async def accept(self) -> None: - """ - Accept an incoming connection and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - - :raise StateError: When the `Runstate` is not `CONNECTING`. - :raise QMPError: When `start_server()` was not called yet. - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - if self._accepted is None: - raise QMPError("Cannot call accept() before start_server().") - await self._session_guard( - self._do_accept(), - 'Failed to establish connection') - await self._session_guard( - self._establish_session(), - 'Failed to establish session') - assert self.runstate == Runstate.RUNNING - - @upper_half - @require(Runstate.IDLE) - async def connect(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Connect to the server and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - - :param address: - Address to connect to; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - await self._session_guard( - self._do_connect(address, ssl), - 'Failed to establish connection') - await self._session_guard( - self._establish_session(), - 'Failed to establish session') - assert self.runstate == Runstate.RUNNING - - @upper_half - async def disconnect(self) -> None: - """ - Disconnect and wait for all tasks to fully stop. - - If there was an exception that caused the reader/writers to - terminate prematurely, it will be raised here. - - :raise Exception: When the reader or writer terminate unexpectedly. - """ - self.logger.debug("disconnect() called.") - self._schedule_disconnect() - await self._wait_disconnect() - - # -------------------------- - # Section: Session machinery - # -------------------------- - - async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None: - """ - Async guard function used to roll back to `IDLE` on any error. - - On any Exception, the state machine will be reset back to - `IDLE`. Most Exceptions will be wrapped with `ConnectError`, but - `BaseException` events will be left alone (This includes - asyncio.CancelledError, even prior to Python 3.8). - - :param error_message: - Human-readable string describing what connection phase failed. - - :raise BaseException: - When `BaseException` occurs in the guarded block. - :raise ConnectError: - When any other error is encountered in the guarded block. - """ - # Note: After Python 3.6 support is removed, this should be an - # @asynccontextmanager instead of accepting a callback. - try: - await coro - except BaseException as err: - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - try: - # Reset the runstate back to IDLE. - await self.disconnect() - except: - # We don't expect any Exceptions from the disconnect function - # here, because we failed to connect in the first place. - # The disconnect() function is intended to perform - # only cannot-fail cleanup here, but you never know. - emsg = ( - "Unexpected bottom half exception. " - "This is a bug in the QMP library. " - "Please report it to and " - "CC: John Snow ." - ) - self.logger.critical("%s:\n%s\n", emsg, pretty_traceback()) - raise - - # CancelledError is an Exception with special semantic meaning; - # We do NOT want to wrap it up under ConnectError. - # NB: CancelledError is not a BaseException before Python 3.8 - if isinstance(err, asyncio.CancelledError): - raise - - # Any other kind of error can be treated as some kind of connection - # failure broadly. Inspect the 'exc' field to explore the root - # cause in greater detail. - if isinstance(err, Exception): - raise ConnectError(emsg, err) from err - - # Raise BaseExceptions un-wrapped, they're more important. - raise - - @property - def _runstate_event(self) -> asyncio.Event: - # asyncio.Event() objects should not be created prior to entrance into - # an event loop, so we can ensure we create it in the correct context. - # Create it on-demand *only* at the behest of an 'async def' method. - if not self._runstate_changed: - self._runstate_changed = asyncio.Event() - return self._runstate_changed - - @upper_half - @bottom_half - def _set_state(self, state: Runstate) -> None: - """ - Change the `Runstate` of the protocol connection. - - Signals the `runstate_changed` event. - """ - if state == self._runstate: - return - - self.logger.debug("Transitioning from '%s' to '%s'.", - str(self._runstate), str(state)) - self._runstate = state - self._runstate_event.set() - self._runstate_event.clear() - - @bottom_half - async def _stop_server(self) -> None: - """ - Stop listening for / accepting new incoming connections. - """ - if self._server is None: - return - - try: - self.logger.debug("Stopping server.") - self._server.close() - await self._server.wait_closed() - self.logger.debug("Server stopped.") - finally: - self._server = None - - @bottom_half # However, it does not run from the R/W tasks. - async def _incoming(self, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter) -> None: - """ - Accept an incoming connection and signal the upper_half. - - This method does the minimum necessary to accept a single - incoming connection. It signals back to the upper_half ASAP so - that any errors during session initialization can occur - naturally in the caller's stack. - - :param reader: Incoming `asyncio.StreamReader` - :param writer: Incoming `asyncio.StreamWriter` - """ - peer = writer.get_extra_info('peername', 'Unknown peer') - self.logger.debug("Incoming connection from %s", peer) - - if self._reader or self._writer: - # Sadly, we can have more than one pending connection - # because of https://bugs.python.org/issue46715 - # Close any extra connections we don't actually want. - self.logger.warning("Extraneous connection inadvertently accepted") - writer.close() - return - - # A connection has been accepted; stop listening for new ones. - assert self._accepted is not None - await self._stop_server() - self._reader, self._writer = (reader, writer) - self._accepted.set() - - @upper_half - async def _do_start_server(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Start listening for an incoming connection, but do not wait for a peer. - - This method starts listening for an incoming connection, but does not - block waiting for a peer. This call will return immediately after - binding and listening to a socket. A later call to accept() must be - made in order to finalize the incoming connection. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise OSError: For stream-related errors. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - self.logger.debug("Awaiting connection on %s ...", address) - self._accepted = asyncio.Event() - - if isinstance(address, tuple): - coro = asyncio.start_server( - self._incoming, - host=address[0], - port=address[1], - ssl=ssl, - backlog=1, - limit=self._limit, - ) - else: - coro = asyncio.start_unix_server( - self._incoming, - path=address, - ssl=ssl, - backlog=1, - limit=self._limit, - ) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - # This will start the server (bind(2), listen(2)). It will also - # call accept(2) if we yield, but we don't block on that here. - self._server = await coro - self.logger.debug("Server listening on %s", address) - - @upper_half - async def _do_accept(self) -> None: - """ - Wait for and accept an incoming connection. - - Requires that we have not yet accepted an incoming connection - from the upper_half, but it's OK if the server is no longer - running because the bottom_half has already accepted the - connection. - """ - assert self._accepted is not None - await self._accepted.wait() - assert self._server is None - self._accepted = None - - self.logger.debug("Connection accepted.") - - @upper_half - async def _do_connect(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Acting as the transport client, initiate a connection to a server. - - :param address: - Address to connect to; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise OSError: For stream-related errors. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - self.logger.debug("Connecting to %s ...", address) - - if isinstance(address, tuple): - connect = asyncio.open_connection( - address[0], - address[1], - ssl=ssl, - limit=self._limit, - ) - else: - connect = asyncio.open_unix_connection( - path=address, - ssl=ssl, - limit=self._limit, - ) - self._reader, self._writer = await connect - - self.logger.debug("Connected.") - - @upper_half - async def _establish_session(self) -> None: - """ - Establish a new session. - - Starts the readers/writer tasks; subclasses may perform their - own negotiations here. The Runstate will be RUNNING upon - successful conclusion. - """ - assert self.runstate == Runstate.CONNECTING - - self._outgoing = asyncio.Queue() - - reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader') - writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer') - - self._reader_task = create_task(reader_coro) - self._writer_task = create_task(writer_coro) - - self._bh_tasks = asyncio.gather( - self._reader_task, - self._writer_task, - ) - - self._set_state(Runstate.RUNNING) - await asyncio.sleep(0) # Allow runstate_event to process - - @upper_half - @bottom_half - def _schedule_disconnect(self) -> None: - """ - Initiate a disconnect; idempotent. - - This method is used both in the upper-half as a direct - consequence of `disconnect()`, and in the bottom-half in the - case of unhandled exceptions in the reader/writer tasks. - - It can be invoked no matter what the `runstate` is. - """ - if not self._dc_task: - self._set_state(Runstate.DISCONNECTING) - self.logger.debug("Scheduling disconnect.") - self._dc_task = create_task(self._bh_disconnect()) - - @upper_half - async def _wait_disconnect(self) -> None: - """ - Waits for a previously scheduled disconnect to finish. - - This method will gather any bottom half exceptions and re-raise - the one that occurred first; presuming it to be the root cause - of any subsequent Exceptions. It is intended to be used in the - upper half of the call chain. - - :raise Exception: - Arbitrary exception re-raised on behalf of the reader/writer. - """ - assert self.runstate == Runstate.DISCONNECTING - assert self._dc_task - - aws: List[Awaitable[object]] = [self._dc_task] - if self._bh_tasks: - aws.insert(0, self._bh_tasks) - all_defined_tasks = asyncio.gather(*aws) - - # Ensure disconnect is done; Exception (if any) is not raised here: - await asyncio.wait((self._dc_task,)) - - try: - await all_defined_tasks # Raise Exceptions from the bottom half. - finally: - self._cleanup() - self._set_state(Runstate.IDLE) - - @upper_half - def _cleanup(self) -> None: - """ - Fully reset this object to a clean state and return to `IDLE`. - """ - def _paranoid_task_erase(task: Optional['asyncio.Future[_U]'] - ) -> Optional['asyncio.Future[_U]']: - # Help to erase a task, ENSURING it is fully quiesced first. - assert (task is None) or task.done() - return None if (task and task.done()) else task - - assert self.runstate == Runstate.DISCONNECTING - self._dc_task = _paranoid_task_erase(self._dc_task) - self._reader_task = _paranoid_task_erase(self._reader_task) - self._writer_task = _paranoid_task_erase(self._writer_task) - self._bh_tasks = _paranoid_task_erase(self._bh_tasks) - - self._reader = None - self._writer = None - self._accepted = None - - # NB: _runstate_changed cannot be cleared because we still need it to - # send the final runstate changed event ...! - - # ---------------------------- - # Section: Bottom Half methods - # ---------------------------- - - @bottom_half - async def _bh_disconnect(self) -> None: - """ - Disconnect and cancel all outstanding tasks. - - It is designed to be called from its task context, - :py:obj:`~AsyncProtocol._dc_task`. By running in its own task, - it is free to wait on any pending actions that may still need to - occur in either the reader or writer tasks. - """ - assert self.runstate == Runstate.DISCONNECTING - - def _done(task: Optional['asyncio.Future[Any]']) -> bool: - return task is not None and task.done() - - # If the server is running, stop it. - await self._stop_server() - - # Are we already in an error pathway? If either of the tasks are - # already done, or if we have no tasks but a reader/writer; we - # must be. - # - # NB: We can't use _bh_tasks to check for premature task - # completion, because it may not yet have had a chance to run - # and gather itself. - tasks = tuple(filter(None, (self._writer_task, self._reader_task))) - error_pathway = _done(self._reader_task) or _done(self._writer_task) - if not tasks: - error_pathway |= bool(self._reader) or bool(self._writer) - - try: - # Try to flush the writer, if possible. - # This *may* cause an error and force us over into the error path. - if not error_pathway: - await self._bh_flush_writer() - except BaseException as err: - error_pathway = True - emsg = "Failed to flush the writer" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - finally: - # Cancel any still-running tasks (Won't raise): - if self._writer_task is not None and not self._writer_task.done(): - self.logger.debug("Cancelling writer task.") - self._writer_task.cancel() - if self._reader_task is not None and not self._reader_task.done(): - self.logger.debug("Cancelling reader task.") - self._reader_task.cancel() - - # Close out the tasks entirely (Won't raise): - if tasks: - self.logger.debug("Waiting for tasks to complete ...") - await asyncio.wait(tasks) - - # Lastly, close the stream itself. (*May raise*!): - await self._bh_close_stream(error_pathway) - self.logger.debug("Disconnected.") - - @bottom_half - async def _bh_flush_writer(self) -> None: - if not self._writer_task: - return - - self.logger.debug("Draining the outbound queue ...") - await self._outgoing.join() - if self._writer is not None: - self.logger.debug("Flushing the StreamWriter ...") - await flush(self._writer) - - @bottom_half - async def _bh_close_stream(self, error_pathway: bool = False) -> None: - # NB: Closing the writer also implcitly closes the reader. - if not self._writer: - return - - if not is_closing(self._writer): - self.logger.debug("Closing StreamWriter.") - self._writer.close() - - self.logger.debug("Waiting for StreamWriter to close ...") - try: - await wait_closed(self._writer) - except Exception: # pylint: disable=broad-except - # It's hard to tell if the Stream is already closed or - # not. Even if one of the tasks has failed, it may have - # failed for a higher-layered protocol reason. The - # stream could still be open and perfectly fine. - # I don't know how to discern its health here. - - if error_pathway: - # We already know that *something* went wrong. Let's - # just trust that the Exception we already have is the - # better one to present to the user, even if we don't - # genuinely *know* the relationship between the two. - self.logger.debug( - "Discarding Exception from wait_closed:\n%s\n", - pretty_traceback(), - ) - else: - # Oops, this is a brand-new error! - raise - finally: - self.logger.debug("StreamWriter closed.") - - @bottom_half - async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None: - """ - Run one of the bottom-half methods in a loop forever. - - If the bottom half ever raises any exception, schedule a - disconnect that will terminate the entire loop. - - :param async_fn: The bottom-half method to run in a loop. - :param name: The name of this task, used for logging. - """ - try: - while True: - await async_fn() - except asyncio.CancelledError: - # We have been cancelled by _bh_disconnect, exit gracefully. - self.logger.debug("Task.%s: cancelled.", name) - return - except BaseException as err: - self.logger.log( - logging.INFO if isinstance(err, EOFError) else logging.ERROR, - "Task.%s: %s", - name, exception_summary(err) - ) - self.logger.debug("Task.%s: failure:\n%s\n", - name, pretty_traceback()) - self._schedule_disconnect() - raise - finally: - self.logger.debug("Task.%s: exiting.", name) - - @bottom_half - async def _bh_send_message(self) -> None: - """ - Wait for an outgoing message, then send it. - - Designed to be run in `_bh_loop_forever()`. - """ - msg = await self._outgoing.get() - try: - await self._send(msg) - finally: - self._outgoing.task_done() - - @bottom_half - async def _bh_recv_message(self) -> None: - """ - Wait for an incoming message and call `_on_message` to route it. - - Designed to be run in `_bh_loop_forever()`. - """ - msg = await self._recv() - await self._on_message(msg) - - # -------------------- - # Section: Message I/O - # -------------------- - - @upper_half - @bottom_half - def _cb_outbound(self, msg: T) -> T: - """ - Callback: outbound message hook. - - This is intended for subclasses to be able to add arbitrary - hooks to filter or manipulate outgoing messages. The base - implementation does nothing but log the message without any - manipulation of the message. - - :param msg: raw outbound message - :return: final outbound message - """ - self.logger.debug("--> %s", str(msg)) - return msg - - @upper_half - @bottom_half - def _cb_inbound(self, msg: T) -> T: - """ - Callback: inbound message hook. - - This is intended for subclasses to be able to add arbitrary - hooks to filter or manipulate incoming messages. The base - implementation does nothing but log the message without any - manipulation of the message. - - This method does not "handle" incoming messages; it is a filter. - The actual "endpoint" for incoming messages is `_on_message()`. - - :param msg: raw inbound message - :return: processed inbound message - """ - self.logger.debug("<-- %s", str(msg)) - return msg - - @upper_half - @bottom_half - async def _readline(self) -> bytes: - """ - Wait for a newline from the incoming reader. - - This method is provided as a convenience for upper-layer - protocols, as many are line-based. - - This method *may* return a sequence of bytes without a trailing - newline if EOF occurs, but *some* bytes were received. In this - case, the next call will raise `EOFError`. It is assumed that - the layer 5 protocol will decide if there is anything meaningful - to be done with a partial message. - - :raise OSError: For stream-related errors. - :raise EOFError: - If the reader stream is at EOF and there are no bytes to return. - :return: bytes, including the newline. - """ - assert self._reader is not None - msg_bytes = await self._reader.readline() - - if not msg_bytes: - if self._reader.at_eof(): - raise EOFError - - return msg_bytes - - @upper_half - @bottom_half - async def _do_recv(self) -> T: - """ - Abstract: Read from the stream and return a message. - - Very low-level; intended to only be called by `_recv()`. - """ - raise NotImplementedError - - @upper_half - @bottom_half - async def _recv(self) -> T: - """ - Read an arbitrary protocol message. - - .. warning:: - This method is intended primarily for `_bh_recv_message()` - to use in an asynchronous task loop. Using it outside of - this loop will "steal" messages from the normal routing - mechanism. It is safe to use prior to `_establish_session()`, - but should not be used otherwise. - - This method uses `_do_recv()` to retrieve the raw message, and - then transforms it using `_cb_inbound()`. - - :return: A single (filtered, processed) protocol message. - """ - message = await self._do_recv() - return self._cb_inbound(message) - - @upper_half - @bottom_half - def _do_send(self, msg: T) -> None: - """ - Abstract: Write a message to the stream. - - Very low-level; intended to only be called by `_send()`. - """ - raise NotImplementedError - - @upper_half - @bottom_half - async def _send(self, msg: T) -> None: - """ - Send an arbitrary protocol message. - - This method will transform any outgoing messages according to - `_cb_outbound()`. - - .. warning:: - Like `_recv()`, this method is intended to be called by - the writer task loop that processes outgoing - messages. Calling it directly may circumvent logic - implemented by the caller meant to correlate outgoing and - incoming messages. - - :raise OSError: For problems with the underlying stream. - """ - msg = self._cb_outbound(msg) - self._do_send(msg) - - @bottom_half - async def _on_message(self, msg: T) -> None: - """ - Called to handle the receipt of a new message. - - .. caution:: - This is executed from within the reader loop, so be advised - that waiting on either the reader or writer task will lead - to deadlock. Additionally, any unhandled exceptions will - directly cause the loop to halt, so logic may be best-kept - to a minimum if at all possible. - - :param msg: The incoming message, already logged/filtered. - """ - # Nothing to do in the abstract case. diff --git a/python/qemu/aqmp/py.typed b/python/qemu/aqmp/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py deleted file mode 100644 index 90a8737..0000000 --- a/python/qemu/aqmp/qmp_client.py +++ /dev/null @@ -1,655 +0,0 @@ -""" -QMP Protocol Implementation - -This module provides the `QMPClient` class, which can be used to connect -and send commands to a QMP server such as QEMU. The QMP class can be -used to either connect to a listening server, or used to listen and -accept an incoming connection from that server. -""" - -import asyncio -import logging -import socket -import struct -from typing import ( - Dict, - List, - Mapping, - Optional, - Union, - cast, -) - -from .error import ProtocolError, QMPError -from .events import Events -from .message import Message -from .models import ErrorResponse, Greeting -from .protocol import AsyncProtocol, Runstate, require -from .util import ( - bottom_half, - exception_summary, - pretty_traceback, - upper_half, -) - - -class _WrappedProtocolError(ProtocolError): - """ - Abstract exception class for Protocol errors that wrap an Exception. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - def __init__(self, error_message: str, exc: Exception): - super().__init__(error_message) - self.exc = exc - - def __str__(self) -> str: - return f"{self.error_message}: {self.exc!s}" - - -class GreetingError(_WrappedProtocolError): - """ - An exception occurred during the Greeting phase. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - - -class NegotiationError(_WrappedProtocolError): - """ - An exception occurred during the Negotiation phase. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - - -class ExecuteError(QMPError): - """ - Exception raised by `QMPClient.execute()` on RPC failure. - - :param error_response: The RPC error response object. - :param sent: The sent RPC message that caused the failure. - :param received: The raw RPC error reply received. - """ - def __init__(self, error_response: ErrorResponse, - sent: Message, received: Message): - super().__init__(error_response.error.desc) - #: The sent `Message` that caused the failure - self.sent: Message = sent - #: The received `Message` that indicated failure - self.received: Message = received - #: The parsed error response - self.error: ErrorResponse = error_response - #: The QMP error class - self.error_class: str = error_response.error.class_ - - -class ExecInterruptedError(QMPError): - """ - Exception raised by `execute()` (et al) when an RPC is interrupted. - - This error is raised when an `execute()` statement could not be - completed. This can occur because the connection itself was - terminated before a reply was received. - - The true cause of the interruption will be available via `disconnect()`. - """ - - -class _MsgProtocolError(ProtocolError): - """ - Abstract error class for protocol errors that have a `Message` object. - - This Exception class is used for protocol errors where the `Message` - was mechanically understood, but was found to be inappropriate or - malformed. - - :param error_message: Human-readable string describing the error. - :param msg: The QMP `Message` that caused the error. - """ - def __init__(self, error_message: str, msg: Message): - super().__init__(error_message) - #: The received `Message` that caused the error. - self.msg: Message = msg - - def __str__(self) -> str: - return "\n".join([ - super().__str__(), - f" Message was: {str(self.msg)}\n", - ]) - - -class ServerParseError(_MsgProtocolError): - """ - The Server sent a `Message` indicating parsing failure. - - i.e. A reply has arrived from the server, but it is missing the "ID" - field, indicating a parsing error. - - :param error_message: Human-readable string describing the error. - :param msg: The QMP `Message` that caused the error. - """ - - -class BadReplyError(_MsgProtocolError): - """ - An execution reply was successfully routed, but not understood. - - If a QMP message is received with an 'id' field to allow it to be - routed, but is otherwise malformed, this exception will be raised. - - A reply message is malformed if it is missing either the 'return' or - 'error' keys, or if the 'error' value has missing keys or members of - the wrong type. - - :param error_message: Human-readable string describing the error. - :param msg: The malformed reply that was received. - :param sent: The message that was sent that prompted the error. - """ - def __init__(self, error_message: str, msg: Message, sent: Message): - super().__init__(error_message, msg) - #: The sent `Message` that caused the failure - self.sent = sent - - -class QMPClient(AsyncProtocol[Message], Events): - """ - Implements a QMP client connection. - - QMP can be used to establish a connection as either the transport - client or server, though this class always acts as the QMP client. - - :param name: Optional nickname for the connection, used for logging. - - Basic script-style usage looks like this:: - - qmp = QMPClient('my_virtual_machine_name') - await qmp.connect(('127.0.0.1', 1234)) - ... - res = await qmp.execute('block-query') - ... - await qmp.disconnect() - - Basic async client-style usage looks like this:: - - class Client: - def __init__(self, name: str): - self.qmp = QMPClient(name) - - async def watch_events(self): - try: - async for event in self.qmp.events: - print(f"Event: {event['event']}") - except asyncio.CancelledError: - return - - async def run(self, address='/tmp/qemu.socket'): - await self.qmp.connect(address) - asyncio.create_task(self.watch_events()) - await self.qmp.runstate_changed.wait() - await self.disconnect() - - See `aqmp.events` for more detail on event handling patterns. - """ - #: Logger object used for debugging messages. - logger = logging.getLogger(__name__) - - # Read buffer limit; large enough to accept query-qmp-schema - _limit = (256 * 1024) - - # Type alias for pending execute() result items - _PendingT = Union[Message, ExecInterruptedError] - - def __init__(self, name: Optional[str] = None) -> None: - super().__init__(name) - Events.__init__(self) - - #: Whether or not to await a greeting after establishing a connection. - self.await_greeting: bool = True - - #: Whether or not to perform capabilities negotiation upon connection. - #: Implies `await_greeting`. - self.negotiate: bool = True - - # Cached Greeting, if one was awaited. - self._greeting: Optional[Greeting] = None - - # Command ID counter - self._execute_id = 0 - - # Incoming RPC reply messages. - self._pending: Dict[ - Union[str, None], - 'asyncio.Queue[QMPClient._PendingT]' - ] = {} - - @property - def greeting(self) -> Optional[Greeting]: - """The `Greeting` from the QMP server, if any.""" - return self._greeting - - @upper_half - async def _establish_session(self) -> None: - """ - Initiate the QMP session. - - Wait for the QMP greeting and perform capabilities negotiation. - - :raise GreetingError: When the greeting is not understood. - :raise NegotiationError: If the negotiation fails. - :raise EOFError: When the server unexpectedly hangs up. - :raise OSError: For underlying stream errors. - """ - self._greeting = None - self._pending = {} - - if self.await_greeting or self.negotiate: - self._greeting = await self._get_greeting() - - if self.negotiate: - await self._negotiate() - - # This will start the reader/writers: - await super()._establish_session() - - @upper_half - async def _get_greeting(self) -> Greeting: - """ - :raise GreetingError: When the greeting is not understood. - :raise EOFError: When the server unexpectedly hangs up. - :raise OSError: For underlying stream errors. - - :return: the Greeting object given by the server. - """ - self.logger.debug("Awaiting greeting ...") - - try: - msg = await self._recv() - return Greeting(msg) - except (ProtocolError, KeyError, TypeError) as err: - emsg = "Did not understand Greeting" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise GreetingError(emsg, err) from err - except BaseException as err: - # EOFError, OSError, or something unexpected. - emsg = "Failed to receive Greeting" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - - @upper_half - async def _negotiate(self) -> None: - """ - Perform QMP capabilities negotiation. - - :raise NegotiationError: When negotiation fails. - :raise EOFError: When the server unexpectedly hangs up. - :raise OSError: For underlying stream errors. - """ - self.logger.debug("Negotiating capabilities ...") - - arguments: Dict[str, List[str]] = {} - if self._greeting and 'oob' in self._greeting.QMP.capabilities: - arguments.setdefault('enable', []).append('oob') - msg = self.make_execute_msg('qmp_capabilities', arguments=arguments) - - # It's not safe to use execute() here, because the reader/writers - # aren't running. AsyncProtocol *requires* that a new session - # does not fail after the reader/writers are running! - try: - await self._send(msg) - reply = await self._recv() - assert 'return' in reply - assert 'error' not in reply - except (ProtocolError, AssertionError) as err: - emsg = "Negotiation failed" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise NegotiationError(emsg, err) from err - except BaseException as err: - # EOFError, OSError, or something unexpected. - emsg = "Negotiation failed" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - - @bottom_half - async def _bh_disconnect(self) -> None: - try: - await super()._bh_disconnect() - finally: - if self._pending: - self.logger.debug("Cancelling pending executions") - keys = self._pending.keys() - for key in keys: - self.logger.debug("Cancelling execution '%s'", key) - self._pending[key].put_nowait( - ExecInterruptedError("Disconnected") - ) - - self.logger.debug("QMP Disconnected.") - - @upper_half - def _cleanup(self) -> None: - super()._cleanup() - assert not self._pending - - @bottom_half - async def _on_message(self, msg: Message) -> None: - """ - Add an incoming message to the appropriate queue/handler. - - :raise ServerParseError: When Message indicates server parse failure. - """ - # Incoming messages are not fully parsed/validated here; - # do only light peeking to know how to route the messages. - - if 'event' in msg: - await self._event_dispatch(msg) - return - - # Below, we assume everything left is an execute/exec-oob response. - - exec_id = cast(Optional[str], msg.get('id')) - - if exec_id in self._pending: - await self._pending[exec_id].put(msg) - return - - # We have a message we can't route back to a caller. - - is_error = 'error' in msg - has_id = 'id' in msg - - if is_error and not has_id: - # This is very likely a server parsing error. - # It doesn't inherently belong to any pending execution. - # Instead of performing clever recovery, just terminate. - # See "NOTE" in qmp-spec.txt, section 2.4.2 - raise ServerParseError( - ("Server sent an error response without an ID, " - "but there are no ID-less executions pending. " - "Assuming this is a server parser failure."), - msg - ) - - # qmp-spec.txt, section 2.4: - # 'Clients should drop all the responses - # that have an unknown "id" field.' - self.logger.log( - logging.ERROR if is_error else logging.WARNING, - "Unknown ID '%s', message dropped.", - exec_id, - ) - self.logger.debug("Unroutable message: %s", str(msg)) - - @upper_half - @bottom_half - async def _do_recv(self) -> Message: - """ - :raise OSError: When a stream error is encountered. - :raise EOFError: When the stream is at EOF. - :raise ProtocolError: - When the Message is not understood. - See also `Message._deserialize`. - - :return: A single QMP `Message`. - """ - msg_bytes = await self._readline() - msg = Message(msg_bytes, eager=True) - return msg - - @upper_half - @bottom_half - def _do_send(self, msg: Message) -> None: - """ - :raise ValueError: JSON serialization failure - :raise TypeError: JSON serialization failure - :raise OSError: When a stream error is encountered. - """ - assert self._writer is not None - self._writer.write(bytes(msg)) - - @upper_half - def _get_exec_id(self) -> str: - exec_id = f"__aqmp#{self._execute_id:05d}" - self._execute_id += 1 - return exec_id - - @upper_half - async def _issue(self, msg: Message) -> Union[None, str]: - """ - Issue a QMP `Message` and do not wait for a reply. - - :param msg: The QMP `Message` to send to the server. - - :return: The ID of the `Message` sent. - """ - msg_id: Optional[str] = None - if 'id' in msg: - assert isinstance(msg['id'], str) - msg_id = msg['id'] - - self._pending[msg_id] = asyncio.Queue(maxsize=1) - try: - await self._outgoing.put(msg) - except: - del self._pending[msg_id] - raise - - return msg_id - - @upper_half - async def _reply(self, msg_id: Union[str, None]) -> Message: - """ - Await a reply to a previously issued QMP message. - - :param msg_id: The ID of the previously issued message. - - :return: The reply from the server. - :raise ExecInterruptedError: - When the reply could not be retrieved because the connection - was lost, or some other problem. - """ - queue = self._pending[msg_id] - - try: - result = await queue.get() - if isinstance(result, ExecInterruptedError): - raise result - return result - finally: - del self._pending[msg_id] - - @upper_half - async def _execute(self, msg: Message, assign_id: bool = True) -> Message: - """ - Send a QMP `Message` to the server and await a reply. - - This method *assumes* you are sending some kind of an execute - statement that *will* receive a reply. - - An execution ID will be assigned if assign_id is `True`. It can be - disabled, but this requires that an ID is manually assigned - instead. For manually assigned IDs, you must not use the string - '__aqmp#' anywhere in the ID. - - :param msg: The QMP `Message` to execute. - :param assign_id: If True, assign a new execution ID. - - :return: Execution reply from the server. - :raise ExecInterruptedError: - When the reply could not be retrieved because the connection - was lost, or some other problem. - """ - if assign_id: - msg['id'] = self._get_exec_id() - elif 'id' in msg: - assert isinstance(msg['id'], str) - assert '__aqmp#' not in msg['id'] - - exec_id = await self._issue(msg) - return await self._reply(exec_id) - - @upper_half - @require(Runstate.RUNNING) - async def _raw( - self, - msg: Union[Message, Mapping[str, object], bytes], - assign_id: bool = True, - ) -> Message: - """ - Issue a raw `Message` to the QMP server and await a reply. - - :param msg: - A Message to send to the server. It may be a `Message`, any - Mapping (including Dict), or raw bytes. - :param assign_id: - Assign an arbitrary execution ID to this message. If - `False`, the existing id must either be absent (and no other - such pending execution may omit an ID) or a string. If it is - a string, it must not start with '__aqmp#' and no other such - pending execution may currently be using that ID. - - :return: Execution reply from the server. - - :raise ExecInterruptedError: - When the reply could not be retrieved because the connection - was lost, or some other problem. - :raise TypeError: - When assign_id is `False`, an ID is given, and it is not a string. - :raise ValueError: - When assign_id is `False`, but the ID is not usable; - Either because it starts with '__aqmp#' or it is already in-use. - """ - # 1. convert generic Mapping or bytes to a QMP Message - # 2. copy Message objects so that we assign an ID only to the copy. - msg = Message(msg) - - exec_id = msg.get('id') - if not assign_id and 'id' in msg: - if not isinstance(exec_id, str): - raise TypeError(f"ID ('{exec_id}') must be a string.") - if exec_id.startswith('__aqmp#'): - raise ValueError( - f"ID ('{exec_id}') must not start with '__aqmp#'." - ) - - if not assign_id and exec_id in self._pending: - raise ValueError( - f"ID '{exec_id}' is in-use and cannot be used." - ) - - return await self._execute(msg, assign_id=assign_id) - - @upper_half - @require(Runstate.RUNNING) - async def execute_msg(self, msg: Message) -> object: - """ - Execute a QMP command and return its value. - - :param msg: The QMP `Message` to execute. - - :return: - The command execution return value from the server. The type of - object returned depends on the command that was issued, - though most in QEMU return a `dict`. - :raise ValueError: - If the QMP `Message` does not have either the 'execute' or - 'exec-oob' fields set. - :raise ExecuteError: When the server returns an error response. - :raise ExecInterruptedError: if the connection was terminated early. - """ - if not ('execute' in msg or 'exec-oob' in msg): - raise ValueError("Requires 'execute' or 'exec-oob' message") - - # Copy the Message so that the ID assigned by _execute() is - # local to this method; allowing the ID to be seen in raised - # Exceptions but without modifying the caller's held copy. - msg = Message(msg) - reply = await self._execute(msg) - - if 'error' in reply: - try: - error_response = ErrorResponse(reply) - except (KeyError, TypeError) as err: - # Error response was malformed. - raise BadReplyError( - "QMP error reply is malformed", reply, msg, - ) from err - - raise ExecuteError(error_response, msg, reply) - - if 'return' not in reply: - raise BadReplyError( - "QMP reply is missing a 'error' or 'return' member", - reply, msg, - ) - - return reply['return'] - - @classmethod - def make_execute_msg(cls, cmd: str, - arguments: Optional[Mapping[str, object]] = None, - oob: bool = False) -> Message: - """ - Create an executable message to be sent by `execute_msg` later. - - :param cmd: QMP command name. - :param arguments: Arguments (if any). Must be JSON-serializable. - :param oob: If `True`, execute "out of band". - - :return: An executable QMP `Message`. - """ - msg = Message({'exec-oob' if oob else 'execute': cmd}) - if arguments is not None: - msg['arguments'] = arguments - return msg - - @upper_half - async def execute(self, cmd: str, - arguments: Optional[Mapping[str, object]] = None, - oob: bool = False) -> object: - """ - Execute a QMP command and return its value. - - :param cmd: QMP command name. - :param arguments: Arguments (if any). Must be JSON-serializable. - :param oob: If `True`, execute "out of band". - - :return: - The command execution return value from the server. The type of - object returned depends on the command that was issued, - though most in QEMU return a `dict`. - :raise ExecuteError: When the server returns an error response. - :raise ExecInterruptedError: if the connection was terminated early. - """ - msg = self.make_execute_msg(cmd, arguments, oob=oob) - return await self.execute_msg(msg) - - @upper_half - @require(Runstate.RUNNING) - def send_fd_scm(self, fd: int) -> None: - """ - Send a file descriptor to the remote via SCM_RIGHTS. - """ - assert self._writer is not None - sock = self._writer.transport.get_extra_info('socket') - - if sock.family != socket.AF_UNIX: - raise QMPError("Sending file descriptors requires a UNIX socket.") - - if not hasattr(sock, 'sendmsg'): - # We need to void the warranty sticker. - # Access to sendmsg is scheduled for removal in Python 3.11. - # Find the real backing socket to use it anyway. - sock = sock._sock # pylint: disable=protected-access - - sock.sendmsg( - [b' '], - [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))] - ) diff --git a/python/qemu/aqmp/qmp_shell.py b/python/qemu/aqmp/qmp_shell.py deleted file mode 100644 index c23f1b1..0000000 --- a/python/qemu/aqmp/qmp_shell.py +++ /dev/null @@ -1,610 +0,0 @@ -# -# Copyright (C) 2009-2022 Red Hat Inc. -# -# Authors: -# Luiz Capitulino -# John Snow -# -# This work is licensed under the terms of the GNU LGPL, version 2 or -# later. See the COPYING file in the top-level directory. -# - -""" -Low-level QEMU shell on top of QMP. - -usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server - -positional arguments: - qmp_server < UNIX socket path | TCP address:port > - -optional arguments: - -h, --help show this help message and exit - -H, --hmp Use HMP interface - -N, --skip-negotiation - Skip negotiate (for qemu-ga) - -v, --verbose Verbose (echo commands sent and received) - -p, --pretty Pretty-print JSON - - -Start QEMU with: - -# qemu [...] -qmp unix:./qmp-sock,server - -Run the shell: - -$ qmp-shell ./qmp-sock - -Commands have the following format: - - < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] - -For example: - -(QEMU) device_add driver=e1000 id=net1 -{'return': {}} -(QEMU) - -key=value pairs also support Python or JSON object literal subset notations, -without spaces. Dictionaries/objects {} are supported as are arrays []. - - example-command arg-name1={'key':'value','obj'={'prop':"value"}} - -Both JSON and Python formatting should work, including both styles of -string literal quotes. Both paradigms of literal values should work, -including null/true/false for JSON and None/True/False for Python. - - -Transactions have the following multi-line format: - - transaction( - action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] - ... - action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] - ) - -One line transactions are also supported: - - transaction( action-name1 ... ) - -For example: - - (QEMU) transaction( - TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 - TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 - TRANS> ) - {"return": {}} - (QEMU) - -Use the -v and -p options to activate the verbose and pretty-print options, -which will echo back the properly formatted JSON-compliant QMP that is being -sent to QEMU, which is useful for debugging and documentation generation. -""" - -import argparse -import ast -import json -import logging -import os -import re -import readline -from subprocess import Popen -import sys -from typing import ( - IO, - Iterator, - List, - NoReturn, - Optional, - Sequence, -) - -from qemu.aqmp import ConnectError, QMPError, SocketAddrT -from qemu.aqmp.legacy import ( - QEMUMonitorProtocol, - QMPBadPortError, - QMPMessage, - QMPObject, -) - - -LOG = logging.getLogger(__name__) - - -class QMPCompleter: - """ - QMPCompleter provides a readline library tab-complete behavior. - """ - # NB: Python 3.9+ will probably allow us to subclass list[str] directly, - # but pylint as of today does not know that List[str] is simply 'list'. - def __init__(self) -> None: - self._matches: List[str] = [] - - def append(self, value: str) -> None: - """Append a new valid completion to the list of possibilities.""" - return self._matches.append(value) - - def complete(self, text: str, state: int) -> Optional[str]: - """readline.set_completer() callback implementation.""" - for cmd in self._matches: - if cmd.startswith(text): - if state == 0: - return cmd - state -= 1 - return None - - -class QMPShellError(QMPError): - """ - QMP Shell Base error class. - """ - - -class FuzzyJSON(ast.NodeTransformer): - """ - This extension of ast.NodeTransformer filters literal "true/false/null" - values in a Python AST and replaces them by proper "True/False/None" values - that Python can properly evaluate. - """ - - @classmethod - def visit_Name(cls, # pylint: disable=invalid-name - node: ast.Name) -> ast.AST: - """ - Transform Name nodes with certain values into Constant (keyword) nodes. - """ - if node.id == 'true': - return ast.Constant(value=True) - if node.id == 'false': - return ast.Constant(value=False) - if node.id == 'null': - return ast.Constant(value=None) - return node - - -class QMPShell(QEMUMonitorProtocol): - """ - QMPShell provides a basic readline-based QMP shell. - - :param address: Address of the QMP server. - :param pretty: Pretty-print QMP messages. - :param verbose: Echo outgoing QMP messages to console. - """ - def __init__(self, address: SocketAddrT, - pretty: bool = False, - verbose: bool = False, - server: bool = False, - logfile: Optional[str] = None): - super().__init__(address, server=server) - self._greeting: Optional[QMPMessage] = None - self._completer = QMPCompleter() - self._transmode = False - self._actions: List[QMPMessage] = [] - self._histfile = os.path.join(os.path.expanduser('~'), - '.qmp-shell_history') - self.pretty = pretty - self.verbose = verbose - self.logfile = None - - if logfile is not None: - self.logfile = open(logfile, "w", encoding='utf-8') - - def close(self) -> None: - # Hook into context manager of parent to save shell history. - self._save_history() - super().close() - - def _fill_completion(self) -> None: - cmds = self.cmd('query-commands') - if 'error' in cmds: - return - for cmd in cmds['return']: - self._completer.append(cmd['name']) - - def _completer_setup(self) -> None: - self._completer = QMPCompleter() - self._fill_completion() - readline.set_history_length(1024) - readline.set_completer(self._completer.complete) - readline.parse_and_bind("tab: complete") - # NB: default delimiters conflict with some command names - # (eg. query-), clearing everything as it doesn't seem to matter - readline.set_completer_delims('') - try: - readline.read_history_file(self._histfile) - except FileNotFoundError: - pass - except IOError as err: - msg = f"Failed to read history '{self._histfile}': {err!s}" - LOG.warning(msg) - - def _save_history(self) -> None: - try: - readline.write_history_file(self._histfile) - except IOError as err: - msg = f"Failed to save history file '{self._histfile}': {err!s}" - LOG.warning(msg) - - @classmethod - def _parse_value(cls, val: str) -> object: - try: - return int(val) - except ValueError: - pass - - if val.lower() == 'true': - return True - if val.lower() == 'false': - return False - if val.startswith(('{', '[')): - # Try first as pure JSON: - try: - return json.loads(val) - except ValueError: - pass - # Try once again as FuzzyJSON: - try: - tree = ast.parse(val, mode='eval') - transformed = FuzzyJSON().visit(tree) - return ast.literal_eval(transformed) - except (SyntaxError, ValueError): - pass - return val - - def _cli_expr(self, - tokens: Sequence[str], - parent: QMPObject) -> None: - for arg in tokens: - (key, sep, val) = arg.partition('=') - if sep != '=': - raise QMPShellError( - f"Expected a key=value pair, got '{arg!s}'" - ) - - value = self._parse_value(val) - optpath = key.split('.') - curpath = [] - for path in optpath[:-1]: - curpath.append(path) - obj = parent.get(path, {}) - if not isinstance(obj, dict): - msg = 'Cannot use "{:s}" as both leaf and non-leaf key' - raise QMPShellError(msg.format('.'.join(curpath))) - parent[path] = obj - parent = obj - if optpath[-1] in parent: - if isinstance(parent[optpath[-1]], dict): - msg = 'Cannot use "{:s}" as both leaf and non-leaf key' - raise QMPShellError(msg.format('.'.join(curpath))) - raise QMPShellError(f'Cannot set "{key}" multiple times') - parent[optpath[-1]] = value - - def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: - """ - Build a QMP input object from a user provided command-line in the - following format: - - < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] - """ - argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' - cmdargs = re.findall(argument_regex, cmdline) - qmpcmd: QMPMessage - - # Transactional CLI entry: - if cmdargs and cmdargs[0] == 'transaction(': - self._transmode = True - self._actions = [] - cmdargs.pop(0) - - # Transactional CLI exit: - if cmdargs and cmdargs[0] == ')' and self._transmode: - self._transmode = False - if len(cmdargs) > 1: - msg = 'Unexpected input after close of Transaction sub-shell' - raise QMPShellError(msg) - qmpcmd = { - 'execute': 'transaction', - 'arguments': {'actions': self._actions} - } - return qmpcmd - - # No args, or no args remaining - if not cmdargs: - return None - - if self._transmode: - # Parse and cache this Transactional Action - finalize = False - action = {'type': cmdargs[0], 'data': {}} - if cmdargs[-1] == ')': - cmdargs.pop(-1) - finalize = True - self._cli_expr(cmdargs[1:], action['data']) - self._actions.append(action) - return self._build_cmd(')') if finalize else None - - # Standard command: parse and return it to be executed. - qmpcmd = {'execute': cmdargs[0], 'arguments': {}} - self._cli_expr(cmdargs[1:], qmpcmd['arguments']) - return qmpcmd - - def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None: - jsobj = json.dumps(qmp_message, - indent=4 if self.pretty else None, - sort_keys=self.pretty) - print(str(jsobj), file=fh) - - def _execute_cmd(self, cmdline: str) -> bool: - try: - qmpcmd = self._build_cmd(cmdline) - except QMPShellError as err: - print( - f"Error while parsing command line: {err!s}\n" - "command format: " - "[arg-name1=arg1] ... [arg-nameN=argN", - file=sys.stderr - ) - return True - # For transaction mode, we may have just cached the action: - if qmpcmd is None: - return True - if self.verbose: - self._print(qmpcmd) - resp = self.cmd_obj(qmpcmd) - if resp is None: - print('Disconnected') - return False - self._print(resp) - if self.logfile is not None: - cmd = {**qmpcmd, **resp} - self._print(cmd, fh=self.logfile) - return True - - def connect(self, negotiate: bool = True) -> None: - self._greeting = super().connect(negotiate) - self._completer_setup() - - def show_banner(self, - msg: str = 'Welcome to the QMP low-level shell!') -> None: - """ - Print to stdio a greeting, and the QEMU version if available. - """ - print(msg) - if not self._greeting: - print('Connected') - return - version = self._greeting['QMP']['version']['qemu'] - print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) - - @property - def prompt(self) -> str: - """ - Return the current shell prompt, including a trailing space. - """ - if self._transmode: - return 'TRANS> ' - return '(QEMU) ' - - def read_exec_command(self) -> bool: - """ - Read and execute a command. - - @return True if execution was ok, return False if disconnected. - """ - try: - cmdline = input(self.prompt) - except EOFError: - print() - return False - - if cmdline == '': - for event in self.get_events(): - print(event) - return True - - return self._execute_cmd(cmdline) - - def repl(self) -> Iterator[None]: - """ - Return an iterator that implements the REPL. - """ - self.show_banner() - while self.read_exec_command(): - yield - self.close() - - -class HMPShell(QMPShell): - """ - HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. - - :param address: Address of the QMP server. - :param pretty: Pretty-print QMP messages. - :param verbose: Echo outgoing QMP messages to console. - """ - def __init__(self, address: SocketAddrT, - pretty: bool = False, - verbose: bool = False, - server: bool = False, - logfile: Optional[str] = None): - super().__init__(address, pretty, verbose, server, logfile) - self._cpu_index = 0 - - def _cmd_completion(self) -> None: - for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): - if cmd and cmd[0] != '[' and cmd[0] != '\t': - name = cmd.split()[0] # drop help text - if name == 'info': - continue - if name.find('|') != -1: - # Command in the form 'foobar|f' or 'f|foobar', take the - # full name - opt = name.split('|') - if len(opt[0]) == 1: - name = opt[1] - else: - name = opt[0] - self._completer.append(name) - self._completer.append('help ' + name) # help completion - - def _info_completion(self) -> None: - for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): - if cmd: - self._completer.append('info ' + cmd.split()[1]) - - def _other_completion(self) -> None: - # special cases - self._completer.append('help info') - - def _fill_completion(self) -> None: - self._cmd_completion() - self._info_completion() - self._other_completion() - - def _cmd_passthrough(self, cmdline: str, - cpu_index: int = 0) -> QMPMessage: - return self.cmd_obj({ - 'execute': 'human-monitor-command', - 'arguments': { - 'command-line': cmdline, - 'cpu-index': cpu_index - } - }) - - def _execute_cmd(self, cmdline: str) -> bool: - if cmdline.split()[0] == "cpu": - # trap the cpu command, it requires special setting - try: - idx = int(cmdline.split()[1]) - if 'return' not in self._cmd_passthrough('info version', idx): - print('bad CPU index') - return True - self._cpu_index = idx - except ValueError: - print('cpu command takes an integer argument') - return True - resp = self._cmd_passthrough(cmdline, self._cpu_index) - if resp is None: - print('Disconnected') - return False - assert 'return' in resp or 'error' in resp - if 'return' in resp: - # Success - if len(resp['return']) > 0: - print(resp['return'], end=' ') - else: - # Error - print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) - return True - - def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: - QMPShell.show_banner(self, msg) - - -def die(msg: str) -> NoReturn: - """Write an error to stderr, then exit with a return code of 1.""" - sys.stderr.write('ERROR: %s\n' % msg) - sys.exit(1) - - -def main() -> None: - """ - qmp-shell entry point: parse command line arguments and start the REPL. - """ - parser = argparse.ArgumentParser() - parser.add_argument('-H', '--hmp', action='store_true', - help='Use HMP interface') - parser.add_argument('-N', '--skip-negotiation', action='store_true', - help='Skip negotiate (for qemu-ga)') - parser.add_argument('-v', '--verbose', action='store_true', - help='Verbose (echo commands sent and received)') - parser.add_argument('-p', '--pretty', action='store_true', - help='Pretty-print JSON') - parser.add_argument('-l', '--logfile', - help='Save log of all QMP messages to PATH') - - default_server = os.environ.get('QMP_SOCKET') - parser.add_argument('qmp_server', action='store', - default=default_server, - help='< UNIX socket path | TCP address:port >') - - args = parser.parse_args() - if args.qmp_server is None: - parser.error("QMP socket or TCP address must be specified") - - shell_class = HMPShell if args.hmp else QMPShell - - try: - address = shell_class.parse_address(args.qmp_server) - except QMPBadPortError: - parser.error(f"Bad port number: {args.qmp_server}") - return # pycharm doesn't know error() is noreturn - - with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu: - try: - qemu.connect(negotiate=not args.skip_negotiation) - except ConnectError as err: - if isinstance(err.exc, OSError): - die(f"Couldn't connect to {args.qmp_server}: {err!s}") - die(str(err)) - - for _ in qemu.repl(): - pass - - -def main_wrap() -> None: - """ - qmp-shell-wrap entry point: parse command line arguments and - start the REPL. - """ - parser = argparse.ArgumentParser() - parser.add_argument('-H', '--hmp', action='store_true', - help='Use HMP interface') - parser.add_argument('-v', '--verbose', action='store_true', - help='Verbose (echo commands sent and received)') - parser.add_argument('-p', '--pretty', action='store_true', - help='Pretty-print JSON') - parser.add_argument('-l', '--logfile', - help='Save log of all QMP messages to PATH') - - parser.add_argument('command', nargs=argparse.REMAINDER, - help='QEMU command line to invoke') - - args = parser.parse_args() - - cmd = args.command - if len(cmd) != 0 and cmd[0] == '--': - cmd = cmd[1:] - if len(cmd) == 0: - cmd = ["qemu-system-x86_64"] - - sockpath = "qmp-shell-wrap-%d" % os.getpid() - cmd += ["-qmp", "unix:%s" % sockpath] - - shell_class = HMPShell if args.hmp else QMPShell - - try: - address = shell_class.parse_address(sockpath) - except QMPBadPortError: - parser.error(f"Bad port number: {sockpath}") - return # pycharm doesn't know error() is noreturn - - try: - with shell_class(address, args.pretty, args.verbose, - True, args.logfile) as qemu: - with Popen(cmd): - - try: - qemu.accept() - except ConnectError as err: - if isinstance(err.exc, OSError): - die(f"Couldn't connect to {args.qmp_server}: {err!s}") - die(str(err)) - - for _ in qemu.repl(): - pass - finally: - os.unlink(sockpath) - - -if __name__ == '__main__': - main() diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py deleted file mode 100644 index eaa5fc7..0000000 --- a/python/qemu/aqmp/util.py +++ /dev/null @@ -1,217 +0,0 @@ -""" -Miscellaneous Utilities - -This module provides asyncio utilities and compatibility wrappers for -Python 3.6 to provide some features that otherwise become available in -Python 3.7+. - -Various logging and debugging utilities are also provided, such as -`exception_summary()` and `pretty_traceback()`, used primarily for -adding information into the logging stream. -""" - -import asyncio -import sys -import traceback -from typing import ( - Any, - Coroutine, - Optional, - TypeVar, - cast, -) - - -T = TypeVar('T') - - -# -------------------------- -# Section: Utility Functions -# -------------------------- - - -async def flush(writer: asyncio.StreamWriter) -> None: - """ - Utility function to ensure a StreamWriter is *fully* drained. - - `asyncio.StreamWriter.drain` only promises we will return to below - the "high-water mark". This function ensures we flush the entire - buffer -- by setting the high water mark to 0 and then calling - drain. The flow control limits are restored after the call is - completed. - """ - transport = cast(asyncio.WriteTransport, writer.transport) - - # https://github.com/python/typeshed/issues/5779 - low, high = transport.get_write_buffer_limits() # type: ignore - transport.set_write_buffer_limits(0, 0) - try: - await writer.drain() - finally: - transport.set_write_buffer_limits(high, low) - - -def upper_half(func: T) -> T: - """ - Do-nothing decorator that annotates a method as an "upper-half" method. - - These methods must not call bottom-half functions directly, but can - schedule them to run. - """ - return func - - -def bottom_half(func: T) -> T: - """ - Do-nothing decorator that annotates a method as a "bottom-half" method. - - These methods must take great care to handle their own exceptions whenever - possible. If they go unhandled, they will cause termination of the loop. - - These methods do not, in general, have the ability to directly - report information to a caller’s context and will usually be - collected as a Task result instead. - - They must not call upper-half functions directly. - """ - return func - - -# ------------------------------- -# Section: Compatibility Wrappers -# ------------------------------- - - -def create_task(coro: Coroutine[Any, Any, T], - loop: Optional[asyncio.AbstractEventLoop] = None - ) -> 'asyncio.Future[T]': - """ - Python 3.6-compatible `asyncio.create_task` wrapper. - - :param coro: The coroutine to execute in a task. - :param loop: Optionally, the loop to create the task in. - - :return: An `asyncio.Future` object. - """ - if sys.version_info >= (3, 7): - if loop is not None: - return loop.create_task(coro) - return asyncio.create_task(coro) # pylint: disable=no-member - - # Python 3.6: - return asyncio.ensure_future(coro, loop=loop) - - -def is_closing(writer: asyncio.StreamWriter) -> bool: - """ - Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper. - - :param writer: The `asyncio.StreamWriter` object. - :return: `True` if the writer is closing, or closed. - """ - if sys.version_info >= (3, 7): - return writer.is_closing() - - # Python 3.6: - transport = writer.transport - assert isinstance(transport, asyncio.WriteTransport) - return transport.is_closing() - - -async def wait_closed(writer: asyncio.StreamWriter) -> None: - """ - Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper. - - :param writer: The `asyncio.StreamWriter` to wait on. - """ - if sys.version_info >= (3, 7): - await writer.wait_closed() - return - - # Python 3.6 - transport = writer.transport - assert isinstance(transport, asyncio.WriteTransport) - - while not transport.is_closing(): - await asyncio.sleep(0) - - # This is an ugly workaround, but it's the best I can come up with. - sock = transport.get_extra_info('socket') - - if sock is None: - # Our transport doesn't have a socket? ... - # Nothing we can reasonably do. - return - - while sock.fileno() != -1: - await asyncio.sleep(0) - - -def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T: - """ - Python 3.6-compatible `asyncio.run` wrapper. - - :param coro: A coroutine to execute now. - :return: The return value from the coroutine. - """ - if sys.version_info >= (3, 7): - return asyncio.run(coro, debug=debug) - - # Python 3.6 - loop = asyncio.get_event_loop() - loop.set_debug(debug) - ret = loop.run_until_complete(coro) - loop.close() - - return ret - - -# ---------------------------- -# Section: Logging & Debugging -# ---------------------------- - - -def exception_summary(exc: BaseException) -> str: - """ - Return a summary string of an arbitrary exception. - - It will be of the form "ExceptionType: Error Message", if the error - string is non-empty, and just "ExceptionType" otherwise. - """ - name = type(exc).__qualname__ - smod = type(exc).__module__ - if smod not in ("__main__", "builtins"): - name = smod + '.' + name - - error = str(exc) - if error: - return f"{name}: {error}" - return name - - -def pretty_traceback(prefix: str = " | ") -> str: - """ - Formats the current traceback, indented to provide visual distinction. - - This is useful for printing a traceback within a traceback for - debugging purposes when encapsulating errors to deliver them up the - stack; when those errors are printed, this helps provide a nice - visual grouping to quickly identify the parts of the error that - belong to the inner exception. - - :param prefix: The prefix to append to each line of the traceback. - :return: A string, formatted something like the following:: - - | Traceback (most recent call last): - | File "foobar.py", line 42, in arbitrary_example - | foo.baz() - | ArbitraryError: [Errno 42] Something bad happened! - """ - output = "".join(traceback.format_exception(*sys.exc_info())) - - exc_lines = [] - for line in output.split('\n'): - exc_lines.append(prefix + line) - - # The last line is always empty, omit it - return "\n".join(exc_lines[:-1]) diff --git a/python/qemu/machine/machine.py b/python/qemu/machine/machine.py index 41be025..07ac5a7 100644 --- a/python/qemu/machine/machine.py +++ b/python/qemu/machine/machine.py @@ -40,8 +40,8 @@ from typing import ( TypeVar, ) -from qemu.aqmp import SocketAddrT -from qemu.aqmp.legacy import ( +from qemu.qmp import SocketAddrT +from qemu.qmp.legacy import ( QEMUMonitorProtocol, QMPMessage, QMPReturnValue, diff --git a/python/qemu/machine/qtest.py b/python/qemu/machine/qtest.py index 13e0aaf..1a1fc6c 100644 --- a/python/qemu/machine/qtest.py +++ b/python/qemu/machine/qtest.py @@ -26,7 +26,7 @@ from typing import ( TextIO, ) -from qemu.aqmp import SocketAddrT +from qemu.qmp import SocketAddrT from .machine import QEMUMachine diff --git a/python/qemu/qmp/__init__.py b/python/qemu/qmp/__init__.py new file mode 100644 index 0000000..69190d0 --- /dev/null +++ b/python/qemu/qmp/__init__.py @@ -0,0 +1,59 @@ +""" +QEMU Monitor Protocol (QMP) development library & tooling. + +This package provides a fairly low-level class for communicating +asynchronously with QMP protocol servers, as implemented by QEMU, the +QEMU Guest Agent, and the QEMU Storage Daemon. + +`QMPClient` provides the main functionality of this package. All errors +raised by this library derive from `QMPError`, see `qmp.error` for +additional detail. See `qmp.events` for an in-depth tutorial on +managing QMP events. +""" + +# Copyright (C) 2020-2022 John Snow for Red Hat, Inc. +# +# Authors: +# John Snow +# +# Based on earlier work by Luiz Capitulino . +# +# This work is licensed under the terms of the GNU LGPL, version 2 or +# later. See the COPYING file in the top-level directory. + +import logging + +from .error import QMPError +from .events import EventListener +from .message import Message +from .protocol import ( + ConnectError, + Runstate, + SocketAddrT, + StateError, +) +from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient + + +# Suppress logging unless an application engages it. +logging.getLogger('qemu.qmp').addHandler(logging.NullHandler()) + + +# The order of these fields impact the Sphinx documentation order. +__all__ = ( + # Classes, most to least important + 'QMPClient', + 'Message', + 'EventListener', + 'Runstate', + + # Exceptions, most generic to most explicit + 'QMPError', + 'StateError', + 'ConnectError', + 'ExecuteError', + 'ExecInterruptedError', + + # Type aliases + 'SocketAddrT', +) diff --git a/python/qemu/qmp/aqmp_tui.py b/python/qemu/qmp/aqmp_tui.py new file mode 100644 index 0000000..59d3036 --- /dev/null +++ b/python/qemu/qmp/aqmp_tui.py @@ -0,0 +1,652 @@ +# Copyright (c) 2021 +# +# Authors: +# Niteesh Babu G S +# +# This work is licensed under the terms of the GNU LGPL, version 2 or +# later. See the COPYING file in the top-level directory. +""" +AQMP TUI + +AQMP TUI is an asynchronous interface built on top the of the AQMP library. +It is the successor of QMP-shell and is bought-in as a replacement for it. + +Example Usage: aqmp-tui +Full Usage: aqmp-tui --help +""" + +import argparse +import asyncio +import json +import logging +from logging import Handler, LogRecord +import signal +from typing import ( + List, + Optional, + Tuple, + Type, + Union, + cast, +) + +from pygments import lexers +from pygments import token as Token +import urwid +import urwid_readline + +from .error import ProtocolError +from .legacy import QEMUMonitorProtocol, QMPBadPortError +from .message import DeserializationError, Message, UnexpectedTypeError +from .protocol import ConnectError, Runstate +from .qmp_client import ExecInterruptedError, QMPClient +from .util import create_task, pretty_traceback + + +# The name of the signal that is used to update the history list +UPDATE_MSG: str = 'UPDATE_MSG' + + +palette = [ + (Token.Punctuation, '', '', '', 'h15,bold', 'g7'), + (Token.Text, '', '', '', '', 'g7'), + (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'), + (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'), + (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'), + (Token.Keyword.Constant, '', '', '', '#6af', 'g7'), + ('DEBUG', '', '', '', '#ddf', 'g7'), + ('INFO', '', '', '', 'g100', 'g7'), + ('WARNING', '', '', '', '#ff6', 'g7'), + ('ERROR', '', '', '', '#a00', 'g7'), + ('CRITICAL', '', '', '', '#a00', 'g7'), + ('background', '', 'black', '', '', 'g7'), +] + + +def format_json(msg: str) -> str: + """ + Formats valid/invalid multi-line JSON message into a single-line message. + + Formatting is first tried using the standard json module. If that fails + due to an decoding error then a simple string manipulation is done to + achieve a single line JSON string. + + Converting into single line is more asthetically pleasing when looking + along with error messages. + + Eg: + Input: + [ 1, + true, + 3 ] + The above input is not a valid QMP message and produces the following error + "QMP message is not a JSON object." + When displaying this in TUI in multiline mode we get + + [ 1, + true, + 3 ]: QMP message is not a JSON object. + + whereas in singleline mode we get the following + + [1, true, 3]: QMP message is not a JSON object. + + The single line mode is more asthetically pleasing. + + :param msg: + The message to formatted into single line. + + :return: Formatted singleline message. + """ + try: + msg = json.loads(msg) + return str(json.dumps(msg)) + except json.decoder.JSONDecodeError: + msg = msg.replace('\n', '') + words = msg.split(' ') + words = list(filter(None, words)) + return ' '.join(words) + + +def has_handler_type(logger: logging.Logger, + handler_type: Type[Handler]) -> bool: + """ + The Logger class has no interface to check if a certain type of handler is + installed or not. So we provide an interface to do so. + + :param logger: + Logger object + :param handler_type: + The type of the handler to be checked. + + :return: returns True if handler of type `handler_type`. + """ + for handler in logger.handlers: + if isinstance(handler, handler_type): + return True + return False + + +class App(QMPClient): + """ + Implements the AQMP TUI. + + Initializes the widgets and starts the urwid event loop. + + :param address: + Address of the server to connect to. + :param num_retries: + The number of times to retry before stopping to reconnect. + :param retry_delay: + The delay(sec) before each retry + """ + def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int, + retry_delay: Optional[int]) -> None: + urwid.register_signal(type(self), UPDATE_MSG) + self.window = Window(self) + self.address = address + self.aloop: Optional[asyncio.AbstractEventLoop] = None + self.num_retries = num_retries + self.retry_delay = retry_delay if retry_delay else 2 + self.retry: bool = False + self.exiting: bool = False + super().__init__() + + def add_to_history(self, msg: str, level: Optional[str] = None) -> None: + """ + Appends the msg to the history list. + + :param msg: + The raw message to be appended in string type. + """ + urwid.emit_signal(self, UPDATE_MSG, msg, level) + + def _cb_outbound(self, msg: Message) -> Message: + """ + Callback: outbound message hook. + + Appends the outgoing messages to the history box. + + :param msg: raw outbound message. + :return: final outbound message. + """ + str_msg = str(msg) + + if not has_handler_type(logging.getLogger(), TUILogHandler): + logging.debug('Request: %s', str_msg) + self.add_to_history('<-- ' + str_msg) + return msg + + def _cb_inbound(self, msg: Message) -> Message: + """ + Callback: outbound message hook. + + Appends the incoming messages to the history box. + + :param msg: raw inbound message. + :return: final inbound message. + """ + str_msg = str(msg) + + if not has_handler_type(logging.getLogger(), TUILogHandler): + logging.debug('Request: %s', str_msg) + self.add_to_history('--> ' + str_msg) + return msg + + async def _send_to_server(self, msg: Message) -> None: + """ + This coroutine sends the message to the server. + The message has to be pre-validated. + + :param msg: + Pre-validated message to be to sent to the server. + + :raise Exception: When an unhandled exception is caught. + """ + try: + await self._raw(msg, assign_id='id' not in msg) + except ExecInterruptedError as err: + logging.info('Error server disconnected before reply %s', str(err)) + self.add_to_history('Server disconnected before reply', 'ERROR') + except Exception as err: + logging.error('Exception from _send_to_server: %s', str(err)) + raise err + + def cb_send_to_server(self, raw_msg: str) -> None: + """ + Validates and sends the message to the server. + The raw string message is first converted into a Message object + and is then sent to the server. + + :param raw_msg: + The raw string message to be sent to the server. + + :raise Exception: When an unhandled exception is caught. + """ + try: + msg = Message(bytes(raw_msg, encoding='utf-8')) + create_task(self._send_to_server(msg)) + except (DeserializationError, UnexpectedTypeError) as err: + raw_msg = format_json(raw_msg) + logging.info('Invalid message: %s', err.error_message) + self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR') + + def unhandled_input(self, key: str) -> None: + """ + Handle's keys which haven't been handled by the child widgets. + + :param key: + Unhandled key + """ + if key == 'esc': + self.kill_app() + + def kill_app(self) -> None: + """ + Initiates killing of app. A bridge between asynchronous and synchronous + code. + """ + create_task(self._kill_app()) + + async def _kill_app(self) -> None: + """ + This coroutine initiates the actual disconnect process and calls + urwid.ExitMainLoop() to kill the TUI. + + :raise Exception: When an unhandled exception is caught. + """ + self.exiting = True + await self.disconnect() + logging.debug('Disconnect finished. Exiting app') + raise urwid.ExitMainLoop() + + async def disconnect(self) -> None: + """ + Overrides the disconnect method to handle the errors locally. + """ + try: + await super().disconnect() + except (OSError, EOFError) as err: + logging.info('disconnect: %s', str(err)) + self.retry = True + except ProtocolError as err: + logging.info('disconnect: %s', str(err)) + except Exception as err: + logging.error('disconnect: Unhandled exception %s', str(err)) + raise err + + def _set_status(self, msg: str) -> None: + """ + Sets the message as the status. + + :param msg: + The message to be displayed in the status bar. + """ + self.window.footer.set_text(msg) + + def _get_formatted_address(self) -> str: + """ + Returns a formatted version of the server's address. + + :return: formatted address + """ + if isinstance(self.address, tuple): + host, port = self.address + addr = f'{host}:{port}' + else: + addr = f'{self.address}' + return addr + + async def _initiate_connection(self) -> Optional[ConnectError]: + """ + Tries connecting to a server a number of times with a delay between + each try. If all retries failed then return the error faced during + the last retry. + + :return: Error faced during last retry. + """ + current_retries = 0 + err = None + + # initial try + await self.connect_server() + while self.retry and current_retries < self.num_retries: + logging.info('Connection Failed, retrying in %d', self.retry_delay) + status = f'[Retry #{current_retries} ({self.retry_delay}s)]' + self._set_status(status) + + await asyncio.sleep(self.retry_delay) + + err = await self.connect_server() + current_retries += 1 + # If all retries failed report the last error + if err: + logging.info('All retries failed: %s', err) + return err + return None + + async def manage_connection(self) -> None: + """ + Manage the connection based on the current run state. + + A reconnect is issued when the current state is IDLE and the number + of retries is not exhausted. + A disconnect is issued when the current state is DISCONNECTING. + """ + while not self.exiting: + if self.runstate == Runstate.IDLE: + err = await self._initiate_connection() + # If retry is still true then, we have exhausted all our tries. + if err: + self._set_status(f'[Error: {err.error_message}]') + else: + addr = self._get_formatted_address() + self._set_status(f'[Connected {addr}]') + elif self.runstate == Runstate.DISCONNECTING: + self._set_status('[Disconnected]') + await self.disconnect() + # check if a retry is needed + if self.runstate == Runstate.IDLE: + continue + await self.runstate_changed() + + async def connect_server(self) -> Optional[ConnectError]: + """ + Initiates a connection to the server at address `self.address` + and in case of a failure, sets the status to the respective error. + """ + try: + await self.connect(self.address) + self.retry = False + except ConnectError as err: + logging.info('connect_server: ConnectError %s', str(err)) + self.retry = True + return err + return None + + def run(self, debug: bool = False) -> None: + """ + Starts the long running co-routines and the urwid event loop. + + :param debug: + Enables/Disables asyncio event loop debugging + """ + screen = urwid.raw_display.Screen() + screen.set_terminal_properties(256) + + self.aloop = asyncio.get_event_loop() + self.aloop.set_debug(debug) + + # Gracefully handle SIGTERM and SIGINT signals + cancel_signals = [signal.SIGTERM, signal.SIGINT] + for sig in cancel_signals: + self.aloop.add_signal_handler(sig, self.kill_app) + + event_loop = urwid.AsyncioEventLoop(loop=self.aloop) + main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'), + unhandled_input=self.unhandled_input, + screen=screen, + palette=palette, + handle_mouse=True, + event_loop=event_loop) + + create_task(self.manage_connection(), self.aloop) + try: + main_loop.run() + except Exception as err: + logging.error('%s\n%s\n', str(err), pretty_traceback()) + raise err + + +class StatusBar(urwid.Text): + """ + A simple statusbar modelled using the Text widget. The status can be + set using the set_text function. All text set is aligned to right. + + :param text: Initial text to be displayed. Default is empty str. + """ + def __init__(self, text: str = ''): + super().__init__(text, align='right') + + +class Editor(urwid_readline.ReadlineEdit): + """ + A simple editor modelled using the urwid_readline.ReadlineEdit widget. + Mimcs GNU readline shortcuts and provides history support. + + The readline shortcuts can be found below: + https://github.com/rr-/urwid_readline#features + + Along with the readline features, this editor also has support for + history. Pressing the 'up'/'down' switches between the prev/next messages + available in the history. + + Currently there is no support to save the history to a file. The history of + previous commands is lost on exit. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + super().__init__(caption='> ', multiline=True) + self.parent = parent + self.history: List[str] = [] + self.last_index: int = -1 + self.show_history: bool = False + + def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]: + """ + Handles the keypress on this widget. + + :param size: + The current size of the widget. + :param key: + The key to be handled. + + :return: Unhandled key if any. + """ + msg = self.get_edit_text() + if key == 'up' and not msg: + # Show the history when 'up arrow' is pressed with no input text. + # NOTE: The show_history logic is necessary because in 'multiline' + # mode (which we use) 'up arrow' is used to move between lines. + if not self.history: + return None + self.show_history = True + last_msg = self.history[self.last_index] + self.set_edit_text(last_msg) + self.edit_pos = len(last_msg) + elif key == 'up' and self.show_history: + self.last_index = max(self.last_index - 1, -len(self.history)) + self.set_edit_text(self.history[self.last_index]) + self.edit_pos = len(self.history[self.last_index]) + elif key == 'down' and self.show_history: + if self.last_index == -1: + self.set_edit_text('') + self.show_history = False + else: + self.last_index += 1 + self.set_edit_text(self.history[self.last_index]) + self.edit_pos = len(self.history[self.last_index]) + elif key == 'meta enter': + # When using multiline, enter inserts a new line into the editor + # send the input to the server on alt + enter + self.parent.cb_send_to_server(msg) + self.history.append(msg) + self.set_edit_text('') + self.last_index = -1 + self.show_history = False + else: + self.show_history = False + self.last_index = -1 + return cast(Optional[str], super().keypress(size, key)) + return None + + +class EditorWidget(urwid.Filler): + """ + Wrapper around the editor widget. + + The Editor is a flow widget and has to wrapped inside a box widget. + This class wraps the Editor inside filler widget. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + super().__init__(Editor(parent), valign='top') + + +class HistoryBox(urwid.ListBox): + """ + This widget is modelled using the ListBox widget, contains the list of + all messages both QMP messages and log messsages to be shown in the TUI. + + The messages are urwid.Text widgets. On every append of a message, the + focus is shifted to the last appended message. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + self.parent = parent + self.history = urwid.SimpleFocusListWalker([]) + super().__init__(self.history) + + def add_to_history(self, + history: Union[str, List[Tuple[str, str]]]) -> None: + """ + Appends a message to the list and set the focus to the last appended + message. + + :param history: + The history item(message/event) to be appended to the list. + """ + self.history.append(urwid.Text(history)) + self.history.set_focus(len(self.history) - 1) + + def mouse_event(self, size: Tuple[int, int], _event: str, button: float, + _x: int, _y: int, focus: bool) -> None: + # Unfortunately there are no urwid constants that represent the mouse + # events. + if button == 4: # Scroll up event + super().keypress(size, 'up') + elif button == 5: # Scroll down event + super().keypress(size, 'down') + + +class HistoryWindow(urwid.Frame): + """ + This window composes the HistoryBox and EditorWidget in a horizontal split. + By default the first focus is given to the history box. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + self.parent = parent + self.editor_widget = EditorWidget(parent) + self.editor = urwid.LineBox(self.editor_widget) + self.history = HistoryBox(parent) + self.body = urwid.Pile([('weight', 80, self.history), + ('weight', 20, self.editor)]) + super().__init__(self.body) + urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history) + + def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None: + """ + Appends a message to the history box + + :param msg: + The message to be appended to the history box. + :param level: + The log level of the message, if it is a log message. + """ + formatted = [] + if level: + msg = f'[{level}]: {msg}' + formatted.append((level, msg)) + else: + lexer = lexers.JsonLexer() # pylint: disable=no-member + for token in lexer.get_tokens(msg): + formatted.append(token) + self.history.add_to_history(formatted) + + +class Window(urwid.Frame): + """ + This window is the top most widget of the TUI and will contain other + windows. Each child of this widget is responsible for displaying a specific + functionality. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + self.parent = parent + footer = StatusBar() + body = HistoryWindow(parent) + super().__init__(body, footer=footer) + + +class TUILogHandler(Handler): + """ + This handler routes all the log messages to the TUI screen. + It is installed to the root logger to so that the log message from all + libraries begin used is routed to the screen. + + :param tui: Reference to the TUI object. + """ + def __init__(self, tui: App) -> None: + super().__init__() + self.tui = tui + + def emit(self, record: LogRecord) -> None: + """ + Emits a record to the TUI screen. + + Appends the log message to the TUI screen + """ + level = record.levelname + msg = record.getMessage() + self.tui.add_to_history(msg, level) + + +def main() -> None: + """ + Driver of the whole script, parses arguments, initialize the TUI and + the logger. + """ + parser = argparse.ArgumentParser(description='AQMP TUI') + parser.add_argument('qmp_server', help='Address of the QMP server. ' + 'Format ') + parser.add_argument('--num-retries', type=int, default=10, + help='Number of times to reconnect before giving up.') + parser.add_argument('--retry-delay', type=int, + help='Time(s) to wait before next retry. ' + 'Default action is to wait 2s between each retry.') + parser.add_argument('--log-file', help='The Log file name') + parser.add_argument('--log-level', default='WARNING', + help='Log level ') + parser.add_argument('--asyncio-debug', action='store_true', + help='Enable debug mode for asyncio loop. ' + 'Generates lot of output, makes TUI unusable when ' + 'logs are logged in the TUI. ' + 'Use only when logging to a file.') + args = parser.parse_args() + + try: + address = QEMUMonitorProtocol.parse_address(args.qmp_server) + except QMPBadPortError as err: + parser.error(str(err)) + + app = App(address, args.num_retries, args.retry_delay) + + root_logger = logging.getLogger() + root_logger.setLevel(logging.getLevelName(args.log_level)) + + if args.log_file: + root_logger.addHandler(logging.FileHandler(args.log_file)) + else: + root_logger.addHandler(TUILogHandler(app)) + + app.run(args.asyncio_debug) + + +if __name__ == '__main__': + main() diff --git a/python/qemu/qmp/error.py b/python/qemu/qmp/error.py new file mode 100644 index 0000000..24ba4d5 --- /dev/null +++ b/python/qemu/qmp/error.py @@ -0,0 +1,50 @@ +""" +QMP Error Classes + +This package seeks to provide semantic error classes that are intended +to be used directly by clients when they would like to handle particular +semantic failures (e.g. "failed to connect") without needing to know the +enumeration of possible reasons for that failure. + +QMPError serves as the ancestor for all exceptions raised by this +package, and is suitable for use in handling semantic errors from this +library. In most cases, individual public methods will attempt to catch +and re-encapsulate various exceptions to provide a semantic +error-handling interface. + +.. admonition:: QMP Exception Hierarchy Reference + + | `Exception` + | +-- `QMPError` + | +-- `ConnectError` + | +-- `StateError` + | +-- `ExecInterruptedError` + | +-- `ExecuteError` + | +-- `ListenerError` + | +-- `ProtocolError` + | +-- `DeserializationError` + | +-- `UnexpectedTypeError` + | +-- `ServerParseError` + | +-- `BadReplyError` + | +-- `GreetingError` + | +-- `NegotiationError` +""" + + +class QMPError(Exception): + """Abstract error class for all errors originating from this package.""" + + +class ProtocolError(QMPError): + """ + Abstract error class for protocol failures. + + Semantically, these errors are generally the fault of either the + protocol server or as a result of a bug in this library. + + :param error_message: Human-readable string describing the error. + """ + def __init__(self, error_message: str): + super().__init__(error_message) + #: Human-readable error message, without any prefix. + self.error_message: str = error_message diff --git a/python/qemu/qmp/events.py b/python/qemu/qmp/events.py new file mode 100644 index 0000000..6199776 --- /dev/null +++ b/python/qemu/qmp/events.py @@ -0,0 +1,717 @@ +""" +QMP Events and EventListeners + +Asynchronous QMP uses `EventListener` objects to listen for events. An +`EventListener` is a FIFO event queue that can be pre-filtered to listen +for only specific events. Each `EventListener` instance receives its own +copy of events that it hears, so events may be consumed without fear or +worry for depriving other listeners of events they need to hear. + + +EventListener Tutorial +---------------------- + +In all of the following examples, we assume that we have a `QMPClient` +instantiated named ``qmp`` that is already connected. + + +`listener()` context blocks with one name +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The most basic usage is by using the `listener()` context manager to +construct them: + +.. code:: python + + with qmp.listener('STOP') as listener: + await qmp.execute('stop') + await listener.get() + +The listener is active only for the duration of the ‘with’ block. This +instance listens only for ‘STOP’ events. + + +`listener()` context blocks with two or more names +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Multiple events can be selected for by providing any ``Iterable[str]``: + +.. code:: python + + with qmp.listener(('STOP', 'RESUME')) as listener: + await qmp.execute('stop') + event = await listener.get() + assert event['event'] == 'STOP' + + await qmp.execute('cont') + event = await listener.get() + assert event['event'] == 'RESUME' + + +`listener()` context blocks with no names +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +By omitting names entirely, you can listen to ALL events. + +.. code:: python + + with qmp.listener() as listener: + await qmp.execute('stop') + event = await listener.get() + assert event['event'] == 'STOP' + +This isn’t a very good use case for this feature: In a non-trivial +running system, we may not know what event will arrive next. Grabbing +the top of a FIFO queue returning multiple kinds of events may be prone +to error. + + +Using async iterators to retrieve events +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If you’d like to simply watch what events happen to arrive, you can use +the listener as an async iterator: + +.. code:: python + + with qmp.listener() as listener: + async for event in listener: + print(f"Event arrived: {event['event']}") + +This is analogous to the following code: + +.. code:: python + + with qmp.listener() as listener: + while True: + event = listener.get() + print(f"Event arrived: {event['event']}") + +This event stream will never end, so these blocks will never terminate. + + +Using asyncio.Task to concurrently retrieve events +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Since a listener’s event stream will never terminate, it is not likely +useful to use that form in a script. For longer-running clients, we can +create event handlers by using `asyncio.Task` to create concurrent +coroutines: + +.. code:: python + + async def print_events(listener): + try: + async for event in listener: + print(f"Event arrived: {event['event']}") + except asyncio.CancelledError: + return + + with qmp.listener() as listener: + task = asyncio.Task(print_events(listener)) + await qmp.execute('stop') + await qmp.execute('cont') + task.cancel() + await task + +However, there is no guarantee that these events will be received by the +time we leave this context block. Once the context block is exited, the +listener will cease to hear any new events, and becomes inert. + +Be mindful of the timing: the above example will *probably*– but does +not *guarantee*– that both STOP/RESUMED events will be printed. The +example below outlines how to use listeners outside of a context block. + + +Using `register_listener()` and `remove_listener()` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To create a listener with a longer lifetime, beyond the scope of a +single block, create a listener and then call `register_listener()`: + +.. code:: python + + class MyClient: + def __init__(self, qmp): + self.qmp = qmp + self.listener = EventListener() + + async def print_events(self): + try: + async for event in self.listener: + print(f"Event arrived: {event['event']}") + except asyncio.CancelledError: + return + + async def run(self): + self.task = asyncio.Task(self.print_events) + self.qmp.register_listener(self.listener) + await qmp.execute('stop') + await qmp.execute('cont') + + async def stop(self): + self.task.cancel() + await self.task + self.qmp.remove_listener(self.listener) + +The listener can be deactivated by using `remove_listener()`. When it is +removed, any possible pending events are cleared and it can be +re-registered at a later time. + + +Using the built-in all events listener +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The `QMPClient` object creates its own default listener named +:py:obj:`~Events.events` that can be used for the same purpose without +having to create your own: + +.. code:: python + + async def print_events(listener): + try: + async for event in listener: + print(f"Event arrived: {event['event']}") + except asyncio.CancelledError: + return + + task = asyncio.Task(print_events(qmp.events)) + + await qmp.execute('stop') + await qmp.execute('cont') + + task.cancel() + await task + + +Using both .get() and async iterators +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The async iterator and `get()` methods pull events from the same FIFO +queue. If you mix the usage of both, be aware: Events are emitted +precisely once per listener. + +If multiple contexts try to pull events from the same listener instance, +events are still emitted only precisely once. + +This restriction can be lifted by creating additional listeners. + + +Creating multiple listeners +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Additional `EventListener` objects can be created at-will. Each one +receives its own copy of events, with separate FIFO event queues. + +.. code:: python + + my_listener = EventListener() + qmp.register_listener(my_listener) + + await qmp.execute('stop') + copy1 = await my_listener.get() + copy2 = await qmp.events.get() + + assert copy1 == copy2 + +In this example, we await an event from both a user-created +`EventListener` and the built-in events listener. Both receive the same +event. + + +Clearing listeners +~~~~~~~~~~~~~~~~~~ + +`EventListener` objects can be cleared, clearing all events seen thus far: + +.. code:: python + + await qmp.execute('stop') + qmp.events.clear() + await qmp.execute('cont') + event = await qmp.events.get() + assert event['event'] == 'RESUME' + +`EventListener` objects are FIFO queues. If events are not consumed, +they will remain in the queue until they are witnessed or discarded via +`clear()`. FIFO queues will be drained automatically upon leaving a +context block, or when calling `remove_listener()`. + + +Accessing listener history +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +`EventListener` objects record their history. Even after being cleared, +you can obtain a record of all events seen so far: + +.. code:: python + + await qmp.execute('stop') + await qmp.execute('cont') + qmp.events.clear() + + assert len(qmp.events.history) == 2 + assert qmp.events.history[0]['event'] == 'STOP' + assert qmp.events.history[1]['event'] == 'RESUME' + +The history is updated immediately and does not require the event to be +witnessed first. + + +Using event filters +~~~~~~~~~~~~~~~~~~~ + +`EventListener` objects can be given complex filtering criteria if names +are not sufficient: + +.. code:: python + + def job1_filter(event) -> bool: + event_data = event.get('data', {}) + event_job_id = event_data.get('id') + return event_job_id == "job1" + + with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener: + await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...}) + async for event in listener: + if event['data']['status'] == 'concluded': + break + +These filters might be most useful when parameterized. `EventListener` +objects expect a function that takes only a single argument (the raw +event, as a `Message`) and returns a bool; True if the event should be +accepted into the stream. You can create a function that adapts this +signature to accept configuration parameters: + +.. code:: python + + def job_filter(job_id: str) -> EventFilter: + def filter(event: Message) -> bool: + return event['data']['id'] == job_id + return filter + + with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener: + await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...}) + async for event in listener: + if event['data']['status'] == 'concluded': + break + + +Activating an existing listener with `listen()` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Listeners with complex, long configurations can also be created manually +and activated temporarily by using `listen()` instead of `listener()`: + +.. code:: python + + listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', + 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', + 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) + + with qmp.listen(listener): + await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...}) + async for event in listener: + print(event) + if event['event'] == 'BLOCK_JOB_COMPLETED': + break + +Any events that are not witnessed by the time the block is left will be +cleared from the queue; entering the block is an implicit +`register_listener()` and leaving the block is an implicit +`remove_listener()`. + + +Activating multiple existing listeners with `listen()` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +While `listener()` is only capable of creating a single listener, +`listen()` is capable of activating multiple listeners simultaneously: + +.. code:: python + + def job_filter(job_id: str) -> EventFilter: + def filter(event: Message) -> bool: + return event['data']['id'] == job_id + return filter + + jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA')) + jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB')) + + with qmp.listen(jobA, jobB): + qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...}) + qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...}) + + async for event in jobA.get(): + if event['data']['status'] == 'concluded': + break + async for event in jobB.get(): + if event['data']['status'] == 'concluded': + break + + +Extending the `EventListener` class +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In the case that a more specialized `EventListener` is desired to +provide either more functionality or more compact syntax for specialized +cases, it can be extended. + +One of the key methods to extend or override is +:py:meth:`~EventListener.accept()`. The default implementation checks an +incoming message for: + +1. A qualifying name, if any :py:obj:`~EventListener.names` were + specified at initialization time +2. That :py:obj:`~EventListener.event_filter()` returns True. + +This can be modified however you see fit to change the criteria for +inclusion in the stream. + +For convenience, a ``JobListener`` class could be created that simply +bakes in configuration so it does not need to be repeated: + +.. code:: python + + class JobListener(EventListener): + def __init__(self, job_id: str): + super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', + 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', + 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) + self.job_id = job_id + + def accept(self, event) -> bool: + if not super().accept(event): + return False + if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'): + return event['data']['id'] == job_id + return event['data']['device'] == job_id + +From here on out, you can conjure up a custom-purpose listener that +listens only for job-related events for a specific job-id easily: + +.. code:: python + + listener = JobListener('job4') + with qmp.listener(listener): + await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...}) + async for event in listener: + print(event) + if event['event'] == 'BLOCK_JOB_COMPLETED': + break + + +Experimental Interfaces & Design Issues +--------------------------------------- + +These interfaces are not ones I am sure I will keep or otherwise modify +heavily. + +qmp.listener()’s type signature +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +`listener()` does not return anything, because it was assumed the caller +already had a handle to the listener. However, for +``qmp.listener(EventListener())`` forms, the caller will not have saved +a handle to the listener. + +Because this function can accept *many* listeners, I found it hard to +accurately type in a way where it could be used in both “one” or “many” +forms conveniently and in a statically type-safe manner. + +Ultimately, I removed the return altogether, but perhaps with more time +I can work out a way to re-add it. + + +API Reference +------------- + +""" + +import asyncio +from contextlib import contextmanager +import logging +from typing import ( + AsyncIterator, + Callable, + Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + Union, +) + +from .error import QMPError +from .message import Message + + +EventNames = Union[str, Iterable[str], None] +EventFilter = Callable[[Message], bool] + + +class ListenerError(QMPError): + """ + Generic error class for `EventListener`-related problems. + """ + + +class EventListener: + """ + Selectively listens for events with runtime configurable filtering. + + This class is designed to be directly usable for the most common cases, + but it can be extended to provide more rigorous control. + + :param names: + One or more names of events to listen for. + When not provided, listen for ALL events. + :param event_filter: + An optional event filtering function. + When names are also provided, this acts as a secondary filter. + + When ``names`` and ``event_filter`` are both provided, the names + will be filtered first, and then the filter function will be called + second. The event filter function can assume that the format of the + event is a known format. + """ + def __init__( + self, + names: EventNames = None, + event_filter: Optional[EventFilter] = None, + ): + # Queue of 'heard' events yet to be witnessed by a caller. + self._queue: 'asyncio.Queue[Message]' = asyncio.Queue() + + # Intended as a historical record, NOT a processing queue or backlog. + self._history: List[Message] = [] + + #: Primary event filter, based on one or more event names. + self.names: Set[str] = set() + if isinstance(names, str): + self.names.add(names) + elif names is not None: + self.names.update(names) + + #: Optional, secondary event filter. + self.event_filter: Optional[EventFilter] = event_filter + + @property + def history(self) -> Tuple[Message, ...]: + """ + A read-only history of all events seen so far. + + This represents *every* event, including those not yet witnessed + via `get()` or ``async for``. It persists between `clear()` + calls and is immutable. + """ + return tuple(self._history) + + def accept(self, event: Message) -> bool: + """ + Determine if this listener accepts this event. + + This method determines which events will appear in the stream. + The default implementation simply checks the event against the + list of names and the event_filter to decide if this + `EventListener` accepts a given event. It can be + overridden/extended to provide custom listener behavior. + + User code is not expected to need to invoke this method. + + :param event: The event under consideration. + :return: `True`, if this listener accepts this event. + """ + name_ok = (not self.names) or (event['event'] in self.names) + return name_ok and ( + (not self.event_filter) or self.event_filter(event) + ) + + async def put(self, event: Message) -> None: + """ + Conditionally put a new event into the FIFO queue. + + This method is not designed to be invoked from user code, and it + should not need to be overridden. It is a public interface so + that `QMPClient` has an interface by which it can inform + registered listeners of new events. + + The event will be put into the queue if + :py:meth:`~EventListener.accept()` returns `True`. + + :param event: The new event to put into the FIFO queue. + """ + if not self.accept(event): + return + + self._history.append(event) + await self._queue.put(event) + + async def get(self) -> Message: + """ + Wait for the very next event in this stream. + + If one is already available, return that one. + """ + return await self._queue.get() + + def empty(self) -> bool: + """ + Return `True` if there are no pending events. + """ + return self._queue.empty() + + def clear(self) -> List[Message]: + """ + Clear this listener of all pending events. + + Called when an `EventListener` is being unregistered, this clears the + pending FIFO queue synchronously. It can be also be used to + manually clear any pending events, if desired. + + :return: The cleared events, if any. + + .. warning:: + Take care when discarding events. Cleared events will be + silently tossed on the floor. All events that were ever + accepted by this listener are visible in `history()`. + """ + events = [] + while True: + try: + events.append(self._queue.get_nowait()) + except asyncio.QueueEmpty: + break + + return events + + def __aiter__(self) -> AsyncIterator[Message]: + return self + + async def __anext__(self) -> Message: + """ + Enables the `EventListener` to function as an async iterator. + + It may be used like this: + + .. code:: python + + async for event in listener: + print(event) + + These iterators will never terminate of their own accord; you + must provide break conditions or otherwise prepare to run them + in an `asyncio.Task` that can be cancelled. + """ + return await self.get() + + +class Events: + """ + Events is a mix-in class that adds event functionality to the QMP class. + + It's designed specifically as a mix-in for `QMPClient`, and it + relies upon the class it is being mixed into having a 'logger' + property. + """ + def __init__(self) -> None: + self._listeners: List[EventListener] = [] + + #: Default, all-events `EventListener`. + self.events: EventListener = EventListener() + self.register_listener(self.events) + + # Parent class needs to have a logger + self.logger: logging.Logger + + async def _event_dispatch(self, msg: Message) -> None: + """ + Given a new event, propagate it to all of the active listeners. + + :param msg: The event to propagate. + """ + for listener in self._listeners: + await listener.put(msg) + + def register_listener(self, listener: EventListener) -> None: + """ + Register and activate an `EventListener`. + + :param listener: The listener to activate. + :raise ListenerError: If the given listener is already registered. + """ + if listener in self._listeners: + raise ListenerError("Attempted to re-register existing listener") + self.logger.debug("Registering %s.", str(listener)) + self._listeners.append(listener) + + def remove_listener(self, listener: EventListener) -> None: + """ + Unregister and deactivate an `EventListener`. + + The removed listener will have its pending events cleared via + `clear()`. The listener can be re-registered later when + desired. + + :param listener: The listener to deactivate. + :raise ListenerError: If the given listener is not registered. + """ + if listener == self.events: + raise ListenerError("Cannot remove the default listener.") + self.logger.debug("Removing %s.", str(listener)) + listener.clear() + self._listeners.remove(listener) + + @contextmanager + def listen(self, *listeners: EventListener) -> Iterator[None]: + r""" + Context manager: Temporarily listen with an `EventListener`. + + Accepts one or more `EventListener` objects and registers them, + activating them for the duration of the context block. + + `EventListener` objects will have any pending events in their + FIFO queue cleared upon exiting the context block, when they are + deactivated. + + :param \*listeners: One or more EventListeners to activate. + :raise ListenerError: If the given listener(s) are already active. + """ + _added = [] + + try: + for listener in listeners: + self.register_listener(listener) + _added.append(listener) + + yield + + finally: + for listener in _added: + self.remove_listener(listener) + + @contextmanager + def listener( + self, + names: EventNames = (), + event_filter: Optional[EventFilter] = None + ) -> Iterator[EventListener]: + """ + Context manager: Temporarily listen with a new `EventListener`. + + Creates an `EventListener` object and registers it, activating + it for the duration of the context block. + + :param names: + One or more names of events to listen for. + When not provided, listen for ALL events. + :param event_filter: + An optional event filtering function. + When names are also provided, this acts as a secondary filter. + + :return: The newly created and active `EventListener`. + """ + listener = EventListener(names, event_filter) + with self.listen(listener): + yield listener diff --git a/python/qemu/qmp/legacy.py b/python/qemu/qmp/legacy.py new file mode 100644 index 0000000..a8629b4 --- /dev/null +++ b/python/qemu/qmp/legacy.py @@ -0,0 +1,317 @@ +""" +(Legacy) Sync QMP Wrapper + +This module provides the `QEMUMonitorProtocol` class, which is a +synchronous wrapper around `QMPClient`. + +Its design closely resembles that of the original QEMUMonitorProtocol +class, originally written by Luiz Capitulino. It is provided here for +compatibility with scripts inside the QEMU source tree that expect the +old interface. +""" + +# +# Copyright (C) 2009-2022 Red Hat Inc. +# +# Authors: +# Luiz Capitulino +# John Snow +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. +# + +import asyncio +from types import TracebackType +from typing import ( + Any, + Awaitable, + Dict, + List, + Optional, + Type, + TypeVar, + Union, +) + +from .error import QMPError +from .protocol import Runstate, SocketAddrT +from .qmp_client import QMPClient + + +#: QMPMessage is an entire QMP message of any kind. +QMPMessage = Dict[str, Any] + +#: QMPReturnValue is the 'return' value of a command. +QMPReturnValue = object + +#: QMPObject is any object in a QMP message. +QMPObject = Dict[str, object] + +# QMPMessage can be outgoing commands or incoming events/returns. +# QMPReturnValue is usually a dict/json object, but due to QAPI's +# 'returns-whitelist', it can actually be anything. +# +# {'return': {}} is a QMPMessage, +# {} is the QMPReturnValue. + + +class QMPBadPortError(QMPError): + """ + Unable to parse socket address: Port was non-numerical. + """ + + +class QEMUMonitorProtocol: + """ + Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) + and then allow to handle commands and events. + + :param address: QEMU address, can be either a unix socket path (string) + or a tuple in the form ( address, port ) for a TCP + connection + :param server: Act as the socket server. (See 'accept') + :param nickname: Optional nickname used for logging. + """ + + def __init__(self, address: SocketAddrT, + server: bool = False, + nickname: Optional[str] = None): + + self._qmp = QMPClient(nickname) + self._aloop = asyncio.get_event_loop() + self._address = address + self._timeout: Optional[float] = None + + if server: + self._sync(self._qmp.start_server(self._address)) + + _T = TypeVar('_T') + + def _sync( + self, future: Awaitable[_T], timeout: Optional[float] = None + ) -> _T: + return self._aloop.run_until_complete( + asyncio.wait_for(future, timeout=timeout) + ) + + def _get_greeting(self) -> Optional[QMPMessage]: + if self._qmp.greeting is not None: + # pylint: disable=protected-access + return self._qmp.greeting._asdict() + return None + + def __enter__(self: _T) -> _T: + # Implement context manager enter function. + return self + + def __exit__(self, + # pylint: disable=duplicate-code + # see https://github.com/PyCQA/pylint/issues/3619 + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType]) -> None: + # Implement context manager exit function. + self.close() + + @classmethod + def parse_address(cls, address: str) -> SocketAddrT: + """ + Parse a string into a QMP address. + + Figure out if the argument is in the port:host form. + If it's not, it's probably a file path. + """ + components = address.split(':') + if len(components) == 2: + try: + port = int(components[1]) + except ValueError: + msg = f"Bad port: '{components[1]}' in '{address}'." + raise QMPBadPortError(msg) from None + return (components[0], port) + + # Treat as filepath. + return address + + def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: + """ + Connect to the QMP Monitor and perform capabilities negotiation. + + :return: QMP greeting dict, or None if negotiate is false + :raise ConnectError: on connection errors + """ + self._qmp.await_greeting = negotiate + self._qmp.negotiate = negotiate + + self._sync( + self._qmp.connect(self._address) + ) + return self._get_greeting() + + def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: + """ + Await connection from QMP Monitor and perform capabilities negotiation. + + :param timeout: + timeout in seconds (nonnegative float number, or None). + If None, there is no timeout, and this may block forever. + + :return: QMP greeting dict + :raise ConnectError: on connection errors + """ + self._qmp.await_greeting = True + self._qmp.negotiate = True + + self._sync(self._qmp.accept(), timeout) + + ret = self._get_greeting() + assert ret is not None + return ret + + def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: + """ + Send a QMP command to the QMP Monitor. + + :param qmp_cmd: QMP command to be sent as a Python dict + :return: QMP response as a Python dict + """ + return dict( + self._sync( + # pylint: disable=protected-access + + # _raw() isn't a public API, because turning off + # automatic ID assignment is discouraged. For + # compatibility with iotests *only*, do it anyway. + self._qmp._raw(qmp_cmd, assign_id=False), + self._timeout + ) + ) + + def cmd(self, name: str, + args: Optional[Dict[str, object]] = None, + cmd_id: Optional[object] = None) -> QMPMessage: + """ + Build a QMP command and send it to the QMP Monitor. + + :param name: command name (string) + :param args: command arguments (dict) + :param cmd_id: command id (dict, list, string or int) + """ + qmp_cmd: QMPMessage = {'execute': name} + if args: + qmp_cmd['arguments'] = args + if cmd_id: + qmp_cmd['id'] = cmd_id + return self.cmd_obj(qmp_cmd) + + def command(self, cmd: str, **kwds: object) -> QMPReturnValue: + """ + Build and send a QMP command to the monitor, report errors if any + """ + return self._sync( + self._qmp.execute(cmd, kwds), + self._timeout + ) + + def pull_event(self, + wait: Union[bool, float] = False) -> Optional[QMPMessage]: + """ + Pulls a single event. + + :param wait: + If False or 0, do not wait. Return None if no events ready. + If True, wait forever until the next event. + Otherwise, wait for the specified number of seconds. + + :raise asyncio.TimeoutError: + When a timeout is requested and the timeout period elapses. + + :return: The first available QMP event, or None. + """ + if not wait: + # wait is False/0: "do not wait, do not except." + if self._qmp.events.empty(): + return None + + # If wait is 'True', wait forever. If wait is False/0, the events + # queue must not be empty; but it still needs some real amount + # of time to complete. + timeout = None + if wait and isinstance(wait, float): + timeout = wait + + return dict( + self._sync( + self._qmp.events.get(), + timeout + ) + ) + + def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: + """ + Get a list of QMP events and clear all pending events. + + :param wait: + If False or 0, do not wait. Return None if no events ready. + If True, wait until we have at least one event. + Otherwise, wait for up to the specified number of seconds for at + least one event. + + :raise asyncio.TimeoutError: + When a timeout is requested and the timeout period elapses. + + :return: A list of QMP events. + """ + events = [dict(x) for x in self._qmp.events.clear()] + if events: + return events + + event = self.pull_event(wait) + return [event] if event is not None else [] + + def clear_events(self) -> None: + """Clear current list of pending events.""" + self._qmp.events.clear() + + def close(self) -> None: + """Close the connection.""" + self._sync( + self._qmp.disconnect() + ) + + def settimeout(self, timeout: Optional[float]) -> None: + """ + Set the timeout for QMP RPC execution. + + This timeout affects the `cmd`, `cmd_obj`, and `command` methods. + The `accept`, `pull_event` and `get_event` methods have their + own configurable timeouts. + + :param timeout: + timeout in seconds, or None. + None will wait indefinitely. + """ + self._timeout = timeout + + def send_fd_scm(self, fd: int) -> None: + """ + Send a file descriptor to the remote via SCM_RIGHTS. + """ + self._qmp.send_fd_scm(fd) + + def __del__(self) -> None: + if self._qmp.runstate == Runstate.IDLE: + return + + if not self._aloop.is_running(): + self.close() + else: + # Garbage collection ran while the event loop was running. + # Nothing we can do about it now, but if we don't raise our + # own error, the user will be treated to a lot of traceback + # they might not understand. + raise QMPError( + "QEMUMonitorProtocol.close()" + " was not called before object was garbage collected" + ) diff --git a/python/qemu/qmp/message.py b/python/qemu/qmp/message.py new file mode 100644 index 0000000..f76ccc9 --- /dev/null +++ b/python/qemu/qmp/message.py @@ -0,0 +1,209 @@ +""" +QMP Message Format + +This module provides the `Message` class, which represents a single QMP +message sent to or from the server. +""" + +import json +from json import JSONDecodeError +from typing import ( + Dict, + Iterator, + Mapping, + MutableMapping, + Optional, + Union, +) + +from .error import ProtocolError + + +class Message(MutableMapping[str, object]): + """ + Represents a single QMP protocol message. + + QMP uses JSON objects as its basic communicative unit; so this + Python object is a :py:obj:`~collections.abc.MutableMapping`. It may + be instantiated from either another mapping (like a `dict`), or from + raw `bytes` that still need to be deserialized. + + Once instantiated, it may be treated like any other MutableMapping:: + + >>> msg = Message(b'{"hello": "world"}') + >>> assert msg['hello'] == 'world' + >>> msg['id'] = 'foobar' + >>> print(msg) + { + "hello": "world", + "id": "foobar" + } + + It can be converted to `bytes`:: + + >>> msg = Message({"hello": "world"}) + >>> print(bytes(msg)) + b'{"hello":"world","id":"foobar"}' + + Or back into a garden-variety `dict`:: + + >>> dict(msg) + {'hello': 'world'} + + + :param value: Initial value, if any. + :param eager: + When `True`, attempt to serialize or deserialize the initial value + immediately, so that conversion exceptions are raised during + the call to ``__init__()``. + """ + # pylint: disable=too-many-ancestors + + def __init__(self, + value: Union[bytes, Mapping[str, object]] = b'{}', *, + eager: bool = True): + self._data: Optional[bytes] = None + self._obj: Optional[Dict[str, object]] = None + + if isinstance(value, bytes): + self._data = value + if eager: + self._obj = self._deserialize(self._data) + else: + self._obj = dict(value) + if eager: + self._data = self._serialize(self._obj) + + # Methods necessary to implement the MutableMapping interface, see: + # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping + + # We get pop, popitem, clear, update, setdefault, __contains__, + # keys, items, values, get, __eq__ and __ne__ for free. + + def __getitem__(self, key: str) -> object: + return self._object[key] + + def __setitem__(self, key: str, value: object) -> None: + self._object[key] = value + self._data = None + + def __delitem__(self, key: str) -> None: + del self._object[key] + self._data = None + + def __iter__(self) -> Iterator[str]: + return iter(self._object) + + def __len__(self) -> int: + return len(self._object) + + # Dunder methods not related to MutableMapping: + + def __repr__(self) -> str: + if self._obj is not None: + return f"Message({self._object!r})" + return f"Message({bytes(self)!r})" + + def __str__(self) -> str: + """Pretty-printed representation of this QMP message.""" + return json.dumps(self._object, indent=2) + + def __bytes__(self) -> bytes: + """bytes representing this QMP message.""" + if self._data is None: + self._data = self._serialize(self._obj or {}) + return self._data + + # Conversion Methods + + @property + def _object(self) -> Dict[str, object]: + """ + A `dict` representing this QMP message. + + Generated on-demand, if required. This property is private + because it returns an object that could be used to invalidate + the internal state of the `Message` object. + """ + if self._obj is None: + self._obj = self._deserialize(self._data or b'{}') + return self._obj + + @classmethod + def _serialize(cls, value: object) -> bytes: + """ + Serialize a JSON object as `bytes`. + + :raise ValueError: When the object cannot be serialized. + :raise TypeError: When the object cannot be serialized. + + :return: `bytes` ready to be sent over the wire. + """ + return json.dumps(value, separators=(',', ':')).encode('utf-8') + + @classmethod + def _deserialize(cls, data: bytes) -> Dict[str, object]: + """ + Deserialize JSON `bytes` into a native Python `dict`. + + :raise DeserializationError: + If JSON deserialization fails for any reason. + :raise UnexpectedTypeError: + If the data does not represent a JSON object. + + :return: A `dict` representing this QMP message. + """ + try: + obj = json.loads(data) + except JSONDecodeError as err: + emsg = "Failed to deserialize QMP message." + raise DeserializationError(emsg, data) from err + if not isinstance(obj, dict): + raise UnexpectedTypeError( + "QMP message is not a JSON object.", + obj + ) + return obj + + +class DeserializationError(ProtocolError): + """ + A QMP message was not understood as JSON. + + When this Exception is raised, ``__cause__`` will be set to the + `json.JSONDecodeError` Exception, which can be interrogated for + further details. + + :param error_message: Human-readable string describing the error. + :param raw: The raw `bytes` that prompted the failure. + """ + def __init__(self, error_message: str, raw: bytes): + super().__init__(error_message) + #: The raw `bytes` that were not understood as JSON. + self.raw: bytes = raw + + def __str__(self) -> str: + return "\n".join([ + super().__str__(), + f" raw bytes were: {str(self.raw)}", + ]) + + +class UnexpectedTypeError(ProtocolError): + """ + A QMP message was JSON, but not a JSON object. + + :param error_message: Human-readable string describing the error. + :param value: The deserialized JSON value that wasn't an object. + """ + def __init__(self, error_message: str, value: object): + super().__init__(error_message) + #: The JSON value that was expected to be an object. + self.value: object = value + + def __str__(self) -> str: + strval = json.dumps(self.value, indent=2) + return "\n".join([ + super().__str__(), + f" json value was: {strval}", + ]) diff --git a/python/qemu/qmp/models.py b/python/qemu/qmp/models.py new file mode 100644 index 0000000..de87f87 --- /dev/null +++ b/python/qemu/qmp/models.py @@ -0,0 +1,146 @@ +""" +QMP Data Models + +This module provides simplistic data classes that represent the few +structures that the QMP spec mandates; they are used to verify incoming +data to make sure it conforms to spec. +""" +# pylint: disable=too-few-public-methods + +from collections import abc +import copy +from typing import ( + Any, + Dict, + Mapping, + Optional, + Sequence, +) + + +class Model: + """ + Abstract data model, representing some QMP object of some kind. + + :param raw: The raw object to be validated. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + self._raw = raw + + def _check_key(self, key: str) -> None: + if key not in self._raw: + raise KeyError(f"'{self._name}' object requires '{key}' member") + + def _check_value(self, key: str, type_: type, typestr: str) -> None: + assert key in self._raw + if not isinstance(self._raw[key], type_): + raise TypeError( + f"'{self._name}' member '{key}' must be a {typestr}" + ) + + def _check_member(self, key: str, type_: type, typestr: str) -> None: + self._check_key(key) + self._check_value(key, type_, typestr) + + @property + def _name(self) -> str: + return type(self).__name__ + + def __repr__(self) -> str: + return f"{self._name}({self._raw!r})" + + +class Greeting(Model): + """ + Defined in qmp-spec.txt, section 2.2, "Server Greeting". + + :param raw: The raw Greeting object. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + super().__init__(raw) + #: 'QMP' member + self.QMP: QMPGreeting # pylint: disable=invalid-name + + self._check_member('QMP', abc.Mapping, "JSON object") + self.QMP = QMPGreeting(self._raw['QMP']) + + def _asdict(self) -> Dict[str, object]: + """ + For compatibility with the iotests sync QMP wrapper. + + The legacy QMP interface needs Greetings as a garden-variety Dict. + + This interface is private in the hopes that it will be able to + be dropped again in the near-future. Caller beware! + """ + return dict(copy.deepcopy(self._raw)) + + +class QMPGreeting(Model): + """ + Defined in qmp-spec.txt, section 2.2, "Server Greeting". + + :param raw: The raw QMPGreeting object. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + super().__init__(raw) + #: 'version' member + self.version: Mapping[str, object] + #: 'capabilities' member + self.capabilities: Sequence[object] + + self._check_member('version', abc.Mapping, "JSON object") + self.version = self._raw['version'] + + self._check_member('capabilities', abc.Sequence, "JSON array") + self.capabilities = self._raw['capabilities'] + + +class ErrorResponse(Model): + """ + Defined in qmp-spec.txt, section 2.4.2, "error". + + :param raw: The raw ErrorResponse object. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + super().__init__(raw) + #: 'error' member + self.error: ErrorInfo + #: 'id' member + self.id: Optional[object] = None # pylint: disable=invalid-name + + self._check_member('error', abc.Mapping, "JSON object") + self.error = ErrorInfo(self._raw['error']) + + if 'id' in raw: + self.id = raw['id'] + + +class ErrorInfo(Model): + """ + Defined in qmp-spec.txt, section 2.4.2, "error". + + :param raw: The raw ErrorInfo object. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + super().__init__(raw) + #: 'class' member, with an underscore to avoid conflicts in Python. + self.class_: str + #: 'desc' member + self.desc: str + + self._check_member('class', str, "string") + self.class_ = self._raw['class'] + + self._check_member('desc', str, "string") + self.desc = self._raw['desc'] diff --git a/python/qemu/qmp/protocol.py b/python/qemu/qmp/protocol.py new file mode 100644 index 0000000..6ea8665 --- /dev/null +++ b/python/qemu/qmp/protocol.py @@ -0,0 +1,1048 @@ +""" +Generic Asynchronous Message-based Protocol Support + +This module provides a generic framework for sending and receiving +messages over an asyncio stream. `AsyncProtocol` is an abstract class +that implements the core mechanisms of a simple send/receive protocol, +and is designed to be extended. + +In this package, it is used as the implementation for the `QMPClient` +class. +""" + +# It's all the docstrings ... ! It's long for a good reason ^_^; +# pylint: disable=too-many-lines + +import asyncio +from asyncio import StreamReader, StreamWriter +from enum import Enum +from functools import wraps +import logging +from ssl import SSLContext +from typing import ( + Any, + Awaitable, + Callable, + Generic, + List, + Optional, + Tuple, + TypeVar, + Union, + cast, +) + +from .error import QMPError +from .util import ( + bottom_half, + create_task, + exception_summary, + flush, + is_closing, + pretty_traceback, + upper_half, + wait_closed, +) + + +T = TypeVar('T') +_U = TypeVar('_U') +_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None`` + +InternetAddrT = Tuple[str, int] +UnixAddrT = str +SocketAddrT = Union[UnixAddrT, InternetAddrT] + + +class Runstate(Enum): + """Protocol session runstate.""" + + #: Fully quiesced and disconnected. + IDLE = 0 + #: In the process of connecting or establishing a session. + CONNECTING = 1 + #: Fully connected and active session. + RUNNING = 2 + #: In the process of disconnecting. + #: Runstate may be returned to `IDLE` by calling `disconnect()`. + DISCONNECTING = 3 + + +class ConnectError(QMPError): + """ + Raised when the initial connection process has failed. + + This Exception always wraps a "root cause" exception that can be + interrogated for additional information. + + :param error_message: Human-readable string describing the error. + :param exc: The root-cause exception. + """ + def __init__(self, error_message: str, exc: Exception): + super().__init__(error_message) + #: Human-readable error string + self.error_message: str = error_message + #: Wrapped root cause exception + self.exc: Exception = exc + + def __str__(self) -> str: + cause = str(self.exc) + if not cause: + # If there's no error string, use the exception name. + cause = exception_summary(self.exc) + return f"{self.error_message}: {cause}" + + +class StateError(QMPError): + """ + An API command (connect, execute, etc) was issued at an inappropriate time. + + This error is raised when a command like + :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate + time. + + :param error_message: Human-readable string describing the state violation. + :param state: The actual `Runstate` seen at the time of the violation. + :param required: The `Runstate` required to process this command. + """ + def __init__(self, error_message: str, + state: Runstate, required: Runstate): + super().__init__(error_message) + self.error_message = error_message + self.state = state + self.required = required + + +F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name + + +# Don't Panic. +def require(required_state: Runstate) -> Callable[[F], F]: + """ + Decorator: protect a method so it can only be run in a certain `Runstate`. + + :param required_state: The `Runstate` required to invoke this method. + :raise StateError: When the required `Runstate` is not met. + """ + def _decorator(func: F) -> F: + # _decorator is the decorator that is built by calling the + # require() decorator factory; e.g.: + # + # @require(Runstate.IDLE) def foo(): ... + # will replace 'foo' with the result of '_decorator(foo)'. + + @wraps(func) + def _wrapper(proto: 'AsyncProtocol[Any]', + *args: Any, **kwargs: Any) -> Any: + # _wrapper is the function that gets executed prior to the + # decorated method. + + name = type(proto).__name__ + + if proto.runstate != required_state: + if proto.runstate == Runstate.CONNECTING: + emsg = f"{name} is currently connecting." + elif proto.runstate == Runstate.DISCONNECTING: + emsg = (f"{name} is disconnecting." + " Call disconnect() to return to IDLE state.") + elif proto.runstate == Runstate.RUNNING: + emsg = f"{name} is already connected and running." + elif proto.runstate == Runstate.IDLE: + emsg = f"{name} is disconnected and idle." + else: + assert False + raise StateError(emsg, proto.runstate, required_state) + # No StateError, so call the wrapped method. + return func(proto, *args, **kwargs) + + # Return the decorated method; + # Transforming Func to Decorated[Func]. + return cast(F, _wrapper) + + # Return the decorator instance from the decorator factory. Phew! + return _decorator + + +class AsyncProtocol(Generic[T]): + """ + AsyncProtocol implements a generic async message-based protocol. + + This protocol assumes the basic unit of information transfer between + client and server is a "message", the details of which are left up + to the implementation. It assumes the sending and receiving of these + messages is full-duplex and not necessarily correlated; i.e. it + supports asynchronous inbound messages. + + It is designed to be extended by a specific protocol which provides + the implementations for how to read and send messages. These must be + defined in `_do_recv()` and `_do_send()`, respectively. + + Other callbacks have a default implementation, but are intended to be + either extended or overridden: + + - `_establish_session`: + The base implementation starts the reader/writer tasks. + A protocol implementation can override this call, inserting + actions to be taken prior to starting the reader/writer tasks + before the super() call; actions needing to occur afterwards + can be written after the super() call. + - `_on_message`: + Actions to be performed when a message is received. + - `_cb_outbound`: + Logging/Filtering hook for all outbound messages. + - `_cb_inbound`: + Logging/Filtering hook for all inbound messages. + This hook runs *before* `_on_message()`. + + :param name: + Name used for logging messages, if any. By default, messages + will log to 'qemu.qmp.protocol', but each individual connection + can be given its own logger by giving it a name; messages will + then log to 'qemu.qmp.protocol.${name}'. + """ + # pylint: disable=too-many-instance-attributes + + #: Logger object for debugging messages from this connection. + logger = logging.getLogger(__name__) + + # Maximum allowable size of read buffer + _limit = (64 * 1024) + + # ------------------------- + # Section: Public interface + # ------------------------- + + def __init__(self, name: Optional[str] = None) -> None: + #: The nickname for this connection, if any. + self.name: Optional[str] = name + if self.name is not None: + self.logger = self.logger.getChild(self.name) + + # stream I/O + self._reader: Optional[StreamReader] = None + self._writer: Optional[StreamWriter] = None + + # Outbound Message queue + self._outgoing: asyncio.Queue[T] + + # Special, long-running tasks: + self._reader_task: Optional[asyncio.Future[None]] = None + self._writer_task: Optional[asyncio.Future[None]] = None + + # Aggregate of the above two tasks, used for Exception management. + self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None + + #: Disconnect task. The disconnect implementation runs in a task + #: so that asynchronous disconnects (initiated by the + #: reader/writer) are allowed to wait for the reader/writers to + #: exit. + self._dc_task: Optional[asyncio.Future[None]] = None + + self._runstate = Runstate.IDLE + self._runstate_changed: Optional[asyncio.Event] = None + + # Server state for start_server() and _incoming() + self._server: Optional[asyncio.AbstractServer] = None + self._accepted: Optional[asyncio.Event] = None + + def __repr__(self) -> str: + cls_name = type(self).__name__ + tokens = [] + if self.name is not None: + tokens.append(f"name={self.name!r}") + tokens.append(f"runstate={self.runstate.name}") + return f"<{cls_name} {' '.join(tokens)}>" + + @property # @upper_half + def runstate(self) -> Runstate: + """The current `Runstate` of the connection.""" + return self._runstate + + @upper_half + async def runstate_changed(self) -> Runstate: + """ + Wait for the `runstate` to change, then return that runstate. + """ + await self._runstate_event.wait() + return self.runstate + + @upper_half + @require(Runstate.IDLE) + async def start_server_and_accept( + self, address: SocketAddrT, + ssl: Optional[SSLContext] = None + ) -> None: + """ + Accept a connection and begin processing message queues. + + If this call fails, `runstate` is guaranteed to be set back to `IDLE`. + This method is precisely equivalent to calling `start_server()` + followed by `accept()`. + + :param address: + Address to listen on; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise StateError: When the `Runstate` is not `IDLE`. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. + """ + await self.start_server(address, ssl) + await self.accept() + assert self.runstate == Runstate.RUNNING + + @upper_half + @require(Runstate.IDLE) + async def start_server(self, address: SocketAddrT, + ssl: Optional[SSLContext] = None) -> None: + """ + Start listening for an incoming connection, but do not wait for a peer. + + This method starts listening for an incoming connection, but + does not block waiting for a peer. This call will return + immediately after binding and listening on a socket. A later + call to `accept()` must be made in order to finalize the + incoming connection. + + :param address: + Address to listen on; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise StateError: When the `Runstate` is not `IDLE`. + :raise ConnectError: + When the server could not start listening on this address. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError`. + """ + await self._session_guard( + self._do_start_server(address, ssl), + 'Failed to establish connection') + assert self.runstate == Runstate.CONNECTING + + @upper_half + @require(Runstate.CONNECTING) + async def accept(self) -> None: + """ + Accept an incoming connection and begin processing message queues. + + If this call fails, `runstate` is guaranteed to be set back to `IDLE`. + + :raise StateError: When the `Runstate` is not `CONNECTING`. + :raise QMPError: When `start_server()` was not called yet. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. + """ + if self._accepted is None: + raise QMPError("Cannot call accept() before start_server().") + await self._session_guard( + self._do_accept(), + 'Failed to establish connection') + await self._session_guard( + self._establish_session(), + 'Failed to establish session') + assert self.runstate == Runstate.RUNNING + + @upper_half + @require(Runstate.IDLE) + async def connect(self, address: SocketAddrT, + ssl: Optional[SSLContext] = None) -> None: + """ + Connect to the server and begin processing message queues. + + If this call fails, `runstate` is guaranteed to be set back to `IDLE`. + + :param address: + Address to connect to; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise StateError: When the `Runstate` is not `IDLE`. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. + """ + await self._session_guard( + self._do_connect(address, ssl), + 'Failed to establish connection') + await self._session_guard( + self._establish_session(), + 'Failed to establish session') + assert self.runstate == Runstate.RUNNING + + @upper_half + async def disconnect(self) -> None: + """ + Disconnect and wait for all tasks to fully stop. + + If there was an exception that caused the reader/writers to + terminate prematurely, it will be raised here. + + :raise Exception: When the reader or writer terminate unexpectedly. + """ + self.logger.debug("disconnect() called.") + self._schedule_disconnect() + await self._wait_disconnect() + + # -------------------------- + # Section: Session machinery + # -------------------------- + + async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None: + """ + Async guard function used to roll back to `IDLE` on any error. + + On any Exception, the state machine will be reset back to + `IDLE`. Most Exceptions will be wrapped with `ConnectError`, but + `BaseException` events will be left alone (This includes + asyncio.CancelledError, even prior to Python 3.8). + + :param error_message: + Human-readable string describing what connection phase failed. + + :raise BaseException: + When `BaseException` occurs in the guarded block. + :raise ConnectError: + When any other error is encountered in the guarded block. + """ + # Note: After Python 3.6 support is removed, this should be an + # @asynccontextmanager instead of accepting a callback. + try: + await coro + except BaseException as err: + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + try: + # Reset the runstate back to IDLE. + await self.disconnect() + except: + # We don't expect any Exceptions from the disconnect function + # here, because we failed to connect in the first place. + # The disconnect() function is intended to perform + # only cannot-fail cleanup here, but you never know. + emsg = ( + "Unexpected bottom half exception. " + "This is a bug in the QMP library. " + "Please report it to and " + "CC: John Snow ." + ) + self.logger.critical("%s:\n%s\n", emsg, pretty_traceback()) + raise + + # CancelledError is an Exception with special semantic meaning; + # We do NOT want to wrap it up under ConnectError. + # NB: CancelledError is not a BaseException before Python 3.8 + if isinstance(err, asyncio.CancelledError): + raise + + # Any other kind of error can be treated as some kind of connection + # failure broadly. Inspect the 'exc' field to explore the root + # cause in greater detail. + if isinstance(err, Exception): + raise ConnectError(emsg, err) from err + + # Raise BaseExceptions un-wrapped, they're more important. + raise + + @property + def _runstate_event(self) -> asyncio.Event: + # asyncio.Event() objects should not be created prior to entrance into + # an event loop, so we can ensure we create it in the correct context. + # Create it on-demand *only* at the behest of an 'async def' method. + if not self._runstate_changed: + self._runstate_changed = asyncio.Event() + return self._runstate_changed + + @upper_half + @bottom_half + def _set_state(self, state: Runstate) -> None: + """ + Change the `Runstate` of the protocol connection. + + Signals the `runstate_changed` event. + """ + if state == self._runstate: + return + + self.logger.debug("Transitioning from '%s' to '%s'.", + str(self._runstate), str(state)) + self._runstate = state + self._runstate_event.set() + self._runstate_event.clear() + + @bottom_half + async def _stop_server(self) -> None: + """ + Stop listening for / accepting new incoming connections. + """ + if self._server is None: + return + + try: + self.logger.debug("Stopping server.") + self._server.close() + await self._server.wait_closed() + self.logger.debug("Server stopped.") + finally: + self._server = None + + @bottom_half # However, it does not run from the R/W tasks. + async def _incoming(self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter) -> None: + """ + Accept an incoming connection and signal the upper_half. + + This method does the minimum necessary to accept a single + incoming connection. It signals back to the upper_half ASAP so + that any errors during session initialization can occur + naturally in the caller's stack. + + :param reader: Incoming `asyncio.StreamReader` + :param writer: Incoming `asyncio.StreamWriter` + """ + peer = writer.get_extra_info('peername', 'Unknown peer') + self.logger.debug("Incoming connection from %s", peer) + + if self._reader or self._writer: + # Sadly, we can have more than one pending connection + # because of https://bugs.python.org/issue46715 + # Close any extra connections we don't actually want. + self.logger.warning("Extraneous connection inadvertently accepted") + writer.close() + return + + # A connection has been accepted; stop listening for new ones. + assert self._accepted is not None + await self._stop_server() + self._reader, self._writer = (reader, writer) + self._accepted.set() + + @upper_half + async def _do_start_server(self, address: SocketAddrT, + ssl: Optional[SSLContext] = None) -> None: + """ + Start listening for an incoming connection, but do not wait for a peer. + + This method starts listening for an incoming connection, but does not + block waiting for a peer. This call will return immediately after + binding and listening to a socket. A later call to accept() must be + made in order to finalize the incoming connection. + + :param address: + Address to listen on; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise OSError: For stream-related errors. + """ + assert self.runstate == Runstate.IDLE + self._set_state(Runstate.CONNECTING) + + self.logger.debug("Awaiting connection on %s ...", address) + self._accepted = asyncio.Event() + + if isinstance(address, tuple): + coro = asyncio.start_server( + self._incoming, + host=address[0], + port=address[1], + ssl=ssl, + backlog=1, + limit=self._limit, + ) + else: + coro = asyncio.start_unix_server( + self._incoming, + path=address, + ssl=ssl, + backlog=1, + limit=self._limit, + ) + + # Allow runstate watchers to witness 'CONNECTING' state; some + # failures in the streaming layer are synchronous and will not + # otherwise yield. + await asyncio.sleep(0) + + # This will start the server (bind(2), listen(2)). It will also + # call accept(2) if we yield, but we don't block on that here. + self._server = await coro + self.logger.debug("Server listening on %s", address) + + @upper_half + async def _do_accept(self) -> None: + """ + Wait for and accept an incoming connection. + + Requires that we have not yet accepted an incoming connection + from the upper_half, but it's OK if the server is no longer + running because the bottom_half has already accepted the + connection. + """ + assert self._accepted is not None + await self._accepted.wait() + assert self._server is None + self._accepted = None + + self.logger.debug("Connection accepted.") + + @upper_half + async def _do_connect(self, address: SocketAddrT, + ssl: Optional[SSLContext] = None) -> None: + """ + Acting as the transport client, initiate a connection to a server. + + :param address: + Address to connect to; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise OSError: For stream-related errors. + """ + assert self.runstate == Runstate.IDLE + self._set_state(Runstate.CONNECTING) + + # Allow runstate watchers to witness 'CONNECTING' state; some + # failures in the streaming layer are synchronous and will not + # otherwise yield. + await asyncio.sleep(0) + + self.logger.debug("Connecting to %s ...", address) + + if isinstance(address, tuple): + connect = asyncio.open_connection( + address[0], + address[1], + ssl=ssl, + limit=self._limit, + ) + else: + connect = asyncio.open_unix_connection( + path=address, + ssl=ssl, + limit=self._limit, + ) + self._reader, self._writer = await connect + + self.logger.debug("Connected.") + + @upper_half + async def _establish_session(self) -> None: + """ + Establish a new session. + + Starts the readers/writer tasks; subclasses may perform their + own negotiations here. The Runstate will be RUNNING upon + successful conclusion. + """ + assert self.runstate == Runstate.CONNECTING + + self._outgoing = asyncio.Queue() + + reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader') + writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer') + + self._reader_task = create_task(reader_coro) + self._writer_task = create_task(writer_coro) + + self._bh_tasks = asyncio.gather( + self._reader_task, + self._writer_task, + ) + + self._set_state(Runstate.RUNNING) + await asyncio.sleep(0) # Allow runstate_event to process + + @upper_half + @bottom_half + def _schedule_disconnect(self) -> None: + """ + Initiate a disconnect; idempotent. + + This method is used both in the upper-half as a direct + consequence of `disconnect()`, and in the bottom-half in the + case of unhandled exceptions in the reader/writer tasks. + + It can be invoked no matter what the `runstate` is. + """ + if not self._dc_task: + self._set_state(Runstate.DISCONNECTING) + self.logger.debug("Scheduling disconnect.") + self._dc_task = create_task(self._bh_disconnect()) + + @upper_half + async def _wait_disconnect(self) -> None: + """ + Waits for a previously scheduled disconnect to finish. + + This method will gather any bottom half exceptions and re-raise + the one that occurred first; presuming it to be the root cause + of any subsequent Exceptions. It is intended to be used in the + upper half of the call chain. + + :raise Exception: + Arbitrary exception re-raised on behalf of the reader/writer. + """ + assert self.runstate == Runstate.DISCONNECTING + assert self._dc_task + + aws: List[Awaitable[object]] = [self._dc_task] + if self._bh_tasks: + aws.insert(0, self._bh_tasks) + all_defined_tasks = asyncio.gather(*aws) + + # Ensure disconnect is done; Exception (if any) is not raised here: + await asyncio.wait((self._dc_task,)) + + try: + await all_defined_tasks # Raise Exceptions from the bottom half. + finally: + self._cleanup() + self._set_state(Runstate.IDLE) + + @upper_half + def _cleanup(self) -> None: + """ + Fully reset this object to a clean state and return to `IDLE`. + """ + def _paranoid_task_erase(task: Optional['asyncio.Future[_U]'] + ) -> Optional['asyncio.Future[_U]']: + # Help to erase a task, ENSURING it is fully quiesced first. + assert (task is None) or task.done() + return None if (task and task.done()) else task + + assert self.runstate == Runstate.DISCONNECTING + self._dc_task = _paranoid_task_erase(self._dc_task) + self._reader_task = _paranoid_task_erase(self._reader_task) + self._writer_task = _paranoid_task_erase(self._writer_task) + self._bh_tasks = _paranoid_task_erase(self._bh_tasks) + + self._reader = None + self._writer = None + self._accepted = None + + # NB: _runstate_changed cannot be cleared because we still need it to + # send the final runstate changed event ...! + + # ---------------------------- + # Section: Bottom Half methods + # ---------------------------- + + @bottom_half + async def _bh_disconnect(self) -> None: + """ + Disconnect and cancel all outstanding tasks. + + It is designed to be called from its task context, + :py:obj:`~AsyncProtocol._dc_task`. By running in its own task, + it is free to wait on any pending actions that may still need to + occur in either the reader or writer tasks. + """ + assert self.runstate == Runstate.DISCONNECTING + + def _done(task: Optional['asyncio.Future[Any]']) -> bool: + return task is not None and task.done() + + # If the server is running, stop it. + await self._stop_server() + + # Are we already in an error pathway? If either of the tasks are + # already done, or if we have no tasks but a reader/writer; we + # must be. + # + # NB: We can't use _bh_tasks to check for premature task + # completion, because it may not yet have had a chance to run + # and gather itself. + tasks = tuple(filter(None, (self._writer_task, self._reader_task))) + error_pathway = _done(self._reader_task) or _done(self._writer_task) + if not tasks: + error_pathway |= bool(self._reader) or bool(self._writer) + + try: + # Try to flush the writer, if possible. + # This *may* cause an error and force us over into the error path. + if not error_pathway: + await self._bh_flush_writer() + except BaseException as err: + error_pathway = True + emsg = "Failed to flush the writer" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise + finally: + # Cancel any still-running tasks (Won't raise): + if self._writer_task is not None and not self._writer_task.done(): + self.logger.debug("Cancelling writer task.") + self._writer_task.cancel() + if self._reader_task is not None and not self._reader_task.done(): + self.logger.debug("Cancelling reader task.") + self._reader_task.cancel() + + # Close out the tasks entirely (Won't raise): + if tasks: + self.logger.debug("Waiting for tasks to complete ...") + await asyncio.wait(tasks) + + # Lastly, close the stream itself. (*May raise*!): + await self._bh_close_stream(error_pathway) + self.logger.debug("Disconnected.") + + @bottom_half + async def _bh_flush_writer(self) -> None: + if not self._writer_task: + return + + self.logger.debug("Draining the outbound queue ...") + await self._outgoing.join() + if self._writer is not None: + self.logger.debug("Flushing the StreamWriter ...") + await flush(self._writer) + + @bottom_half + async def _bh_close_stream(self, error_pathway: bool = False) -> None: + # NB: Closing the writer also implcitly closes the reader. + if not self._writer: + return + + if not is_closing(self._writer): + self.logger.debug("Closing StreamWriter.") + self._writer.close() + + self.logger.debug("Waiting for StreamWriter to close ...") + try: + await wait_closed(self._writer) + except Exception: # pylint: disable=broad-except + # It's hard to tell if the Stream is already closed or + # not. Even if one of the tasks has failed, it may have + # failed for a higher-layered protocol reason. The + # stream could still be open and perfectly fine. + # I don't know how to discern its health here. + + if error_pathway: + # We already know that *something* went wrong. Let's + # just trust that the Exception we already have is the + # better one to present to the user, even if we don't + # genuinely *know* the relationship between the two. + self.logger.debug( + "Discarding Exception from wait_closed:\n%s\n", + pretty_traceback(), + ) + else: + # Oops, this is a brand-new error! + raise + finally: + self.logger.debug("StreamWriter closed.") + + @bottom_half + async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None: + """ + Run one of the bottom-half methods in a loop forever. + + If the bottom half ever raises any exception, schedule a + disconnect that will terminate the entire loop. + + :param async_fn: The bottom-half method to run in a loop. + :param name: The name of this task, used for logging. + """ + try: + while True: + await async_fn() + except asyncio.CancelledError: + # We have been cancelled by _bh_disconnect, exit gracefully. + self.logger.debug("Task.%s: cancelled.", name) + return + except BaseException as err: + self.logger.log( + logging.INFO if isinstance(err, EOFError) else logging.ERROR, + "Task.%s: %s", + name, exception_summary(err) + ) + self.logger.debug("Task.%s: failure:\n%s\n", + name, pretty_traceback()) + self._schedule_disconnect() + raise + finally: + self.logger.debug("Task.%s: exiting.", name) + + @bottom_half + async def _bh_send_message(self) -> None: + """ + Wait for an outgoing message, then send it. + + Designed to be run in `_bh_loop_forever()`. + """ + msg = await self._outgoing.get() + try: + await self._send(msg) + finally: + self._outgoing.task_done() + + @bottom_half + async def _bh_recv_message(self) -> None: + """ + Wait for an incoming message and call `_on_message` to route it. + + Designed to be run in `_bh_loop_forever()`. + """ + msg = await self._recv() + await self._on_message(msg) + + # -------------------- + # Section: Message I/O + # -------------------- + + @upper_half + @bottom_half + def _cb_outbound(self, msg: T) -> T: + """ + Callback: outbound message hook. + + This is intended for subclasses to be able to add arbitrary + hooks to filter or manipulate outgoing messages. The base + implementation does nothing but log the message without any + manipulation of the message. + + :param msg: raw outbound message + :return: final outbound message + """ + self.logger.debug("--> %s", str(msg)) + return msg + + @upper_half + @bottom_half + def _cb_inbound(self, msg: T) -> T: + """ + Callback: inbound message hook. + + This is intended for subclasses to be able to add arbitrary + hooks to filter or manipulate incoming messages. The base + implementation does nothing but log the message without any + manipulation of the message. + + This method does not "handle" incoming messages; it is a filter. + The actual "endpoint" for incoming messages is `_on_message()`. + + :param msg: raw inbound message + :return: processed inbound message + """ + self.logger.debug("<-- %s", str(msg)) + return msg + + @upper_half + @bottom_half + async def _readline(self) -> bytes: + """ + Wait for a newline from the incoming reader. + + This method is provided as a convenience for upper-layer + protocols, as many are line-based. + + This method *may* return a sequence of bytes without a trailing + newline if EOF occurs, but *some* bytes were received. In this + case, the next call will raise `EOFError`. It is assumed that + the layer 5 protocol will decide if there is anything meaningful + to be done with a partial message. + + :raise OSError: For stream-related errors. + :raise EOFError: + If the reader stream is at EOF and there are no bytes to return. + :return: bytes, including the newline. + """ + assert self._reader is not None + msg_bytes = await self._reader.readline() + + if not msg_bytes: + if self._reader.at_eof(): + raise EOFError + + return msg_bytes + + @upper_half + @bottom_half + async def _do_recv(self) -> T: + """ + Abstract: Read from the stream and return a message. + + Very low-level; intended to only be called by `_recv()`. + """ + raise NotImplementedError + + @upper_half + @bottom_half + async def _recv(self) -> T: + """ + Read an arbitrary protocol message. + + .. warning:: + This method is intended primarily for `_bh_recv_message()` + to use in an asynchronous task loop. Using it outside of + this loop will "steal" messages from the normal routing + mechanism. It is safe to use prior to `_establish_session()`, + but should not be used otherwise. + + This method uses `_do_recv()` to retrieve the raw message, and + then transforms it using `_cb_inbound()`. + + :return: A single (filtered, processed) protocol message. + """ + message = await self._do_recv() + return self._cb_inbound(message) + + @upper_half + @bottom_half + def _do_send(self, msg: T) -> None: + """ + Abstract: Write a message to the stream. + + Very low-level; intended to only be called by `_send()`. + """ + raise NotImplementedError + + @upper_half + @bottom_half + async def _send(self, msg: T) -> None: + """ + Send an arbitrary protocol message. + + This method will transform any outgoing messages according to + `_cb_outbound()`. + + .. warning:: + Like `_recv()`, this method is intended to be called by + the writer task loop that processes outgoing + messages. Calling it directly may circumvent logic + implemented by the caller meant to correlate outgoing and + incoming messages. + + :raise OSError: For problems with the underlying stream. + """ + msg = self._cb_outbound(msg) + self._do_send(msg) + + @bottom_half + async def _on_message(self, msg: T) -> None: + """ + Called to handle the receipt of a new message. + + .. caution:: + This is executed from within the reader loop, so be advised + that waiting on either the reader or writer task will lead + to deadlock. Additionally, any unhandled exceptions will + directly cause the loop to halt, so logic may be best-kept + to a minimum if at all possible. + + :param msg: The incoming message, already logged/filtered. + """ + # Nothing to do in the abstract case. diff --git a/python/qemu/qmp/py.typed b/python/qemu/qmp/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/python/qemu/qmp/qmp_client.py b/python/qemu/qmp/qmp_client.py new file mode 100644 index 0000000..5dcda04 --- /dev/null +++ b/python/qemu/qmp/qmp_client.py @@ -0,0 +1,655 @@ +""" +QMP Protocol Implementation + +This module provides the `QMPClient` class, which can be used to connect +and send commands to a QMP server such as QEMU. The QMP class can be +used to either connect to a listening server, or used to listen and +accept an incoming connection from that server. +""" + +import asyncio +import logging +import socket +import struct +from typing import ( + Dict, + List, + Mapping, + Optional, + Union, + cast, +) + +from .error import ProtocolError, QMPError +from .events import Events +from .message import Message +from .models import ErrorResponse, Greeting +from .protocol import AsyncProtocol, Runstate, require +from .util import ( + bottom_half, + exception_summary, + pretty_traceback, + upper_half, +) + + +class _WrappedProtocolError(ProtocolError): + """ + Abstract exception class for Protocol errors that wrap an Exception. + + :param error_message: Human-readable string describing the error. + :param exc: The root-cause exception. + """ + def __init__(self, error_message: str, exc: Exception): + super().__init__(error_message) + self.exc = exc + + def __str__(self) -> str: + return f"{self.error_message}: {self.exc!s}" + + +class GreetingError(_WrappedProtocolError): + """ + An exception occurred during the Greeting phase. + + :param error_message: Human-readable string describing the error. + :param exc: The root-cause exception. + """ + + +class NegotiationError(_WrappedProtocolError): + """ + An exception occurred during the Negotiation phase. + + :param error_message: Human-readable string describing the error. + :param exc: The root-cause exception. + """ + + +class ExecuteError(QMPError): + """ + Exception raised by `QMPClient.execute()` on RPC failure. + + :param error_response: The RPC error response object. + :param sent: The sent RPC message that caused the failure. + :param received: The raw RPC error reply received. + """ + def __init__(self, error_response: ErrorResponse, + sent: Message, received: Message): + super().__init__(error_response.error.desc) + #: The sent `Message` that caused the failure + self.sent: Message = sent + #: The received `Message` that indicated failure + self.received: Message = received + #: The parsed error response + self.error: ErrorResponse = error_response + #: The QMP error class + self.error_class: str = error_response.error.class_ + + +class ExecInterruptedError(QMPError): + """ + Exception raised by `execute()` (et al) when an RPC is interrupted. + + This error is raised when an `execute()` statement could not be + completed. This can occur because the connection itself was + terminated before a reply was received. + + The true cause of the interruption will be available via `disconnect()`. + """ + + +class _MsgProtocolError(ProtocolError): + """ + Abstract error class for protocol errors that have a `Message` object. + + This Exception class is used for protocol errors where the `Message` + was mechanically understood, but was found to be inappropriate or + malformed. + + :param error_message: Human-readable string describing the error. + :param msg: The QMP `Message` that caused the error. + """ + def __init__(self, error_message: str, msg: Message): + super().__init__(error_message) + #: The received `Message` that caused the error. + self.msg: Message = msg + + def __str__(self) -> str: + return "\n".join([ + super().__str__(), + f" Message was: {str(self.msg)}\n", + ]) + + +class ServerParseError(_MsgProtocolError): + """ + The Server sent a `Message` indicating parsing failure. + + i.e. A reply has arrived from the server, but it is missing the "ID" + field, indicating a parsing error. + + :param error_message: Human-readable string describing the error. + :param msg: The QMP `Message` that caused the error. + """ + + +class BadReplyError(_MsgProtocolError): + """ + An execution reply was successfully routed, but not understood. + + If a QMP message is received with an 'id' field to allow it to be + routed, but is otherwise malformed, this exception will be raised. + + A reply message is malformed if it is missing either the 'return' or + 'error' keys, or if the 'error' value has missing keys or members of + the wrong type. + + :param error_message: Human-readable string describing the error. + :param msg: The malformed reply that was received. + :param sent: The message that was sent that prompted the error. + """ + def __init__(self, error_message: str, msg: Message, sent: Message): + super().__init__(error_message, msg) + #: The sent `Message` that caused the failure + self.sent = sent + + +class QMPClient(AsyncProtocol[Message], Events): + """ + Implements a QMP client connection. + + QMP can be used to establish a connection as either the transport + client or server, though this class always acts as the QMP client. + + :param name: Optional nickname for the connection, used for logging. + + Basic script-style usage looks like this:: + + qmp = QMPClient('my_virtual_machine_name') + await qmp.connect(('127.0.0.1', 1234)) + ... + res = await qmp.execute('block-query') + ... + await qmp.disconnect() + + Basic async client-style usage looks like this:: + + class Client: + def __init__(self, name: str): + self.qmp = QMPClient(name) + + async def watch_events(self): + try: + async for event in self.qmp.events: + print(f"Event: {event['event']}") + except asyncio.CancelledError: + return + + async def run(self, address='/tmp/qemu.socket'): + await self.qmp.connect(address) + asyncio.create_task(self.watch_events()) + await self.qmp.runstate_changed.wait() + await self.disconnect() + + See `qmp.events` for more detail on event handling patterns. + """ + #: Logger object used for debugging messages. + logger = logging.getLogger(__name__) + + # Read buffer limit; large enough to accept query-qmp-schema + _limit = (256 * 1024) + + # Type alias for pending execute() result items + _PendingT = Union[Message, ExecInterruptedError] + + def __init__(self, name: Optional[str] = None) -> None: + super().__init__(name) + Events.__init__(self) + + #: Whether or not to await a greeting after establishing a connection. + self.await_greeting: bool = True + + #: Whether or not to perform capabilities negotiation upon connection. + #: Implies `await_greeting`. + self.negotiate: bool = True + + # Cached Greeting, if one was awaited. + self._greeting: Optional[Greeting] = None + + # Command ID counter + self._execute_id = 0 + + # Incoming RPC reply messages. + self._pending: Dict[ + Union[str, None], + 'asyncio.Queue[QMPClient._PendingT]' + ] = {} + + @property + def greeting(self) -> Optional[Greeting]: + """The `Greeting` from the QMP server, if any.""" + return self._greeting + + @upper_half + async def _establish_session(self) -> None: + """ + Initiate the QMP session. + + Wait for the QMP greeting and perform capabilities negotiation. + + :raise GreetingError: When the greeting is not understood. + :raise NegotiationError: If the negotiation fails. + :raise EOFError: When the server unexpectedly hangs up. + :raise OSError: For underlying stream errors. + """ + self._greeting = None + self._pending = {} + + if self.await_greeting or self.negotiate: + self._greeting = await self._get_greeting() + + if self.negotiate: + await self._negotiate() + + # This will start the reader/writers: + await super()._establish_session() + + @upper_half + async def _get_greeting(self) -> Greeting: + """ + :raise GreetingError: When the greeting is not understood. + :raise EOFError: When the server unexpectedly hangs up. + :raise OSError: For underlying stream errors. + + :return: the Greeting object given by the server. + """ + self.logger.debug("Awaiting greeting ...") + + try: + msg = await self._recv() + return Greeting(msg) + except (ProtocolError, KeyError, TypeError) as err: + emsg = "Did not understand Greeting" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise GreetingError(emsg, err) from err + except BaseException as err: + # EOFError, OSError, or something unexpected. + emsg = "Failed to receive Greeting" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise + + @upper_half + async def _negotiate(self) -> None: + """ + Perform QMP capabilities negotiation. + + :raise NegotiationError: When negotiation fails. + :raise EOFError: When the server unexpectedly hangs up. + :raise OSError: For underlying stream errors. + """ + self.logger.debug("Negotiating capabilities ...") + + arguments: Dict[str, List[str]] = {} + if self._greeting and 'oob' in self._greeting.QMP.capabilities: + arguments.setdefault('enable', []).append('oob') + msg = self.make_execute_msg('qmp_capabilities', arguments=arguments) + + # It's not safe to use execute() here, because the reader/writers + # aren't running. AsyncProtocol *requires* that a new session + # does not fail after the reader/writers are running! + try: + await self._send(msg) + reply = await self._recv() + assert 'return' in reply + assert 'error' not in reply + except (ProtocolError, AssertionError) as err: + emsg = "Negotiation failed" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise NegotiationError(emsg, err) from err + except BaseException as err: + # EOFError, OSError, or something unexpected. + emsg = "Negotiation failed" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise + + @bottom_half + async def _bh_disconnect(self) -> None: + try: + await super()._bh_disconnect() + finally: + if self._pending: + self.logger.debug("Cancelling pending executions") + keys = self._pending.keys() + for key in keys: + self.logger.debug("Cancelling execution '%s'", key) + self._pending[key].put_nowait( + ExecInterruptedError("Disconnected") + ) + + self.logger.debug("QMP Disconnected.") + + @upper_half + def _cleanup(self) -> None: + super()._cleanup() + assert not self._pending + + @bottom_half + async def _on_message(self, msg: Message) -> None: + """ + Add an incoming message to the appropriate queue/handler. + + :raise ServerParseError: When Message indicates server parse failure. + """ + # Incoming messages are not fully parsed/validated here; + # do only light peeking to know how to route the messages. + + if 'event' in msg: + await self._event_dispatch(msg) + return + + # Below, we assume everything left is an execute/exec-oob response. + + exec_id = cast(Optional[str], msg.get('id')) + + if exec_id in self._pending: + await self._pending[exec_id].put(msg) + return + + # We have a message we can't route back to a caller. + + is_error = 'error' in msg + has_id = 'id' in msg + + if is_error and not has_id: + # This is very likely a server parsing error. + # It doesn't inherently belong to any pending execution. + # Instead of performing clever recovery, just terminate. + # See "NOTE" in qmp-spec.txt, section 2.4.2 + raise ServerParseError( + ("Server sent an error response without an ID, " + "but there are no ID-less executions pending. " + "Assuming this is a server parser failure."), + msg + ) + + # qmp-spec.txt, section 2.4: + # 'Clients should drop all the responses + # that have an unknown "id" field.' + self.logger.log( + logging.ERROR if is_error else logging.WARNING, + "Unknown ID '%s', message dropped.", + exec_id, + ) + self.logger.debug("Unroutable message: %s", str(msg)) + + @upper_half + @bottom_half + async def _do_recv(self) -> Message: + """ + :raise OSError: When a stream error is encountered. + :raise EOFError: When the stream is at EOF. + :raise ProtocolError: + When the Message is not understood. + See also `Message._deserialize`. + + :return: A single QMP `Message`. + """ + msg_bytes = await self._readline() + msg = Message(msg_bytes, eager=True) + return msg + + @upper_half + @bottom_half + def _do_send(self, msg: Message) -> None: + """ + :raise ValueError: JSON serialization failure + :raise TypeError: JSON serialization failure + :raise OSError: When a stream error is encountered. + """ + assert self._writer is not None + self._writer.write(bytes(msg)) + + @upper_half + def _get_exec_id(self) -> str: + exec_id = f"__qmp#{self._execute_id:05d}" + self._execute_id += 1 + return exec_id + + @upper_half + async def _issue(self, msg: Message) -> Union[None, str]: + """ + Issue a QMP `Message` and do not wait for a reply. + + :param msg: The QMP `Message` to send to the server. + + :return: The ID of the `Message` sent. + """ + msg_id: Optional[str] = None + if 'id' in msg: + assert isinstance(msg['id'], str) + msg_id = msg['id'] + + self._pending[msg_id] = asyncio.Queue(maxsize=1) + try: + await self._outgoing.put(msg) + except: + del self._pending[msg_id] + raise + + return msg_id + + @upper_half + async def _reply(self, msg_id: Union[str, None]) -> Message: + """ + Await a reply to a previously issued QMP message. + + :param msg_id: The ID of the previously issued message. + + :return: The reply from the server. + :raise ExecInterruptedError: + When the reply could not be retrieved because the connection + was lost, or some other problem. + """ + queue = self._pending[msg_id] + + try: + result = await queue.get() + if isinstance(result, ExecInterruptedError): + raise result + return result + finally: + del self._pending[msg_id] + + @upper_half + async def _execute(self, msg: Message, assign_id: bool = True) -> Message: + """ + Send a QMP `Message` to the server and await a reply. + + This method *assumes* you are sending some kind of an execute + statement that *will* receive a reply. + + An execution ID will be assigned if assign_id is `True`. It can be + disabled, but this requires that an ID is manually assigned + instead. For manually assigned IDs, you must not use the string + '__qmp#' anywhere in the ID. + + :param msg: The QMP `Message` to execute. + :param assign_id: If True, assign a new execution ID. + + :return: Execution reply from the server. + :raise ExecInterruptedError: + When the reply could not be retrieved because the connection + was lost, or some other problem. + """ + if assign_id: + msg['id'] = self._get_exec_id() + elif 'id' in msg: + assert isinstance(msg['id'], str) + assert '__qmp#' not in msg['id'] + + exec_id = await self._issue(msg) + return await self._reply(exec_id) + + @upper_half + @require(Runstate.RUNNING) + async def _raw( + self, + msg: Union[Message, Mapping[str, object], bytes], + assign_id: bool = True, + ) -> Message: + """ + Issue a raw `Message` to the QMP server and await a reply. + + :param msg: + A Message to send to the server. It may be a `Message`, any + Mapping (including Dict), or raw bytes. + :param assign_id: + Assign an arbitrary execution ID to this message. If + `False`, the existing id must either be absent (and no other + such pending execution may omit an ID) or a string. If it is + a string, it must not start with '__qmp#' and no other such + pending execution may currently be using that ID. + + :return: Execution reply from the server. + + :raise ExecInterruptedError: + When the reply could not be retrieved because the connection + was lost, or some other problem. + :raise TypeError: + When assign_id is `False`, an ID is given, and it is not a string. + :raise ValueError: + When assign_id is `False`, but the ID is not usable; + Either because it starts with '__qmp#' or it is already in-use. + """ + # 1. convert generic Mapping or bytes to a QMP Message + # 2. copy Message objects so that we assign an ID only to the copy. + msg = Message(msg) + + exec_id = msg.get('id') + if not assign_id and 'id' in msg: + if not isinstance(exec_id, str): + raise TypeError(f"ID ('{exec_id}') must be a string.") + if exec_id.startswith('__qmp#'): + raise ValueError( + f"ID ('{exec_id}') must not start with '__qmp#'." + ) + + if not assign_id and exec_id in self._pending: + raise ValueError( + f"ID '{exec_id}' is in-use and cannot be used." + ) + + return await self._execute(msg, assign_id=assign_id) + + @upper_half + @require(Runstate.RUNNING) + async def execute_msg(self, msg: Message) -> object: + """ + Execute a QMP command and return its value. + + :param msg: The QMP `Message` to execute. + + :return: + The command execution return value from the server. The type of + object returned depends on the command that was issued, + though most in QEMU return a `dict`. + :raise ValueError: + If the QMP `Message` does not have either the 'execute' or + 'exec-oob' fields set. + :raise ExecuteError: When the server returns an error response. + :raise ExecInterruptedError: if the connection was terminated early. + """ + if not ('execute' in msg or 'exec-oob' in msg): + raise ValueError("Requires 'execute' or 'exec-oob' message") + + # Copy the Message so that the ID assigned by _execute() is + # local to this method; allowing the ID to be seen in raised + # Exceptions but without modifying the caller's held copy. + msg = Message(msg) + reply = await self._execute(msg) + + if 'error' in reply: + try: + error_response = ErrorResponse(reply) + except (KeyError, TypeError) as err: + # Error response was malformed. + raise BadReplyError( + "QMP error reply is malformed", reply, msg, + ) from err + + raise ExecuteError(error_response, msg, reply) + + if 'return' not in reply: + raise BadReplyError( + "QMP reply is missing a 'error' or 'return' member", + reply, msg, + ) + + return reply['return'] + + @classmethod + def make_execute_msg(cls, cmd: str, + arguments: Optional[Mapping[str, object]] = None, + oob: bool = False) -> Message: + """ + Create an executable message to be sent by `execute_msg` later. + + :param cmd: QMP command name. + :param arguments: Arguments (if any). Must be JSON-serializable. + :param oob: If `True`, execute "out of band". + + :return: An executable QMP `Message`. + """ + msg = Message({'exec-oob' if oob else 'execute': cmd}) + if arguments is not None: + msg['arguments'] = arguments + return msg + + @upper_half + async def execute(self, cmd: str, + arguments: Optional[Mapping[str, object]] = None, + oob: bool = False) -> object: + """ + Execute a QMP command and return its value. + + :param cmd: QMP command name. + :param arguments: Arguments (if any). Must be JSON-serializable. + :param oob: If `True`, execute "out of band". + + :return: + The command execution return value from the server. The type of + object returned depends on the command that was issued, + though most in QEMU return a `dict`. + :raise ExecuteError: When the server returns an error response. + :raise ExecInterruptedError: if the connection was terminated early. + """ + msg = self.make_execute_msg(cmd, arguments, oob=oob) + return await self.execute_msg(msg) + + @upper_half + @require(Runstate.RUNNING) + def send_fd_scm(self, fd: int) -> None: + """ + Send a file descriptor to the remote via SCM_RIGHTS. + """ + assert self._writer is not None + sock = self._writer.transport.get_extra_info('socket') + + if sock.family != socket.AF_UNIX: + raise QMPError("Sending file descriptors requires a UNIX socket.") + + if not hasattr(sock, 'sendmsg'): + # We need to void the warranty sticker. + # Access to sendmsg is scheduled for removal in Python 3.11. + # Find the real backing socket to use it anyway. + sock = sock._sock # pylint: disable=protected-access + + sock.sendmsg( + [b' '], + [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))] + ) diff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py new file mode 100644 index 0000000..619ab42 --- /dev/null +++ b/python/qemu/qmp/qmp_shell.py @@ -0,0 +1,610 @@ +# +# Copyright (C) 2009-2022 Red Hat Inc. +# +# Authors: +# Luiz Capitulino +# John Snow +# +# This work is licensed under the terms of the GNU LGPL, version 2 or +# later. See the COPYING file in the top-level directory. +# + +""" +Low-level QEMU shell on top of QMP. + +usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server + +positional arguments: + qmp_server < UNIX socket path | TCP address:port > + +optional arguments: + -h, --help show this help message and exit + -H, --hmp Use HMP interface + -N, --skip-negotiation + Skip negotiate (for qemu-ga) + -v, --verbose Verbose (echo commands sent and received) + -p, --pretty Pretty-print JSON + + +Start QEMU with: + +# qemu [...] -qmp unix:./qmp-sock,server + +Run the shell: + +$ qmp-shell ./qmp-sock + +Commands have the following format: + + < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] + +For example: + +(QEMU) device_add driver=e1000 id=net1 +{'return': {}} +(QEMU) + +key=value pairs also support Python or JSON object literal subset notations, +without spaces. Dictionaries/objects {} are supported as are arrays []. + + example-command arg-name1={'key':'value','obj'={'prop':"value"}} + +Both JSON and Python formatting should work, including both styles of +string literal quotes. Both paradigms of literal values should work, +including null/true/false for JSON and None/True/False for Python. + + +Transactions have the following multi-line format: + + transaction( + action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] + ... + action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] + ) + +One line transactions are also supported: + + transaction( action-name1 ... ) + +For example: + + (QEMU) transaction( + TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 + TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 + TRANS> ) + {"return": {}} + (QEMU) + +Use the -v and -p options to activate the verbose and pretty-print options, +which will echo back the properly formatted JSON-compliant QMP that is being +sent to QEMU, which is useful for debugging and documentation generation. +""" + +import argparse +import ast +import json +import logging +import os +import re +import readline +from subprocess import Popen +import sys +from typing import ( + IO, + Iterator, + List, + NoReturn, + Optional, + Sequence, +) + +from qemu.qmp import ConnectError, QMPError, SocketAddrT +from qemu.qmp.legacy import ( + QEMUMonitorProtocol, + QMPBadPortError, + QMPMessage, + QMPObject, +) + + +LOG = logging.getLogger(__name__) + + +class QMPCompleter: + """ + QMPCompleter provides a readline library tab-complete behavior. + """ + # NB: Python 3.9+ will probably allow us to subclass list[str] directly, + # but pylint as of today does not know that List[str] is simply 'list'. + def __init__(self) -> None: + self._matches: List[str] = [] + + def append(self, value: str) -> None: + """Append a new valid completion to the list of possibilities.""" + return self._matches.append(value) + + def complete(self, text: str, state: int) -> Optional[str]: + """readline.set_completer() callback implementation.""" + for cmd in self._matches: + if cmd.startswith(text): + if state == 0: + return cmd + state -= 1 + return None + + +class QMPShellError(QMPError): + """ + QMP Shell Base error class. + """ + + +class FuzzyJSON(ast.NodeTransformer): + """ + This extension of ast.NodeTransformer filters literal "true/false/null" + values in a Python AST and replaces them by proper "True/False/None" values + that Python can properly evaluate. + """ + + @classmethod + def visit_Name(cls, # pylint: disable=invalid-name + node: ast.Name) -> ast.AST: + """ + Transform Name nodes with certain values into Constant (keyword) nodes. + """ + if node.id == 'true': + return ast.Constant(value=True) + if node.id == 'false': + return ast.Constant(value=False) + if node.id == 'null': + return ast.Constant(value=None) + return node + + +class QMPShell(QEMUMonitorProtocol): + """ + QMPShell provides a basic readline-based QMP shell. + + :param address: Address of the QMP server. + :param pretty: Pretty-print QMP messages. + :param verbose: Echo outgoing QMP messages to console. + """ + def __init__(self, address: SocketAddrT, + pretty: bool = False, + verbose: bool = False, + server: bool = False, + logfile: Optional[str] = None): + super().__init__(address, server=server) + self._greeting: Optional[QMPMessage] = None + self._completer = QMPCompleter() + self._transmode = False + self._actions: List[QMPMessage] = [] + self._histfile = os.path.join(os.path.expanduser('~'), + '.qmp-shell_history') + self.pretty = pretty + self.verbose = verbose + self.logfile = None + + if logfile is not None: + self.logfile = open(logfile, "w", encoding='utf-8') + + def close(self) -> None: + # Hook into context manager of parent to save shell history. + self._save_history() + super().close() + + def _fill_completion(self) -> None: + cmds = self.cmd('query-commands') + if 'error' in cmds: + return + for cmd in cmds['return']: + self._completer.append(cmd['name']) + + def _completer_setup(self) -> None: + self._completer = QMPCompleter() + self._fill_completion() + readline.set_history_length(1024) + readline.set_completer(self._completer.complete) + readline.parse_and_bind("tab: complete") + # NB: default delimiters conflict with some command names + # (eg. query-), clearing everything as it doesn't seem to matter + readline.set_completer_delims('') + try: + readline.read_history_file(self._histfile) + except FileNotFoundError: + pass + except IOError as err: + msg = f"Failed to read history '{self._histfile}': {err!s}" + LOG.warning(msg) + + def _save_history(self) -> None: + try: + readline.write_history_file(self._histfile) + except IOError as err: + msg = f"Failed to save history file '{self._histfile}': {err!s}" + LOG.warning(msg) + + @classmethod + def _parse_value(cls, val: str) -> object: + try: + return int(val) + except ValueError: + pass + + if val.lower() == 'true': + return True + if val.lower() == 'false': + return False + if val.startswith(('{', '[')): + # Try first as pure JSON: + try: + return json.loads(val) + except ValueError: + pass + # Try once again as FuzzyJSON: + try: + tree = ast.parse(val, mode='eval') + transformed = FuzzyJSON().visit(tree) + return ast.literal_eval(transformed) + except (SyntaxError, ValueError): + pass + return val + + def _cli_expr(self, + tokens: Sequence[str], + parent: QMPObject) -> None: + for arg in tokens: + (key, sep, val) = arg.partition('=') + if sep != '=': + raise QMPShellError( + f"Expected a key=value pair, got '{arg!s}'" + ) + + value = self._parse_value(val) + optpath = key.split('.') + curpath = [] + for path in optpath[:-1]: + curpath.append(path) + obj = parent.get(path, {}) + if not isinstance(obj, dict): + msg = 'Cannot use "{:s}" as both leaf and non-leaf key' + raise QMPShellError(msg.format('.'.join(curpath))) + parent[path] = obj + parent = obj + if optpath[-1] in parent: + if isinstance(parent[optpath[-1]], dict): + msg = 'Cannot use "{:s}" as both leaf and non-leaf key' + raise QMPShellError(msg.format('.'.join(curpath))) + raise QMPShellError(f'Cannot set "{key}" multiple times') + parent[optpath[-1]] = value + + def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: + """ + Build a QMP input object from a user provided command-line in the + following format: + + < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] + """ + argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' + cmdargs = re.findall(argument_regex, cmdline) + qmpcmd: QMPMessage + + # Transactional CLI entry: + if cmdargs and cmdargs[0] == 'transaction(': + self._transmode = True + self._actions = [] + cmdargs.pop(0) + + # Transactional CLI exit: + if cmdargs and cmdargs[0] == ')' and self._transmode: + self._transmode = False + if len(cmdargs) > 1: + msg = 'Unexpected input after close of Transaction sub-shell' + raise QMPShellError(msg) + qmpcmd = { + 'execute': 'transaction', + 'arguments': {'actions': self._actions} + } + return qmpcmd + + # No args, or no args remaining + if not cmdargs: + return None + + if self._transmode: + # Parse and cache this Transactional Action + finalize = False + action = {'type': cmdargs[0], 'data': {}} + if cmdargs[-1] == ')': + cmdargs.pop(-1) + finalize = True + self._cli_expr(cmdargs[1:], action['data']) + self._actions.append(action) + return self._build_cmd(')') if finalize else None + + # Standard command: parse and return it to be executed. + qmpcmd = {'execute': cmdargs[0], 'arguments': {}} + self._cli_expr(cmdargs[1:], qmpcmd['arguments']) + return qmpcmd + + def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None: + jsobj = json.dumps(qmp_message, + indent=4 if self.pretty else None, + sort_keys=self.pretty) + print(str(jsobj), file=fh) + + def _execute_cmd(self, cmdline: str) -> bool: + try: + qmpcmd = self._build_cmd(cmdline) + except QMPShellError as err: + print( + f"Error while parsing command line: {err!s}\n" + "command format: " + "[arg-name1=arg1] ... [arg-nameN=argN", + file=sys.stderr + ) + return True + # For transaction mode, we may have just cached the action: + if qmpcmd is None: + return True + if self.verbose: + self._print(qmpcmd) + resp = self.cmd_obj(qmpcmd) + if resp is None: + print('Disconnected') + return False + self._print(resp) + if self.logfile is not None: + cmd = {**qmpcmd, **resp} + self._print(cmd, fh=self.logfile) + return True + + def connect(self, negotiate: bool = True) -> None: + self._greeting = super().connect(negotiate) + self._completer_setup() + + def show_banner(self, + msg: str = 'Welcome to the QMP low-level shell!') -> None: + """ + Print to stdio a greeting, and the QEMU version if available. + """ + print(msg) + if not self._greeting: + print('Connected') + return + version = self._greeting['QMP']['version']['qemu'] + print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) + + @property + def prompt(self) -> str: + """ + Return the current shell prompt, including a trailing space. + """ + if self._transmode: + return 'TRANS> ' + return '(QEMU) ' + + def read_exec_command(self) -> bool: + """ + Read and execute a command. + + @return True if execution was ok, return False if disconnected. + """ + try: + cmdline = input(self.prompt) + except EOFError: + print() + return False + + if cmdline == '': + for event in self.get_events(): + print(event) + return True + + return self._execute_cmd(cmdline) + + def repl(self) -> Iterator[None]: + """ + Return an iterator that implements the REPL. + """ + self.show_banner() + while self.read_exec_command(): + yield + self.close() + + +class HMPShell(QMPShell): + """ + HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. + + :param address: Address of the QMP server. + :param pretty: Pretty-print QMP messages. + :param verbose: Echo outgoing QMP messages to console. + """ + def __init__(self, address: SocketAddrT, + pretty: bool = False, + verbose: bool = False, + server: bool = False, + logfile: Optional[str] = None): + super().__init__(address, pretty, verbose, server, logfile) + self._cpu_index = 0 + + def _cmd_completion(self) -> None: + for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): + if cmd and cmd[0] != '[' and cmd[0] != '\t': + name = cmd.split()[0] # drop help text + if name == 'info': + continue + if name.find('|') != -1: + # Command in the form 'foobar|f' or 'f|foobar', take the + # full name + opt = name.split('|') + if len(opt[0]) == 1: + name = opt[1] + else: + name = opt[0] + self._completer.append(name) + self._completer.append('help ' + name) # help completion + + def _info_completion(self) -> None: + for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): + if cmd: + self._completer.append('info ' + cmd.split()[1]) + + def _other_completion(self) -> None: + # special cases + self._completer.append('help info') + + def _fill_completion(self) -> None: + self._cmd_completion() + self._info_completion() + self._other_completion() + + def _cmd_passthrough(self, cmdline: str, + cpu_index: int = 0) -> QMPMessage: + return self.cmd_obj({ + 'execute': 'human-monitor-command', + 'arguments': { + 'command-line': cmdline, + 'cpu-index': cpu_index + } + }) + + def _execute_cmd(self, cmdline: str) -> bool: + if cmdline.split()[0] == "cpu": + # trap the cpu command, it requires special setting + try: + idx = int(cmdline.split()[1]) + if 'return' not in self._cmd_passthrough('info version', idx): + print('bad CPU index') + return True + self._cpu_index = idx + except ValueError: + print('cpu command takes an integer argument') + return True + resp = self._cmd_passthrough(cmdline, self._cpu_index) + if resp is None: + print('Disconnected') + return False + assert 'return' in resp or 'error' in resp + if 'return' in resp: + # Success + if len(resp['return']) > 0: + print(resp['return'], end=' ') + else: + # Error + print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) + return True + + def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: + QMPShell.show_banner(self, msg) + + +def die(msg: str) -> NoReturn: + """Write an error to stderr, then exit with a return code of 1.""" + sys.stderr.write('ERROR: %s\n' % msg) + sys.exit(1) + + +def main() -> None: + """ + qmp-shell entry point: parse command line arguments and start the REPL. + """ + parser = argparse.ArgumentParser() + parser.add_argument('-H', '--hmp', action='store_true', + help='Use HMP interface') + parser.add_argument('-N', '--skip-negotiation', action='store_true', + help='Skip negotiate (for qemu-ga)') + parser.add_argument('-v', '--verbose', action='store_true', + help='Verbose (echo commands sent and received)') + parser.add_argument('-p', '--pretty', action='store_true', + help='Pretty-print JSON') + parser.add_argument('-l', '--logfile', + help='Save log of all QMP messages to PATH') + + default_server = os.environ.get('QMP_SOCKET') + parser.add_argument('qmp_server', action='store', + default=default_server, + help='< UNIX socket path | TCP address:port >') + + args = parser.parse_args() + if args.qmp_server is None: + parser.error("QMP socket or TCP address must be specified") + + shell_class = HMPShell if args.hmp else QMPShell + + try: + address = shell_class.parse_address(args.qmp_server) + except QMPBadPortError: + parser.error(f"Bad port number: {args.qmp_server}") + return # pycharm doesn't know error() is noreturn + + with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu: + try: + qemu.connect(negotiate=not args.skip_negotiation) + except ConnectError as err: + if isinstance(err.exc, OSError): + die(f"Couldn't connect to {args.qmp_server}: {err!s}") + die(str(err)) + + for _ in qemu.repl(): + pass + + +def main_wrap() -> None: + """ + qmp-shell-wrap entry point: parse command line arguments and + start the REPL. + """ + parser = argparse.ArgumentParser() + parser.add_argument('-H', '--hmp', action='store_true', + help='Use HMP interface') + parser.add_argument('-v', '--verbose', action='store_true', + help='Verbose (echo commands sent and received)') + parser.add_argument('-p', '--pretty', action='store_true', + help='Pretty-print JSON') + parser.add_argument('-l', '--logfile', + help='Save log of all QMP messages to PATH') + + parser.add_argument('command', nargs=argparse.REMAINDER, + help='QEMU command line to invoke') + + args = parser.parse_args() + + cmd = args.command + if len(cmd) != 0 and cmd[0] == '--': + cmd = cmd[1:] + if len(cmd) == 0: + cmd = ["qemu-system-x86_64"] + + sockpath = "qmp-shell-wrap-%d" % os.getpid() + cmd += ["-qmp", "unix:%s" % sockpath] + + shell_class = HMPShell if args.hmp else QMPShell + + try: + address = shell_class.parse_address(sockpath) + except QMPBadPortError: + parser.error(f"Bad port number: {sockpath}") + return # pycharm doesn't know error() is noreturn + + try: + with shell_class(address, args.pretty, args.verbose, + True, args.logfile) as qemu: + with Popen(cmd): + + try: + qemu.accept() + except ConnectError as err: + if isinstance(err.exc, OSError): + die(f"Couldn't connect to {args.qmp_server}: {err!s}") + die(str(err)) + + for _ in qemu.repl(): + pass + finally: + os.unlink(sockpath) + + +if __name__ == '__main__': + main() diff --git a/python/qemu/qmp/util.py b/python/qemu/qmp/util.py new file mode 100644 index 0000000..eaa5fc7 --- /dev/null +++ b/python/qemu/qmp/util.py @@ -0,0 +1,217 @@ +""" +Miscellaneous Utilities + +This module provides asyncio utilities and compatibility wrappers for +Python 3.6 to provide some features that otherwise become available in +Python 3.7+. + +Various logging and debugging utilities are also provided, such as +`exception_summary()` and `pretty_traceback()`, used primarily for +adding information into the logging stream. +""" + +import asyncio +import sys +import traceback +from typing import ( + Any, + Coroutine, + Optional, + TypeVar, + cast, +) + + +T = TypeVar('T') + + +# -------------------------- +# Section: Utility Functions +# -------------------------- + + +async def flush(writer: asyncio.StreamWriter) -> None: + """ + Utility function to ensure a StreamWriter is *fully* drained. + + `asyncio.StreamWriter.drain` only promises we will return to below + the "high-water mark". This function ensures we flush the entire + buffer -- by setting the high water mark to 0 and then calling + drain. The flow control limits are restored after the call is + completed. + """ + transport = cast(asyncio.WriteTransport, writer.transport) + + # https://github.com/python/typeshed/issues/5779 + low, high = transport.get_write_buffer_limits() # type: ignore + transport.set_write_buffer_limits(0, 0) + try: + await writer.drain() + finally: + transport.set_write_buffer_limits(high, low) + + +def upper_half(func: T) -> T: + """ + Do-nothing decorator that annotates a method as an "upper-half" method. + + These methods must not call bottom-half functions directly, but can + schedule them to run. + """ + return func + + +def bottom_half(func: T) -> T: + """ + Do-nothing decorator that annotates a method as a "bottom-half" method. + + These methods must take great care to handle their own exceptions whenever + possible. If they go unhandled, they will cause termination of the loop. + + These methods do not, in general, have the ability to directly + report information to a caller’s context and will usually be + collected as a Task result instead. + + They must not call upper-half functions directly. + """ + return func + + +# ------------------------------- +# Section: Compatibility Wrappers +# ------------------------------- + + +def create_task(coro: Coroutine[Any, Any, T], + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> 'asyncio.Future[T]': + """ + Python 3.6-compatible `asyncio.create_task` wrapper. + + :param coro: The coroutine to execute in a task. + :param loop: Optionally, the loop to create the task in. + + :return: An `asyncio.Future` object. + """ + if sys.version_info >= (3, 7): + if loop is not None: + return loop.create_task(coro) + return asyncio.create_task(coro) # pylint: disable=no-member + + # Python 3.6: + return asyncio.ensure_future(coro, loop=loop) + + +def is_closing(writer: asyncio.StreamWriter) -> bool: + """ + Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper. + + :param writer: The `asyncio.StreamWriter` object. + :return: `True` if the writer is closing, or closed. + """ + if sys.version_info >= (3, 7): + return writer.is_closing() + + # Python 3.6: + transport = writer.transport + assert isinstance(transport, asyncio.WriteTransport) + return transport.is_closing() + + +async def wait_closed(writer: asyncio.StreamWriter) -> None: + """ + Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper. + + :param writer: The `asyncio.StreamWriter` to wait on. + """ + if sys.version_info >= (3, 7): + await writer.wait_closed() + return + + # Python 3.6 + transport = writer.transport + assert isinstance(transport, asyncio.WriteTransport) + + while not transport.is_closing(): + await asyncio.sleep(0) + + # This is an ugly workaround, but it's the best I can come up with. + sock = transport.get_extra_info('socket') + + if sock is None: + # Our transport doesn't have a socket? ... + # Nothing we can reasonably do. + return + + while sock.fileno() != -1: + await asyncio.sleep(0) + + +def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T: + """ + Python 3.6-compatible `asyncio.run` wrapper. + + :param coro: A coroutine to execute now. + :return: The return value from the coroutine. + """ + if sys.version_info >= (3, 7): + return asyncio.run(coro, debug=debug) + + # Python 3.6 + loop = asyncio.get_event_loop() + loop.set_debug(debug) + ret = loop.run_until_complete(coro) + loop.close() + + return ret + + +# ---------------------------- +# Section: Logging & Debugging +# ---------------------------- + + +def exception_summary(exc: BaseException) -> str: + """ + Return a summary string of an arbitrary exception. + + It will be of the form "ExceptionType: Error Message", if the error + string is non-empty, and just "ExceptionType" otherwise. + """ + name = type(exc).__qualname__ + smod = type(exc).__module__ + if smod not in ("__main__", "builtins"): + name = smod + '.' + name + + error = str(exc) + if error: + return f"{name}: {error}" + return name + + +def pretty_traceback(prefix: str = " | ") -> str: + """ + Formats the current traceback, indented to provide visual distinction. + + This is useful for printing a traceback within a traceback for + debugging purposes when encapsulating errors to deliver them up the + stack; when those errors are printed, this helps provide a nice + visual grouping to quickly identify the parts of the error that + belong to the inner exception. + + :param prefix: The prefix to append to each line of the traceback. + :return: A string, formatted something like the following:: + + | Traceback (most recent call last): + | File "foobar.py", line 42, in arbitrary_example + | foo.baz() + | ArbitraryError: [Errno 42] Something bad happened! + """ + output = "".join(traceback.format_exception(*sys.exc_info())) + + exc_lines = [] + for line in output.split('\n'): + exc_lines.append(prefix + line) + + # The last line is always empty, omit it + return "\n".join(exc_lines[:-1]) diff --git a/python/qemu/utils/qemu_ga_client.py b/python/qemu/utils/qemu_ga_client.py index 15ed430..8c38a7a 100644 --- a/python/qemu/utils/qemu_ga_client.py +++ b/python/qemu/utils/qemu_ga_client.py @@ -50,8 +50,8 @@ from typing import ( Sequence, ) -from qemu.aqmp import ConnectError, SocketAddrT -from qemu.aqmp.legacy import QEMUMonitorProtocol +from qemu.qmp import ConnectError, SocketAddrT +from qemu.qmp.legacy import QEMUMonitorProtocol # This script has not seen many patches or careful attention in quite diff --git a/python/qemu/utils/qom.py b/python/qemu/utils/qom.py index bb5d1a7..bcf192f 100644 --- a/python/qemu/utils/qom.py +++ b/python/qemu/utils/qom.py @@ -32,7 +32,7 @@ QOM commands: import argparse -from qemu.aqmp import ExecuteError +from qemu.qmp import ExecuteError from .qom_common import QOMCommand diff --git a/python/qemu/utils/qom_common.py b/python/qemu/utils/qom_common.py index e034a6f..80da1b2 100644 --- a/python/qemu/utils/qom_common.py +++ b/python/qemu/utils/qom_common.py @@ -27,8 +27,8 @@ from typing import ( TypeVar, ) -from qemu.aqmp import QMPError -from qemu.aqmp.legacy import QEMUMonitorProtocol +from qemu.qmp import QMPError +from qemu.qmp.legacy import QEMUMonitorProtocol class ObjectPropertyInfo: diff --git a/python/qemu/utils/qom_fuse.py b/python/qemu/utils/qom_fuse.py index 653a76b..8dcd59f 100644 --- a/python/qemu/utils/qom_fuse.py +++ b/python/qemu/utils/qom_fuse.py @@ -48,7 +48,7 @@ from typing import ( import fuse from fuse import FUSE, FuseOSError, Operations -from qemu.aqmp import ExecuteError +from qemu.qmp import ExecuteError from .qom_common import QOMCommand diff --git a/python/setup.cfg b/python/setup.cfg index 49e3c28..773e51b 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -24,7 +24,7 @@ classifiers = [options] python_requires = >= 3.6 packages = - qemu.aqmp + qemu.qmp qemu.machine qemu.utils @@ -66,9 +66,9 @@ console_scripts = qom-tree = qemu.utils.qom:QOMTree.entry_point qom-fuse = qemu.utils.qom_fuse:QOMFuse.entry_point [fuse] qemu-ga-client = qemu.utils.qemu_ga_client:main - qmp-shell = qemu.aqmp.qmp_shell:main - qmp-shell-wrap = qemu.aqmp.qmp_shell:main_wrap - aqmp-tui = qemu.aqmp.aqmp_tui:main [tui] + qmp-shell = qemu.qmp.qmp_shell:main + qmp-shell-wrap = qemu.qmp.qmp_shell:main_wrap + aqmp-tui = qemu.qmp.aqmp_tui:main [tui] [flake8] extend-ignore = E722 # Prefer pylint's bare-except checks to flake8's @@ -84,7 +84,7 @@ namespace_packages = True # fusepy has no type stubs: allow_subclassing_any = True -[mypy-qemu.aqmp.aqmp_tui] +[mypy-qemu.qmp.aqmp_tui] # urwid and urwid_readline have no type stubs: allow_subclassing_any = True diff --git a/python/tests/protocol.py b/python/tests/protocol.py index d6849ad..56c4d44 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -6,9 +6,9 @@ from tempfile import TemporaryDirectory import avocado -from qemu.aqmp import ConnectError, Runstate -from qemu.aqmp.protocol import AsyncProtocol, StateError -from qemu.aqmp.util import asyncio_run, create_task +from qemu.qmp import ConnectError, Runstate +from qemu.qmp.protocol import AsyncProtocol, StateError +from qemu.qmp.util import asyncio_run, create_task class NullProtocol(AsyncProtocol[None]): @@ -183,7 +183,7 @@ class Smoke(avocado.Test): def testLogger(self): self.assertEqual( self.proto.logger.name, - 'qemu.aqmp.protocol' + 'qemu.qmp.protocol' ) def testName(self): @@ -196,7 +196,7 @@ class Smoke(avocado.Test): self.assertEqual( self.proto.logger.name, - 'qemu.aqmp.protocol.Steve' + 'qemu.qmp.protocol.Steve' ) self.assertEqual( @@ -431,7 +431,7 @@ class Accept(Connect): await self.proto.start_server_and_accept('/dev/null') async def _hanging_connection(self): - with TemporaryDirectory(suffix='.aqmp') as tmpdir: + with TemporaryDirectory(suffix='.qmp') as tmpdir: sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock") await self.proto.start_server_and_accept(sock) @@ -587,7 +587,7 @@ class SimpleSession(TestBase): @TestBase.async_test async def testSmoke(self): - with TemporaryDirectory(suffix='.aqmp') as tmpdir: + with TemporaryDirectory(suffix='.qmp') as tmpdir: sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock") server_task = create_task(self.server.start_server_and_accept(sock)) -- cgit v1.1