diff options
-rw-r--r-- | python/.gitignore | 5 | ||||
-rw-r--r-- | python/Makefile | 9 | ||||
-rw-r--r-- | python/Pipfile.lock | 28 | ||||
-rw-r--r-- | python/avocado.cfg | 3 | ||||
-rw-r--r-- | python/qemu/aqmp/__init__.py | 59 | ||||
-rw-r--r-- | python/qemu/aqmp/aqmp_tui.py | 652 | ||||
-rw-r--r-- | python/qemu/aqmp/error.py | 50 | ||||
-rw-r--r-- | python/qemu/aqmp/events.py | 706 | ||||
-rw-r--r-- | python/qemu/aqmp/message.py | 209 | ||||
-rw-r--r-- | python/qemu/aqmp/models.py | 133 | ||||
-rw-r--r-- | python/qemu/aqmp/protocol.py | 902 | ||||
-rw-r--r-- | python/qemu/aqmp/py.typed | 0 | ||||
-rw-r--r-- | python/qemu/aqmp/qmp_client.py | 621 | ||||
-rw-r--r-- | python/qemu/aqmp/util.py | 217 | ||||
-rw-r--r-- | python/setup.cfg | 43 | ||||
-rw-r--r-- | python/tests/protocol.py | 583 |
16 files changed, 4214 insertions, 6 deletions
diff --git a/python/.gitignore b/python/.gitignore index c8b0e67..904f324 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -15,3 +15,8 @@ qemu.egg-info/ .venv/ .tox/ .dev-venv/ + +# Coverage.py reports +.coverage +.coverage.* +htmlcov/ diff --git a/python/Makefile b/python/Makefile index fe27a3e..3334311 100644 --- a/python/Makefile +++ b/python/Makefile @@ -92,6 +92,13 @@ check: check-tox: @tox $(QEMU_TOX_EXTRA_ARGS) +.PHONY: check-coverage +check-coverage: + @coverage run -m avocado --config avocado.cfg run tests/*.py + @coverage combine + @coverage html + @coverage report + .PHONY: clean clean: python3 setup.py clean --all @@ -100,3 +107,5 @@ clean: .PHONY: distclean distclean: clean rm -rf qemu.egg-info/ .venv/ .tox/ $(QEMU_VENV_DIR) dist/ + rm -f .coverage .coverage.* + rm -rf htmlcov/ diff --git a/python/Pipfile.lock b/python/Pipfile.lock index 8ab41a3..d2a7dbd 100644 --- a/python/Pipfile.lock +++ b/python/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "eff562a688ebc6f3ffe67494dbb804b883e2159ad81c4d55d96da9f7aec13e91" + "sha256": "784b327272db32403d5a488507853b5afba850ba26a5948e5b6a90c1baef2d9c" }, "pipfile-spec": 6, "requires": { @@ -39,11 +39,11 @@ }, "avocado-framework": { "hashes": [ - "sha256:3fca7226d7d164f124af8a741e7fa658ff4345a0738ddc32907631fd688b38ed", - "sha256:48ac254c0ae2ef0c0ceeb38e3d3df0388718eda8f48b3ab55b30b252839f42b1" + "sha256:244cb569f8eb4e50a22ac82e1a2b2bba2458999f4281efbe2651bd415d59c65b", + "sha256:6f15998b67ecd0e7dde790c4de4dd249d6df52dfe6d5cc4e2dd6596df51c3583" ], "index": "pypi", - "version": "==87.0" + "version": "==90.0" }, "distlib": { "hashes": [ @@ -200,6 +200,14 @@ ], "version": "==2.0.0" }, + "pygments": { + "hashes": [ + "sha256:a18f47b506a429f6f4b9df81bb02beab9ca21d0a5fee38ed15aef65f0545519f", + "sha256:d66e804411278594d764fc69ec36ec13d9ae9147193a1740cd34d272ca383b8e" + ], + "markers": "python_version >= '3.5'", + "version": "==2.9.0" + }, "pylint": { "hashes": [ "sha256:082a6d461b54f90eea49ca90fff4ee8b6e45e8029e5dbd72f6107ef84f3779c0", @@ -289,6 +297,18 @@ "markers": "python_version < '3.8'", "version": "==3.10.0.0" }, + "urwid": { + "hashes": [ + "sha256:588bee9c1cb208d0906a9f73c613d2bd32c3ed3702012f51efe318a3f2127eae" + ], + "version": "==2.1.2" + }, + "urwid-readline": { + "hashes": [ + "sha256:018020cbc864bb5ed87be17dc26b069eae2755cb29f3a9c569aac3bded1efaf4" + ], + "version": "==0.13" + }, "virtualenv": { "hashes": [ "sha256:14fdf849f80dbb29a4eb6caa9875d476ee2a5cf76a5f5415fa2f1606010ab467", diff --git a/python/avocado.cfg b/python/avocado.cfg index 10dc6fb..c7722e7 100644 --- a/python/avocado.cfg +++ b/python/avocado.cfg @@ -1,3 +1,6 @@ +[run] +test_runner = runner + [simpletests] # Don't show stdout/stderr in the test *summary* status.failure_fields = ['status'] diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py new file mode 100644 index 0000000..ab17829 --- /dev/null +++ b/python/qemu/aqmp/__init__.py @@ -0,0 +1,59 @@ +""" +QEMU Monitor Protocol (QMP) development library & tooling. + +This package provides a fairly low-level class for communicating +asynchronously with QMP protocol servers, as implemented by QEMU, the +QEMU Guest Agent, and the QEMU Storage Daemon. + +`QMPClient` provides the main functionality of this package. All errors +raised by this library dervive from `AQMPError`, see `aqmp.error` for +additional detail. See `aqmp.events` for an in-depth tutorial on +managing QMP events. +""" + +# Copyright (C) 2020, 2021 John Snow for Red Hat, Inc. +# +# Authors: +# John Snow <jsnow@redhat.com> +# +# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>. +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. + +import warnings + +from .error import AQMPError +from .events import EventListener +from .message import Message +from .protocol import ConnectError, Runstate, StateError +from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient + + +_WMSG = """ + +The Asynchronous QMP library is currently in development and its API +should be considered highly fluid and subject to change. It should +not be used by any other scripts checked into the QEMU tree. + +Proceed with caution! +""" + +warnings.warn(_WMSG, FutureWarning) + + +# The order of these fields impact the Sphinx documentation order. +__all__ = ( + # Classes, most to least important + 'QMPClient', + 'Message', + 'EventListener', + 'Runstate', + + # Exceptions, most generic to most explicit + 'AQMPError', + 'StateError', + 'ConnectError', + 'ExecuteError', + 'ExecInterruptedError', +) diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py new file mode 100644 index 0000000..a2929f7 --- /dev/null +++ b/python/qemu/aqmp/aqmp_tui.py @@ -0,0 +1,652 @@ +# Copyright (c) 2021 +# +# Authors: +# Niteesh Babu G S <niteesh.gs@gmail.com> +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. +""" +AQMP TUI + +AQMP TUI is an asynchronous interface built on top the of the AQMP library. +It is the successor of QMP-shell and is bought-in as a replacement for it. + +Example Usage: aqmp-tui <SOCKET | TCP IP:PORT> +Full Usage: aqmp-tui --help +""" + +import argparse +import asyncio +import json +import logging +from logging import Handler, LogRecord +import signal +from typing import ( + List, + Optional, + Tuple, + Type, + Union, + cast, +) + +from pygments import lexers +from pygments import token as Token +import urwid +import urwid_readline + +from ..qmp import QEMUMonitorProtocol, QMPBadPortError +from .error import ProtocolError +from .message import DeserializationError, Message, UnexpectedTypeError +from .protocol import ConnectError, Runstate +from .qmp_client import ExecInterruptedError, QMPClient +from .util import create_task, pretty_traceback + + +# The name of the signal that is used to update the history list +UPDATE_MSG: str = 'UPDATE_MSG' + + +palette = [ + (Token.Punctuation, '', '', '', 'h15,bold', 'g7'), + (Token.Text, '', '', '', '', 'g7'), + (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'), + (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'), + (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'), + (Token.Keyword.Constant, '', '', '', '#6af', 'g7'), + ('DEBUG', '', '', '', '#ddf', 'g7'), + ('INFO', '', '', '', 'g100', 'g7'), + ('WARNING', '', '', '', '#ff6', 'g7'), + ('ERROR', '', '', '', '#a00', 'g7'), + ('CRITICAL', '', '', '', '#a00', 'g7'), + ('background', '', 'black', '', '', 'g7'), +] + + +def format_json(msg: str) -> str: + """ + Formats valid/invalid multi-line JSON message into a single-line message. + + Formatting is first tried using the standard json module. If that fails + due to an decoding error then a simple string manipulation is done to + achieve a single line JSON string. + + Converting into single line is more asthetically pleasing when looking + along with error messages. + + Eg: + Input: + [ 1, + true, + 3 ] + The above input is not a valid QMP message and produces the following error + "QMP message is not a JSON object." + When displaying this in TUI in multiline mode we get + + [ 1, + true, + 3 ]: QMP message is not a JSON object. + + whereas in singleline mode we get the following + + [1, true, 3]: QMP message is not a JSON object. + + The single line mode is more asthetically pleasing. + + :param msg: + The message to formatted into single line. + + :return: Formatted singleline message. + """ + try: + msg = json.loads(msg) + return str(json.dumps(msg)) + except json.decoder.JSONDecodeError: + msg = msg.replace('\n', '') + words = msg.split(' ') + words = list(filter(None, words)) + return ' '.join(words) + + +def has_handler_type(logger: logging.Logger, + handler_type: Type[Handler]) -> bool: + """ + The Logger class has no interface to check if a certain type of handler is + installed or not. So we provide an interface to do so. + + :param logger: + Logger object + :param handler_type: + The type of the handler to be checked. + + :return: returns True if handler of type `handler_type`. + """ + for handler in logger.handlers: + if isinstance(handler, handler_type): + return True + return False + + +class App(QMPClient): + """ + Implements the AQMP TUI. + + Initializes the widgets and starts the urwid event loop. + + :param address: + Address of the server to connect to. + :param num_retries: + The number of times to retry before stopping to reconnect. + :param retry_delay: + The delay(sec) before each retry + """ + def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int, + retry_delay: Optional[int]) -> None: + urwid.register_signal(type(self), UPDATE_MSG) + self.window = Window(self) + self.address = address + self.aloop: Optional[asyncio.AbstractEventLoop] = None + self.num_retries = num_retries + self.retry_delay = retry_delay if retry_delay else 2 + self.retry: bool = False + self.exiting: bool = False + super().__init__() + + def add_to_history(self, msg: str, level: Optional[str] = None) -> None: + """ + Appends the msg to the history list. + + :param msg: + The raw message to be appended in string type. + """ + urwid.emit_signal(self, UPDATE_MSG, msg, level) + + def _cb_outbound(self, msg: Message) -> Message: + """ + Callback: outbound message hook. + + Appends the outgoing messages to the history box. + + :param msg: raw outbound message. + :return: final outbound message. + """ + str_msg = str(msg) + + if not has_handler_type(logging.getLogger(), TUILogHandler): + logging.debug('Request: %s', str_msg) + self.add_to_history('<-- ' + str_msg) + return msg + + def _cb_inbound(self, msg: Message) -> Message: + """ + Callback: outbound message hook. + + Appends the incoming messages to the history box. + + :param msg: raw inbound message. + :return: final inbound message. + """ + str_msg = str(msg) + + if not has_handler_type(logging.getLogger(), TUILogHandler): + logging.debug('Request: %s', str_msg) + self.add_to_history('--> ' + str_msg) + return msg + + async def _send_to_server(self, msg: Message) -> None: + """ + This coroutine sends the message to the server. + The message has to be pre-validated. + + :param msg: + Pre-validated message to be to sent to the server. + + :raise Exception: When an unhandled exception is caught. + """ + try: + await self._raw(msg, assign_id='id' not in msg) + except ExecInterruptedError as err: + logging.info('Error server disconnected before reply %s', str(err)) + self.add_to_history('Server disconnected before reply', 'ERROR') + except Exception as err: + logging.error('Exception from _send_to_server: %s', str(err)) + raise err + + def cb_send_to_server(self, raw_msg: str) -> None: + """ + Validates and sends the message to the server. + The raw string message is first converted into a Message object + and is then sent to the server. + + :param raw_msg: + The raw string message to be sent to the server. + + :raise Exception: When an unhandled exception is caught. + """ + try: + msg = Message(bytes(raw_msg, encoding='utf-8')) + create_task(self._send_to_server(msg)) + except (DeserializationError, UnexpectedTypeError) as err: + raw_msg = format_json(raw_msg) + logging.info('Invalid message: %s', err.error_message) + self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR') + + def unhandled_input(self, key: str) -> None: + """ + Handle's keys which haven't been handled by the child widgets. + + :param key: + Unhandled key + """ + if key == 'esc': + self.kill_app() + + def kill_app(self) -> None: + """ + Initiates killing of app. A bridge between asynchronous and synchronous + code. + """ + create_task(self._kill_app()) + + async def _kill_app(self) -> None: + """ + This coroutine initiates the actual disconnect process and calls + urwid.ExitMainLoop() to kill the TUI. + + :raise Exception: When an unhandled exception is caught. + """ + self.exiting = True + await self.disconnect() + logging.debug('Disconnect finished. Exiting app') + raise urwid.ExitMainLoop() + + async def disconnect(self) -> None: + """ + Overrides the disconnect method to handle the errors locally. + """ + try: + await super().disconnect() + except (OSError, EOFError) as err: + logging.info('disconnect: %s', str(err)) + self.retry = True + except ProtocolError as err: + logging.info('disconnect: %s', str(err)) + except Exception as err: + logging.error('disconnect: Unhandled exception %s', str(err)) + raise err + + def _set_status(self, msg: str) -> None: + """ + Sets the message as the status. + + :param msg: + The message to be displayed in the status bar. + """ + self.window.footer.set_text(msg) + + def _get_formatted_address(self) -> str: + """ + Returns a formatted version of the server's address. + + :return: formatted address + """ + if isinstance(self.address, tuple): + host, port = self.address + addr = f'{host}:{port}' + else: + addr = f'{self.address}' + return addr + + async def _initiate_connection(self) -> Optional[ConnectError]: + """ + Tries connecting to a server a number of times with a delay between + each try. If all retries failed then return the error faced during + the last retry. + + :return: Error faced during last retry. + """ + current_retries = 0 + err = None + + # initial try + await self.connect_server() + while self.retry and current_retries < self.num_retries: + logging.info('Connection Failed, retrying in %d', self.retry_delay) + status = f'[Retry #{current_retries} ({self.retry_delay}s)]' + self._set_status(status) + + await asyncio.sleep(self.retry_delay) + + err = await self.connect_server() + current_retries += 1 + # If all retries failed report the last error + if err: + logging.info('All retries failed: %s', err) + return err + return None + + async def manage_connection(self) -> None: + """ + Manage the connection based on the current run state. + + A reconnect is issued when the current state is IDLE and the number + of retries is not exhausted. + A disconnect is issued when the current state is DISCONNECTING. + """ + while not self.exiting: + if self.runstate == Runstate.IDLE: + err = await self._initiate_connection() + # If retry is still true then, we have exhausted all our tries. + if err: + self._set_status(f'[Error: {err.error_message}]') + else: + addr = self._get_formatted_address() + self._set_status(f'[Connected {addr}]') + elif self.runstate == Runstate.DISCONNECTING: + self._set_status('[Disconnected]') + await self.disconnect() + # check if a retry is needed + if self.runstate == Runstate.IDLE: + continue + await self.runstate_changed() + + async def connect_server(self) -> Optional[ConnectError]: + """ + Initiates a connection to the server at address `self.address` + and in case of a failure, sets the status to the respective error. + """ + try: + await self.connect(self.address) + self.retry = False + except ConnectError as err: + logging.info('connect_server: ConnectError %s', str(err)) + self.retry = True + return err + return None + + def run(self, debug: bool = False) -> None: + """ + Starts the long running co-routines and the urwid event loop. + + :param debug: + Enables/Disables asyncio event loop debugging + """ + screen = urwid.raw_display.Screen() + screen.set_terminal_properties(256) + + self.aloop = asyncio.get_event_loop() + self.aloop.set_debug(debug) + + # Gracefully handle SIGTERM and SIGINT signals + cancel_signals = [signal.SIGTERM, signal.SIGINT] + for sig in cancel_signals: + self.aloop.add_signal_handler(sig, self.kill_app) + + event_loop = urwid.AsyncioEventLoop(loop=self.aloop) + main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'), + unhandled_input=self.unhandled_input, + screen=screen, + palette=palette, + handle_mouse=True, + event_loop=event_loop) + + create_task(self.manage_connection(), self.aloop) + try: + main_loop.run() + except Exception as err: + logging.error('%s\n%s\n', str(err), pretty_traceback()) + raise err + + +class StatusBar(urwid.Text): + """ + A simple statusbar modelled using the Text widget. The status can be + set using the set_text function. All text set is aligned to right. + + :param text: Initial text to be displayed. Default is empty str. + """ + def __init__(self, text: str = ''): + super().__init__(text, align='right') + + +class Editor(urwid_readline.ReadlineEdit): + """ + A simple editor modelled using the urwid_readline.ReadlineEdit widget. + Mimcs GNU readline shortcuts and provides history support. + + The readline shortcuts can be found below: + https://github.com/rr-/urwid_readline#features + + Along with the readline features, this editor also has support for + history. Pressing the 'up'/'down' switches between the prev/next messages + available in the history. + + Currently there is no support to save the history to a file. The history of + previous commands is lost on exit. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + super().__init__(caption='> ', multiline=True) + self.parent = parent + self.history: List[str] = [] + self.last_index: int = -1 + self.show_history: bool = False + + def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]: + """ + Handles the keypress on this widget. + + :param size: + The current size of the widget. + :param key: + The key to be handled. + + :return: Unhandled key if any. + """ + msg = self.get_edit_text() + if key == 'up' and not msg: + # Show the history when 'up arrow' is pressed with no input text. + # NOTE: The show_history logic is necessary because in 'multiline' + # mode (which we use) 'up arrow' is used to move between lines. + if not self.history: + return None + self.show_history = True + last_msg = self.history[self.last_index] + self.set_edit_text(last_msg) + self.edit_pos = len(last_msg) + elif key == 'up' and self.show_history: + self.last_index = max(self.last_index - 1, -len(self.history)) + self.set_edit_text(self.history[self.last_index]) + self.edit_pos = len(self.history[self.last_index]) + elif key == 'down' and self.show_history: + if self.last_index == -1: + self.set_edit_text('') + self.show_history = False + else: + self.last_index += 1 + self.set_edit_text(self.history[self.last_index]) + self.edit_pos = len(self.history[self.last_index]) + elif key == 'meta enter': + # When using multiline, enter inserts a new line into the editor + # send the input to the server on alt + enter + self.parent.cb_send_to_server(msg) + self.history.append(msg) + self.set_edit_text('') + self.last_index = -1 + self.show_history = False + else: + self.show_history = False + self.last_index = -1 + return cast(Optional[str], super().keypress(size, key)) + return None + + +class EditorWidget(urwid.Filler): + """ + Wrapper around the editor widget. + + The Editor is a flow widget and has to wrapped inside a box widget. + This class wraps the Editor inside filler widget. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + super().__init__(Editor(parent), valign='top') + + +class HistoryBox(urwid.ListBox): + """ + This widget is modelled using the ListBox widget, contains the list of + all messages both QMP messages and log messsages to be shown in the TUI. + + The messages are urwid.Text widgets. On every append of a message, the + focus is shifted to the last appended message. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + self.parent = parent + self.history = urwid.SimpleFocusListWalker([]) + super().__init__(self.history) + + def add_to_history(self, + history: Union[str, List[Tuple[str, str]]]) -> None: + """ + Appends a message to the list and set the focus to the last appended + message. + + :param history: + The history item(message/event) to be appended to the list. + """ + self.history.append(urwid.Text(history)) + self.history.set_focus(len(self.history) - 1) + + def mouse_event(self, size: Tuple[int, int], _event: str, button: float, + _x: int, _y: int, focus: bool) -> None: + # Unfortunately there are no urwid constants that represent the mouse + # events. + if button == 4: # Scroll up event + super().keypress(size, 'up') + elif button == 5: # Scroll down event + super().keypress(size, 'down') + + +class HistoryWindow(urwid.Frame): + """ + This window composes the HistoryBox and EditorWidget in a horizontal split. + By default the first focus is given to the history box. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + self.parent = parent + self.editor_widget = EditorWidget(parent) + self.editor = urwid.LineBox(self.editor_widget) + self.history = HistoryBox(parent) + self.body = urwid.Pile([('weight', 80, self.history), + ('weight', 20, self.editor)]) + super().__init__(self.body) + urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history) + + def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None: + """ + Appends a message to the history box + + :param msg: + The message to be appended to the history box. + :param level: + The log level of the message, if it is a log message. + """ + formatted = [] + if level: + msg = f'[{level}]: {msg}' + formatted.append((level, msg)) + else: + lexer = lexers.JsonLexer() # pylint: disable=no-member + for token in lexer.get_tokens(msg): + formatted.append(token) + self.history.add_to_history(formatted) + + +class Window(urwid.Frame): + """ + This window is the top most widget of the TUI and will contain other + windows. Each child of this widget is responsible for displaying a specific + functionality. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + self.parent = parent + footer = StatusBar() + body = HistoryWindow(parent) + super().__init__(body, footer=footer) + + +class TUILogHandler(Handler): + """ + This handler routes all the log messages to the TUI screen. + It is installed to the root logger to so that the log message from all + libraries begin used is routed to the screen. + + :param tui: Reference to the TUI object. + """ + def __init__(self, tui: App) -> None: + super().__init__() + self.tui = tui + + def emit(self, record: LogRecord) -> None: + """ + Emits a record to the TUI screen. + + Appends the log message to the TUI screen + """ + level = record.levelname + msg = record.getMessage() + self.tui.add_to_history(msg, level) + + +def main() -> None: + """ + Driver of the whole script, parses arguments, initialize the TUI and + the logger. + """ + parser = argparse.ArgumentParser(description='AQMP TUI') + parser.add_argument('qmp_server', help='Address of the QMP server. ' + 'Format <UNIX socket path | TCP addr:port>') + parser.add_argument('--num-retries', type=int, default=10, + help='Number of times to reconnect before giving up.') + parser.add_argument('--retry-delay', type=int, + help='Time(s) to wait before next retry. ' + 'Default action is to wait 2s between each retry.') + parser.add_argument('--log-file', help='The Log file name') + parser.add_argument('--log-level', default='WARNING', + help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>') + parser.add_argument('--asyncio-debug', action='store_true', + help='Enable debug mode for asyncio loop. ' + 'Generates lot of output, makes TUI unusable when ' + 'logs are logged in the TUI. ' + 'Use only when logging to a file.') + args = parser.parse_args() + + try: + address = QEMUMonitorProtocol.parse_address(args.qmp_server) + except QMPBadPortError as err: + parser.error(str(err)) + + app = App(address, args.num_retries, args.retry_delay) + + root_logger = logging.getLogger() + root_logger.setLevel(logging.getLevelName(args.log_level)) + + if args.log_file: + root_logger.addHandler(logging.FileHandler(args.log_file)) + else: + root_logger.addHandler(TUILogHandler(app)) + + app.run(args.asyncio_debug) + + +if __name__ == '__main__': + main() diff --git a/python/qemu/aqmp/error.py b/python/qemu/aqmp/error.py new file mode 100644 index 0000000..781f49b --- /dev/null +++ b/python/qemu/aqmp/error.py @@ -0,0 +1,50 @@ +""" +AQMP Error Classes + +This package seeks to provide semantic error classes that are intended +to be used directly by clients when they would like to handle particular +semantic failures (e.g. "failed to connect") without needing to know the +enumeration of possible reasons for that failure. + +AQMPError serves as the ancestor for all exceptions raised by this +package, and is suitable for use in handling semantic errors from this +library. In most cases, individual public methods will attempt to catch +and re-encapsulate various exceptions to provide a semantic +error-handling interface. + +.. admonition:: AQMP Exception Hierarchy Reference + + | `Exception` + | +-- `AQMPError` + | +-- `ConnectError` + | +-- `StateError` + | +-- `ExecInterruptedError` + | +-- `ExecuteError` + | +-- `ListenerError` + | +-- `ProtocolError` + | +-- `DeserializationError` + | +-- `UnexpectedTypeError` + | +-- `ServerParseError` + | +-- `BadReplyError` + | +-- `GreetingError` + | +-- `NegotiationError` +""" + + +class AQMPError(Exception): + """Abstract error class for all errors originating from this package.""" + + +class ProtocolError(AQMPError): + """ + Abstract error class for protocol failures. + + Semantically, these errors are generally the fault of either the + protocol server or as a result of a bug in this library. + + :param error_message: Human-readable string describing the error. + """ + def __init__(self, error_message: str): + super().__init__(error_message) + #: Human-readable error message, without any prefix. + self.error_message: str = error_message diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py new file mode 100644 index 0000000..fb81d21 --- /dev/null +++ b/python/qemu/aqmp/events.py @@ -0,0 +1,706 @@ +""" +AQMP Events and EventListeners + +Asynchronous QMP uses `EventListener` objects to listen for events. An +`EventListener` is a FIFO event queue that can be pre-filtered to listen +for only specific events. Each `EventListener` instance receives its own +copy of events that it hears, so events may be consumed without fear or +worry for depriving other listeners of events they need to hear. + + +EventListener Tutorial +---------------------- + +In all of the following examples, we assume that we have a `QMPClient` +instantiated named ``qmp`` that is already connected. + + +`listener()` context blocks with one name +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The most basic usage is by using the `listener()` context manager to +construct them: + +.. code:: python + + with qmp.listener('STOP') as listener: + await qmp.execute('stop') + await listener.get() + +The listener is active only for the duration of the ‘with’ block. This +instance listens only for ‘STOP’ events. + + +`listener()` context blocks with two or more names +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Multiple events can be selected for by providing any ``Iterable[str]``: + +.. code:: python + + with qmp.listener(('STOP', 'RESUME')) as listener: + await qmp.execute('stop') + event = await listener.get() + assert event['event'] == 'STOP' + + await qmp.execute('cont') + event = await listener.get() + assert event['event'] == 'RESUME' + + +`listener()` context blocks with no names +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +By omitting names entirely, you can listen to ALL events. + +.. code:: python + + with qmp.listener() as listener: + await qmp.execute('stop') + event = await listener.get() + assert event['event'] == 'STOP' + +This isn’t a very good use case for this feature: In a non-trivial +running system, we may not know what event will arrive next. Grabbing +the top of a FIFO queue returning multiple kinds of events may be prone +to error. + + +Using async iterators to retrieve events +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If you’d like to simply watch what events happen to arrive, you can use +the listener as an async iterator: + +.. code:: python + + with qmp.listener() as listener: + async for event in listener: + print(f"Event arrived: {event['event']}") + +This is analogous to the following code: + +.. code:: python + + with qmp.listener() as listener: + while True: + event = listener.get() + print(f"Event arrived: {event['event']}") + +This event stream will never end, so these blocks will never terminate. + + +Using asyncio.Task to concurrently retrieve events +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Since a listener’s event stream will never terminate, it is not likely +useful to use that form in a script. For longer-running clients, we can +create event handlers by using `asyncio.Task` to create concurrent +coroutines: + +.. code:: python + + async def print_events(listener): + try: + async for event in listener: + print(f"Event arrived: {event['event']}") + except asyncio.CancelledError: + return + + with qmp.listener() as listener: + task = asyncio.Task(print_events(listener)) + await qmp.execute('stop') + await qmp.execute('cont') + task.cancel() + await task + +However, there is no guarantee that these events will be received by the +time we leave this context block. Once the context block is exited, the +listener will cease to hear any new events, and becomes inert. + +Be mindful of the timing: the above example will *probably*– but does +not *guarantee*– that both STOP/RESUMED events will be printed. The +example below outlines how to use listeners outside of a context block. + + +Using `register_listener()` and `remove_listener()` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To create a listener with a longer lifetime, beyond the scope of a +single block, create a listener and then call `register_listener()`: + +.. code:: python + + class MyClient: + def __init__(self, qmp): + self.qmp = qmp + self.listener = EventListener() + + async def print_events(self): + try: + async for event in self.listener: + print(f"Event arrived: {event['event']}") + except asyncio.CancelledError: + return + + async def run(self): + self.task = asyncio.Task(self.print_events) + self.qmp.register_listener(self.listener) + await qmp.execute('stop') + await qmp.execute('cont') + + async def stop(self): + self.task.cancel() + await self.task + self.qmp.remove_listener(self.listener) + +The listener can be deactivated by using `remove_listener()`. When it is +removed, any possible pending events are cleared and it can be +re-registered at a later time. + + +Using the built-in all events listener +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The `QMPClient` object creates its own default listener named +:py:obj:`~Events.events` that can be used for the same purpose without +having to create your own: + +.. code:: python + + async def print_events(listener): + try: + async for event in listener: + print(f"Event arrived: {event['event']}") + except asyncio.CancelledError: + return + + task = asyncio.Task(print_events(qmp.events)) + + await qmp.execute('stop') + await qmp.execute('cont') + + task.cancel() + await task + + +Using both .get() and async iterators +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The async iterator and `get()` methods pull events from the same FIFO +queue. If you mix the usage of both, be aware: Events are emitted +precisely once per listener. + +If multiple contexts try to pull events from the same listener instance, +events are still emitted only precisely once. + +This restriction can be lifted by creating additional listeners. + + +Creating multiple listeners +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Additional `EventListener` objects can be created at-will. Each one +receives its own copy of events, with separate FIFO event queues. + +.. code:: python + + my_listener = EventListener() + qmp.register_listener(my_listener) + + await qmp.execute('stop') + copy1 = await my_listener.get() + copy2 = await qmp.events.get() + + assert copy1 == copy2 + +In this example, we await an event from both a user-created +`EventListener` and the built-in events listener. Both receive the same +event. + + +Clearing listeners +~~~~~~~~~~~~~~~~~~ + +`EventListener` objects can be cleared, clearing all events seen thus far: + +.. code:: python + + await qmp.execute('stop') + qmp.events.clear() + await qmp.execute('cont') + event = await qmp.events.get() + assert event['event'] == 'RESUME' + +`EventListener` objects are FIFO queues. If events are not consumed, +they will remain in the queue until they are witnessed or discarded via +`clear()`. FIFO queues will be drained automatically upon leaving a +context block, or when calling `remove_listener()`. + + +Accessing listener history +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +`EventListener` objects record their history. Even after being cleared, +you can obtain a record of all events seen so far: + +.. code:: python + + await qmp.execute('stop') + await qmp.execute('cont') + qmp.events.clear() + + assert len(qmp.events.history) == 2 + assert qmp.events.history[0]['event'] == 'STOP' + assert qmp.events.history[1]['event'] == 'RESUME' + +The history is updated immediately and does not require the event to be +witnessed first. + + +Using event filters +~~~~~~~~~~~~~~~~~~~ + +`EventListener` objects can be given complex filtering criteria if names +are not sufficient: + +.. code:: python + + def job1_filter(event) -> bool: + event_data = event.get('data', {}) + event_job_id = event_data.get('id') + return event_job_id == "job1" + + with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener: + await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...}) + async for event in listener: + if event['data']['status'] == 'concluded': + break + +These filters might be most useful when parameterized. `EventListener` +objects expect a function that takes only a single argument (the raw +event, as a `Message`) and returns a bool; True if the event should be +accepted into the stream. You can create a function that adapts this +signature to accept configuration parameters: + +.. code:: python + + def job_filter(job_id: str) -> EventFilter: + def filter(event: Message) -> bool: + return event['data']['id'] == job_id + return filter + + with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener: + await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...}) + async for event in listener: + if event['data']['status'] == 'concluded': + break + + +Activating an existing listener with `listen()` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Listeners with complex, long configurations can also be created manually +and activated temporarily by using `listen()` instead of `listener()`: + +.. code:: python + + listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', + 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', + 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) + + with qmp.listen(listener): + await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...}) + async for event in listener: + print(event) + if event['event'] == 'BLOCK_JOB_COMPLETED': + break + +Any events that are not witnessed by the time the block is left will be +cleared from the queue; entering the block is an implicit +`register_listener()` and leaving the block is an implicit +`remove_listener()`. + + +Activating multiple existing listeners with `listen()` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +While `listener()` is only capable of creating a single listener, +`listen()` is capable of activating multiple listeners simultaneously: + +.. code:: python + + def job_filter(job_id: str) -> EventFilter: + def filter(event: Message) -> bool: + return event['data']['id'] == job_id + return filter + + jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA')) + jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB')) + + with qmp.listen(jobA, jobB): + qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...}) + qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...}) + + async for event in jobA.get(): + if event['data']['status'] == 'concluded': + break + async for event in jobB.get(): + if event['data']['status'] == 'concluded': + break + + +Extending the `EventListener` class +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In the case that a more specialized `EventListener` is desired to +provide either more functionality or more compact syntax for specialized +cases, it can be extended. + +One of the key methods to extend or override is +:py:meth:`~EventListener.accept()`. The default implementation checks an +incoming message for: + +1. A qualifying name, if any :py:obj:`~EventListener.names` were + specified at initialization time +2. That :py:obj:`~EventListener.event_filter()` returns True. + +This can be modified however you see fit to change the criteria for +inclusion in the stream. + +For convenience, a ``JobListener`` class could be created that simply +bakes in configuration so it does not need to be repeated: + +.. code:: python + + class JobListener(EventListener): + def __init__(self, job_id: str): + super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', + 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', + 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) + self.job_id = job_id + + def accept(self, event) -> bool: + if not super().accept(event): + return False + if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'): + return event['data']['id'] == job_id + return event['data']['device'] == job_id + +From here on out, you can conjure up a custom-purpose listener that +listens only for job-related events for a specific job-id easily: + +.. code:: python + + listener = JobListener('job4') + with qmp.listener(listener): + await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...}) + async for event in listener: + print(event) + if event['event'] == 'BLOCK_JOB_COMPLETED': + break + + +Experimental Interfaces & Design Issues +--------------------------------------- + +These interfaces are not ones I am sure I will keep or otherwise modify +heavily. + +qmp.listener()’s type signature +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +`listener()` does not return anything, because it was assumed the caller +already had a handle to the listener. However, for +``qmp.listener(EventListener())`` forms, the caller will not have saved +a handle to the listener. + +Because this function can accept *many* listeners, I found it hard to +accurately type in a way where it could be used in both “one” or “many” +forms conveniently and in a statically type-safe manner. + +Ultimately, I removed the return altogether, but perhaps with more time +I can work out a way to re-add it. + + +API Reference +------------- + +""" + +import asyncio +from contextlib import contextmanager +import logging +from typing import ( + AsyncIterator, + Callable, + Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + Union, +) + +from .error import AQMPError +from .message import Message + + +EventNames = Union[str, Iterable[str], None] +EventFilter = Callable[[Message], bool] + + +class ListenerError(AQMPError): + """ + Generic error class for `EventListener`-related problems. + """ + + +class EventListener: + """ + Selectively listens for events with runtime configurable filtering. + + This class is designed to be directly usable for the most common cases, + but it can be extended to provide more rigorous control. + + :param names: + One or more names of events to listen for. + When not provided, listen for ALL events. + :param event_filter: + An optional event filtering function. + When names are also provided, this acts as a secondary filter. + + When ``names`` and ``event_filter`` are both provided, the names + will be filtered first, and then the filter function will be called + second. The event filter function can assume that the format of the + event is a known format. + """ + def __init__( + self, + names: EventNames = None, + event_filter: Optional[EventFilter] = None, + ): + # Queue of 'heard' events yet to be witnessed by a caller. + self._queue: 'asyncio.Queue[Message]' = asyncio.Queue() + + # Intended as a historical record, NOT a processing queue or backlog. + self._history: List[Message] = [] + + #: Primary event filter, based on one or more event names. + self.names: Set[str] = set() + if isinstance(names, str): + self.names.add(names) + elif names is not None: + self.names.update(names) + + #: Optional, secondary event filter. + self.event_filter: Optional[EventFilter] = event_filter + + @property + def history(self) -> Tuple[Message, ...]: + """ + A read-only history of all events seen so far. + + This represents *every* event, including those not yet witnessed + via `get()` or ``async for``. It persists between `clear()` + calls and is immutable. + """ + return tuple(self._history) + + def accept(self, event: Message) -> bool: + """ + Determine if this listener accepts this event. + + This method determines which events will appear in the stream. + The default implementation simply checks the event against the + list of names and the event_filter to decide if this + `EventListener` accepts a given event. It can be + overridden/extended to provide custom listener behavior. + + User code is not expected to need to invoke this method. + + :param event: The event under consideration. + :return: `True`, if this listener accepts this event. + """ + name_ok = (not self.names) or (event['event'] in self.names) + return name_ok and ( + (not self.event_filter) or self.event_filter(event) + ) + + async def put(self, event: Message) -> None: + """ + Conditionally put a new event into the FIFO queue. + + This method is not designed to be invoked from user code, and it + should not need to be overridden. It is a public interface so + that `QMPClient` has an interface by which it can inform + registered listeners of new events. + + The event will be put into the queue if + :py:meth:`~EventListener.accept()` returns `True`. + + :param event: The new event to put into the FIFO queue. + """ + if not self.accept(event): + return + + self._history.append(event) + await self._queue.put(event) + + async def get(self) -> Message: + """ + Wait for the very next event in this stream. + + If one is already available, return that one. + """ + return await self._queue.get() + + def clear(self) -> None: + """ + Clear this listener of all pending events. + + Called when an `EventListener` is being unregistered, this clears the + pending FIFO queue synchronously. It can be also be used to + manually clear any pending events, if desired. + + .. warning:: + Take care when discarding events. Cleared events will be + silently tossed on the floor. All events that were ever + accepted by this listener are visible in `history()`. + """ + while True: + try: + self._queue.get_nowait() + except asyncio.QueueEmpty: + break + + def __aiter__(self) -> AsyncIterator[Message]: + return self + + async def __anext__(self) -> Message: + """ + Enables the `EventListener` to function as an async iterator. + + It may be used like this: + + .. code:: python + + async for event in listener: + print(event) + + These iterators will never terminate of their own accord; you + must provide break conditions or otherwise prepare to run them + in an `asyncio.Task` that can be cancelled. + """ + return await self.get() + + +class Events: + """ + Events is a mix-in class that adds event functionality to the QMP class. + + It's designed specifically as a mix-in for `QMPClient`, and it + relies upon the class it is being mixed into having a 'logger' + property. + """ + def __init__(self) -> None: + self._listeners: List[EventListener] = [] + + #: Default, all-events `EventListener`. + self.events: EventListener = EventListener() + self.register_listener(self.events) + + # Parent class needs to have a logger + self.logger: logging.Logger + + async def _event_dispatch(self, msg: Message) -> None: + """ + Given a new event, propagate it to all of the active listeners. + + :param msg: The event to propagate. + """ + for listener in self._listeners: + await listener.put(msg) + + def register_listener(self, listener: EventListener) -> None: + """ + Register and activate an `EventListener`. + + :param listener: The listener to activate. + :raise ListenerError: If the given listener is already registered. + """ + if listener in self._listeners: + raise ListenerError("Attempted to re-register existing listener") + self.logger.debug("Registering %s.", str(listener)) + self._listeners.append(listener) + + def remove_listener(self, listener: EventListener) -> None: + """ + Unregister and deactivate an `EventListener`. + + The removed listener will have its pending events cleared via + `clear()`. The listener can be re-registered later when + desired. + + :param listener: The listener to deactivate. + :raise ListenerError: If the given listener is not registered. + """ + if listener == self.events: + raise ListenerError("Cannot remove the default listener.") + self.logger.debug("Removing %s.", str(listener)) + listener.clear() + self._listeners.remove(listener) + + @contextmanager + def listen(self, *listeners: EventListener) -> Iterator[None]: + r""" + Context manager: Temporarily listen with an `EventListener`. + + Accepts one or more `EventListener` objects and registers them, + activating them for the duration of the context block. + + `EventListener` objects will have any pending events in their + FIFO queue cleared upon exiting the context block, when they are + deactivated. + + :param \*listeners: One or more EventListeners to activate. + :raise ListenerError: If the given listener(s) are already active. + """ + _added = [] + + try: + for listener in listeners: + self.register_listener(listener) + _added.append(listener) + + yield + + finally: + for listener in _added: + self.remove_listener(listener) + + @contextmanager + def listener( + self, + names: EventNames = (), + event_filter: Optional[EventFilter] = None + ) -> Iterator[EventListener]: + """ + Context manager: Temporarily listen with a new `EventListener`. + + Creates an `EventListener` object and registers it, activating + it for the duration of the context block. + + :param names: + One or more names of events to listen for. + When not provided, listen for ALL events. + :param event_filter: + An optional event filtering function. + When names are also provided, this acts as a secondary filter. + + :return: The newly created and active `EventListener`. + """ + listener = EventListener(names, event_filter) + with self.listen(listener): + yield listener diff --git a/python/qemu/aqmp/message.py b/python/qemu/aqmp/message.py new file mode 100644 index 0000000..f76ccc9 --- /dev/null +++ b/python/qemu/aqmp/message.py @@ -0,0 +1,209 @@ +""" +QMP Message Format + +This module provides the `Message` class, which represents a single QMP +message sent to or from the server. +""" + +import json +from json import JSONDecodeError +from typing import ( + Dict, + Iterator, + Mapping, + MutableMapping, + Optional, + Union, +) + +from .error import ProtocolError + + +class Message(MutableMapping[str, object]): + """ + Represents a single QMP protocol message. + + QMP uses JSON objects as its basic communicative unit; so this + Python object is a :py:obj:`~collections.abc.MutableMapping`. It may + be instantiated from either another mapping (like a `dict`), or from + raw `bytes` that still need to be deserialized. + + Once instantiated, it may be treated like any other MutableMapping:: + + >>> msg = Message(b'{"hello": "world"}') + >>> assert msg['hello'] == 'world' + >>> msg['id'] = 'foobar' + >>> print(msg) + { + "hello": "world", + "id": "foobar" + } + + It can be converted to `bytes`:: + + >>> msg = Message({"hello": "world"}) + >>> print(bytes(msg)) + b'{"hello":"world","id":"foobar"}' + + Or back into a garden-variety `dict`:: + + >>> dict(msg) + {'hello': 'world'} + + + :param value: Initial value, if any. + :param eager: + When `True`, attempt to serialize or deserialize the initial value + immediately, so that conversion exceptions are raised during + the call to ``__init__()``. + """ + # pylint: disable=too-many-ancestors + + def __init__(self, + value: Union[bytes, Mapping[str, object]] = b'{}', *, + eager: bool = True): + self._data: Optional[bytes] = None + self._obj: Optional[Dict[str, object]] = None + + if isinstance(value, bytes): + self._data = value + if eager: + self._obj = self._deserialize(self._data) + else: + self._obj = dict(value) + if eager: + self._data = self._serialize(self._obj) + + # Methods necessary to implement the MutableMapping interface, see: + # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping + + # We get pop, popitem, clear, update, setdefault, __contains__, + # keys, items, values, get, __eq__ and __ne__ for free. + + def __getitem__(self, key: str) -> object: + return self._object[key] + + def __setitem__(self, key: str, value: object) -> None: + self._object[key] = value + self._data = None + + def __delitem__(self, key: str) -> None: + del self._object[key] + self._data = None + + def __iter__(self) -> Iterator[str]: + return iter(self._object) + + def __len__(self) -> int: + return len(self._object) + + # Dunder methods not related to MutableMapping: + + def __repr__(self) -> str: + if self._obj is not None: + return f"Message({self._object!r})" + return f"Message({bytes(self)!r})" + + def __str__(self) -> str: + """Pretty-printed representation of this QMP message.""" + return json.dumps(self._object, indent=2) + + def __bytes__(self) -> bytes: + """bytes representing this QMP message.""" + if self._data is None: + self._data = self._serialize(self._obj or {}) + return self._data + + # Conversion Methods + + @property + def _object(self) -> Dict[str, object]: + """ + A `dict` representing this QMP message. + + Generated on-demand, if required. This property is private + because it returns an object that could be used to invalidate + the internal state of the `Message` object. + """ + if self._obj is None: + self._obj = self._deserialize(self._data or b'{}') + return self._obj + + @classmethod + def _serialize(cls, value: object) -> bytes: + """ + Serialize a JSON object as `bytes`. + + :raise ValueError: When the object cannot be serialized. + :raise TypeError: When the object cannot be serialized. + + :return: `bytes` ready to be sent over the wire. + """ + return json.dumps(value, separators=(',', ':')).encode('utf-8') + + @classmethod + def _deserialize(cls, data: bytes) -> Dict[str, object]: + """ + Deserialize JSON `bytes` into a native Python `dict`. + + :raise DeserializationError: + If JSON deserialization fails for any reason. + :raise UnexpectedTypeError: + If the data does not represent a JSON object. + + :return: A `dict` representing this QMP message. + """ + try: + obj = json.loads(data) + except JSONDecodeError as err: + emsg = "Failed to deserialize QMP message." + raise DeserializationError(emsg, data) from err + if not isinstance(obj, dict): + raise UnexpectedTypeError( + "QMP message is not a JSON object.", + obj + ) + return obj + + +class DeserializationError(ProtocolError): + """ + A QMP message was not understood as JSON. + + When this Exception is raised, ``__cause__`` will be set to the + `json.JSONDecodeError` Exception, which can be interrogated for + further details. + + :param error_message: Human-readable string describing the error. + :param raw: The raw `bytes` that prompted the failure. + """ + def __init__(self, error_message: str, raw: bytes): + super().__init__(error_message) + #: The raw `bytes` that were not understood as JSON. + self.raw: bytes = raw + + def __str__(self) -> str: + return "\n".join([ + super().__str__(), + f" raw bytes were: {str(self.raw)}", + ]) + + +class UnexpectedTypeError(ProtocolError): + """ + A QMP message was JSON, but not a JSON object. + + :param error_message: Human-readable string describing the error. + :param value: The deserialized JSON value that wasn't an object. + """ + def __init__(self, error_message: str, value: object): + super().__init__(error_message) + #: The JSON value that was expected to be an object. + self.value: object = value + + def __str__(self) -> str: + strval = json.dumps(self.value, indent=2) + return "\n".join([ + super().__str__(), + f" json value was: {strval}", + ]) diff --git a/python/qemu/aqmp/models.py b/python/qemu/aqmp/models.py new file mode 100644 index 0000000..24c9412 --- /dev/null +++ b/python/qemu/aqmp/models.py @@ -0,0 +1,133 @@ +""" +QMP Data Models + +This module provides simplistic data classes that represent the few +structures that the QMP spec mandates; they are used to verify incoming +data to make sure it conforms to spec. +""" +# pylint: disable=too-few-public-methods + +from collections import abc +from typing import ( + Any, + Mapping, + Optional, + Sequence, +) + + +class Model: + """ + Abstract data model, representing some QMP object of some kind. + + :param raw: The raw object to be validated. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + self._raw = raw + + def _check_key(self, key: str) -> None: + if key not in self._raw: + raise KeyError(f"'{self._name}' object requires '{key}' member") + + def _check_value(self, key: str, type_: type, typestr: str) -> None: + assert key in self._raw + if not isinstance(self._raw[key], type_): + raise TypeError( + f"'{self._name}' member '{key}' must be a {typestr}" + ) + + def _check_member(self, key: str, type_: type, typestr: str) -> None: + self._check_key(key) + self._check_value(key, type_, typestr) + + @property + def _name(self) -> str: + return type(self).__name__ + + def __repr__(self) -> str: + return f"{self._name}({self._raw!r})" + + +class Greeting(Model): + """ + Defined in qmp-spec.txt, section 2.2, "Server Greeting". + + :param raw: The raw Greeting object. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + super().__init__(raw) + #: 'QMP' member + self.QMP: QMPGreeting # pylint: disable=invalid-name + + self._check_member('QMP', abc.Mapping, "JSON object") + self.QMP = QMPGreeting(self._raw['QMP']) + + +class QMPGreeting(Model): + """ + Defined in qmp-spec.txt, section 2.2, "Server Greeting". + + :param raw: The raw QMPGreeting object. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + super().__init__(raw) + #: 'version' member + self.version: Mapping[str, object] + #: 'capabilities' member + self.capabilities: Sequence[object] + + self._check_member('version', abc.Mapping, "JSON object") + self.version = self._raw['version'] + + self._check_member('capabilities', abc.Sequence, "JSON array") + self.capabilities = self._raw['capabilities'] + + +class ErrorResponse(Model): + """ + Defined in qmp-spec.txt, section 2.4.2, "error". + + :param raw: The raw ErrorResponse object. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + super().__init__(raw) + #: 'error' member + self.error: ErrorInfo + #: 'id' member + self.id: Optional[object] = None # pylint: disable=invalid-name + + self._check_member('error', abc.Mapping, "JSON object") + self.error = ErrorInfo(self._raw['error']) + + if 'id' in raw: + self.id = raw['id'] + + +class ErrorInfo(Model): + """ + Defined in qmp-spec.txt, section 2.4.2, "error". + + :param raw: The raw ErrorInfo object. + :raise KeyError: If any required fields are absent. + :raise TypeError: If any required fields have the wrong type. + """ + def __init__(self, raw: Mapping[str, Any]): + super().__init__(raw) + #: 'class' member, with an underscore to avoid conflicts in Python. + self.class_: str + #: 'desc' member + self.desc: str + + self._check_member('class', str, "string") + self.class_ = self._raw['class'] + + self._check_member('desc', str, "string") + self.desc = self._raw['desc'] diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py new file mode 100644 index 0000000..32e7874 --- /dev/null +++ b/python/qemu/aqmp/protocol.py @@ -0,0 +1,902 @@ +""" +Generic Asynchronous Message-based Protocol Support + +This module provides a generic framework for sending and receiving +messages over an asyncio stream. `AsyncProtocol` is an abstract class +that implements the core mechanisms of a simple send/receive protocol, +and is designed to be extended. + +In this package, it is used as the implementation for the `QMPClient` +class. +""" + +import asyncio +from asyncio import StreamReader, StreamWriter +from enum import Enum +from functools import wraps +import logging +from ssl import SSLContext +from typing import ( + Any, + Awaitable, + Callable, + Generic, + List, + Optional, + Tuple, + TypeVar, + Union, + cast, +) + +from .error import AQMPError +from .util import ( + bottom_half, + create_task, + exception_summary, + flush, + is_closing, + pretty_traceback, + upper_half, + wait_closed, +) + + +T = TypeVar('T') +_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None`` +_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]']) + + +class Runstate(Enum): + """Protocol session runstate.""" + + #: Fully quiesced and disconnected. + IDLE = 0 + #: In the process of connecting or establishing a session. + CONNECTING = 1 + #: Fully connected and active session. + RUNNING = 2 + #: In the process of disconnecting. + #: Runstate may be returned to `IDLE` by calling `disconnect()`. + DISCONNECTING = 3 + + +class ConnectError(AQMPError): + """ + Raised when the initial connection process has failed. + + This Exception always wraps a "root cause" exception that can be + interrogated for additional information. + + :param error_message: Human-readable string describing the error. + :param exc: The root-cause exception. + """ + def __init__(self, error_message: str, exc: Exception): + super().__init__(error_message) + #: Human-readable error string + self.error_message: str = error_message + #: Wrapped root cause exception + self.exc: Exception = exc + + def __str__(self) -> str: + return f"{self.error_message}: {self.exc!s}" + + +class StateError(AQMPError): + """ + An API command (connect, execute, etc) was issued at an inappropriate time. + + This error is raised when a command like + :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate + time. + + :param error_message: Human-readable string describing the state violation. + :param state: The actual `Runstate` seen at the time of the violation. + :param required: The `Runstate` required to process this command. + """ + def __init__(self, error_message: str, + state: Runstate, required: Runstate): + super().__init__(error_message) + self.error_message = error_message + self.state = state + self.required = required + + +F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name + + +# Don't Panic. +def require(required_state: Runstate) -> Callable[[F], F]: + """ + Decorator: protect a method so it can only be run in a certain `Runstate`. + + :param required_state: The `Runstate` required to invoke this method. + :raise StateError: When the required `Runstate` is not met. + """ + def _decorator(func: F) -> F: + # _decorator is the decorator that is built by calling the + # require() decorator factory; e.g.: + # + # @require(Runstate.IDLE) def foo(): ... + # will replace 'foo' with the result of '_decorator(foo)'. + + @wraps(func) + def _wrapper(proto: 'AsyncProtocol[Any]', + *args: Any, **kwargs: Any) -> Any: + # _wrapper is the function that gets executed prior to the + # decorated method. + + name = type(proto).__name__ + + if proto.runstate != required_state: + if proto.runstate == Runstate.CONNECTING: + emsg = f"{name} is currently connecting." + elif proto.runstate == Runstate.DISCONNECTING: + emsg = (f"{name} is disconnecting." + " Call disconnect() to return to IDLE state.") + elif proto.runstate == Runstate.RUNNING: + emsg = f"{name} is already connected and running." + elif proto.runstate == Runstate.IDLE: + emsg = f"{name} is disconnected and idle." + else: + assert False + raise StateError(emsg, proto.runstate, required_state) + # No StateError, so call the wrapped method. + return func(proto, *args, **kwargs) + + # Return the decorated method; + # Transforming Func to Decorated[Func]. + return cast(F, _wrapper) + + # Return the decorator instance from the decorator factory. Phew! + return _decorator + + +class AsyncProtocol(Generic[T]): + """ + AsyncProtocol implements a generic async message-based protocol. + + This protocol assumes the basic unit of information transfer between + client and server is a "message", the details of which are left up + to the implementation. It assumes the sending and receiving of these + messages is full-duplex and not necessarily correlated; i.e. it + supports asynchronous inbound messages. + + It is designed to be extended by a specific protocol which provides + the implementations for how to read and send messages. These must be + defined in `_do_recv()` and `_do_send()`, respectively. + + Other callbacks have a default implementation, but are intended to be + either extended or overridden: + + - `_establish_session`: + The base implementation starts the reader/writer tasks. + A protocol implementation can override this call, inserting + actions to be taken prior to starting the reader/writer tasks + before the super() call; actions needing to occur afterwards + can be written after the super() call. + - `_on_message`: + Actions to be performed when a message is received. + - `_cb_outbound`: + Logging/Filtering hook for all outbound messages. + - `_cb_inbound`: + Logging/Filtering hook for all inbound messages. + This hook runs *before* `_on_message()`. + + :param name: + Name used for logging messages, if any. By default, messages + will log to 'qemu.aqmp.protocol', but each individual connection + can be given its own logger by giving it a name; messages will + then log to 'qemu.aqmp.protocol.${name}'. + """ + # pylint: disable=too-many-instance-attributes + + #: Logger object for debugging messages from this connection. + logger = logging.getLogger(__name__) + + # Maximum allowable size of read buffer + _limit = (64 * 1024) + + # ------------------------- + # Section: Public interface + # ------------------------- + + def __init__(self, name: Optional[str] = None) -> None: + #: The nickname for this connection, if any. + self.name: Optional[str] = name + if self.name is not None: + self.logger = self.logger.getChild(self.name) + + # stream I/O + self._reader: Optional[StreamReader] = None + self._writer: Optional[StreamWriter] = None + + # Outbound Message queue + self._outgoing: asyncio.Queue[T] + + # Special, long-running tasks: + self._reader_task: Optional[asyncio.Future[None]] = None + self._writer_task: Optional[asyncio.Future[None]] = None + + # Aggregate of the above two tasks, used for Exception management. + self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None + + #: Disconnect task. The disconnect implementation runs in a task + #: so that asynchronous disconnects (initiated by the + #: reader/writer) are allowed to wait for the reader/writers to + #: exit. + self._dc_task: Optional[asyncio.Future[None]] = None + + self._runstate = Runstate.IDLE + self._runstate_changed: Optional[asyncio.Event] = None + + def __repr__(self) -> str: + cls_name = type(self).__name__ + tokens = [] + if self.name is not None: + tokens.append(f"name={self.name!r}") + tokens.append(f"runstate={self.runstate.name}") + return f"<{cls_name} {' '.join(tokens)}>" + + @property # @upper_half + def runstate(self) -> Runstate: + """The current `Runstate` of the connection.""" + return self._runstate + + @upper_half + async def runstate_changed(self) -> Runstate: + """ + Wait for the `runstate` to change, then return that runstate. + """ + await self._runstate_event.wait() + return self.runstate + + @upper_half + @require(Runstate.IDLE) + async def accept(self, address: Union[str, Tuple[str, int]], + ssl: Optional[SSLContext] = None) -> None: + """ + Accept a connection and begin processing message queues. + + If this call fails, `runstate` is guaranteed to be set back to `IDLE`. + + :param address: + Address to listen to; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise StateError: When the `Runstate` is not `IDLE`. + :raise ConnectError: If a connection could not be accepted. + """ + await self._new_session(address, ssl, accept=True) + + @upper_half + @require(Runstate.IDLE) + async def connect(self, address: Union[str, Tuple[str, int]], + ssl: Optional[SSLContext] = None) -> None: + """ + Connect to the server and begin processing message queues. + + If this call fails, `runstate` is guaranteed to be set back to `IDLE`. + + :param address: + Address to connect to; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise StateError: When the `Runstate` is not `IDLE`. + :raise ConnectError: If a connection cannot be made to the server. + """ + await self._new_session(address, ssl) + + @upper_half + async def disconnect(self) -> None: + """ + Disconnect and wait for all tasks to fully stop. + + If there was an exception that caused the reader/writers to + terminate prematurely, it will be raised here. + + :raise Exception: When the reader or writer terminate unexpectedly. + """ + self.logger.debug("disconnect() called.") + self._schedule_disconnect() + await self._wait_disconnect() + + # -------------------------- + # Section: Session machinery + # -------------------------- + + @property + def _runstate_event(self) -> asyncio.Event: + # asyncio.Event() objects should not be created prior to entrance into + # an event loop, so we can ensure we create it in the correct context. + # Create it on-demand *only* at the behest of an 'async def' method. + if not self._runstate_changed: + self._runstate_changed = asyncio.Event() + return self._runstate_changed + + @upper_half + @bottom_half + def _set_state(self, state: Runstate) -> None: + """ + Change the `Runstate` of the protocol connection. + + Signals the `runstate_changed` event. + """ + if state == self._runstate: + return + + self.logger.debug("Transitioning from '%s' to '%s'.", + str(self._runstate), str(state)) + self._runstate = state + self._runstate_event.set() + self._runstate_event.clear() + + @upper_half + async def _new_session(self, + address: Union[str, Tuple[str, int]], + ssl: Optional[SSLContext] = None, + accept: bool = False) -> None: + """ + Establish a new connection and initialize the session. + + Connect or accept a new connection, then begin the protocol + session machinery. If this call fails, `runstate` is guaranteed + to be set back to `IDLE`. + + :param address: + Address to connect to/listen on; + UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + :param accept: Accept a connection instead of connecting when `True`. + + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `AQMPError`. + """ + assert self.runstate == Runstate.IDLE + + try: + phase = "connection" + await self._establish_connection(address, ssl, accept) + + phase = "session" + await self._establish_session() + + except BaseException as err: + emsg = f"Failed to establish {phase}" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + try: + # Reset from CONNECTING back to IDLE. + await self.disconnect() + except: + emsg = "Unexpected bottom half exception" + self.logger.critical("%s:\n%s\n", emsg, pretty_traceback()) + raise + + # NB: CancelledError is not a BaseException before Python 3.8 + if isinstance(err, asyncio.CancelledError): + raise + + if isinstance(err, Exception): + raise ConnectError(emsg, err) from err + + # Raise BaseExceptions un-wrapped, they're more important. + raise + + assert self.runstate == Runstate.RUNNING + + @upper_half + async def _establish_connection( + self, + address: Union[str, Tuple[str, int]], + ssl: Optional[SSLContext] = None, + accept: bool = False + ) -> None: + """ + Establish a new connection. + + :param address: + Address to connect to/listen on; + UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + :param accept: Accept a connection instead of connecting when `True`. + """ + assert self.runstate == Runstate.IDLE + self._set_state(Runstate.CONNECTING) + + # Allow runstate watchers to witness 'CONNECTING' state; some + # failures in the streaming layer are synchronous and will not + # otherwise yield. + await asyncio.sleep(0) + + if accept: + await self._do_accept(address, ssl) + else: + await self._do_connect(address, ssl) + + @upper_half + async def _do_accept(self, address: Union[str, Tuple[str, int]], + ssl: Optional[SSLContext] = None) -> None: + """ + Acting as the transport server, accept a single connection. + + :param address: + Address to listen on; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise OSError: For stream-related errors. + """ + self.logger.debug("Awaiting connection on %s ...", address) + connected = asyncio.Event() + server: Optional[asyncio.AbstractServer] = None + + async def _client_connected_cb(reader: asyncio.StreamReader, + writer: asyncio.StreamWriter) -> None: + """Used to accept a single incoming connection, see below.""" + nonlocal server + nonlocal connected + + # A connection has been accepted; stop listening for new ones. + assert server is not None + server.close() + await server.wait_closed() + server = None + + # Register this client as being connected + self._reader, self._writer = (reader, writer) + + # Signal back: We've accepted a client! + connected.set() + + if isinstance(address, tuple): + coro = asyncio.start_server( + _client_connected_cb, + host=address[0], + port=address[1], + ssl=ssl, + backlog=1, + limit=self._limit, + ) + else: + coro = asyncio.start_unix_server( + _client_connected_cb, + path=address, + ssl=ssl, + backlog=1, + limit=self._limit, + ) + + server = await coro # Starts listening + await connected.wait() # Waits for the callback to fire (and finish) + assert server is None + + self.logger.debug("Connection accepted.") + + @upper_half + async def _do_connect(self, address: Union[str, Tuple[str, int]], + ssl: Optional[SSLContext] = None) -> None: + """ + Acting as the transport client, initiate a connection to a server. + + :param address: + Address to connect to; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise OSError: For stream-related errors. + """ + self.logger.debug("Connecting to %s ...", address) + + if isinstance(address, tuple): + connect = asyncio.open_connection( + address[0], + address[1], + ssl=ssl, + limit=self._limit, + ) + else: + connect = asyncio.open_unix_connection( + path=address, + ssl=ssl, + limit=self._limit, + ) + self._reader, self._writer = await connect + + self.logger.debug("Connected.") + + @upper_half + async def _establish_session(self) -> None: + """ + Establish a new session. + + Starts the readers/writer tasks; subclasses may perform their + own negotiations here. The Runstate will be RUNNING upon + successful conclusion. + """ + assert self.runstate == Runstate.CONNECTING + + self._outgoing = asyncio.Queue() + + reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader') + writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer') + + self._reader_task = create_task(reader_coro) + self._writer_task = create_task(writer_coro) + + self._bh_tasks = asyncio.gather( + self._reader_task, + self._writer_task, + ) + + self._set_state(Runstate.RUNNING) + await asyncio.sleep(0) # Allow runstate_event to process + + @upper_half + @bottom_half + def _schedule_disconnect(self) -> None: + """ + Initiate a disconnect; idempotent. + + This method is used both in the upper-half as a direct + consequence of `disconnect()`, and in the bottom-half in the + case of unhandled exceptions in the reader/writer tasks. + + It can be invoked no matter what the `runstate` is. + """ + if not self._dc_task: + self._set_state(Runstate.DISCONNECTING) + self.logger.debug("Scheduling disconnect.") + self._dc_task = create_task(self._bh_disconnect()) + + @upper_half + async def _wait_disconnect(self) -> None: + """ + Waits for a previously scheduled disconnect to finish. + + This method will gather any bottom half exceptions and re-raise + the one that occurred first; presuming it to be the root cause + of any subsequent Exceptions. It is intended to be used in the + upper half of the call chain. + + :raise Exception: + Arbitrary exception re-raised on behalf of the reader/writer. + """ + assert self.runstate == Runstate.DISCONNECTING + assert self._dc_task + + aws: List[Awaitable[object]] = [self._dc_task] + if self._bh_tasks: + aws.insert(0, self._bh_tasks) + all_defined_tasks = asyncio.gather(*aws) + + # Ensure disconnect is done; Exception (if any) is not raised here: + await asyncio.wait((self._dc_task,)) + + try: + await all_defined_tasks # Raise Exceptions from the bottom half. + finally: + self._cleanup() + self._set_state(Runstate.IDLE) + + @upper_half + def _cleanup(self) -> None: + """ + Fully reset this object to a clean state and return to `IDLE`. + """ + def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]: + # Help to erase a task, ENSURING it is fully quiesced first. + assert (task is None) or task.done() + return None if (task and task.done()) else task + + assert self.runstate == Runstate.DISCONNECTING + self._dc_task = _paranoid_task_erase(self._dc_task) + self._reader_task = _paranoid_task_erase(self._reader_task) + self._writer_task = _paranoid_task_erase(self._writer_task) + self._bh_tasks = _paranoid_task_erase(self._bh_tasks) + + self._reader = None + self._writer = None + + # NB: _runstate_changed cannot be cleared because we still need it to + # send the final runstate changed event ...! + + # ---------------------------- + # Section: Bottom Half methods + # ---------------------------- + + @bottom_half + async def _bh_disconnect(self) -> None: + """ + Disconnect and cancel all outstanding tasks. + + It is designed to be called from its task context, + :py:obj:`~AsyncProtocol._dc_task`. By running in its own task, + it is free to wait on any pending actions that may still need to + occur in either the reader or writer tasks. + """ + assert self.runstate == Runstate.DISCONNECTING + + def _done(task: Optional['asyncio.Future[Any]']) -> bool: + return task is not None and task.done() + + # NB: We can't rely on _bh_tasks being done() here, it may not + # yet have had a chance to run and gather itself. + tasks = tuple(filter(None, (self._writer_task, self._reader_task))) + error_pathway = _done(self._reader_task) or _done(self._writer_task) + + try: + # Try to flush the writer, if possible: + if not error_pathway: + await self._bh_flush_writer() + except BaseException as err: + error_pathway = True + emsg = "Failed to flush the writer" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise + finally: + # Cancel any still-running tasks: + if self._writer_task is not None and not self._writer_task.done(): + self.logger.debug("Cancelling writer task.") + self._writer_task.cancel() + if self._reader_task is not None and not self._reader_task.done(): + self.logger.debug("Cancelling reader task.") + self._reader_task.cancel() + + # Close out the tasks entirely (Won't raise): + if tasks: + self.logger.debug("Waiting for tasks to complete ...") + await asyncio.wait(tasks) + + # Lastly, close the stream itself. (May raise): + await self._bh_close_stream(error_pathway) + self.logger.debug("Disconnected.") + + @bottom_half + async def _bh_flush_writer(self) -> None: + if not self._writer_task: + return + + self.logger.debug("Draining the outbound queue ...") + await self._outgoing.join() + if self._writer is not None: + self.logger.debug("Flushing the StreamWriter ...") + await flush(self._writer) + + @bottom_half + async def _bh_close_stream(self, error_pathway: bool = False) -> None: + # NB: Closing the writer also implcitly closes the reader. + if not self._writer: + return + + if not is_closing(self._writer): + self.logger.debug("Closing StreamWriter.") + self._writer.close() + + self.logger.debug("Waiting for StreamWriter to close ...") + try: + await wait_closed(self._writer) + except Exception: # pylint: disable=broad-except + # It's hard to tell if the Stream is already closed or + # not. Even if one of the tasks has failed, it may have + # failed for a higher-layered protocol reason. The + # stream could still be open and perfectly fine. + # I don't know how to discern its health here. + + if error_pathway: + # We already know that *something* went wrong. Let's + # just trust that the Exception we already have is the + # better one to present to the user, even if we don't + # genuinely *know* the relationship between the two. + self.logger.debug( + "Discarding Exception from wait_closed:\n%s\n", + pretty_traceback(), + ) + else: + # Oops, this is a brand-new error! + raise + finally: + self.logger.debug("StreamWriter closed.") + + @bottom_half + async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None: + """ + Run one of the bottom-half methods in a loop forever. + + If the bottom half ever raises any exception, schedule a + disconnect that will terminate the entire loop. + + :param async_fn: The bottom-half method to run in a loop. + :param name: The name of this task, used for logging. + """ + try: + while True: + await async_fn() + except asyncio.CancelledError: + # We have been cancelled by _bh_disconnect, exit gracefully. + self.logger.debug("Task.%s: cancelled.", name) + return + except BaseException as err: + self.logger.error("Task.%s: %s", + name, exception_summary(err)) + self.logger.debug("Task.%s: failure:\n%s\n", + name, pretty_traceback()) + self._schedule_disconnect() + raise + finally: + self.logger.debug("Task.%s: exiting.", name) + + @bottom_half + async def _bh_send_message(self) -> None: + """ + Wait for an outgoing message, then send it. + + Designed to be run in `_bh_loop_forever()`. + """ + msg = await self._outgoing.get() + try: + await self._send(msg) + finally: + self._outgoing.task_done() + + @bottom_half + async def _bh_recv_message(self) -> None: + """ + Wait for an incoming message and call `_on_message` to route it. + + Designed to be run in `_bh_loop_forever()`. + """ + msg = await self._recv() + await self._on_message(msg) + + # -------------------- + # Section: Message I/O + # -------------------- + + @upper_half + @bottom_half + def _cb_outbound(self, msg: T) -> T: + """ + Callback: outbound message hook. + + This is intended for subclasses to be able to add arbitrary + hooks to filter or manipulate outgoing messages. The base + implementation does nothing but log the message without any + manipulation of the message. + + :param msg: raw outbound message + :return: final outbound message + """ + self.logger.debug("--> %s", str(msg)) + return msg + + @upper_half + @bottom_half + def _cb_inbound(self, msg: T) -> T: + """ + Callback: inbound message hook. + + This is intended for subclasses to be able to add arbitrary + hooks to filter or manipulate incoming messages. The base + implementation does nothing but log the message without any + manipulation of the message. + + This method does not "handle" incoming messages; it is a filter. + The actual "endpoint" for incoming messages is `_on_message()`. + + :param msg: raw inbound message + :return: processed inbound message + """ + self.logger.debug("<-- %s", str(msg)) + return msg + + @upper_half + @bottom_half + async def _readline(self) -> bytes: + """ + Wait for a newline from the incoming reader. + + This method is provided as a convenience for upper-layer + protocols, as many are line-based. + + This method *may* return a sequence of bytes without a trailing + newline if EOF occurs, but *some* bytes were received. In this + case, the next call will raise `EOFError`. It is assumed that + the layer 5 protocol will decide if there is anything meaningful + to be done with a partial message. + + :raise OSError: For stream-related errors. + :raise EOFError: + If the reader stream is at EOF and there are no bytes to return. + :return: bytes, including the newline. + """ + assert self._reader is not None + msg_bytes = await self._reader.readline() + + if not msg_bytes: + if self._reader.at_eof(): + raise EOFError + + return msg_bytes + + @upper_half + @bottom_half + async def _do_recv(self) -> T: + """ + Abstract: Read from the stream and return a message. + + Very low-level; intended to only be called by `_recv()`. + """ + raise NotImplementedError + + @upper_half + @bottom_half + async def _recv(self) -> T: + """ + Read an arbitrary protocol message. + + .. warning:: + This method is intended primarily for `_bh_recv_message()` + to use in an asynchronous task loop. Using it outside of + this loop will "steal" messages from the normal routing + mechanism. It is safe to use prior to `_establish_session()`, + but should not be used otherwise. + + This method uses `_do_recv()` to retrieve the raw message, and + then transforms it using `_cb_inbound()`. + + :return: A single (filtered, processed) protocol message. + """ + message = await self._do_recv() + return self._cb_inbound(message) + + @upper_half + @bottom_half + def _do_send(self, msg: T) -> None: + """ + Abstract: Write a message to the stream. + + Very low-level; intended to only be called by `_send()`. + """ + raise NotImplementedError + + @upper_half + @bottom_half + async def _send(self, msg: T) -> None: + """ + Send an arbitrary protocol message. + + This method will transform any outgoing messages according to + `_cb_outbound()`. + + .. warning:: + Like `_recv()`, this method is intended to be called by + the writer task loop that processes outgoing + messages. Calling it directly may circumvent logic + implemented by the caller meant to correlate outgoing and + incoming messages. + + :raise OSError: For problems with the underlying stream. + """ + msg = self._cb_outbound(msg) + self._do_send(msg) + + @bottom_half + async def _on_message(self, msg: T) -> None: + """ + Called to handle the receipt of a new message. + + .. caution:: + This is executed from within the reader loop, so be advised + that waiting on either the reader or writer task will lead + to deadlock. Additionally, any unhandled exceptions will + directly cause the loop to halt, so logic may be best-kept + to a minimum if at all possible. + + :param msg: The incoming message, already logged/filtered. + """ + # Nothing to do in the abstract case. diff --git a/python/qemu/aqmp/py.typed b/python/qemu/aqmp/py.typed new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/python/qemu/aqmp/py.typed diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py new file mode 100644 index 0000000..82e9dab --- /dev/null +++ b/python/qemu/aqmp/qmp_client.py @@ -0,0 +1,621 @@ +""" +QMP Protocol Implementation + +This module provides the `QMPClient` class, which can be used to connect +and send commands to a QMP server such as QEMU. The QMP class can be +used to either connect to a listening server, or used to listen and +accept an incoming connection from that server. +""" + +import asyncio +import logging +from typing import ( + Dict, + List, + Mapping, + Optional, + Union, + cast, +) + +from .error import AQMPError, ProtocolError +from .events import Events +from .message import Message +from .models import ErrorResponse, Greeting +from .protocol import AsyncProtocol, Runstate, require +from .util import ( + bottom_half, + exception_summary, + pretty_traceback, + upper_half, +) + + +class _WrappedProtocolError(ProtocolError): + """ + Abstract exception class for Protocol errors that wrap an Exception. + + :param error_message: Human-readable string describing the error. + :param exc: The root-cause exception. + """ + def __init__(self, error_message: str, exc: Exception): + super().__init__(error_message) + self.exc = exc + + def __str__(self) -> str: + return f"{self.error_message}: {self.exc!s}" + + +class GreetingError(_WrappedProtocolError): + """ + An exception occurred during the Greeting phase. + + :param error_message: Human-readable string describing the error. + :param exc: The root-cause exception. + """ + + +class NegotiationError(_WrappedProtocolError): + """ + An exception occurred during the Negotiation phase. + + :param error_message: Human-readable string describing the error. + :param exc: The root-cause exception. + """ + + +class ExecuteError(AQMPError): + """ + Exception raised by `QMPClient.execute()` on RPC failure. + + :param error_response: The RPC error response object. + :param sent: The sent RPC message that caused the failure. + :param received: The raw RPC error reply received. + """ + def __init__(self, error_response: ErrorResponse, + sent: Message, received: Message): + super().__init__(error_response.error.desc) + #: The sent `Message` that caused the failure + self.sent: Message = sent + #: The received `Message` that indicated failure + self.received: Message = received + #: The parsed error response + self.error: ErrorResponse = error_response + #: The QMP error class + self.error_class: str = error_response.error.class_ + + +class ExecInterruptedError(AQMPError): + """ + Exception raised by `execute()` (et al) when an RPC is interrupted. + + This error is raised when an `execute()` statement could not be + completed. This can occur because the connection itself was + terminated before a reply was received. + + The true cause of the interruption will be available via `disconnect()`. + """ + + +class _MsgProtocolError(ProtocolError): + """ + Abstract error class for protocol errors that have a `Message` object. + + This Exception class is used for protocol errors where the `Message` + was mechanically understood, but was found to be inappropriate or + malformed. + + :param error_message: Human-readable string describing the error. + :param msg: The QMP `Message` that caused the error. + """ + def __init__(self, error_message: str, msg: Message): + super().__init__(error_message) + #: The received `Message` that caused the error. + self.msg: Message = msg + + def __str__(self) -> str: + return "\n".join([ + super().__str__(), + f" Message was: {str(self.msg)}\n", + ]) + + +class ServerParseError(_MsgProtocolError): + """ + The Server sent a `Message` indicating parsing failure. + + i.e. A reply has arrived from the server, but it is missing the "ID" + field, indicating a parsing error. + + :param error_message: Human-readable string describing the error. + :param msg: The QMP `Message` that caused the error. + """ + + +class BadReplyError(_MsgProtocolError): + """ + An execution reply was successfully routed, but not understood. + + If a QMP message is received with an 'id' field to allow it to be + routed, but is otherwise malformed, this exception will be raised. + + A reply message is malformed if it is missing either the 'return' or + 'error' keys, or if the 'error' value has missing keys or members of + the wrong type. + + :param error_message: Human-readable string describing the error. + :param msg: The malformed reply that was received. + :param sent: The message that was sent that prompted the error. + """ + def __init__(self, error_message: str, msg: Message, sent: Message): + super().__init__(error_message, msg) + #: The sent `Message` that caused the failure + self.sent = sent + + +class QMPClient(AsyncProtocol[Message], Events): + """ + Implements a QMP client connection. + + QMP can be used to establish a connection as either the transport + client or server, though this class always acts as the QMP client. + + :param name: Optional nickname for the connection, used for logging. + + Basic script-style usage looks like this:: + + qmp = QMPClient('my_virtual_machine_name') + await qmp.connect(('127.0.0.1', 1234)) + ... + res = await qmp.execute('block-query') + ... + await qmp.disconnect() + + Basic async client-style usage looks like this:: + + class Client: + def __init__(self, name: str): + self.qmp = QMPClient(name) + + async def watch_events(self): + try: + async for event in self.qmp.events: + print(f"Event: {event['event']}") + except asyncio.CancelledError: + return + + async def run(self, address='/tmp/qemu.socket'): + await self.qmp.connect(address) + asyncio.create_task(self.watch_events()) + await self.qmp.runstate_changed.wait() + await self.disconnect() + + See `aqmp.events` for more detail on event handling patterns. + """ + #: Logger object used for debugging messages. + logger = logging.getLogger(__name__) + + # Read buffer limit; large enough to accept query-qmp-schema + _limit = (256 * 1024) + + # Type alias for pending execute() result items + _PendingT = Union[Message, ExecInterruptedError] + + def __init__(self, name: Optional[str] = None) -> None: + super().__init__(name) + Events.__init__(self) + + #: Whether or not to await a greeting after establishing a connection. + self.await_greeting: bool = True + + #: Whether or not to perform capabilities negotiation upon connection. + #: Implies `await_greeting`. + self.negotiate: bool = True + + # Cached Greeting, if one was awaited. + self._greeting: Optional[Greeting] = None + + # Command ID counter + self._execute_id = 0 + + # Incoming RPC reply messages. + self._pending: Dict[ + Union[str, None], + 'asyncio.Queue[QMPClient._PendingT]' + ] = {} + + @upper_half + async def _establish_session(self) -> None: + """ + Initiate the QMP session. + + Wait for the QMP greeting and perform capabilities negotiation. + + :raise GreetingError: When the greeting is not understood. + :raise NegotiationError: If the negotiation fails. + :raise EOFError: When the server unexpectedly hangs up. + :raise OSError: For underlying stream errors. + """ + self._greeting = None + self._pending = {} + + if self.await_greeting or self.negotiate: + self._greeting = await self._get_greeting() + + if self.negotiate: + await self._negotiate() + + # This will start the reader/writers: + await super()._establish_session() + + @upper_half + async def _get_greeting(self) -> Greeting: + """ + :raise GreetingError: When the greeting is not understood. + :raise EOFError: When the server unexpectedly hangs up. + :raise OSError: For underlying stream errors. + + :return: the Greeting object given by the server. + """ + self.logger.debug("Awaiting greeting ...") + + try: + msg = await self._recv() + return Greeting(msg) + except (ProtocolError, KeyError, TypeError) as err: + emsg = "Did not understand Greeting" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise GreetingError(emsg, err) from err + except BaseException as err: + # EOFError, OSError, or something unexpected. + emsg = "Failed to receive Greeting" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise + + @upper_half + async def _negotiate(self) -> None: + """ + Perform QMP capabilities negotiation. + + :raise NegotiationError: When negotiation fails. + :raise EOFError: When the server unexpectedly hangs up. + :raise OSError: For underlying stream errors. + """ + self.logger.debug("Negotiating capabilities ...") + + arguments: Dict[str, List[str]] = {'enable': []} + if self._greeting and 'oob' in self._greeting.QMP.capabilities: + arguments['enable'].append('oob') + msg = self.make_execute_msg('qmp_capabilities', arguments=arguments) + + # It's not safe to use execute() here, because the reader/writers + # aren't running. AsyncProtocol *requires* that a new session + # does not fail after the reader/writers are running! + try: + await self._send(msg) + reply = await self._recv() + assert 'return' in reply + assert 'error' not in reply + except (ProtocolError, AssertionError) as err: + emsg = "Negotiation failed" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise NegotiationError(emsg, err) from err + except BaseException as err: + # EOFError, OSError, or something unexpected. + emsg = "Negotiation failed" + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + raise + + @bottom_half + async def _bh_disconnect(self) -> None: + try: + await super()._bh_disconnect() + finally: + if self._pending: + self.logger.debug("Cancelling pending executions") + keys = self._pending.keys() + for key in keys: + self.logger.debug("Cancelling execution '%s'", key) + self._pending[key].put_nowait( + ExecInterruptedError("Disconnected") + ) + + self.logger.debug("QMP Disconnected.") + + @upper_half + def _cleanup(self) -> None: + super()._cleanup() + assert not self._pending + + @bottom_half + async def _on_message(self, msg: Message) -> None: + """ + Add an incoming message to the appropriate queue/handler. + + :raise ServerParseError: When Message indicates server parse failure. + """ + # Incoming messages are not fully parsed/validated here; + # do only light peeking to know how to route the messages. + + if 'event' in msg: + await self._event_dispatch(msg) + return + + # Below, we assume everything left is an execute/exec-oob response. + + exec_id = cast(Optional[str], msg.get('id')) + + if exec_id in self._pending: + await self._pending[exec_id].put(msg) + return + + # We have a message we can't route back to a caller. + + is_error = 'error' in msg + has_id = 'id' in msg + + if is_error and not has_id: + # This is very likely a server parsing error. + # It doesn't inherently belong to any pending execution. + # Instead of performing clever recovery, just terminate. + # See "NOTE" in qmp-spec.txt, section 2.4.2 + raise ServerParseError( + ("Server sent an error response without an ID, " + "but there are no ID-less executions pending. " + "Assuming this is a server parser failure."), + msg + ) + + # qmp-spec.txt, section 2.4: + # 'Clients should drop all the responses + # that have an unknown "id" field.' + self.logger.log( + logging.ERROR if is_error else logging.WARNING, + "Unknown ID '%s', message dropped.", + exec_id, + ) + self.logger.debug("Unroutable message: %s", str(msg)) + + @upper_half + @bottom_half + async def _do_recv(self) -> Message: + """ + :raise OSError: When a stream error is encountered. + :raise EOFError: When the stream is at EOF. + :raise ProtocolError: + When the Message is not understood. + See also `Message._deserialize`. + + :return: A single QMP `Message`. + """ + msg_bytes = await self._readline() + msg = Message(msg_bytes, eager=True) + return msg + + @upper_half + @bottom_half + def _do_send(self, msg: Message) -> None: + """ + :raise ValueError: JSON serialization failure + :raise TypeError: JSON serialization failure + :raise OSError: When a stream error is encountered. + """ + assert self._writer is not None + self._writer.write(bytes(msg)) + + @upper_half + def _get_exec_id(self) -> str: + exec_id = f"__aqmp#{self._execute_id:05d}" + self._execute_id += 1 + return exec_id + + @upper_half + async def _issue(self, msg: Message) -> Union[None, str]: + """ + Issue a QMP `Message` and do not wait for a reply. + + :param msg: The QMP `Message` to send to the server. + + :return: The ID of the `Message` sent. + """ + msg_id: Optional[str] = None + if 'id' in msg: + assert isinstance(msg['id'], str) + msg_id = msg['id'] + + self._pending[msg_id] = asyncio.Queue(maxsize=1) + await self._outgoing.put(msg) + + return msg_id + + @upper_half + async def _reply(self, msg_id: Union[str, None]) -> Message: + """ + Await a reply to a previously issued QMP message. + + :param msg_id: The ID of the previously issued message. + + :return: The reply from the server. + :raise ExecInterruptedError: + When the reply could not be retrieved because the connection + was lost, or some other problem. + """ + queue = self._pending[msg_id] + result = await queue.get() + + try: + if isinstance(result, ExecInterruptedError): + raise result + return result + finally: + del self._pending[msg_id] + + @upper_half + async def _execute(self, msg: Message, assign_id: bool = True) -> Message: + """ + Send a QMP `Message` to the server and await a reply. + + This method *assumes* you are sending some kind of an execute + statement that *will* receive a reply. + + An execution ID will be assigned if assign_id is `True`. It can be + disabled, but this requires that an ID is manually assigned + instead. For manually assigned IDs, you must not use the string + '__aqmp#' anywhere in the ID. + + :param msg: The QMP `Message` to execute. + :param assign_id: If True, assign a new execution ID. + + :return: Execution reply from the server. + :raise ExecInterruptedError: + When the reply could not be retrieved because the connection + was lost, or some other problem. + """ + if assign_id: + msg['id'] = self._get_exec_id() + elif 'id' in msg: + assert isinstance(msg['id'], str) + assert '__aqmp#' not in msg['id'] + + exec_id = await self._issue(msg) + return await self._reply(exec_id) + + @upper_half + @require(Runstate.RUNNING) + async def _raw( + self, + msg: Union[Message, Mapping[str, object], bytes], + assign_id: bool = True, + ) -> Message: + """ + Issue a raw `Message` to the QMP server and await a reply. + + :param msg: + A Message to send to the server. It may be a `Message`, any + Mapping (including Dict), or raw bytes. + :param assign_id: + Assign an arbitrary execution ID to this message. If + `False`, the existing id must either be absent (and no other + such pending execution may omit an ID) or a string. If it is + a string, it must not start with '__aqmp#' and no other such + pending execution may currently be using that ID. + + :return: Execution reply from the server. + + :raise ExecInterruptedError: + When the reply could not be retrieved because the connection + was lost, or some other problem. + :raise TypeError: + When assign_id is `False`, an ID is given, and it is not a string. + :raise ValueError: + When assign_id is `False`, but the ID is not usable; + Either because it starts with '__aqmp#' or it is already in-use. + """ + # 1. convert generic Mapping or bytes to a QMP Message + # 2. copy Message objects so that we assign an ID only to the copy. + msg = Message(msg) + + exec_id = msg.get('id') + if not assign_id and 'id' in msg: + if not isinstance(exec_id, str): + raise TypeError(f"ID ('{exec_id}') must be a string.") + if exec_id.startswith('__aqmp#'): + raise ValueError( + f"ID ('{exec_id}') must not start with '__aqmp#'." + ) + + if not assign_id and exec_id in self._pending: + raise ValueError( + f"ID '{exec_id}' is in-use and cannot be used." + ) + + return await self._execute(msg, assign_id=assign_id) + + @upper_half + @require(Runstate.RUNNING) + async def execute_msg(self, msg: Message) -> object: + """ + Execute a QMP command and return its value. + + :param msg: The QMP `Message` to execute. + + :return: + The command execution return value from the server. The type of + object returned depends on the command that was issued, + though most in QEMU return a `dict`. + :raise ValueError: + If the QMP `Message` does not have either the 'execute' or + 'exec-oob' fields set. + :raise ExecuteError: When the server returns an error response. + :raise ExecInterruptedError: if the connection was terminated early. + """ + if not ('execute' in msg or 'exec-oob' in msg): + raise ValueError("Requires 'execute' or 'exec-oob' message") + + # Copy the Message so that the ID assigned by _execute() is + # local to this method; allowing the ID to be seen in raised + # Exceptions but without modifying the caller's held copy. + msg = Message(msg) + reply = await self._execute(msg) + + if 'error' in reply: + try: + error_response = ErrorResponse(reply) + except (KeyError, TypeError) as err: + # Error response was malformed. + raise BadReplyError( + "QMP error reply is malformed", reply, msg, + ) from err + + raise ExecuteError(error_response, msg, reply) + + if 'return' not in reply: + raise BadReplyError( + "QMP reply is missing a 'error' or 'return' member", + reply, msg, + ) + + return reply['return'] + + @classmethod + def make_execute_msg(cls, cmd: str, + arguments: Optional[Mapping[str, object]] = None, + oob: bool = False) -> Message: + """ + Create an executable message to be sent by `execute_msg` later. + + :param cmd: QMP command name. + :param arguments: Arguments (if any). Must be JSON-serializable. + :param oob: If `True`, execute "out of band". + + :return: An executable QMP `Message`. + """ + msg = Message({'exec-oob' if oob else 'execute': cmd}) + if arguments is not None: + msg['arguments'] = arguments + return msg + + @upper_half + async def execute(self, cmd: str, + arguments: Optional[Mapping[str, object]] = None, + oob: bool = False) -> object: + """ + Execute a QMP command and return its value. + + :param cmd: QMP command name. + :param arguments: Arguments (if any). Must be JSON-serializable. + :param oob: If `True`, execute "out of band". + + :return: + The command execution return value from the server. The type of + object returned depends on the command that was issued, + though most in QEMU return a `dict`. + :raise ExecuteError: When the server returns an error response. + :raise ExecInterruptedError: if the connection was terminated early. + """ + msg = self.make_execute_msg(cmd, arguments, oob=oob) + return await self.execute_msg(msg) diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py new file mode 100644 index 0000000..eaa5fc7 --- /dev/null +++ b/python/qemu/aqmp/util.py @@ -0,0 +1,217 @@ +""" +Miscellaneous Utilities + +This module provides asyncio utilities and compatibility wrappers for +Python 3.6 to provide some features that otherwise become available in +Python 3.7+. + +Various logging and debugging utilities are also provided, such as +`exception_summary()` and `pretty_traceback()`, used primarily for +adding information into the logging stream. +""" + +import asyncio +import sys +import traceback +from typing import ( + Any, + Coroutine, + Optional, + TypeVar, + cast, +) + + +T = TypeVar('T') + + +# -------------------------- +# Section: Utility Functions +# -------------------------- + + +async def flush(writer: asyncio.StreamWriter) -> None: + """ + Utility function to ensure a StreamWriter is *fully* drained. + + `asyncio.StreamWriter.drain` only promises we will return to below + the "high-water mark". This function ensures we flush the entire + buffer -- by setting the high water mark to 0 and then calling + drain. The flow control limits are restored after the call is + completed. + """ + transport = cast(asyncio.WriteTransport, writer.transport) + + # https://github.com/python/typeshed/issues/5779 + low, high = transport.get_write_buffer_limits() # type: ignore + transport.set_write_buffer_limits(0, 0) + try: + await writer.drain() + finally: + transport.set_write_buffer_limits(high, low) + + +def upper_half(func: T) -> T: + """ + Do-nothing decorator that annotates a method as an "upper-half" method. + + These methods must not call bottom-half functions directly, but can + schedule them to run. + """ + return func + + +def bottom_half(func: T) -> T: + """ + Do-nothing decorator that annotates a method as a "bottom-half" method. + + These methods must take great care to handle their own exceptions whenever + possible. If they go unhandled, they will cause termination of the loop. + + These methods do not, in general, have the ability to directly + report information to a caller’s context and will usually be + collected as a Task result instead. + + They must not call upper-half functions directly. + """ + return func + + +# ------------------------------- +# Section: Compatibility Wrappers +# ------------------------------- + + +def create_task(coro: Coroutine[Any, Any, T], + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> 'asyncio.Future[T]': + """ + Python 3.6-compatible `asyncio.create_task` wrapper. + + :param coro: The coroutine to execute in a task. + :param loop: Optionally, the loop to create the task in. + + :return: An `asyncio.Future` object. + """ + if sys.version_info >= (3, 7): + if loop is not None: + return loop.create_task(coro) + return asyncio.create_task(coro) # pylint: disable=no-member + + # Python 3.6: + return asyncio.ensure_future(coro, loop=loop) + + +def is_closing(writer: asyncio.StreamWriter) -> bool: + """ + Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper. + + :param writer: The `asyncio.StreamWriter` object. + :return: `True` if the writer is closing, or closed. + """ + if sys.version_info >= (3, 7): + return writer.is_closing() + + # Python 3.6: + transport = writer.transport + assert isinstance(transport, asyncio.WriteTransport) + return transport.is_closing() + + +async def wait_closed(writer: asyncio.StreamWriter) -> None: + """ + Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper. + + :param writer: The `asyncio.StreamWriter` to wait on. + """ + if sys.version_info >= (3, 7): + await writer.wait_closed() + return + + # Python 3.6 + transport = writer.transport + assert isinstance(transport, asyncio.WriteTransport) + + while not transport.is_closing(): + await asyncio.sleep(0) + + # This is an ugly workaround, but it's the best I can come up with. + sock = transport.get_extra_info('socket') + + if sock is None: + # Our transport doesn't have a socket? ... + # Nothing we can reasonably do. + return + + while sock.fileno() != -1: + await asyncio.sleep(0) + + +def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T: + """ + Python 3.6-compatible `asyncio.run` wrapper. + + :param coro: A coroutine to execute now. + :return: The return value from the coroutine. + """ + if sys.version_info >= (3, 7): + return asyncio.run(coro, debug=debug) + + # Python 3.6 + loop = asyncio.get_event_loop() + loop.set_debug(debug) + ret = loop.run_until_complete(coro) + loop.close() + + return ret + + +# ---------------------------- +# Section: Logging & Debugging +# ---------------------------- + + +def exception_summary(exc: BaseException) -> str: + """ + Return a summary string of an arbitrary exception. + + It will be of the form "ExceptionType: Error Message", if the error + string is non-empty, and just "ExceptionType" otherwise. + """ + name = type(exc).__qualname__ + smod = type(exc).__module__ + if smod not in ("__main__", "builtins"): + name = smod + '.' + name + + error = str(exc) + if error: + return f"{name}: {error}" + return name + + +def pretty_traceback(prefix: str = " | ") -> str: + """ + Formats the current traceback, indented to provide visual distinction. + + This is useful for printing a traceback within a traceback for + debugging purposes when encapsulating errors to deliver them up the + stack; when those errors are printed, this helps provide a nice + visual grouping to quickly identify the parts of the error that + belong to the inner exception. + + :param prefix: The prefix to append to each line of the traceback. + :return: A string, formatted something like the following:: + + | Traceback (most recent call last): + | File "foobar.py", line 42, in arbitrary_example + | foo.baz() + | ArbitraryError: [Errno 42] Something bad happened! + """ + output = "".join(traceback.format_exception(*sys.exc_info())) + + exc_lines = [] + for line in output.split('\n'): + exc_lines.append(prefix + line) + + # The last line is always empty, omit it + return "\n".join(exc_lines[:-1]) diff --git a/python/setup.cfg b/python/setup.cfg index fdca265..417e937 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -27,6 +27,7 @@ packages = qemu.qmp qemu.machine qemu.utils + qemu.aqmp [options.package_data] * = py.typed @@ -36,18 +37,27 @@ packages = # version, use e.g. "pipenv install --dev pylint==3.0.0". # Subsequently, edit 'Pipfile' to remove e.g. 'pylint = "==3.0.0'. devel = - avocado-framework >= 87.0 + avocado-framework >= 90.0 flake8 >= 3.6.0 fusepy >= 2.0.4 isort >= 5.1.2 mypy >= 0.770 pylint >= 2.8.0 tox >= 3.18.0 + urwid >= 2.1.2 + urwid-readline >= 0.13 + Pygments >= 2.9.0 # Provides qom-fuse functionality fuse = fusepy >= 2.0.4 +# AQMP TUI dependencies +tui = + urwid >= 2.1.2 + urwid-readline >= 0.13 + Pygments >= 2.9.0 + [options.entry_points] console_scripts = qom = qemu.qmp.qom:main @@ -58,6 +68,7 @@ console_scripts = qom-fuse = qemu.qmp.qom_fuse:QOMFuse.entry_point [fuse] qemu-ga-client = qemu.qmp.qemu_ga_client:main qmp-shell = qemu.qmp.qmp_shell:main + aqmp-tui = qemu.aqmp.aqmp_tui:main [tui] [flake8] extend-ignore = E722 # Prefer pylint's bare-except checks to flake8's @@ -73,8 +84,22 @@ namespace_packages = True # fusepy has no type stubs: allow_subclassing_any = True +[mypy-qemu.aqmp.aqmp_tui] +# urwid and urwid_readline have no type stubs: +allow_subclassing_any = True + +# The following missing import directives are because these libraries do not +# provide type stubs. Allow them on an as-needed basis for mypy. [mypy-fuse] -# fusepy has no type stubs: +ignore_missing_imports = True + +[mypy-urwid] +ignore_missing_imports = True + +[mypy-urwid_readline] +ignore_missing_imports = True + +[mypy-pygments] ignore_missing_imports = True [pylint.messages control] @@ -88,6 +113,8 @@ ignore_missing_imports = True # no Warning level messages displayed, use "--disable=all --enable=classes # --disable=W". disable=consider-using-f-string, + too-many-function-args, # mypy handles this with less false positives. + no-member, # mypy also handles this better. [pylint.basic] # Good variable names which should always be accepted, separated by a comma. @@ -100,6 +127,7 @@ good-names=i, fh, # fh = open(...) fd, # fd = os.open(...) c, # for c in string: ... + T, # for TypeVars. See pylint#3401 [pylint.similarities] # Ignore imports when computing similarities. @@ -134,5 +162,16 @@ allowlist_externals = make deps = .[devel] .[fuse] # Workaround to trigger tox venv rebuild + .[tui] # Workaround to trigger tox venv rebuild commands = make check + +# Coverage.py [https://coverage.readthedocs.io/en/latest/] is a tool for +# measuring code coverage of Python programs. It monitors your program, +# noting which parts of the code have been executed, then analyzes the +# source to identify code that could have been executed but was not. + +[coverage:run] +concurrency = multiprocessing +source = qemu/ +parallel = true diff --git a/python/tests/protocol.py b/python/tests/protocol.py new file mode 100644 index 0000000..5cd7938 --- /dev/null +++ b/python/tests/protocol.py @@ -0,0 +1,583 @@ +import asyncio +from contextlib import contextmanager +import os +import socket +from tempfile import TemporaryDirectory + +import avocado + +from qemu.aqmp import ConnectError, Runstate +from qemu.aqmp.protocol import AsyncProtocol, StateError +from qemu.aqmp.util import asyncio_run, create_task + + +class NullProtocol(AsyncProtocol[None]): + """ + NullProtocol is a test mockup of an AsyncProtocol implementation. + + It adds a fake_session instance variable that enables a code path + that bypasses the actual connection logic, but still allows the + reader/writers to start. + + Because the message type is defined as None, an asyncio.Event named + 'trigger_input' is created that prohibits the reader from + incessantly being able to yield None; this event can be poked to + simulate an incoming message. + + For testing symmetry with do_recv, an interface is added to "send" a + Null message. + + For testing purposes, a "simulate_disconnection" method is also + added which allows us to trigger a bottom half disconnect without + injecting any real errors into the reader/writer loops; in essence + it performs exactly half of what disconnect() normally does. + """ + def __init__(self, name=None): + self.fake_session = False + self.trigger_input: asyncio.Event + super().__init__(name) + + async def _establish_session(self): + self.trigger_input = asyncio.Event() + await super()._establish_session() + + async def _do_accept(self, address, ssl=None): + if not self.fake_session: + await super()._do_accept(address, ssl) + + async def _do_connect(self, address, ssl=None): + if not self.fake_session: + await super()._do_connect(address, ssl) + + async def _do_recv(self) -> None: + await self.trigger_input.wait() + self.trigger_input.clear() + + def _do_send(self, msg: None) -> None: + pass + + async def send_msg(self) -> None: + await self._outgoing.put(None) + + async def simulate_disconnect(self) -> None: + """ + Simulates a bottom-half disconnect. + + This method schedules a disconnection but does not wait for it + to complete. This is used to put the loop into the DISCONNECTING + state without fully quiescing it back to IDLE. This is normally + something you cannot coax AsyncProtocol to do on purpose, but it + will be similar to what happens with an unhandled Exception in + the reader/writer. + + Under normal circumstances, the library design requires you to + await on disconnect(), which awaits the disconnect task and + returns bottom half errors as a pre-condition to allowing the + loop to return back to IDLE. + """ + self._schedule_disconnect() + + +class LineProtocol(AsyncProtocol[str]): + def __init__(self, name=None): + super().__init__(name) + self.rx_history = [] + + async def _do_recv(self) -> str: + raw = await self._readline() + msg = raw.decode() + self.rx_history.append(msg) + return msg + + def _do_send(self, msg: str) -> None: + assert self._writer is not None + self._writer.write(msg.encode() + b'\n') + + async def send_msg(self, msg: str) -> None: + await self._outgoing.put(msg) + + +def run_as_task(coro, allow_cancellation=False): + """ + Run a given coroutine as a task. + + Optionally, wrap it in a try..except block that allows this + coroutine to be canceled gracefully. + """ + async def _runner(): + try: + await coro + except asyncio.CancelledError: + if allow_cancellation: + return + raise + return create_task(_runner()) + + +@contextmanager +def jammed_socket(): + """ + Opens up a random unused TCP port on localhost, then jams it. + """ + socks = [] + + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('127.0.0.1', 0)) + sock.listen(1) + address = sock.getsockname() + + socks.append(sock) + + # I don't *fully* understand why, but it takes *two* un-accepted + # connections to start jamming the socket. + for _ in range(2): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(address) + socks.append(sock) + + yield address + + finally: + for sock in socks: + sock.close() + + +class Smoke(avocado.Test): + + def setUp(self): + self.proto = NullProtocol() + + def test__repr__(self): + self.assertEqual( + repr(self.proto), + "<NullProtocol runstate=IDLE>" + ) + + def testRunstate(self): + self.assertEqual( + self.proto.runstate, + Runstate.IDLE + ) + + def testDefaultName(self): + self.assertEqual( + self.proto.name, + None + ) + + def testLogger(self): + self.assertEqual( + self.proto.logger.name, + 'qemu.aqmp.protocol' + ) + + def testName(self): + self.proto = NullProtocol('Steve') + + self.assertEqual( + self.proto.name, + 'Steve' + ) + + self.assertEqual( + self.proto.logger.name, + 'qemu.aqmp.protocol.Steve' + ) + + self.assertEqual( + repr(self.proto), + "<NullProtocol name='Steve' runstate=IDLE>" + ) + + +class TestBase(avocado.Test): + + def setUp(self): + self.proto = NullProtocol(type(self).__name__) + self.assertEqual(self.proto.runstate, Runstate.IDLE) + self.runstate_watcher = None + + def tearDown(self): + self.assertEqual(self.proto.runstate, Runstate.IDLE) + + async def _asyncSetUp(self): + pass + + async def _asyncTearDown(self): + if self.runstate_watcher: + await self.runstate_watcher + + @staticmethod + def async_test(async_test_method): + """ + Decorator; adds SetUp and TearDown to async tests. + """ + async def _wrapper(self, *args, **kwargs): + loop = asyncio.get_event_loop() + loop.set_debug(True) + + await self._asyncSetUp() + await async_test_method(self, *args, **kwargs) + await self._asyncTearDown() + + return _wrapper + + # Definitions + + # The states we expect a "bad" connect/accept attempt to transition through + BAD_CONNECTION_STATES = ( + Runstate.CONNECTING, + Runstate.DISCONNECTING, + Runstate.IDLE, + ) + + # The states we expect a "good" session to transition through + GOOD_CONNECTION_STATES = ( + Runstate.CONNECTING, + Runstate.RUNNING, + Runstate.DISCONNECTING, + Runstate.IDLE, + ) + + # Helpers + + async def _watch_runstates(self, *states): + """ + This launches a task alongside (most) tests below to confirm that + the sequence of runstate changes that occur is exactly as + anticipated. + """ + async def _watcher(): + for state in states: + new_state = await self.proto.runstate_changed() + self.assertEqual( + new_state, + state, + msg=f"Expected state '{state.name}'", + ) + + self.runstate_watcher = create_task(_watcher()) + # Kick the loop and force the task to block on the event. + await asyncio.sleep(0) + + +class State(TestBase): + + @TestBase.async_test + async def testSuperfluousDisconnect(self): + """ + Test calling disconnect() while already disconnected. + """ + await self._watch_runstates( + Runstate.DISCONNECTING, + Runstate.IDLE, + ) + await self.proto.disconnect() + + +class Connect(TestBase): + """ + Tests primarily related to calling Connect(). + """ + async def _bad_connection(self, family: str): + assert family in ('INET', 'UNIX') + + if family == 'INET': + await self.proto.connect(('127.0.0.1', 0)) + elif family == 'UNIX': + await self.proto.connect('/dev/null') + + async def _hanging_connection(self): + with jammed_socket() as addr: + await self.proto.connect(addr) + + async def _bad_connection_test(self, family: str): + await self._watch_runstates(*self.BAD_CONNECTION_STATES) + + with self.assertRaises(ConnectError) as context: + await self._bad_connection(family) + + self.assertIsInstance(context.exception.exc, OSError) + self.assertEqual( + context.exception.error_message, + "Failed to establish connection" + ) + + @TestBase.async_test + async def testBadINET(self): + """ + Test an immediately rejected call to an IP target. + """ + await self._bad_connection_test('INET') + + @TestBase.async_test + async def testBadUNIX(self): + """ + Test an immediately rejected call to a UNIX socket target. + """ + await self._bad_connection_test('UNIX') + + @TestBase.async_test + async def testCancellation(self): + """ + Test what happens when a connection attempt is aborted. + """ + # Note that accept() cannot be cancelled outright, as it isn't a task. + # However, we can wrap it in a task and cancel *that*. + await self._watch_runstates(*self.BAD_CONNECTION_STATES) + task = run_as_task(self._hanging_connection(), allow_cancellation=True) + + state = await self.proto.runstate_changed() + self.assertEqual(state, Runstate.CONNECTING) + + # This is insider baseball, but the connection attempt has + # yielded *just* before the actual connection attempt, so kick + # the loop to make sure it's truly wedged. + await asyncio.sleep(0) + + task.cancel() + await task + + @TestBase.async_test + async def testTimeout(self): + """ + Test what happens when a connection attempt times out. + """ + await self._watch_runstates(*self.BAD_CONNECTION_STATES) + task = run_as_task(self._hanging_connection()) + + # More insider baseball: to improve the speed of this test while + # guaranteeing that the connection even gets a chance to start, + # verify that the connection hangs *first*, then await the + # result of the task with a nearly-zero timeout. + + state = await self.proto.runstate_changed() + self.assertEqual(state, Runstate.CONNECTING) + await asyncio.sleep(0) + + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(task, timeout=0) + + @TestBase.async_test + async def testRequire(self): + """ + Test what happens when a connection attempt is made while CONNECTING. + """ + await self._watch_runstates(*self.BAD_CONNECTION_STATES) + task = run_as_task(self._hanging_connection(), allow_cancellation=True) + + state = await self.proto.runstate_changed() + self.assertEqual(state, Runstate.CONNECTING) + + with self.assertRaises(StateError) as context: + await self._bad_connection('UNIX') + + self.assertEqual( + context.exception.error_message, + "NullProtocol is currently connecting." + ) + self.assertEqual(context.exception.state, Runstate.CONNECTING) + self.assertEqual(context.exception.required, Runstate.IDLE) + + task.cancel() + await task + + @TestBase.async_test + async def testImplicitRunstateInit(self): + """ + Test what happens if we do not wait on the runstate event until + AFTER a connection is made, i.e., connect()/accept() themselves + initialize the runstate event. All of the above tests force the + initialization by waiting on the runstate *first*. + """ + task = run_as_task(self._hanging_connection(), allow_cancellation=True) + + # Kick the loop to coerce the state change + await asyncio.sleep(0) + assert self.proto.runstate == Runstate.CONNECTING + + # We already missed the transition to CONNECTING + await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE) + + task.cancel() + await task + + +class Accept(Connect): + """ + All of the same tests as Connect, but using the accept() interface. + """ + async def _bad_connection(self, family: str): + assert family in ('INET', 'UNIX') + + if family == 'INET': + await self.proto.accept(('example.com', 1)) + elif family == 'UNIX': + await self.proto.accept('/dev/null') + + async def _hanging_connection(self): + with TemporaryDirectory(suffix='.aqmp') as tmpdir: + sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock") + await self.proto.accept(sock) + + +class FakeSession(TestBase): + + def setUp(self): + super().setUp() + self.proto.fake_session = True + + async def _asyncSetUp(self): + await super()._asyncSetUp() + await self._watch_runstates(*self.GOOD_CONNECTION_STATES) + + async def _asyncTearDown(self): + await self.proto.disconnect() + await super()._asyncTearDown() + + #### + + @TestBase.async_test + async def testFakeConnect(self): + + """Test the full state lifecycle (via connect) with a no-op session.""" + await self.proto.connect('/not/a/real/path') + self.assertEqual(self.proto.runstate, Runstate.RUNNING) + + @TestBase.async_test + async def testFakeAccept(self): + """Test the full state lifecycle (via accept) with a no-op session.""" + await self.proto.accept('/not/a/real/path') + self.assertEqual(self.proto.runstate, Runstate.RUNNING) + + @TestBase.async_test + async def testFakeRecv(self): + """Test receiving a fake/null message.""" + await self.proto.accept('/not/a/real/path') + + logname = self.proto.logger.name + with self.assertLogs(logname, level='DEBUG') as context: + self.proto.trigger_input.set() + self.proto.trigger_input.clear() + await asyncio.sleep(0) # Kick reader. + + self.assertEqual( + context.output, + [f"DEBUG:{logname}:<-- None"], + ) + + @TestBase.async_test + async def testFakeSend(self): + """Test sending a fake/null message.""" + await self.proto.accept('/not/a/real/path') + + logname = self.proto.logger.name + with self.assertLogs(logname, level='DEBUG') as context: + # Cheat: Send a Null message to nobody. + await self.proto.send_msg() + # Kick writer; awaiting on a queue.put isn't sufficient to yield. + await asyncio.sleep(0) + + self.assertEqual( + context.output, + [f"DEBUG:{logname}:--> None"], + ) + + async def _prod_session_api( + self, + current_state: Runstate, + error_message: str, + accept: bool = True + ): + with self.assertRaises(StateError) as context: + if accept: + await self.proto.accept('/not/a/real/path') + else: + await self.proto.connect('/not/a/real/path') + + self.assertEqual(context.exception.error_message, error_message) + self.assertEqual(context.exception.state, current_state) + self.assertEqual(context.exception.required, Runstate.IDLE) + + @TestBase.async_test + async def testAcceptRequireRunning(self): + """Test that accept() cannot be called when Runstate=RUNNING""" + await self.proto.accept('/not/a/real/path') + + await self._prod_session_api( + Runstate.RUNNING, + "NullProtocol is already connected and running.", + accept=True, + ) + + @TestBase.async_test + async def testConnectRequireRunning(self): + """Test that connect() cannot be called when Runstate=RUNNING""" + await self.proto.accept('/not/a/real/path') + + await self._prod_session_api( + Runstate.RUNNING, + "NullProtocol is already connected and running.", + accept=False, + ) + + @TestBase.async_test + async def testAcceptRequireDisconnecting(self): + """Test that accept() cannot be called when Runstate=DISCONNECTING""" + await self.proto.accept('/not/a/real/path') + + # Cheat: force a disconnect. + await self.proto.simulate_disconnect() + + await self._prod_session_api( + Runstate.DISCONNECTING, + ("NullProtocol is disconnecting." + " Call disconnect() to return to IDLE state."), + accept=True, + ) + + @TestBase.async_test + async def testConnectRequireDisconnecting(self): + """Test that connect() cannot be called when Runstate=DISCONNECTING""" + await self.proto.accept('/not/a/real/path') + + # Cheat: force a disconnect. + await self.proto.simulate_disconnect() + + await self._prod_session_api( + Runstate.DISCONNECTING, + ("NullProtocol is disconnecting." + " Call disconnect() to return to IDLE state."), + accept=False, + ) + + +class SimpleSession(TestBase): + + def setUp(self): + super().setUp() + self.server = LineProtocol(type(self).__name__ + '-server') + + async def _asyncSetUp(self): + await super()._asyncSetUp() + await self._watch_runstates(*self.GOOD_CONNECTION_STATES) + + async def _asyncTearDown(self): + await self.proto.disconnect() + try: + await self.server.disconnect() + except EOFError: + pass + await super()._asyncTearDown() + + @TestBase.async_test + async def testSmoke(self): + with TemporaryDirectory(suffix='.aqmp') as tmpdir: + sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock") + server_task = create_task(self.server.accept(sock)) + + # give the server a chance to start listening [...] + await asyncio.sleep(0) + await self.proto.connect(sock) |