From fd9c3a6219b0470c356c8486188052d353846806 Mon Sep 17 00:00:00 2001 From: John Snow Date: Mon, 10 Jan 2022 18:28:55 -0500 Subject: python: move qmp-shell under the AQMP package Signed-off-by: John Snow Reviewed-by: Vladimir Sementsov-Ogievskiy Reviewed-by: Beraldo Leal --- python/qemu/aqmp/qmp_shell.py | 537 ++++++++++++++++++++++++++++++++++++++++++ python/qemu/qmp/qmp_shell.py | 537 ------------------------------------------ 2 files changed, 537 insertions(+), 537 deletions(-) create mode 100644 python/qemu/aqmp/qmp_shell.py delete mode 100644 python/qemu/qmp/qmp_shell.py (limited to 'python/qemu') diff --git a/python/qemu/aqmp/qmp_shell.py b/python/qemu/aqmp/qmp_shell.py new file mode 100644 index 0000000..d11bf54 --- /dev/null +++ b/python/qemu/aqmp/qmp_shell.py @@ -0,0 +1,537 @@ +# +# Copyright (C) 2009, 2010 Red Hat Inc. +# +# Authors: +# Luiz Capitulino +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. +# + +""" +Low-level QEMU shell on top of QMP. + +usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server + +positional arguments: + qmp_server < UNIX socket path | TCP address:port > + +optional arguments: + -h, --help show this help message and exit + -H, --hmp Use HMP interface + -N, --skip-negotiation + Skip negotiate (for qemu-ga) + -v, --verbose Verbose (echo commands sent and received) + -p, --pretty Pretty-print JSON + + +Start QEMU with: + +# qemu [...] -qmp unix:./qmp-sock,server + +Run the shell: + +$ qmp-shell ./qmp-sock + +Commands have the following format: + + < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] + +For example: + +(QEMU) device_add driver=e1000 id=net1 +{'return': {}} +(QEMU) + +key=value pairs also support Python or JSON object literal subset notations, +without spaces. Dictionaries/objects {} are supported as are arrays []. + + example-command arg-name1={'key':'value','obj'={'prop':"value"}} + +Both JSON and Python formatting should work, including both styles of +string literal quotes. Both paradigms of literal values should work, +including null/true/false for JSON and None/True/False for Python. + + +Transactions have the following multi-line format: + + transaction( + action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] + ... + action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] + ) + +One line transactions are also supported: + + transaction( action-name1 ... ) + +For example: + + (QEMU) transaction( + TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 + TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 + TRANS> ) + {"return": {}} + (QEMU) + +Use the -v and -p options to activate the verbose and pretty-print options, +which will echo back the properly formatted JSON-compliant QMP that is being +sent to QEMU, which is useful for debugging and documentation generation. +""" + +import argparse +import ast +import json +import logging +import os +import re +import readline +import sys +from typing import ( + Iterator, + List, + NoReturn, + Optional, + Sequence, +) + +from qemu.aqmp import ConnectError, QMPError, SocketAddrT +from qemu.aqmp.legacy import ( + QEMUMonitorProtocol, + QMPBadPortError, + QMPMessage, + QMPObject, +) + + +LOG = logging.getLogger(__name__) + + +class QMPCompleter: + """ + QMPCompleter provides a readline library tab-complete behavior. + """ + # NB: Python 3.9+ will probably allow us to subclass list[str] directly, + # but pylint as of today does not know that List[str] is simply 'list'. + def __init__(self) -> None: + self._matches: List[str] = [] + + def append(self, value: str) -> None: + """Append a new valid completion to the list of possibilities.""" + return self._matches.append(value) + + def complete(self, text: str, state: int) -> Optional[str]: + """readline.set_completer() callback implementation.""" + for cmd in self._matches: + if cmd.startswith(text): + if state == 0: + return cmd + state -= 1 + return None + + +class QMPShellError(QMPError): + """ + QMP Shell Base error class. + """ + + +class FuzzyJSON(ast.NodeTransformer): + """ + This extension of ast.NodeTransformer filters literal "true/false/null" + values in a Python AST and replaces them by proper "True/False/None" values + that Python can properly evaluate. + """ + + @classmethod + def visit_Name(cls, # pylint: disable=invalid-name + node: ast.Name) -> ast.AST: + """ + Transform Name nodes with certain values into Constant (keyword) nodes. + """ + if node.id == 'true': + return ast.Constant(value=True) + if node.id == 'false': + return ast.Constant(value=False) + if node.id == 'null': + return ast.Constant(value=None) + return node + + +class QMPShell(QEMUMonitorProtocol): + """ + QMPShell provides a basic readline-based QMP shell. + + :param address: Address of the QMP server. + :param pretty: Pretty-print QMP messages. + :param verbose: Echo outgoing QMP messages to console. + """ + def __init__(self, address: SocketAddrT, + pretty: bool = False, verbose: bool = False): + super().__init__(address) + self._greeting: Optional[QMPMessage] = None + self._completer = QMPCompleter() + self._transmode = False + self._actions: List[QMPMessage] = [] + self._histfile = os.path.join(os.path.expanduser('~'), + '.qmp-shell_history') + self.pretty = pretty + self.verbose = verbose + + def close(self) -> None: + # Hook into context manager of parent to save shell history. + self._save_history() + super().close() + + def _fill_completion(self) -> None: + cmds = self.cmd('query-commands') + if 'error' in cmds: + return + for cmd in cmds['return']: + self._completer.append(cmd['name']) + + def _completer_setup(self) -> None: + self._completer = QMPCompleter() + self._fill_completion() + readline.set_history_length(1024) + readline.set_completer(self._completer.complete) + readline.parse_and_bind("tab: complete") + # NB: default delimiters conflict with some command names + # (eg. query-), clearing everything as it doesn't seem to matter + readline.set_completer_delims('') + try: + readline.read_history_file(self._histfile) + except FileNotFoundError: + pass + except IOError as err: + msg = f"Failed to read history '{self._histfile}': {err!s}" + LOG.warning(msg) + + def _save_history(self) -> None: + try: + readline.write_history_file(self._histfile) + except IOError as err: + msg = f"Failed to save history file '{self._histfile}': {err!s}" + LOG.warning(msg) + + @classmethod + def _parse_value(cls, val: str) -> object: + try: + return int(val) + except ValueError: + pass + + if val.lower() == 'true': + return True + if val.lower() == 'false': + return False + if val.startswith(('{', '[')): + # Try first as pure JSON: + try: + return json.loads(val) + except ValueError: + pass + # Try once again as FuzzyJSON: + try: + tree = ast.parse(val, mode='eval') + transformed = FuzzyJSON().visit(tree) + return ast.literal_eval(transformed) + except (SyntaxError, ValueError): + pass + return val + + def _cli_expr(self, + tokens: Sequence[str], + parent: QMPObject) -> None: + for arg in tokens: + (key, sep, val) = arg.partition('=') + if sep != '=': + raise QMPShellError( + f"Expected a key=value pair, got '{arg!s}'" + ) + + value = self._parse_value(val) + optpath = key.split('.') + curpath = [] + for path in optpath[:-1]: + curpath.append(path) + obj = parent.get(path, {}) + if not isinstance(obj, dict): + msg = 'Cannot use "{:s}" as both leaf and non-leaf key' + raise QMPShellError(msg.format('.'.join(curpath))) + parent[path] = obj + parent = obj + if optpath[-1] in parent: + if isinstance(parent[optpath[-1]], dict): + msg = 'Cannot use "{:s}" as both leaf and non-leaf key' + raise QMPShellError(msg.format('.'.join(curpath))) + raise QMPShellError(f'Cannot set "{key}" multiple times') + parent[optpath[-1]] = value + + def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: + """ + Build a QMP input object from a user provided command-line in the + following format: + + < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] + """ + argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' + cmdargs = re.findall(argument_regex, cmdline) + qmpcmd: QMPMessage + + # Transactional CLI entry: + if cmdargs and cmdargs[0] == 'transaction(': + self._transmode = True + self._actions = [] + cmdargs.pop(0) + + # Transactional CLI exit: + if cmdargs and cmdargs[0] == ')' and self._transmode: + self._transmode = False + if len(cmdargs) > 1: + msg = 'Unexpected input after close of Transaction sub-shell' + raise QMPShellError(msg) + qmpcmd = { + 'execute': 'transaction', + 'arguments': {'actions': self._actions} + } + return qmpcmd + + # No args, or no args remaining + if not cmdargs: + return None + + if self._transmode: + # Parse and cache this Transactional Action + finalize = False + action = {'type': cmdargs[0], 'data': {}} + if cmdargs[-1] == ')': + cmdargs.pop(-1) + finalize = True + self._cli_expr(cmdargs[1:], action['data']) + self._actions.append(action) + return self._build_cmd(')') if finalize else None + + # Standard command: parse and return it to be executed. + qmpcmd = {'execute': cmdargs[0], 'arguments': {}} + self._cli_expr(cmdargs[1:], qmpcmd['arguments']) + return qmpcmd + + def _print(self, qmp_message: object) -> None: + jsobj = json.dumps(qmp_message, + indent=4 if self.pretty else None, + sort_keys=self.pretty) + print(str(jsobj)) + + def _execute_cmd(self, cmdline: str) -> bool: + try: + qmpcmd = self._build_cmd(cmdline) + except QMPShellError as err: + print( + f"Error while parsing command line: {err!s}\n" + "command format: " + "[arg-name1=arg1] ... [arg-nameN=argN", + file=sys.stderr + ) + return True + # For transaction mode, we may have just cached the action: + if qmpcmd is None: + return True + if self.verbose: + self._print(qmpcmd) + resp = self.cmd_obj(qmpcmd) + if resp is None: + print('Disconnected') + return False + self._print(resp) + return True + + def connect(self, negotiate: bool = True) -> None: + self._greeting = super().connect(negotiate) + self._completer_setup() + + def show_banner(self, + msg: str = 'Welcome to the QMP low-level shell!') -> None: + """ + Print to stdio a greeting, and the QEMU version if available. + """ + print(msg) + if not self._greeting: + print('Connected') + return + version = self._greeting['QMP']['version']['qemu'] + print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) + + @property + def prompt(self) -> str: + """ + Return the current shell prompt, including a trailing space. + """ + if self._transmode: + return 'TRANS> ' + return '(QEMU) ' + + def read_exec_command(self) -> bool: + """ + Read and execute a command. + + @return True if execution was ok, return False if disconnected. + """ + try: + cmdline = input(self.prompt) + except EOFError: + print() + return False + + if cmdline == '': + for event in self.get_events(): + print(event) + return True + + return self._execute_cmd(cmdline) + + def repl(self) -> Iterator[None]: + """ + Return an iterator that implements the REPL. + """ + self.show_banner() + while self.read_exec_command(): + yield + self.close() + + +class HMPShell(QMPShell): + """ + HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. + + :param address: Address of the QMP server. + :param pretty: Pretty-print QMP messages. + :param verbose: Echo outgoing QMP messages to console. + """ + def __init__(self, address: SocketAddrT, + pretty: bool = False, verbose: bool = False): + super().__init__(address, pretty, verbose) + self._cpu_index = 0 + + def _cmd_completion(self) -> None: + for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): + if cmd and cmd[0] != '[' and cmd[0] != '\t': + name = cmd.split()[0] # drop help text + if name == 'info': + continue + if name.find('|') != -1: + # Command in the form 'foobar|f' or 'f|foobar', take the + # full name + opt = name.split('|') + if len(opt[0]) == 1: + name = opt[1] + else: + name = opt[0] + self._completer.append(name) + self._completer.append('help ' + name) # help completion + + def _info_completion(self) -> None: + for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): + if cmd: + self._completer.append('info ' + cmd.split()[1]) + + def _other_completion(self) -> None: + # special cases + self._completer.append('help info') + + def _fill_completion(self) -> None: + self._cmd_completion() + self._info_completion() + self._other_completion() + + def _cmd_passthrough(self, cmdline: str, + cpu_index: int = 0) -> QMPMessage: + return self.cmd_obj({ + 'execute': 'human-monitor-command', + 'arguments': { + 'command-line': cmdline, + 'cpu-index': cpu_index + } + }) + + def _execute_cmd(self, cmdline: str) -> bool: + if cmdline.split()[0] == "cpu": + # trap the cpu command, it requires special setting + try: + idx = int(cmdline.split()[1]) + if 'return' not in self._cmd_passthrough('info version', idx): + print('bad CPU index') + return True + self._cpu_index = idx + except ValueError: + print('cpu command takes an integer argument') + return True + resp = self._cmd_passthrough(cmdline, self._cpu_index) + if resp is None: + print('Disconnected') + return False + assert 'return' in resp or 'error' in resp + if 'return' in resp: + # Success + if len(resp['return']) > 0: + print(resp['return'], end=' ') + else: + # Error + print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) + return True + + def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: + QMPShell.show_banner(self, msg) + + +def die(msg: str) -> NoReturn: + """Write an error to stderr, then exit with a return code of 1.""" + sys.stderr.write('ERROR: %s\n' % msg) + sys.exit(1) + + +def main() -> None: + """ + qmp-shell entry point: parse command line arguments and start the REPL. + """ + parser = argparse.ArgumentParser() + parser.add_argument('-H', '--hmp', action='store_true', + help='Use HMP interface') + parser.add_argument('-N', '--skip-negotiation', action='store_true', + help='Skip negotiate (for qemu-ga)') + parser.add_argument('-v', '--verbose', action='store_true', + help='Verbose (echo commands sent and received)') + parser.add_argument('-p', '--pretty', action='store_true', + help='Pretty-print JSON') + + default_server = os.environ.get('QMP_SOCKET') + parser.add_argument('qmp_server', action='store', + default=default_server, + help='< UNIX socket path | TCP address:port >') + + args = parser.parse_args() + if args.qmp_server is None: + parser.error("QMP socket or TCP address must be specified") + + shell_class = HMPShell if args.hmp else QMPShell + + try: + address = shell_class.parse_address(args.qmp_server) + except QMPBadPortError: + parser.error(f"Bad port number: {args.qmp_server}") + return # pycharm doesn't know error() is noreturn + + with shell_class(address, args.pretty, args.verbose) as qemu: + try: + qemu.connect(negotiate=not args.skip_negotiation) + except ConnectError as err: + if isinstance(err.exc, OSError): + die(f"Couldn't connect to {args.qmp_server}: {err!s}") + die(str(err)) + + for _ in qemu.repl(): + pass + + +if __name__ == '__main__': + main() diff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py deleted file mode 100644 index d11bf54..0000000 --- a/python/qemu/qmp/qmp_shell.py +++ /dev/null @@ -1,537 +0,0 @@ -# -# Copyright (C) 2009, 2010 Red Hat Inc. -# -# Authors: -# Luiz Capitulino -# -# This work is licensed under the terms of the GNU GPL, version 2. See -# the COPYING file in the top-level directory. -# - -""" -Low-level QEMU shell on top of QMP. - -usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server - -positional arguments: - qmp_server < UNIX socket path | TCP address:port > - -optional arguments: - -h, --help show this help message and exit - -H, --hmp Use HMP interface - -N, --skip-negotiation - Skip negotiate (for qemu-ga) - -v, --verbose Verbose (echo commands sent and received) - -p, --pretty Pretty-print JSON - - -Start QEMU with: - -# qemu [...] -qmp unix:./qmp-sock,server - -Run the shell: - -$ qmp-shell ./qmp-sock - -Commands have the following format: - - < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] - -For example: - -(QEMU) device_add driver=e1000 id=net1 -{'return': {}} -(QEMU) - -key=value pairs also support Python or JSON object literal subset notations, -without spaces. Dictionaries/objects {} are supported as are arrays []. - - example-command arg-name1={'key':'value','obj'={'prop':"value"}} - -Both JSON and Python formatting should work, including both styles of -string literal quotes. Both paradigms of literal values should work, -including null/true/false for JSON and None/True/False for Python. - - -Transactions have the following multi-line format: - - transaction( - action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] - ... - action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] - ) - -One line transactions are also supported: - - transaction( action-name1 ... ) - -For example: - - (QEMU) transaction( - TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 - TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 - TRANS> ) - {"return": {}} - (QEMU) - -Use the -v and -p options to activate the verbose and pretty-print options, -which will echo back the properly formatted JSON-compliant QMP that is being -sent to QEMU, which is useful for debugging and documentation generation. -""" - -import argparse -import ast -import json -import logging -import os -import re -import readline -import sys -from typing import ( - Iterator, - List, - NoReturn, - Optional, - Sequence, -) - -from qemu.aqmp import ConnectError, QMPError, SocketAddrT -from qemu.aqmp.legacy import ( - QEMUMonitorProtocol, - QMPBadPortError, - QMPMessage, - QMPObject, -) - - -LOG = logging.getLogger(__name__) - - -class QMPCompleter: - """ - QMPCompleter provides a readline library tab-complete behavior. - """ - # NB: Python 3.9+ will probably allow us to subclass list[str] directly, - # but pylint as of today does not know that List[str] is simply 'list'. - def __init__(self) -> None: - self._matches: List[str] = [] - - def append(self, value: str) -> None: - """Append a new valid completion to the list of possibilities.""" - return self._matches.append(value) - - def complete(self, text: str, state: int) -> Optional[str]: - """readline.set_completer() callback implementation.""" - for cmd in self._matches: - if cmd.startswith(text): - if state == 0: - return cmd - state -= 1 - return None - - -class QMPShellError(QMPError): - """ - QMP Shell Base error class. - """ - - -class FuzzyJSON(ast.NodeTransformer): - """ - This extension of ast.NodeTransformer filters literal "true/false/null" - values in a Python AST and replaces them by proper "True/False/None" values - that Python can properly evaluate. - """ - - @classmethod - def visit_Name(cls, # pylint: disable=invalid-name - node: ast.Name) -> ast.AST: - """ - Transform Name nodes with certain values into Constant (keyword) nodes. - """ - if node.id == 'true': - return ast.Constant(value=True) - if node.id == 'false': - return ast.Constant(value=False) - if node.id == 'null': - return ast.Constant(value=None) - return node - - -class QMPShell(QEMUMonitorProtocol): - """ - QMPShell provides a basic readline-based QMP shell. - - :param address: Address of the QMP server. - :param pretty: Pretty-print QMP messages. - :param verbose: Echo outgoing QMP messages to console. - """ - def __init__(self, address: SocketAddrT, - pretty: bool = False, verbose: bool = False): - super().__init__(address) - self._greeting: Optional[QMPMessage] = None - self._completer = QMPCompleter() - self._transmode = False - self._actions: List[QMPMessage] = [] - self._histfile = os.path.join(os.path.expanduser('~'), - '.qmp-shell_history') - self.pretty = pretty - self.verbose = verbose - - def close(self) -> None: - # Hook into context manager of parent to save shell history. - self._save_history() - super().close() - - def _fill_completion(self) -> None: - cmds = self.cmd('query-commands') - if 'error' in cmds: - return - for cmd in cmds['return']: - self._completer.append(cmd['name']) - - def _completer_setup(self) -> None: - self._completer = QMPCompleter() - self._fill_completion() - readline.set_history_length(1024) - readline.set_completer(self._completer.complete) - readline.parse_and_bind("tab: complete") - # NB: default delimiters conflict with some command names - # (eg. query-), clearing everything as it doesn't seem to matter - readline.set_completer_delims('') - try: - readline.read_history_file(self._histfile) - except FileNotFoundError: - pass - except IOError as err: - msg = f"Failed to read history '{self._histfile}': {err!s}" - LOG.warning(msg) - - def _save_history(self) -> None: - try: - readline.write_history_file(self._histfile) - except IOError as err: - msg = f"Failed to save history file '{self._histfile}': {err!s}" - LOG.warning(msg) - - @classmethod - def _parse_value(cls, val: str) -> object: - try: - return int(val) - except ValueError: - pass - - if val.lower() == 'true': - return True - if val.lower() == 'false': - return False - if val.startswith(('{', '[')): - # Try first as pure JSON: - try: - return json.loads(val) - except ValueError: - pass - # Try once again as FuzzyJSON: - try: - tree = ast.parse(val, mode='eval') - transformed = FuzzyJSON().visit(tree) - return ast.literal_eval(transformed) - except (SyntaxError, ValueError): - pass - return val - - def _cli_expr(self, - tokens: Sequence[str], - parent: QMPObject) -> None: - for arg in tokens: - (key, sep, val) = arg.partition('=') - if sep != '=': - raise QMPShellError( - f"Expected a key=value pair, got '{arg!s}'" - ) - - value = self._parse_value(val) - optpath = key.split('.') - curpath = [] - for path in optpath[:-1]: - curpath.append(path) - obj = parent.get(path, {}) - if not isinstance(obj, dict): - msg = 'Cannot use "{:s}" as both leaf and non-leaf key' - raise QMPShellError(msg.format('.'.join(curpath))) - parent[path] = obj - parent = obj - if optpath[-1] in parent: - if isinstance(parent[optpath[-1]], dict): - msg = 'Cannot use "{:s}" as both leaf and non-leaf key' - raise QMPShellError(msg.format('.'.join(curpath))) - raise QMPShellError(f'Cannot set "{key}" multiple times') - parent[optpath[-1]] = value - - def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: - """ - Build a QMP input object from a user provided command-line in the - following format: - - < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] - """ - argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' - cmdargs = re.findall(argument_regex, cmdline) - qmpcmd: QMPMessage - - # Transactional CLI entry: - if cmdargs and cmdargs[0] == 'transaction(': - self._transmode = True - self._actions = [] - cmdargs.pop(0) - - # Transactional CLI exit: - if cmdargs and cmdargs[0] == ')' and self._transmode: - self._transmode = False - if len(cmdargs) > 1: - msg = 'Unexpected input after close of Transaction sub-shell' - raise QMPShellError(msg) - qmpcmd = { - 'execute': 'transaction', - 'arguments': {'actions': self._actions} - } - return qmpcmd - - # No args, or no args remaining - if not cmdargs: - return None - - if self._transmode: - # Parse and cache this Transactional Action - finalize = False - action = {'type': cmdargs[0], 'data': {}} - if cmdargs[-1] == ')': - cmdargs.pop(-1) - finalize = True - self._cli_expr(cmdargs[1:], action['data']) - self._actions.append(action) - return self._build_cmd(')') if finalize else None - - # Standard command: parse and return it to be executed. - qmpcmd = {'execute': cmdargs[0], 'arguments': {}} - self._cli_expr(cmdargs[1:], qmpcmd['arguments']) - return qmpcmd - - def _print(self, qmp_message: object) -> None: - jsobj = json.dumps(qmp_message, - indent=4 if self.pretty else None, - sort_keys=self.pretty) - print(str(jsobj)) - - def _execute_cmd(self, cmdline: str) -> bool: - try: - qmpcmd = self._build_cmd(cmdline) - except QMPShellError as err: - print( - f"Error while parsing command line: {err!s}\n" - "command format: " - "[arg-name1=arg1] ... [arg-nameN=argN", - file=sys.stderr - ) - return True - # For transaction mode, we may have just cached the action: - if qmpcmd is None: - return True - if self.verbose: - self._print(qmpcmd) - resp = self.cmd_obj(qmpcmd) - if resp is None: - print('Disconnected') - return False - self._print(resp) - return True - - def connect(self, negotiate: bool = True) -> None: - self._greeting = super().connect(negotiate) - self._completer_setup() - - def show_banner(self, - msg: str = 'Welcome to the QMP low-level shell!') -> None: - """ - Print to stdio a greeting, and the QEMU version if available. - """ - print(msg) - if not self._greeting: - print('Connected') - return - version = self._greeting['QMP']['version']['qemu'] - print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) - - @property - def prompt(self) -> str: - """ - Return the current shell prompt, including a trailing space. - """ - if self._transmode: - return 'TRANS> ' - return '(QEMU) ' - - def read_exec_command(self) -> bool: - """ - Read and execute a command. - - @return True if execution was ok, return False if disconnected. - """ - try: - cmdline = input(self.prompt) - except EOFError: - print() - return False - - if cmdline == '': - for event in self.get_events(): - print(event) - return True - - return self._execute_cmd(cmdline) - - def repl(self) -> Iterator[None]: - """ - Return an iterator that implements the REPL. - """ - self.show_banner() - while self.read_exec_command(): - yield - self.close() - - -class HMPShell(QMPShell): - """ - HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. - - :param address: Address of the QMP server. - :param pretty: Pretty-print QMP messages. - :param verbose: Echo outgoing QMP messages to console. - """ - def __init__(self, address: SocketAddrT, - pretty: bool = False, verbose: bool = False): - super().__init__(address, pretty, verbose) - self._cpu_index = 0 - - def _cmd_completion(self) -> None: - for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): - if cmd and cmd[0] != '[' and cmd[0] != '\t': - name = cmd.split()[0] # drop help text - if name == 'info': - continue - if name.find('|') != -1: - # Command in the form 'foobar|f' or 'f|foobar', take the - # full name - opt = name.split('|') - if len(opt[0]) == 1: - name = opt[1] - else: - name = opt[0] - self._completer.append(name) - self._completer.append('help ' + name) # help completion - - def _info_completion(self) -> None: - for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): - if cmd: - self._completer.append('info ' + cmd.split()[1]) - - def _other_completion(self) -> None: - # special cases - self._completer.append('help info') - - def _fill_completion(self) -> None: - self._cmd_completion() - self._info_completion() - self._other_completion() - - def _cmd_passthrough(self, cmdline: str, - cpu_index: int = 0) -> QMPMessage: - return self.cmd_obj({ - 'execute': 'human-monitor-command', - 'arguments': { - 'command-line': cmdline, - 'cpu-index': cpu_index - } - }) - - def _execute_cmd(self, cmdline: str) -> bool: - if cmdline.split()[0] == "cpu": - # trap the cpu command, it requires special setting - try: - idx = int(cmdline.split()[1]) - if 'return' not in self._cmd_passthrough('info version', idx): - print('bad CPU index') - return True - self._cpu_index = idx - except ValueError: - print('cpu command takes an integer argument') - return True - resp = self._cmd_passthrough(cmdline, self._cpu_index) - if resp is None: - print('Disconnected') - return False - assert 'return' in resp or 'error' in resp - if 'return' in resp: - # Success - if len(resp['return']) > 0: - print(resp['return'], end=' ') - else: - # Error - print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) - return True - - def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: - QMPShell.show_banner(self, msg) - - -def die(msg: str) -> NoReturn: - """Write an error to stderr, then exit with a return code of 1.""" - sys.stderr.write('ERROR: %s\n' % msg) - sys.exit(1) - - -def main() -> None: - """ - qmp-shell entry point: parse command line arguments and start the REPL. - """ - parser = argparse.ArgumentParser() - parser.add_argument('-H', '--hmp', action='store_true', - help='Use HMP interface') - parser.add_argument('-N', '--skip-negotiation', action='store_true', - help='Skip negotiate (for qemu-ga)') - parser.add_argument('-v', '--verbose', action='store_true', - help='Verbose (echo commands sent and received)') - parser.add_argument('-p', '--pretty', action='store_true', - help='Pretty-print JSON') - - default_server = os.environ.get('QMP_SOCKET') - parser.add_argument('qmp_server', action='store', - default=default_server, - help='< UNIX socket path | TCP address:port >') - - args = parser.parse_args() - if args.qmp_server is None: - parser.error("QMP socket or TCP address must be specified") - - shell_class = HMPShell if args.hmp else QMPShell - - try: - address = shell_class.parse_address(args.qmp_server) - except QMPBadPortError: - parser.error(f"Bad port number: {args.qmp_server}") - return # pycharm doesn't know error() is noreturn - - with shell_class(address, args.pretty, args.verbose) as qemu: - try: - qemu.connect(negotiate=not args.skip_negotiation) - except ConnectError as err: - if isinstance(err.exc, OSError): - die(f"Couldn't connect to {args.qmp_server}: {err!s}") - die(str(err)) - - for _ in qemu.repl(): - pass - - -if __name__ == '__main__': - main() -- cgit v1.1