From aeb6b48a4714895a202e97a35c789ef7edf71665 Mon Sep 17 00:00:00 2001 From: G S Niteesh Babu Date: Tue, 24 Aug 2021 03:37:43 +0530 Subject: python/aqmp-tui: Add AQMP TUI Added AQMP TUI. Implements the follwing basic features: 1) Command transmission/reception. 2) Shows events asynchronously. 3) Shows server status in the bottom status bar. 4) Automatic retries on disconnects and error conditions. Also added type annotations and necessary pylint/mypy configurations. Signed-off-by: G S Niteesh Babu Message-Id: <20210823220746.28295-3-niteesh.gs@gmail.com> Signed-off-by: John Snow --- python/qemu/aqmp/aqmp_tui.py | 620 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 620 insertions(+) create mode 100644 python/qemu/aqmp/aqmp_tui.py (limited to 'python/qemu') diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py new file mode 100644 index 0000000..ac53354 --- /dev/null +++ b/python/qemu/aqmp/aqmp_tui.py @@ -0,0 +1,620 @@ +# Copyright (c) 2021 +# +# Authors: +# Niteesh Babu G S +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. +""" +AQMP TUI + +AQMP TUI is an asynchronous interface built on top the of the AQMP library. +It is the successor of QMP-shell and is bought-in as a replacement for it. + +Example Usage: aqmp-tui +Full Usage: aqmp-tui --help +""" + +import argparse +import asyncio +import json +import logging +from logging import Handler, LogRecord +import signal +from typing import ( + List, + Optional, + Tuple, + Type, + Union, + cast, +) + +import urwid +import urwid_readline + +from ..qmp import QEMUMonitorProtocol, QMPBadPortError +from .error import ProtocolError +from .message import DeserializationError, Message, UnexpectedTypeError +from .protocol import ConnectError, Runstate +from .qmp_client import ExecInterruptedError, QMPClient +from .util import create_task, pretty_traceback + + +# The name of the signal that is used to update the history list +UPDATE_MSG: str = 'UPDATE_MSG' + + +def format_json(msg: str) -> str: + """ + Formats valid/invalid multi-line JSON message into a single-line message. + + Formatting is first tried using the standard json module. If that fails + due to an decoding error then a simple string manipulation is done to + achieve a single line JSON string. + + Converting into single line is more asthetically pleasing when looking + along with error messages. + + Eg: + Input: + [ 1, + true, + 3 ] + The above input is not a valid QMP message and produces the following error + "QMP message is not a JSON object." + When displaying this in TUI in multiline mode we get + + [ 1, + true, + 3 ]: QMP message is not a JSON object. + + whereas in singleline mode we get the following + + [1, true, 3]: QMP message is not a JSON object. + + The single line mode is more asthetically pleasing. + + :param msg: + The message to formatted into single line. + + :return: Formatted singleline message. + """ + try: + msg = json.loads(msg) + return str(json.dumps(msg)) + except json.decoder.JSONDecodeError: + msg = msg.replace('\n', '') + words = msg.split(' ') + words = list(filter(None, words)) + return ' '.join(words) + + +def has_handler_type(logger: logging.Logger, + handler_type: Type[Handler]) -> bool: + """ + The Logger class has no interface to check if a certain type of handler is + installed or not. So we provide an interface to do so. + + :param logger: + Logger object + :param handler_type: + The type of the handler to be checked. + + :return: returns True if handler of type `handler_type`. + """ + for handler in logger.handlers: + if isinstance(handler, handler_type): + return True + return False + + +class App(QMPClient): + """ + Implements the AQMP TUI. + + Initializes the widgets and starts the urwid event loop. + + :param address: + Address of the server to connect to. + :param num_retries: + The number of times to retry before stopping to reconnect. + :param retry_delay: + The delay(sec) before each retry + """ + def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int, + retry_delay: Optional[int]) -> None: + urwid.register_signal(type(self), UPDATE_MSG) + self.window = Window(self) + self.address = address + self.aloop: Optional[asyncio.AbstractEventLoop] = None + self.num_retries = num_retries + self.retry_delay = retry_delay if retry_delay else 2 + self.retry: bool = False + self.exiting: bool = False + super().__init__() + + def add_to_history(self, msg: str, level: Optional[str] = None) -> None: + """ + Appends the msg to the history list. + + :param msg: + The raw message to be appended in string type. + """ + urwid.emit_signal(self, UPDATE_MSG, msg, level) + + def _cb_outbound(self, msg: Message) -> Message: + """ + Callback: outbound message hook. + + Appends the outgoing messages to the history box. + + :param msg: raw outbound message. + :return: final outbound message. + """ + str_msg = str(msg) + + if not has_handler_type(logging.getLogger(), TUILogHandler): + logging.debug('Request: %s', str_msg) + self.add_to_history('<-- ' + str_msg) + return msg + + def _cb_inbound(self, msg: Message) -> Message: + """ + Callback: outbound message hook. + + Appends the incoming messages to the history box. + + :param msg: raw inbound message. + :return: final inbound message. + """ + str_msg = str(msg) + + if not has_handler_type(logging.getLogger(), TUILogHandler): + logging.debug('Request: %s', str_msg) + self.add_to_history('--> ' + str_msg) + return msg + + async def _send_to_server(self, msg: Message) -> None: + """ + This coroutine sends the message to the server. + The message has to be pre-validated. + + :param msg: + Pre-validated message to be to sent to the server. + + :raise Exception: When an unhandled exception is caught. + """ + try: + await self._raw(msg, assign_id='id' not in msg) + except ExecInterruptedError as err: + logging.info('Error server disconnected before reply %s', str(err)) + self.add_to_history('Server disconnected before reply', 'ERROR') + except Exception as err: + logging.error('Exception from _send_to_server: %s', str(err)) + raise err + + def cb_send_to_server(self, raw_msg: str) -> None: + """ + Validates and sends the message to the server. + The raw string message is first converted into a Message object + and is then sent to the server. + + :param raw_msg: + The raw string message to be sent to the server. + + :raise Exception: When an unhandled exception is caught. + """ + try: + msg = Message(bytes(raw_msg, encoding='utf-8')) + create_task(self._send_to_server(msg)) + except (DeserializationError, UnexpectedTypeError) as err: + raw_msg = format_json(raw_msg) + logging.info('Invalid message: %s', err.error_message) + self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR') + + def unhandled_input(self, key: str) -> None: + """ + Handle's keys which haven't been handled by the child widgets. + + :param key: + Unhandled key + """ + if key == 'esc': + self.kill_app() + + def kill_app(self) -> None: + """ + Initiates killing of app. A bridge between asynchronous and synchronous + code. + """ + create_task(self._kill_app()) + + async def _kill_app(self) -> None: + """ + This coroutine initiates the actual disconnect process and calls + urwid.ExitMainLoop() to kill the TUI. + + :raise Exception: When an unhandled exception is caught. + """ + self.exiting = True + await self.disconnect() + logging.debug('Disconnect finished. Exiting app') + raise urwid.ExitMainLoop() + + async def disconnect(self) -> None: + """ + Overrides the disconnect method to handle the errors locally. + """ + try: + await super().disconnect() + except (OSError, EOFError) as err: + logging.info('disconnect: %s', str(err)) + self.retry = True + except ProtocolError as err: + logging.info('disconnect: %s', str(err)) + except Exception as err: + logging.error('disconnect: Unhandled exception %s', str(err)) + raise err + + def _set_status(self, msg: str) -> None: + """ + Sets the message as the status. + + :param msg: + The message to be displayed in the status bar. + """ + self.window.footer.set_text(msg) + + def _get_formatted_address(self) -> str: + """ + Returns a formatted version of the server's address. + + :return: formatted address + """ + if isinstance(self.address, tuple): + host, port = self.address + addr = f'{host}:{port}' + else: + addr = f'{self.address}' + return addr + + async def _initiate_connection(self) -> Optional[ConnectError]: + """ + Tries connecting to a server a number of times with a delay between + each try. If all retries failed then return the error faced during + the last retry. + + :return: Error faced during last retry. + """ + current_retries = 0 + err = None + + # initial try + await self.connect_server() + while self.retry and current_retries < self.num_retries: + logging.info('Connection Failed, retrying in %d', self.retry_delay) + status = f'[Retry #{current_retries} ({self.retry_delay}s)]' + self._set_status(status) + + await asyncio.sleep(self.retry_delay) + + err = await self.connect_server() + current_retries += 1 + # If all retries failed report the last error + if err: + logging.info('All retries failed: %s', err) + return err + return None + + async def manage_connection(self) -> None: + """ + Manage the connection based on the current run state. + + A reconnect is issued when the current state is IDLE and the number + of retries is not exhausted. + A disconnect is issued when the current state is DISCONNECTING. + """ + while not self.exiting: + if self.runstate == Runstate.IDLE: + err = await self._initiate_connection() + # If retry is still true then, we have exhausted all our tries. + if err: + self._set_status(f'[Error: {err.error_message}]') + else: + addr = self._get_formatted_address() + self._set_status(f'[Connected {addr}]') + elif self.runstate == Runstate.DISCONNECTING: + self._set_status('[Disconnected]') + await self.disconnect() + # check if a retry is needed + if self.runstate == Runstate.IDLE: + continue + await self.runstate_changed() + + async def connect_server(self) -> Optional[ConnectError]: + """ + Initiates a connection to the server at address `self.address` + and in case of a failure, sets the status to the respective error. + """ + try: + await self.connect(self.address) + self.retry = False + except ConnectError as err: + logging.info('connect_server: ConnectError %s', str(err)) + self.retry = True + return err + return None + + def run(self, debug: bool = False) -> None: + """ + Starts the long running co-routines and the urwid event loop. + + :param debug: + Enables/Disables asyncio event loop debugging + """ + self.aloop = asyncio.get_event_loop() + self.aloop.set_debug(debug) + + # Gracefully handle SIGTERM and SIGINT signals + cancel_signals = [signal.SIGTERM, signal.SIGINT] + for sig in cancel_signals: + self.aloop.add_signal_handler(sig, self.kill_app) + + event_loop = urwid.AsyncioEventLoop(loop=self.aloop) + main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'), + unhandled_input=self.unhandled_input, + handle_mouse=True, + event_loop=event_loop) + + create_task(self.manage_connection(), self.aloop) + try: + main_loop.run() + except Exception as err: + logging.error('%s\n%s\n', str(err), pretty_traceback()) + raise err + + +class StatusBar(urwid.Text): + """ + A simple statusbar modelled using the Text widget. The status can be + set using the set_text function. All text set is aligned to right. + + :param text: Initial text to be displayed. Default is empty str. + """ + def __init__(self, text: str = ''): + super().__init__(text, align='right') + + +class Editor(urwid_readline.ReadlineEdit): + """ + A simple editor modelled using the urwid_readline.ReadlineEdit widget. + Mimcs GNU readline shortcuts and provides history support. + + The readline shortcuts can be found below: + https://github.com/rr-/urwid_readline#features + + Along with the readline features, this editor also has support for + history. Pressing the 'up'/'down' switches between the prev/next messages + available in the history. + + Currently there is no support to save the history to a file. The history of + previous commands is lost on exit. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + super().__init__(caption='> ', multiline=True) + self.parent = parent + self.history: List[str] = [] + self.last_index: int = -1 + self.show_history: bool = False + + def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]: + """ + Handles the keypress on this widget. + + :param size: + The current size of the widget. + :param key: + The key to be handled. + + :return: Unhandled key if any. + """ + msg = self.get_edit_text() + if key == 'up' and not msg: + # Show the history when 'up arrow' is pressed with no input text. + # NOTE: The show_history logic is necessary because in 'multiline' + # mode (which we use) 'up arrow' is used to move between lines. + if not self.history: + return None + self.show_history = True + last_msg = self.history[self.last_index] + self.set_edit_text(last_msg) + self.edit_pos = len(last_msg) + elif key == 'up' and self.show_history: + self.last_index = max(self.last_index - 1, -len(self.history)) + self.set_edit_text(self.history[self.last_index]) + self.edit_pos = len(self.history[self.last_index]) + elif key == 'down' and self.show_history: + if self.last_index == -1: + self.set_edit_text('') + self.show_history = False + else: + self.last_index += 1 + self.set_edit_text(self.history[self.last_index]) + self.edit_pos = len(self.history[self.last_index]) + elif key == 'meta enter': + # When using multiline, enter inserts a new line into the editor + # send the input to the server on alt + enter + self.parent.cb_send_to_server(msg) + self.history.append(msg) + self.set_edit_text('') + self.last_index = -1 + self.show_history = False + else: + self.show_history = False + self.last_index = -1 + return cast(Optional[str], super().keypress(size, key)) + return None + + +class EditorWidget(urwid.Filler): + """ + Wrapper around the editor widget. + + The Editor is a flow widget and has to wrapped inside a box widget. + This class wraps the Editor inside filler widget. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + super().__init__(Editor(parent), valign='top') + + +class HistoryBox(urwid.ListBox): + """ + This widget is modelled using the ListBox widget, contains the list of + all messages both QMP messages and log messsages to be shown in the TUI. + + The messages are urwid.Text widgets. On every append of a message, the + focus is shifted to the last appended message. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + self.parent = parent + self.history = urwid.SimpleFocusListWalker([]) + super().__init__(self.history) + + def add_to_history(self, history: str) -> None: + """ + Appends a message to the list and set the focus to the last appended + message. + + :param history: + The history item(message/event) to be appended to the list. + """ + self.history.append(urwid.Text(history)) + self.history.set_focus(len(self.history) - 1) + + def mouse_event(self, size: Tuple[int, int], _event: str, button: float, + _x: int, _y: int, focus: bool) -> None: + # Unfortunately there are no urwid constants that represent the mouse + # events. + if button == 4: # Scroll up event + super().keypress(size, 'up') + elif button == 5: # Scroll down event + super().keypress(size, 'down') + + +class HistoryWindow(urwid.Frame): + """ + This window composes the HistoryBox and EditorWidget in a horizontal split. + By default the first focus is given to the history box. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + self.parent = parent + self.editor_widget = EditorWidget(parent) + self.editor = urwid.LineBox(self.editor_widget) + self.history = HistoryBox(parent) + self.body = urwid.Pile([('weight', 80, self.history), + ('weight', 20, self.editor)]) + super().__init__(self.body) + urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history) + + def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None: + """ + Appends a message to the history box + + :param msg: + The message to be appended to the history box. + """ + if level: + msg = f'[{level}]: {msg}' + self.history.add_to_history(msg) + + +class Window(urwid.Frame): + """ + This window is the top most widget of the TUI and will contain other + windows. Each child of this widget is responsible for displaying a specific + functionality. + + :param parent: Reference to the TUI object. + """ + def __init__(self, parent: App) -> None: + self.parent = parent + footer = StatusBar() + body = HistoryWindow(parent) + super().__init__(body, footer=footer) + + +class TUILogHandler(Handler): + """ + This handler routes all the log messages to the TUI screen. + It is installed to the root logger to so that the log message from all + libraries begin used is routed to the screen. + + :param tui: Reference to the TUI object. + """ + def __init__(self, tui: App) -> None: + super().__init__() + self.tui = tui + + def emit(self, record: LogRecord) -> None: + """ + Emits a record to the TUI screen. + + Appends the log message to the TUI screen + """ + level = record.levelname + msg = record.getMessage() + self.tui.add_to_history(msg, level) + + +def main() -> None: + """ + Driver of the whole script, parses arguments, initialize the TUI and + the logger. + """ + parser = argparse.ArgumentParser(description='AQMP TUI') + parser.add_argument('qmp_server', help='Address of the QMP server. ' + 'Format ') + parser.add_argument('--num-retries', type=int, default=10, + help='Number of times to reconnect before giving up.') + parser.add_argument('--retry-delay', type=int, + help='Time(s) to wait before next retry. ' + 'Default action is to wait 2s between each retry.') + parser.add_argument('--log-file', help='The Log file name') + parser.add_argument('--log-level', default='WARNING', + help='Log level ') + parser.add_argument('--asyncio-debug', action='store_true', + help='Enable debug mode for asyncio loop. ' + 'Generates lot of output, makes TUI unusable when ' + 'logs are logged in the TUI. ' + 'Use only when logging to a file.') + args = parser.parse_args() + + try: + address = QEMUMonitorProtocol.parse_address(args.qmp_server) + except QMPBadPortError as err: + parser.error(str(err)) + + app = App(address, args.num_retries, args.retry_delay) + + root_logger = logging.getLogger() + root_logger.setLevel(logging.getLevelName(args.log_level)) + + if args.log_file: + root_logger.addHandler(logging.FileHandler(args.log_file)) + else: + root_logger.addHandler(TUILogHandler(app)) + + app.run(args.asyncio_debug) + + +if __name__ == '__main__': + main() -- cgit v1.1