diff options
-rw-r--r-- | MAINTAINERS | 9 | ||||
-rw-r--r-- | python/mypy.ini | 4 | ||||
-rw-r--r-- | python/qemu/.isort.cfg | 7 | ||||
-rw-r--r-- | python/qemu/accel.py | 9 | ||||
-rw-r--r-- | python/qemu/console_socket.py | 54 | ||||
-rw-r--r-- | python/qemu/machine.py | 308 | ||||
-rw-r--r-- | python/qemu/qmp.py | 89 | ||||
-rw-r--r-- | python/qemu/qtest.py | 55 | ||||
-rw-r--r-- | tests/qemu-iotests/iotests.py | 2 |
9 files changed, 326 insertions, 211 deletions
diff --git a/MAINTAINERS b/MAINTAINERS index a7f0acf..6a197bd 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -2373,11 +2373,18 @@ S: Maintained F: include/sysemu/cryptodev*.h F: backends/cryptodev*.c +Python library +M: John Snow <jsnow@redhat.com> +M: Cleber Rosa <crosa@redhat.com> +R: Eduardo Habkost <ehabkost@redhat.com> +S: Maintained +F: python/ +T: git https://gitlab.com/jsnow/qemu.git python + Python scripts M: Eduardo Habkost <ehabkost@redhat.com> M: Cleber Rosa <crosa@redhat.com> S: Odd fixes -F: python/qemu/*py F: scripts/*.py F: tests/*.py diff --git a/python/mypy.ini b/python/mypy.ini new file mode 100644 index 0000000..1a581c5 --- /dev/null +++ b/python/mypy.ini @@ -0,0 +1,4 @@ +[mypy] +strict = True +python_version = 3.6 +warn_unused_configs = True diff --git a/python/qemu/.isort.cfg b/python/qemu/.isort.cfg new file mode 100644 index 0000000..6d0fd6c --- /dev/null +++ b/python/qemu/.isort.cfg @@ -0,0 +1,7 @@ +[settings] +force_grid_wrap=4 +force_sort_within_sections=True +include_trailing_comma=True +line_length=72 +lines_after_imports=2 +multi_line_output=3
\ No newline at end of file diff --git a/python/qemu/accel.py b/python/qemu/accel.py index 7fabe62..297933d 100644 --- a/python/qemu/accel.py +++ b/python/qemu/accel.py @@ -17,6 +17,8 @@ accelerators. import logging import os import subprocess +from typing import List, Optional + LOG = logging.getLogger(__name__) @@ -29,7 +31,7 @@ ADDITIONAL_ARCHES = { } -def list_accel(qemu_bin): +def list_accel(qemu_bin: str) -> List[str]: """ List accelerators enabled in the QEMU binary. @@ -49,7 +51,8 @@ def list_accel(qemu_bin): return [acc.strip() for acc in out.splitlines()[1:]] -def kvm_available(target_arch=None, qemu_bin=None): +def kvm_available(target_arch: Optional[str] = None, + qemu_bin: Optional[str] = None) -> bool: """ Check if KVM is available using the following heuristic: - Kernel module is present in the host; @@ -72,7 +75,7 @@ def kvm_available(target_arch=None, qemu_bin=None): return True -def tcg_available(qemu_bin): +def tcg_available(qemu_bin: str) -> bool: """ Check if TCG is available. diff --git a/python/qemu/console_socket.py b/python/qemu/console_socket.py index 70869fb..f060d79 100644 --- a/python/qemu/console_socket.py +++ b/python/qemu/console_socket.py @@ -13,10 +13,11 @@ which can drain a socket and optionally dump the bytes to file. # the COPYING file in the top-level directory. # +from collections import deque import socket import threading -from collections import deque import time +from typing import Deque, Optional class ConsoleSocket(socket.socket): @@ -29,22 +30,22 @@ class ConsoleSocket(socket.socket): Optionally a file path can be passed in and we will also dump the characters to this file for debugging purposes. """ - def __init__(self, address, file=None, drain=False): - self._recv_timeout_sec = 300 + def __init__(self, address: str, file: Optional[str] = None, + drain: bool = False): + self._recv_timeout_sec = 300.0 self._sleep_time = 0.5 - self._buffer = deque() + self._buffer: Deque[int] = deque() socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM) self.connect(address) self._logfile = None if file: - self._logfile = open(file, "w") + self._logfile = open(file, "bw") self._open = True + self._drain_thread = None if drain: self._drain_thread = self._thread_start() - else: - self._drain_thread = None - def _drain_fn(self): + def _drain_fn(self) -> None: """Drains the socket and runs while the socket is open.""" while self._open: try: @@ -55,7 +56,7 @@ class ConsoleSocket(socket.socket): # self._open is set to False. time.sleep(self._sleep_time) - def _thread_start(self): + def _thread_start(self) -> threading.Thread: """Kick off a thread to drain the socket.""" # Configure socket to not block and timeout. # This allows our drain thread to not block @@ -67,7 +68,7 @@ class ConsoleSocket(socket.socket): drain_thread.start() return drain_thread - def close(self): + def close(self) -> None: """Close the base object and wait for the thread to terminate""" if self._open: self._open = False @@ -79,51 +80,42 @@ class ConsoleSocket(socket.socket): self._logfile.close() self._logfile = None - def _drain_socket(self): + def _drain_socket(self) -> None: """process arriving characters into in memory _buffer""" data = socket.socket.recv(self, 1) - # latin1 is needed since there are some chars - # we are receiving that cannot be encoded to utf-8 - # such as 0xe2, 0x80, 0xA6. - string = data.decode("latin1") if self._logfile: - self._logfile.write("{}".format(string)) + self._logfile.write(data) self._logfile.flush() - for c in string: - self._buffer.extend(c) + self._buffer.extend(data) - def recv(self, bufsize=1): + def recv(self, bufsize: int = 1, flags: int = 0) -> bytes: """Return chars from in memory buffer. Maintains the same API as socket.socket.recv. """ if self._drain_thread is None: # Not buffering the socket, pass thru to socket. - return socket.socket.recv(self, bufsize) + return socket.socket.recv(self, bufsize, flags) + assert not flags, "Cannot pass flags to recv() in drained mode" start_time = time.time() while len(self._buffer) < bufsize: time.sleep(self._sleep_time) elapsed_sec = time.time() - start_time if elapsed_sec > self._recv_timeout_sec: raise socket.timeout - chars = ''.join([self._buffer.popleft() for i in range(bufsize)]) - # We choose to use latin1 to remain consistent with - # handle_read() and give back the same data as the user would - # receive if they were reading directly from the - # socket w/o our intervention. - return chars.encode("latin1") + return bytes((self._buffer.popleft() for i in range(bufsize))) - def setblocking(self, value): + def setblocking(self, value: bool) -> None: """When not draining we pass thru to the socket, since when draining we control socket blocking. """ if self._drain_thread is None: socket.socket.setblocking(self, value) - def settimeout(self, seconds): + def settimeout(self, value: Optional[float]) -> None: """When not draining we pass thru to the socket, since when draining we control the timeout. """ - if seconds is not None: - self._recv_timeout_sec = seconds + if value is not None: + self._recv_timeout_sec = value if self._drain_thread is None: - socket.socket.settimeout(self, seconds) + socket.socket.settimeout(self, value) diff --git a/python/qemu/machine.py b/python/qemu/machine.py index 82f3731..6420f01 100644 --- a/python/qemu/machine.py +++ b/python/qemu/machine.py @@ -18,17 +18,29 @@ which provides facilities for managing the lifetime of a QEMU VM. # import errno +from itertools import chain import logging import os -import subprocess import shutil import signal +import socket +import subprocess import tempfile -from typing import Optional, Type from types import TracebackType -from . import console_socket +from typing import ( + Any, + BinaryIO, + Dict, + List, + Optional, + Sequence, + Tuple, + Type, +) + +from . import console_socket, qmp +from .qmp import QMPMessage, QMPReturnValue, SocketAddrT -from . import qmp LOG = logging.getLogger(__name__) @@ -57,7 +69,7 @@ class AbnormalShutdown(QEMUMachineError): class QEMUMachine: """ - A QEMU VM + A QEMU VM. Use this object as a context manager to ensure the QEMU process terminates:: @@ -67,10 +79,17 @@ class QEMUMachine: # vm is guaranteed to be shut down here """ - def __init__(self, binary, args=None, wrapper=None, name=None, - test_dir="/var/tmp", monitor_address=None, - socket_scm_helper=None, sock_dir=None, - drain_console=False, console_log=None): + def __init__(self, + binary: str, + args: Sequence[str] = (), + wrapper: Sequence[str] = (), + name: Optional[str] = None, + test_dir: str = "/var/tmp", + monitor_address: Optional[SocketAddrT] = None, + socket_scm_helper: Optional[str] = None, + sock_dir: Optional[str] = None, + drain_console: bool = False, + console_log: Optional[str] = None): ''' Initialize a QEMUMachine @@ -82,45 +101,30 @@ class QEMUMachine: @param monitor_address: address for QMP monitor @param socket_scm_helper: helper program, required for send_fd_scm() @param sock_dir: where to create socket (overrides test_dir for sock) - @param console_log: (optional) path to console log file @param drain_console: (optional) True to drain console socket to buffer + @param console_log: (optional) path to console log file @note: Qemu process is not started until launch() is used. ''' - if args is None: - args = [] - if wrapper is None: - wrapper = [] - if name is None: - name = "qemu-%d" % os.getpid() - if sock_dir is None: - sock_dir = test_dir - self._name = name - self._monitor_address = monitor_address - self._vm_monitor = None - self._qemu_log_path = None - self._qemu_log_file = None - self._popen = None + # Direct user configuration + self._binary = binary - self._args = list(args) # Force copy args in case we modify them + self._args = list(args) self._wrapper = wrapper - self._events = [] - self._iolog = None - self._socket_scm_helper = socket_scm_helper - self._qmp_set = True # Enable QMP monitor by default. - self._qmp = None - self._qemu_full_args = None + + self._name = name or "qemu-%d" % os.getpid() self._test_dir = test_dir - self._temp_dir = None - self._sock_dir = sock_dir - self._launched = False - self._machine = None - self._console_index = 0 - self._console_set = False - self._console_device_type = None - self._console_address = None - self._console_socket = None - self._remove_files = [] - self._user_killed = False + self._sock_dir = sock_dir or self._test_dir + self._socket_scm_helper = socket_scm_helper + + if monitor_address is not None: + self._monitor_address = monitor_address + self._remove_monitor_sockfile = False + else: + self._monitor_address = os.path.join( + self._sock_dir, f"{self._name}-monitor.sock" + ) + self._remove_monitor_sockfile = True + self._console_log_path = console_log if self._console_log_path: # In order to log the console, buffering needs to be enabled. @@ -128,7 +132,29 @@ class QEMUMachine: else: self._drain_console = drain_console - def __enter__(self): + # Runstate + self._qemu_log_path: Optional[str] = None + self._qemu_log_file: Optional[BinaryIO] = None + self._popen: Optional['subprocess.Popen[bytes]'] = None + self._events: List[QMPMessage] = [] + self._iolog: Optional[str] = None + self._qmp_set = True # Enable QMP monitor by default. + self._qmp_connection: Optional[qmp.QEMUMonitorProtocol] = None + self._qemu_full_args: Tuple[str, ...] = () + self._temp_dir: Optional[str] = None + self._launched = False + self._machine: Optional[str] = None + self._console_index = 0 + self._console_set = False + self._console_device_type: Optional[str] = None + self._console_address = os.path.join( + self._sock_dir, f"{self._name}-console.sock" + ) + self._console_socket: Optional[socket.socket] = None + self._remove_files: List[str] = [] + self._user_killed = False + + def __enter__(self) -> 'QEMUMachine': return self def __exit__(self, @@ -137,14 +163,15 @@ class QEMUMachine: exc_tb: Optional[TracebackType]) -> None: self.shutdown() - def add_monitor_null(self): + def add_monitor_null(self) -> None: """ This can be used to add an unused monitor instance. """ self._args.append('-monitor') self._args.append('null') - def add_fd(self, fd, fdset, opaque, opts=''): + def add_fd(self, fd: int, fdset: int, + opaque: str, opts: str = '') -> 'QEMUMachine': """ Pass a file descriptor to the VM """ @@ -163,7 +190,8 @@ class QEMUMachine: self._args.append(','.join(options)) return self - def send_fd_scm(self, fd=None, file_path=None): + def send_fd_scm(self, fd: Optional[int] = None, + file_path: Optional[str] = None) -> int: """ Send an fd or file_path to socket_scm_helper. @@ -207,7 +235,7 @@ class QEMUMachine: return proc.returncode @staticmethod - def _remove_if_exists(path): + def _remove_if_exists(path: str) -> None: """ Remove file object at path if it exists """ @@ -218,46 +246,52 @@ class QEMUMachine: return raise - def is_running(self): + def is_running(self) -> bool: """Returns true if the VM is running.""" return self._popen is not None and self._popen.poll() is None - def exitcode(self): + @property + def _subp(self) -> 'subprocess.Popen[bytes]': + if self._popen is None: + raise QEMUMachineError('Subprocess pipe not present') + return self._popen + + def exitcode(self) -> Optional[int]: """Returns the exit code if possible, or None.""" if self._popen is None: return None return self._popen.poll() - def get_pid(self): + def get_pid(self) -> Optional[int]: """Returns the PID of the running process, or None.""" if not self.is_running(): return None - return self._popen.pid + return self._subp.pid - def _load_io_log(self): + def _load_io_log(self) -> None: if self._qemu_log_path is not None: with open(self._qemu_log_path, "r") as iolog: self._iolog = iolog.read() - def _base_args(self): + @property + def _base_args(self) -> List[str]: args = ['-display', 'none', '-vga', 'none'] + if self._qmp_set: if isinstance(self._monitor_address, tuple): - moncdev = "socket,id=mon,host=%s,port=%s" % ( - self._monitor_address[0], - self._monitor_address[1]) + moncdev = "socket,id=mon,host={},port={}".format( + *self._monitor_address + ) else: - moncdev = 'socket,id=mon,path=%s' % self._vm_monitor + moncdev = f"socket,id=mon,path={self._monitor_address}" args.extend(['-chardev', moncdev, '-mon', 'chardev=mon,mode=control']) + if self._machine is not None: args.extend(['-machine', self._machine]) for _ in range(self._console_index): args.extend(['-serial', 'null']) if self._console_set: - self._console_address = os.path.join(self._sock_dir, - self._name + "-console.sock") - self._remove_files.append(self._console_address) chardev = ('socket,id=console,path=%s,server,nowait' % self._console_address) args.extend(['-chardev', chardev]) @@ -268,26 +302,29 @@ class QEMUMachine: args.extend(['-device', device]) return args - def _pre_launch(self): + def _pre_launch(self) -> None: self._temp_dir = tempfile.mkdtemp(dir=self._test_dir) self._qemu_log_path = os.path.join(self._temp_dir, self._name + ".log") self._qemu_log_file = open(self._qemu_log_path, 'wb') + if self._console_set: + self._remove_files.append(self._console_address) + if self._qmp_set: - if self._monitor_address is not None: - self._vm_monitor = self._monitor_address - else: - self._vm_monitor = os.path.join(self._sock_dir, - self._name + "-monitor.sock") - self._remove_files.append(self._vm_monitor) - self._qmp = qmp.QEMUMonitorProtocol(self._vm_monitor, server=True, - nickname=self._name) - - def _post_launch(self): - if self._qmp: + if self._remove_monitor_sockfile: + assert isinstance(self._monitor_address, str) + self._remove_files.append(self._monitor_address) + self._qmp_connection = qmp.QEMUMonitorProtocol( + self._monitor_address, + server=True, + nickname=self._name + ) + + def _post_launch(self) -> None: + if self._qmp_connection: self._qmp.accept() - def _post_shutdown(self): + def _post_shutdown(self) -> None: """ Called to cleanup the VM instance after the process has exited. May also be called after a failed launch. @@ -295,9 +332,9 @@ class QEMUMachine: # Comprehensive reset for the failed launch case: self._early_cleanup() - if self._qmp: + if self._qmp_connection: self._qmp.close() - self._qmp = None + self._qmp_connection = None self._load_io_log() @@ -327,7 +364,7 @@ class QEMUMachine: self._user_killed = False self._launched = False - def launch(self): + def launch(self) -> None: """ Launch the VM and make sure we cleanup and expose the command line/output in case of exception @@ -337,7 +374,7 @@ class QEMUMachine: raise QEMUMachineError('VM already launched') self._iolog = None - self._qemu_full_args = None + self._qemu_full_args = () try: self._launch() self._launched = True @@ -351,14 +388,18 @@ class QEMUMachine: LOG.debug('Output: %r', self._iolog) raise - def _launch(self): + def _launch(self) -> None: """ Launch the VM and establish a QMP connection """ devnull = open(os.path.devnull, 'rb') self._pre_launch() - self._qemu_full_args = (self._wrapper + [self._binary] + - self._base_args() + self._args) + self._qemu_full_args = tuple( + chain(self._wrapper, + [self._binary], + self._base_args, + self._args) + ) LOG.debug('VM launch command: %r', ' '.join(self._qemu_full_args)) self._popen = subprocess.Popen(self._qemu_full_args, stdin=devnull, @@ -390,8 +431,8 @@ class QEMUMachine: waiting for the QEMU process to terminate. """ self._early_cleanup() - self._popen.kill() - self._popen.wait(timeout=60) + self._subp.kill() + self._subp.wait(timeout=60) def _soft_shutdown(self, timeout: Optional[int], has_quit: bool = False) -> None: @@ -409,13 +450,13 @@ class QEMUMachine: """ self._early_cleanup() - if self._qmp is not None: + if self._qmp_connection: if not has_quit: # Might raise ConnectionReset self._qmp.cmd('quit') # May raise subprocess.TimeoutExpired - self._popen.wait(timeout=timeout) + self._subp.wait(timeout=timeout) def _do_shutdown(self, timeout: Optional[int], has_quit: bool = False) -> None: @@ -466,7 +507,7 @@ class QEMUMachine: finally: self._post_shutdown() - def kill(self): + def kill(self) -> None: """ Terminate the VM forcefully, wait for it to exit, and perform cleanup. """ @@ -481,7 +522,7 @@ class QEMUMachine: """ self.shutdown(has_quit=True, timeout=timeout) - def set_qmp_monitor(self, enabled=True): + def set_qmp_monitor(self, enabled: bool = True) -> None: """ Set the QMP monitor. @@ -490,39 +531,45 @@ class QEMUMachine: line. Default is True. @note: call this function before launch(). """ - if enabled: - self._qmp_set = True - else: - self._qmp_set = False - self._qmp = None + self._qmp_set = enabled - def qmp(self, cmd, conv_keys=True, **args): - """ - Invoke a QMP command and return the response dict - """ + @property + def _qmp(self) -> qmp.QEMUMonitorProtocol: + if self._qmp_connection is None: + raise QEMUMachineError("Attempt to access QMP with no connection") + return self._qmp_connection + + @classmethod + def _qmp_args(cls, _conv_keys: bool = True, **args: Any) -> Dict[str, Any]: qmp_args = dict() for key, value in args.items(): - if conv_keys: + if _conv_keys: qmp_args[key.replace('_', '-')] = value else: qmp_args[key] = value + return qmp_args + def qmp(self, cmd: str, + conv_keys: bool = True, + **args: Any) -> QMPMessage: + """ + Invoke a QMP command and return the response dict + """ + qmp_args = self._qmp_args(conv_keys, **args) return self._qmp.cmd(cmd, args=qmp_args) - def command(self, cmd, conv_keys=True, **args): + def command(self, cmd: str, + conv_keys: bool = True, + **args: Any) -> QMPReturnValue: """ Invoke a QMP command. On success return the response dict. On failure raise an exception. """ - reply = self.qmp(cmd, conv_keys, **args) - if reply is None: - raise qmp.QMPError("Monitor is closed") - if "error" in reply: - raise qmp.QMPResponseError(reply) - return reply["return"] + qmp_args = self._qmp_args(conv_keys, **args) + return self._qmp.command(cmd, **qmp_args) - def get_qmp_event(self, wait=False): + def get_qmp_event(self, wait: bool = False) -> Optional[QMPMessage]: """ Poll for one queued QMP events and return it """ @@ -530,7 +577,7 @@ class QEMUMachine: return self._events.pop(0) return self._qmp.pull_event(wait=wait) - def get_qmp_events(self, wait=False): + def get_qmp_events(self, wait: bool = False) -> List[QMPMessage]: """ Poll for queued QMP events and return a list of dicts """ @@ -541,7 +588,7 @@ class QEMUMachine: return events @staticmethod - def event_match(event, match=None): + def event_match(event: Any, match: Optional[Any]) -> bool: """ Check if an event matches optional match criteria. @@ -571,9 +618,11 @@ class QEMUMachine: return True except TypeError: # either match or event wasn't iterable (not a dict) - return match == event + return bool(match == event) - def event_wait(self, name, timeout=60.0, match=None): + def event_wait(self, name: str, + timeout: float = 60.0, + match: Optional[QMPMessage] = None) -> Optional[QMPMessage]: """ event_wait waits for and returns a named event from QMP with a timeout. @@ -583,22 +632,33 @@ class QEMUMachine: """ return self.events_wait([(name, match)], timeout) - def events_wait(self, events, timeout=60.0): + def events_wait(self, + events: Sequence[Tuple[str, Any]], + timeout: float = 60.0) -> Optional[QMPMessage]: """ - events_wait waits for and returns a named event - from QMP with a timeout. + events_wait waits for and returns a single named event from QMP. + In the case of multiple qualifying events, this function returns the + first one. - events: a sequence of (name, match_criteria) tuples. - The match criteria are optional and may be None. - See event_match for details. - timeout: QEMUMonitorProtocol.pull_event timeout parameter. + :param events: A sequence of (name, match_criteria) tuples. + The match criteria are optional and may be None. + See event_match for details. + :param timeout: Optional timeout, in seconds. + See QEMUMonitorProtocol.pull_event. + + :raise QMPTimeoutError: If timeout was non-zero and no matching events + were found. + :return: A QMP event matching the filter criteria. + If timeout was 0 and no event matched, None. """ - def _match(event): + def _match(event: QMPMessage) -> bool: for name, match in events: if event['event'] == name and self.event_match(event, match): return True return False + event: Optional[QMPMessage] + # Search cached events for event in self._events: if _match(event): @@ -608,26 +668,30 @@ class QEMUMachine: # Poll for new events while True: event = self._qmp.pull_event(wait=timeout) + if event is None: + # NB: None is only returned when timeout is false-ish. + # Timeouts raise QMPTimeoutError instead! + break if _match(event): return event self._events.append(event) return None - def get_log(self): + def get_log(self) -> Optional[str]: """ After self.shutdown or failed qemu execution, this returns the output of the qemu process. """ return self._iolog - def add_args(self, *args): + def add_args(self, *args: str) -> None: """ Adds to the list of extra arguments to be given to the QEMU binary """ self._args.extend(args) - def set_machine(self, machine_type): + def set_machine(self, machine_type: str) -> None: """ Sets the machine type @@ -636,7 +700,9 @@ class QEMUMachine: """ self._machine = machine_type - def set_console(self, device_type=None, console_index=0): + def set_console(self, + device_type: Optional[str] = None, + console_index: int = 0) -> None: """ Sets the device type for a console device @@ -667,7 +733,7 @@ class QEMUMachine: self._console_index = console_index @property - def console_socket(self): + def console_socket(self) -> socket.socket: """ Returns a socket connected to the console """ diff --git a/python/qemu/qmp.py b/python/qemu/qmp.py index 7935dab..2cd4d43 100644 --- a/python/qemu/qmp.py +++ b/python/qemu/qmp.py @@ -7,21 +7,22 @@ # This work is licensed under the terms of the GNU GPL, version 2. See # the COPYING file in the top-level directory. -import json import errno -import socket +import json import logging +import socket +from types import TracebackType from typing import ( Any, - cast, Dict, + List, Optional, TextIO, - Type, Tuple, + Type, Union, + cast, ) -from types import TracebackType # QMPMessage is a QMP Message of any kind. @@ -90,7 +91,9 @@ class QEMUMonitorProtocol: #: Logger object for debugging messages logger = logging.getLogger('QMP') - def __init__(self, address, server=False, nickname=None): + def __init__(self, address: SocketAddrT, + server: bool = False, + nickname: Optional[str] = None): """ Create a QEMUMonitorProtocol class. @@ -102,7 +105,7 @@ class QEMUMonitorProtocol: @note No connection is established, this is done by the connect() or accept() methods """ - self.__events = [] + self.__events: List[QMPMessage] = [] self.__address = address self.__sock = self.__get_sock() self.__sockfile: Optional[TextIO] = None @@ -114,14 +117,14 @@ class QEMUMonitorProtocol: self.__sock.bind(self.__address) self.__sock.listen(1) - def __get_sock(self): + def __get_sock(self) -> socket.socket: if isinstance(self.__address, tuple): family = socket.AF_INET else: family = socket.AF_UNIX return socket.socket(family, socket.SOCK_STREAM) - def __negotiate_capabilities(self): + def __negotiate_capabilities(self) -> QMPMessage: greeting = self.__json_read() if greeting is None or "QMP" not in greeting: raise QMPConnectError @@ -131,7 +134,7 @@ class QEMUMonitorProtocol: return greeting raise QMPCapabilitiesError - def __json_read(self, only_event=False): + def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]: assert self.__sockfile is not None while True: data = self.__sockfile.readline() @@ -148,7 +151,7 @@ class QEMUMonitorProtocol: continue return resp - def __get_events(self, wait=False): + def __get_events(self, wait: Union[bool, float] = False) -> None: """ Check for new events in the stream and cache them in __events. @@ -161,15 +164,19 @@ class QEMUMonitorProtocol: retrieved or if some other error occurred. """ + # Current timeout and blocking status + current_timeout = self.__sock.gettimeout() + # Check for new events regardless and pull them into the cache: - self.__sock.setblocking(False) + self.__sock.settimeout(0) # i.e. setblocking(False) try: self.__json_read() except OSError as err: - if err.errno == errno.EAGAIN: - # No data available - pass - self.__sock.setblocking(True) + # EAGAIN: No data available; not critical + if err.errno != errno.EAGAIN: + raise + finally: + self.__sock.settimeout(current_timeout) # Wait for new events, if needed. # if wait is 0.0, this means "no wait" and is also implicitly false. @@ -178,15 +185,18 @@ class QEMUMonitorProtocol: self.__sock.settimeout(wait) try: ret = self.__json_read(only_event=True) - except socket.timeout: - raise QMPTimeoutError("Timeout waiting for event") - except: - raise QMPConnectError("Error while reading from socket") + except socket.timeout as err: + raise QMPTimeoutError("Timeout waiting for event") from err + except Exception as err: + msg = "Error while reading from socket" + raise QMPConnectError(msg) from err + finally: + self.__sock.settimeout(current_timeout) + if ret is None: raise QMPConnectError("Error while reading from socket") - self.__sock.settimeout(None) - def __enter__(self): + def __enter__(self) -> 'QEMUMonitorProtocol': # Implement context manager enter function. return self @@ -199,7 +209,7 @@ class QEMUMonitorProtocol: # Implement context manager exit function. self.close() - def connect(self, negotiate=True): + def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: """ Connect to the QMP Monitor and perform capabilities negotiation. @@ -214,7 +224,7 @@ class QEMUMonitorProtocol: return self.__negotiate_capabilities() return None - def accept(self, timeout=15.0): + def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: """ Await connection from QMP Monitor and perform capabilities negotiation. @@ -250,7 +260,9 @@ class QEMUMonitorProtocol: self.logger.debug("<<< %s", resp) return resp - def cmd(self, name, args=None, cmd_id=None): + def cmd(self, name: str, + args: Optional[Dict[str, Any]] = None, + cmd_id: Optional[Any] = None) -> QMPMessage: """ Build a QMP command and send it to the QMP Monitor. @@ -258,14 +270,14 @@ class QEMUMonitorProtocol: @param args: command arguments (dict) @param cmd_id: command id (dict, list, string or int) """ - qmp_cmd = {'execute': name} + qmp_cmd: QMPMessage = {'execute': name} if args: qmp_cmd['arguments'] = args if cmd_id: qmp_cmd['id'] = cmd_id return self.cmd_obj(qmp_cmd) - def command(self, cmd, **kwds): + def command(self, cmd: str, **kwds: Any) -> QMPReturnValue: """ Build and send a QMP command to the monitor, report errors if any """ @@ -278,7 +290,8 @@ class QEMUMonitorProtocol: ) return cast(QMPReturnValue, ret['return']) - def pull_event(self, wait=False): + def pull_event(self, + wait: Union[bool, float] = False) -> Optional[QMPMessage]: """ Pulls a single event. @@ -298,7 +311,7 @@ class QEMUMonitorProtocol: return self.__events.pop(0) return None - def get_events(self, wait=False): + def get_events(self, wait: bool = False) -> List[QMPMessage]: """ Get a list of available QMP events. @@ -315,13 +328,13 @@ class QEMUMonitorProtocol: self.__get_events(wait) return self.__events - def clear_events(self): + def clear_events(self) -> None: """ Clear current list of pending events. """ self.__events = [] - def close(self): + def close(self) -> None: """ Close the socket and socket file. """ @@ -330,16 +343,22 @@ class QEMUMonitorProtocol: if self.__sockfile: self.__sockfile.close() - def settimeout(self, timeout): + def settimeout(self, timeout: Optional[float]) -> None: """ Set the socket timeout. - @param timeout (float): timeout in seconds, or None. + @param timeout (float): timeout in seconds (non-zero), or None. @note This is a wrap around socket.settimeout + + @raise ValueError: if timeout was set to 0. """ + if timeout == 0: + msg = "timeout cannot be 0; this engages non-blocking mode." + msg += " Use 'None' instead to disable timeouts." + raise ValueError(msg) self.__sock.settimeout(timeout) - def get_sock_fd(self): + def get_sock_fd(self) -> int: """ Get the socket file descriptor. @@ -347,7 +366,7 @@ class QEMUMonitorProtocol: """ return self.__sock.fileno() - def is_scm_available(self): + def is_scm_available(self) -> bool: """ Check if the socket allows for SCM_RIGHTS. diff --git a/python/qemu/qtest.py b/python/qemu/qtest.py index 888c8bd..39a0cf6 100644 --- a/python/qemu/qtest.py +++ b/python/qemu/qtest.py @@ -17,11 +17,17 @@ subclass of QEMUMachine, respectively. # Based on qmp.py. # -import socket import os -from typing import Optional, TextIO +import socket +from typing import ( + List, + Optional, + Sequence, + TextIO, +) from .machine import QEMUMachine +from .qmp import SocketAddrT class QEMUQtestProtocol: @@ -38,7 +44,8 @@ class QEMUQtestProtocol: No conection is estabalished by __init__(), this is done by the connect() or accept() methods. """ - def __init__(self, address, server=False): + def __init__(self, address: SocketAddrT, + server: bool = False): self._address = address self._sock = self._get_sock() self._sockfile: Optional[TextIO] = None @@ -46,14 +53,14 @@ class QEMUQtestProtocol: self._sock.bind(self._address) self._sock.listen(1) - def _get_sock(self): + def _get_sock(self) -> socket.socket: if isinstance(self._address, tuple): family = socket.AF_INET else: family = socket.AF_UNIX return socket.socket(family, socket.SOCK_STREAM) - def connect(self): + def connect(self) -> None: """ Connect to the qtest socket. @@ -62,7 +69,7 @@ class QEMUQtestProtocol: self._sock.connect(self._address) self._sockfile = self._sock.makefile(mode='r') - def accept(self): + def accept(self) -> None: """ Await connection from QEMU. @@ -71,7 +78,7 @@ class QEMUQtestProtocol: self._sock, _ = self._sock.accept() self._sockfile = self._sock.makefile(mode='r') - def cmd(self, qtest_cmd): + def cmd(self, qtest_cmd: str) -> str: """ Send a qtest command on the wire. @@ -82,14 +89,16 @@ class QEMUQtestProtocol: resp = self._sockfile.readline() return resp - def close(self): - """Close this socket.""" + def close(self) -> None: + """ + Close this socket. + """ self._sock.close() if self._sockfile: self._sockfile.close() self._sockfile = None - def settimeout(self, timeout): + def settimeout(self, timeout: Optional[float]) -> None: """Set a timeout, in seconds.""" self._sock.settimeout(timeout) @@ -99,8 +108,13 @@ class QEMUQtestMachine(QEMUMachine): A QEMU VM, with a qtest socket available. """ - def __init__(self, binary, args=None, name=None, test_dir="/var/tmp", - socket_scm_helper=None, sock_dir=None): + def __init__(self, + binary: str, + args: Sequence[str] = (), + name: Optional[str] = None, + test_dir: str = "/var/tmp", + socket_scm_helper: Optional[str] = None, + sock_dir: Optional[str] = None): if name is None: name = "qemu-%d" % os.getpid() if sock_dir is None: @@ -108,16 +122,19 @@ class QEMUQtestMachine(QEMUMachine): super().__init__(binary, args, name=name, test_dir=test_dir, socket_scm_helper=socket_scm_helper, sock_dir=sock_dir) - self._qtest = None + self._qtest: Optional[QEMUQtestProtocol] = None self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock") - def _base_args(self): - args = super()._base_args() - args.extend(['-qtest', 'unix:path=' + self._qtest_path, - '-accel', 'qtest']) + @property + def _base_args(self) -> List[str]: + args = super()._base_args + args.extend([ + '-qtest', f"unix:path={self._qtest_path}", + '-accel', 'qtest' + ]) return args - def _pre_launch(self): + def _pre_launch(self) -> None: super()._pre_launch() self._qtest = QEMUQtestProtocol(self._qtest_path, server=True) @@ -126,7 +143,7 @@ class QEMUQtestMachine(QEMUMachine): super()._post_launch() self._qtest.accept() - def _post_shutdown(self): + def _post_shutdown(self) -> None: super()._post_shutdown() self._remove_if_exists(self._qtest_path) diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py index f212cec..63d2ace 100644 --- a/tests/qemu-iotests/iotests.py +++ b/tests/qemu-iotests/iotests.py @@ -605,7 +605,7 @@ class VM(qtest.QEMUQtestMachine): def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: cmd = 'human-monitor-command' - kwargs = {'command-line': command_line} + kwargs: Dict[str, Any] = {'command-line': command_line} if use_log: return self.qmp_log(cmd, **kwargs) else: |