""" QEMU Monitor Protocol Python class """ # Copyright (C) 2009, 2010 Red Hat Inc. # # Authors: # Luiz Capitulino # # This work is licensed under the terms of the GNU GPL, version 2. See # the COPYING file in the top-level directory. import json import errno import socket import logging from typing import ( Optional, TextIO, Type, ) from types import TracebackType class QMPError(Exception): """ QMP base exception """ class QMPConnectError(QMPError): """ QMP connection exception """ class QMPCapabilitiesError(QMPError): """ QMP negotiate capabilities exception """ class QMPTimeoutError(QMPError): """ QMP timeout exception """ class QEMUMonitorProtocol: """ Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then allow to handle commands and events. """ #: Logger object for debugging messages logger = logging.getLogger('QMP') def __init__(self, address, server=False, nickname=None): """ Create a QEMUMonitorProtocol class. @param address: QEMU address, can be either a unix socket path (string) or a tuple in the form ( address, port ) for a TCP connection @param server: server mode listens on the socket (bool) @raise OSError on socket connection errors @note No connection is established, this is done by the connect() or accept() methods """ self.__events = [] self.__address = address self.__sock = self.__get_sock() self.__sockfile: Optional[TextIO] = None self._nickname = nickname if self._nickname: self.logger = logging.getLogger('QMP').getChild(self._nickname) if server: self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.__sock.bind(self.__address) self.__sock.listen(1) def __get_sock(self): if isinstance(self.__address, tuple): family = socket.AF_INET else: family = socket.AF_UNIX return socket.socket(family, socket.SOCK_STREAM) def __negotiate_capabilities(self): greeting = self.__json_read() if greeting is None or "QMP" not in greeting: raise QMPConnectError # Greeting seems ok, negotiate capabilities resp = self.cmd('qmp_capabilities') if resp and "return" in resp: return greeting raise QMPCapabilitiesError def __json_read(self, only_event=False): while True: data = self.__sockfile.readline() if not data: return None resp = json.loads(data) if 'event' in resp: self.logger.debug("<<< %s", resp) self.__events.append(resp) if not only_event: continue return resp def __get_events(self, wait=False): """ Check for new events in the stream and cache them in __events. @param wait (bool): block until an event is available. @param wait (float): If wait is a float, treat it as a timeout value. @raise QMPTimeoutError: If a timeout float is provided and the timeout period elapses. @raise QMPConnectError: If wait is True but no events could be retrieved or if some other error occurred. """ # Check for new events regardless and pull them into the cache: self.__sock.setblocking(False) try: self.__json_read() except OSError as err: if err.errno == errno.EAGAIN: # No data available pass self.__sock.setblocking(True) # Wait for new events, if needed. # if wait is 0.0, this means "no wait" and is also implicitly false. if not self.__events and wait: if isinstance(wait, float): self.__sock.settimeout(wait) try: ret = self.__json_read(only_event=True) except socket.timeout: raise QMPTimeoutError("Timeout waiting for event") except: raise QMPConnectError("Error while reading from socket") if ret is None: raise QMPConnectError("Error while reading from socket") self.__sock.settimeout(None) def __enter__(self): # Implement context manager enter function. return self def __exit__(self, # pylint: disable=duplicate-code # see https://github.com/PyCQA/pylint/issues/3619 exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]) -> None: # Implement context manager exit function. self.close() def connect(self, negotiate=True): """ Connect to the QMP Monitor and perform capabilities negotiation. @return QMP greeting dict, or None if negotiate is false @raise OSError on socket connection errors @raise QMPConnectError if the greeting is not received @raise QMPCapabilitiesError if fails to negotiate capabilities """ self.__sock.connect(self.__address) self.__sockfile = self.__sock.makefile(mode='r') if negotiate: return self.__negotiate_capabilities() return None def accept(self, timeout=15.0): """ Await connection from QMP Monitor and perform capabilities negotiation. @param timeout: timeout in seconds (nonnegative float number, or None). The value passed will set the behavior of the underneath QMP socket as described in [1]. Default value is set to 15.0. @return QMP greeting dict @raise OSError on socket connection errors @raise QMPConnectError if the greeting is not received @raise QMPCapabilitiesError if fails to negotiate capabilities [1] https://docs.python.org/3/library/socket.html#socket.socket.settimeout """ self.__sock.settimeout(timeout) self.__sock, _ = self.__sock.accept() self.__sockfile = self.__sock.makefile(mode='r') return self.__negotiate_capabilities() def cmd_obj(self, qmp_cmd): """ Send a QMP command to the QMP Monitor. @param qmp_cmd: QMP command to be sent as a Python dict @return QMP response as a Python dict or None if the connection has been closed """ self.logger.debug(">>> %s", qmp_cmd) try: self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8')) except OSError as err: if err.errno == errno.EPIPE: return None raise err resp = self.__json_read() self.logger.debug("<<< %s", resp) return resp def cmd(self, name, args=None, cmd_id=None): """ Build a QMP command and send it to the QMP Monitor. @param name: command name (string) @param args: command arguments (dict) @param cmd_id: command id (dict, list, string or int) """ qmp_cmd = {'execute': name} if args: qmp_cmd['arguments'] = args if cmd_id: qmp_cmd['id'] = cmd_id return self.cmd_obj(qmp_cmd) def command(self, cmd, **kwds): """ Build and send a QMP command to the monitor, report errors if any """ ret = self.cmd(cmd, kwds) if "error" in ret: raise Exception(ret['error']['desc']) return ret['return'] def pull_event(self, wait=False): """ Pulls a single event. @param wait (bool): block until an event is available. @param wait (float): If wait is a float, treat it as a timeout value. @raise QMPTimeoutError: If a timeout float is provided and the timeout period elapses. @raise QMPConnectError: If wait is True but no events could be retrieved or if some other error occurred. @return The first available QMP event, or None. """ self.__get_events(wait) if self.__events: return self.__events.pop(0) return None def get_events(self, wait=False): """ Get a list of available QMP events. @param wait (bool): block until an event is available. @param wait (float): If wait is a float, treat it as a timeout value. @raise QMPTimeoutError: If a timeout float is provided and the timeout period elapses. @raise QMPConnectError: If wait is True but no events could be retrieved or if some other error occurred. @return The list of available QMP events. """ self.__get_events(wait) return self.__events def clear_events(self): """ Clear current list of pending events. """ self.__events = [] def close(self): """ Close the socket and socket file. """ if self.__sock: self.__sock.close() if self.__sockfile: self.__sockfile.close() def settimeout(self, timeout): """ Set the socket timeout. @param timeout (float): timeout in seconds, or None. @note This is a wrap around socket.settimeout """ self.__sock.settimeout(timeout) def get_sock_fd(self): """ Get the socket file descriptor. @return The file descriptor number. """ return self.__sock.fileno() def is_scm_available(self): """ Check if the socket allows for SCM_RIGHTS. @return True if SCM_RIGHTS is available, otherwise False. """ return self.__sock.family == socket.AF_UNIX