diff options
author | John Snow <jsnow@redhat.com> | 2022-01-10 18:28:54 -0500 |
---|---|---|
committer | John Snow <jsnow@redhat.com> | 2022-01-21 16:01:31 -0500 |
commit | 0347c4c4cfed47e54d9dc275ceb28d35b250749f (patch) | |
tree | 530f7362f47b1c371fa488595d09dc8198e26cbe /python/qemu/utils | |
parent | f3efd12930f34b9724e15d8fd2ff56a97b67219d (diff) | |
download | qemu-0347c4c4cfed47e54d9dc275ceb28d35b250749f.zip qemu-0347c4c4cfed47e54d9dc275ceb28d35b250749f.tar.gz qemu-0347c4c4cfed47e54d9dc275ceb28d35b250749f.tar.bz2 |
python: move qmp utilities to python/qemu/utils
In order to upload a QMP package to PyPI, I want to remove any scripts
that I am not 100% confident I want to support upstream, beyond our
castle walls.
Move most of our QMP utilities into the utils package so we can split
them out from the PyPI upload.
Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Reviewed-by: Beraldo Leal <bleal@redhat.com>
Diffstat (limited to 'python/qemu/utils')
-rw-r--r-- | python/qemu/utils/qemu_ga_client.py | 323 | ||||
-rw-r--r-- | python/qemu/utils/qom.py | 273 | ||||
-rw-r--r-- | python/qemu/utils/qom_common.py | 175 | ||||
-rw-r--r-- | python/qemu/utils/qom_fuse.py | 207 |
4 files changed, 978 insertions, 0 deletions
diff --git a/python/qemu/utils/qemu_ga_client.py b/python/qemu/utils/qemu_ga_client.py new file mode 100644 index 0000000..15ed430 --- /dev/null +++ b/python/qemu/utils/qemu_ga_client.py @@ -0,0 +1,323 @@ +""" +QEMU Guest Agent Client + +Usage: + +Start QEMU with: + +# qemu [...] -chardev socket,path=/tmp/qga.sock,server=on,wait=off,id=qga0 \ + -device virtio-serial \ + -device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0 + +Run the script: + +$ qemu-ga-client --address=/tmp/qga.sock <command> [args...] + +or + +$ export QGA_CLIENT_ADDRESS=/tmp/qga.sock +$ qemu-ga-client <command> [args...] + +For example: + +$ qemu-ga-client cat /etc/resolv.conf +# Generated by NetworkManager +nameserver 10.0.2.3 +$ qemu-ga-client fsfreeze status +thawed +$ qemu-ga-client fsfreeze freeze +2 filesystems frozen + +See also: https://wiki.qemu.org/Features/QAPI/GuestAgent +""" + +# Copyright (C) 2012 Ryota Ozaki <ozaki.ryota@gmail.com> +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. + +import argparse +import asyncio +import base64 +import os +import random +import sys +from typing import ( + Any, + Callable, + Dict, + Optional, + Sequence, +) + +from qemu.aqmp import ConnectError, SocketAddrT +from qemu.aqmp.legacy import QEMUMonitorProtocol + + +# This script has not seen many patches or careful attention in quite +# some time. If you would like to improve it, please review the design +# carefully and add docstrings at that point in time. Until then: + +# pylint: disable=missing-docstring + + +class QemuGuestAgent(QEMUMonitorProtocol): + def __getattr__(self, name: str) -> Callable[..., Any]: + def wrapper(**kwds: object) -> object: + return self.command('guest-' + name.replace('_', '-'), **kwds) + return wrapper + + +class QemuGuestAgentClient: + def __init__(self, address: SocketAddrT): + self.qga = QemuGuestAgent(address) + self.qga.connect(negotiate=False) + + def sync(self, timeout: Optional[float] = 3) -> None: + # Avoid being blocked forever + if not self.ping(timeout): + raise EnvironmentError('Agent seems not alive') + uid = random.randint(0, (1 << 32) - 1) + while True: + ret = self.qga.sync(id=uid) + if isinstance(ret, int) and int(ret) == uid: + break + + def __file_read_all(self, handle: int) -> bytes: + eof = False + data = b'' + while not eof: + ret = self.qga.file_read(handle=handle, count=1024) + _data = base64.b64decode(ret['buf-b64']) + data += _data + eof = ret['eof'] + return data + + def read(self, path: str) -> bytes: + handle = self.qga.file_open(path=path) + try: + data = self.__file_read_all(handle) + finally: + self.qga.file_close(handle=handle) + return data + + def info(self) -> str: + info = self.qga.info() + + msgs = [] + msgs.append('version: ' + info['version']) + msgs.append('supported_commands:') + enabled = [c['name'] for c in info['supported_commands'] + if c['enabled']] + msgs.append('\tenabled: ' + ', '.join(enabled)) + disabled = [c['name'] for c in info['supported_commands'] + if not c['enabled']] + msgs.append('\tdisabled: ' + ', '.join(disabled)) + + return '\n'.join(msgs) + + @classmethod + def __gen_ipv4_netmask(cls, prefixlen: int) -> str: + mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2) + return '.'.join([str(mask >> 24), + str((mask >> 16) & 0xff), + str((mask >> 8) & 0xff), + str(mask & 0xff)]) + + def ifconfig(self) -> str: + nifs = self.qga.network_get_interfaces() + + msgs = [] + for nif in nifs: + msgs.append(nif['name'] + ':') + if 'ip-addresses' in nif: + for ipaddr in nif['ip-addresses']: + if ipaddr['ip-address-type'] == 'ipv4': + addr = ipaddr['ip-address'] + mask = self.__gen_ipv4_netmask(int(ipaddr['prefix'])) + msgs.append(f"\tinet {addr} netmask {mask}") + elif ipaddr['ip-address-type'] == 'ipv6': + addr = ipaddr['ip-address'] + prefix = ipaddr['prefix'] + msgs.append(f"\tinet6 {addr} prefixlen {prefix}") + if nif['hardware-address'] != '00:00:00:00:00:00': + msgs.append("\tether " + nif['hardware-address']) + + return '\n'.join(msgs) + + def ping(self, timeout: Optional[float]) -> bool: + self.qga.settimeout(timeout) + try: + self.qga.ping() + except asyncio.TimeoutError: + return False + return True + + def fsfreeze(self, cmd: str) -> object: + if cmd not in ['status', 'freeze', 'thaw']: + raise Exception('Invalid command: ' + cmd) + # Can be int (freeze, thaw) or GuestFsfreezeStatus (status) + return getattr(self.qga, 'fsfreeze' + '_' + cmd)() + + def fstrim(self, minimum: int) -> Dict[str, object]: + # returns GuestFilesystemTrimResponse + ret = getattr(self.qga, 'fstrim')(minimum=minimum) + assert isinstance(ret, dict) + return ret + + def suspend(self, mode: str) -> None: + if mode not in ['disk', 'ram', 'hybrid']: + raise Exception('Invalid mode: ' + mode) + + try: + getattr(self.qga, 'suspend' + '_' + mode)() + # On error exception will raise + except asyncio.TimeoutError: + # On success command will timed out + return + + def shutdown(self, mode: str = 'powerdown') -> None: + if mode not in ['powerdown', 'halt', 'reboot']: + raise Exception('Invalid mode: ' + mode) + + try: + self.qga.shutdown(mode=mode) + except asyncio.TimeoutError: + pass + + +def _cmd_cat(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + if len(args) != 1: + print('Invalid argument') + print('Usage: cat <file>') + sys.exit(1) + print(client.read(args[0])) + + +def _cmd_fsfreeze(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + usage = 'Usage: fsfreeze status|freeze|thaw' + if len(args) != 1: + print('Invalid argument') + print(usage) + sys.exit(1) + if args[0] not in ['status', 'freeze', 'thaw']: + print('Invalid command: ' + args[0]) + print(usage) + sys.exit(1) + cmd = args[0] + ret = client.fsfreeze(cmd) + if cmd == 'status': + print(ret) + return + + assert isinstance(ret, int) + verb = 'frozen' if cmd == 'freeze' else 'thawed' + print(f"{ret:d} filesystems {verb}") + + +def _cmd_fstrim(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + if len(args) == 0: + minimum = 0 + else: + minimum = int(args[0]) + print(client.fstrim(minimum)) + + +def _cmd_ifconfig(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + print(client.ifconfig()) + + +def _cmd_info(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + print(client.info()) + + +def _cmd_ping(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + timeout = 3.0 if len(args) == 0 else float(args[0]) + alive = client.ping(timeout) + if not alive: + print("Not responded in %s sec" % args[0]) + sys.exit(1) + + +def _cmd_suspend(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + usage = 'Usage: suspend disk|ram|hybrid' + if len(args) != 1: + print('Less argument') + print(usage) + sys.exit(1) + if args[0] not in ['disk', 'ram', 'hybrid']: + print('Invalid command: ' + args[0]) + print(usage) + sys.exit(1) + client.suspend(args[0]) + + +def _cmd_shutdown(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + client.shutdown() + + +_cmd_powerdown = _cmd_shutdown + + +def _cmd_halt(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + client.shutdown('halt') + + +def _cmd_reboot(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + client.shutdown('reboot') + + +commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m] + + +def send_command(address: str, cmd: str, args: Sequence[str]) -> None: + if not os.path.exists(address): + print(f"'{address}' not found. (Is QEMU running?)") + sys.exit(1) + + if cmd not in commands: + print('Invalid command: ' + cmd) + print('Available commands: ' + ', '.join(commands)) + sys.exit(1) + + try: + client = QemuGuestAgentClient(address) + except ConnectError as err: + print(err) + if isinstance(err.exc, ConnectionError): + print('(Is QEMU running?)') + sys.exit(1) + + if cmd == 'fsfreeze' and args[0] == 'freeze': + client.sync(60) + elif cmd != 'ping': + client.sync() + + globals()['_cmd_' + cmd](client, args) + + +def main() -> None: + address = os.environ.get('QGA_CLIENT_ADDRESS') + + parser = argparse.ArgumentParser() + parser.add_argument('--address', action='store', + default=address, + help='Specify a ip:port pair or a unix socket path') + parser.add_argument('command', choices=commands) + parser.add_argument('args', nargs='*') + + args = parser.parse_args() + if args.address is None: + parser.error('address is not specified') + sys.exit(1) + + send_command(args.address, args.command, args.args) + + +if __name__ == '__main__': + main() diff --git a/python/qemu/utils/qom.py b/python/qemu/utils/qom.py new file mode 100644 index 0000000..bb5d1a7 --- /dev/null +++ b/python/qemu/utils/qom.py @@ -0,0 +1,273 @@ +""" +QEMU Object Model testing tools. + +usage: qom [-h] {set,get,list,tree,fuse} ... + +Query and manipulate QOM data + +optional arguments: + -h, --help show this help message and exit + +QOM commands: + {set,get,list,tree,fuse} + set Set a QOM property value + get Get a QOM property value + list List QOM properties at a given path + tree Show QOM tree from a given path + fuse Mount a QOM tree as a FUSE filesystem +""" +## +# Copyright John Snow 2020, for Red Hat, Inc. +# Copyright IBM, Corp. 2011 +# +# Authors: +# John Snow <jsnow@redhat.com> +# Anthony Liguori <aliguori@amazon.com> +# +# This work is licensed under the terms of the GNU GPL, version 2 or later. +# See the COPYING file in the top-level directory. +# +# Based on ./scripts/qmp/qom-[set|get|tree|list] +## + +import argparse + +from qemu.aqmp import ExecuteError + +from .qom_common import QOMCommand + + +try: + from .qom_fuse import QOMFuse +except ModuleNotFoundError as _err: + if _err.name != 'fuse': + raise +else: + assert issubclass(QOMFuse, QOMCommand) + + +class QOMSet(QOMCommand): + """ + QOM Command - Set a property to a given value. + + usage: qom-set [-h] [--socket SOCKET] <path>.<property> <value> + + Set a QOM property value + + positional arguments: + <path>.<property> QOM path and property, separated by a period '.' + <value> new QOM property value + + optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. + """ + name = 'set' + help = 'Set a QOM property value' + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + cls.add_path_prop_arg(parser) + parser.add_argument( + 'value', + metavar='<value>', + action='store', + help='new QOM property value' + ) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + self.path, self.prop = args.path_prop.rsplit('.', 1) + self.value = args.value + + def run(self) -> int: + rsp = self.qmp.command( + 'qom-set', + path=self.path, + property=self.prop, + value=self.value + ) + print(rsp) + return 0 + + +class QOMGet(QOMCommand): + """ + QOM Command - Get a property's current value. + + usage: qom-get [-h] [--socket SOCKET] <path>.<property> + + Get a QOM property value + + positional arguments: + <path>.<property> QOM path and property, separated by a period '.' + + optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. + """ + name = 'get' + help = 'Get a QOM property value' + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + cls.add_path_prop_arg(parser) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + try: + tmp = args.path_prop.rsplit('.', 1) + except ValueError as err: + raise ValueError('Invalid format for <path>.<property>') from err + self.path = tmp[0] + self.prop = tmp[1] + + def run(self) -> int: + rsp = self.qmp.command( + 'qom-get', + path=self.path, + property=self.prop + ) + if isinstance(rsp, dict): + for key, value in rsp.items(): + print(f"{key}: {value}") + else: + print(rsp) + return 0 + + +class QOMList(QOMCommand): + """ + QOM Command - List the properties at a given path. + + usage: qom-list [-h] [--socket SOCKET] <path> + + List QOM properties at a given path + + positional arguments: + <path> QOM path + + optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. + """ + name = 'list' + help = 'List QOM properties at a given path' + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + parser.add_argument( + 'path', + metavar='<path>', + action='store', + help='QOM path', + ) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + self.path = args.path + + def run(self) -> int: + rsp = self.qom_list(self.path) + for item in rsp: + if item.child: + print(f"{item.name}/") + elif item.link: + print(f"@{item.name}/") + else: + print(item.name) + return 0 + + +class QOMTree(QOMCommand): + """ + QOM Command - Show the full tree below a given path. + + usage: qom-tree [-h] [--socket SOCKET] [<path>] + + Show QOM tree from a given path + + positional arguments: + <path> QOM path + + optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. + """ + name = 'tree' + help = 'Show QOM tree from a given path' + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + parser.add_argument( + 'path', + metavar='<path>', + action='store', + help='QOM path', + nargs='?', + default='/' + ) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + self.path = args.path + + def _list_node(self, path: str) -> None: + print(path) + items = self.qom_list(path) + for item in items: + if item.child: + continue + try: + rsp = self.qmp.command('qom-get', path=path, + property=item.name) + print(f" {item.name}: {rsp} ({item.type})") + except ExecuteError as err: + print(f" {item.name}: <EXCEPTION: {err!s}> ({item.type})") + print('') + for item in items: + if not item.child: + continue + if path == '/': + path = '' + self._list_node(f"{path}/{item.name}") + + def run(self) -> int: + self._list_node(self.path) + return 0 + + +def main() -> int: + """QOM script main entry point.""" + parser = argparse.ArgumentParser( + description='Query and manipulate QOM data' + ) + subparsers = parser.add_subparsers( + title='QOM commands', + dest='command' + ) + + for command in QOMCommand.__subclasses__(): + command.register(subparsers) + + args = parser.parse_args() + + if args.command is None: + parser.error('Command not specified.') + return 1 + + cmd_class = args.cmd_class + assert isinstance(cmd_class, type(QOMCommand)) + return cmd_class.command_runner(args) diff --git a/python/qemu/utils/qom_common.py b/python/qemu/utils/qom_common.py new file mode 100644 index 0000000..e034a6f --- /dev/null +++ b/python/qemu/utils/qom_common.py @@ -0,0 +1,175 @@ +""" +QOM Command abstractions. +""" +## +# Copyright John Snow 2020, for Red Hat, Inc. +# Copyright IBM, Corp. 2011 +# +# Authors: +# John Snow <jsnow@redhat.com> +# Anthony Liguori <aliguori@amazon.com> +# +# This work is licensed under the terms of the GNU GPL, version 2 or later. +# See the COPYING file in the top-level directory. +# +# Based on ./scripts/qmp/qom-[set|get|tree|list] +## + +import argparse +import os +import sys +from typing import ( + Any, + Dict, + List, + Optional, + Type, + TypeVar, +) + +from qemu.aqmp import QMPError +from qemu.aqmp.legacy import QEMUMonitorProtocol + + +class ObjectPropertyInfo: + """ + Represents the return type from e.g. qom-list. + """ + def __init__(self, name: str, type_: str, + description: Optional[str] = None, + default_value: Optional[object] = None): + self.name = name + self.type = type_ + self.description = description + self.default_value = default_value + + @classmethod + def make(cls, value: Dict[str, Any]) -> 'ObjectPropertyInfo': + """ + Build an ObjectPropertyInfo from a Dict with an unknown shape. + """ + assert value.keys() >= {'name', 'type'} + assert value.keys() <= {'name', 'type', 'description', 'default-value'} + return cls(value['name'], value['type'], + value.get('description'), + value.get('default-value')) + + @property + def child(self) -> bool: + """Is this property a child property?""" + return self.type.startswith('child<') + + @property + def link(self) -> bool: + """Is this property a link property?""" + return self.type.startswith('link<') + + +CommandT = TypeVar('CommandT', bound='QOMCommand') + + +class QOMCommand: + """ + Represents a QOM sub-command. + + :param args: Parsed arguments, as returned from parser.parse_args. + """ + name: str + help: str + + def __init__(self, args: argparse.Namespace): + if args.socket is None: + raise QMPError("No QMP socket path or address given") + self.qmp = QEMUMonitorProtocol( + QEMUMonitorProtocol.parse_address(args.socket) + ) + self.qmp.connect() + + @classmethod + def register(cls, subparsers: Any) -> None: + """ + Register this command with the argument parser. + + :param subparsers: argparse subparsers object, from "add_subparsers". + """ + subparser = subparsers.add_parser(cls.name, help=cls.help, + description=cls.help) + cls.configure_parser(subparser) + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + """ + Configure a parser with this command's arguments. + + :param parser: argparse parser or subparser object. + """ + default_path = os.environ.get('QMP_SOCKET') + parser.add_argument( + '--socket', '-s', + dest='socket', + action='store', + help='QMP socket path or address (addr:port).' + ' May also be set via QMP_SOCKET environment variable.', + default=default_path + ) + parser.set_defaults(cmd_class=cls) + + @classmethod + def add_path_prop_arg(cls, parser: argparse.ArgumentParser) -> None: + """ + Add the <path>.<proptery> positional argument to this command. + + :param parser: The parser to add the argument to. + """ + parser.add_argument( + 'path_prop', + metavar='<path>.<property>', + action='store', + help="QOM path and property, separated by a period '.'" + ) + + def run(self) -> int: + """ + Run this command. + + :return: 0 on success, 1 otherwise. + """ + raise NotImplementedError + + def qom_list(self, path: str) -> List[ObjectPropertyInfo]: + """ + :return: a strongly typed list from the 'qom-list' command. + """ + rsp = self.qmp.command('qom-list', path=path) + # qom-list returns List[ObjectPropertyInfo] + assert isinstance(rsp, list) + return [ObjectPropertyInfo.make(x) for x in rsp] + + @classmethod + def command_runner( + cls: Type[CommandT], + args: argparse.Namespace + ) -> int: + """ + Run a fully-parsed subcommand, with error-handling for the CLI. + + :return: The return code from `run()`. + """ + try: + cmd = cls(args) + return cmd.run() + except QMPError as err: + print(f"{type(err).__name__}: {err!s}", file=sys.stderr) + return -1 + + @classmethod + def entry_point(cls) -> int: + """ + Build this command's parser, parse arguments, and run the command. + + :return: `run`'s return code. + """ + parser = argparse.ArgumentParser(description=cls.help) + cls.configure_parser(parser) + args = parser.parse_args() + return cls.command_runner(args) diff --git a/python/qemu/utils/qom_fuse.py b/python/qemu/utils/qom_fuse.py new file mode 100644 index 0000000..653a76b --- /dev/null +++ b/python/qemu/utils/qom_fuse.py @@ -0,0 +1,207 @@ +""" +QEMU Object Model FUSE filesystem tool + +This script offers a simple FUSE filesystem within which the QOM tree +may be browsed, queried and edited using traditional shell tooling. + +This script requires the 'fusepy' python package. + + +usage: qom-fuse [-h] [--socket SOCKET] <mount> + +Mount a QOM tree as a FUSE filesystem + +positional arguments: + <mount> Mount point + +optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. +""" +## +# Copyright IBM, Corp. 2012 +# Copyright (C) 2020 Red Hat, Inc. +# +# Authors: +# Anthony Liguori <aliguori@us.ibm.com> +# Markus Armbruster <armbru@redhat.com> +# +# This work is licensed under the terms of the GNU GPL, version 2 or later. +# See the COPYING file in the top-level directory. +## + +import argparse +from errno import ENOENT, EPERM +import stat +import sys +from typing import ( + IO, + Dict, + Iterator, + Mapping, + Optional, + Union, +) + +import fuse +from fuse import FUSE, FuseOSError, Operations + +from qemu.aqmp import ExecuteError + +from .qom_common import QOMCommand + + +fuse.fuse_python_api = (0, 2) + + +class QOMFuse(QOMCommand, Operations): + """ + QOMFuse implements both fuse.Operations and QOMCommand. + + Operations implements the FS, and QOMCommand implements the CLI command. + """ + name = 'fuse' + help = 'Mount a QOM tree as a FUSE filesystem' + fuse: FUSE + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + parser.add_argument( + 'mount', + metavar='<mount>', + action='store', + help="Mount point", + ) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + self.mount = args.mount + self.ino_map: Dict[str, int] = {} + self.ino_count = 1 + + def run(self) -> int: + print(f"Mounting QOMFS to '{self.mount}'", file=sys.stderr) + self.fuse = FUSE(self, self.mount, foreground=True) + return 0 + + def get_ino(self, path: str) -> int: + """Get an inode number for a given QOM path.""" + if path in self.ino_map: + return self.ino_map[path] + self.ino_map[path] = self.ino_count + self.ino_count += 1 + return self.ino_map[path] + + def is_object(self, path: str) -> bool: + """Is the given QOM path an object?""" + try: + self.qom_list(path) + return True + except ExecuteError: + return False + + def is_property(self, path: str) -> bool: + """Is the given QOM path a property?""" + path, prop = path.rsplit('/', 1) + if path == '': + path = '/' + try: + for item in self.qom_list(path): + if item.name == prop: + return True + return False + except ExecuteError: + return False + + def is_link(self, path: str) -> bool: + """Is the given QOM path a link?""" + path, prop = path.rsplit('/', 1) + if path == '': + path = '/' + try: + for item in self.qom_list(path): + if item.name == prop and item.link: + return True + return False + except ExecuteError: + return False + + def read(self, path: str, size: int, offset: int, fh: IO[bytes]) -> bytes: + if not self.is_property(path): + raise FuseOSError(ENOENT) + + path, prop = path.rsplit('/', 1) + if path == '': + path = '/' + try: + data = str(self.qmp.command('qom-get', path=path, property=prop)) + data += '\n' # make values shell friendly + except ExecuteError as err: + raise FuseOSError(EPERM) from err + + if offset > len(data): + return b'' + + return bytes(data[offset:][:size], encoding='utf-8') + + def readlink(self, path: str) -> Union[bool, str]: + if not self.is_link(path): + return False + path, prop = path.rsplit('/', 1) + prefix = '/'.join(['..'] * (len(path.split('/')) - 1)) + return prefix + str(self.qmp.command('qom-get', path=path, + property=prop)) + + def getattr(self, path: str, + fh: Optional[IO[bytes]] = None) -> Mapping[str, object]: + if self.is_link(path): + value = { + 'st_mode': 0o755 | stat.S_IFLNK, + 'st_ino': self.get_ino(path), + 'st_dev': 0, + 'st_nlink': 2, + 'st_uid': 1000, + 'st_gid': 1000, + 'st_size': 4096, + 'st_atime': 0, + 'st_mtime': 0, + 'st_ctime': 0 + } + elif self.is_object(path): + value = { + 'st_mode': 0o755 | stat.S_IFDIR, + 'st_ino': self.get_ino(path), + 'st_dev': 0, + 'st_nlink': 2, + 'st_uid': 1000, + 'st_gid': 1000, + 'st_size': 4096, + 'st_atime': 0, + 'st_mtime': 0, + 'st_ctime': 0 + } + elif self.is_property(path): + value = { + 'st_mode': 0o644 | stat.S_IFREG, + 'st_ino': self.get_ino(path), + 'st_dev': 0, + 'st_nlink': 1, + 'st_uid': 1000, + 'st_gid': 1000, + 'st_size': 4096, + 'st_atime': 0, + 'st_mtime': 0, + 'st_ctime': 0 + } + else: + raise FuseOSError(ENOENT) + return value + + def readdir(self, path: str, fh: IO[bytes]) -> Iterator[str]: + yield '.' + yield '..' + for item in self.qom_list(path): + yield item.name |