1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
"""
QEMU qtest library
qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
subclass of QEMUMachine, respectively.
"""
# Copyright (C) 2015 Red Hat Inc.
#
# Authors:
# Fam Zheng <famz@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2. See
# the COPYING file in the top-level directory.
#
# Based on qmp.py.
#
import os
import socket
from typing import (
List,
Optional,
Sequence,
TextIO,
Tuple,
)
from qemu.qmp import SocketAddrT
from .machine import QEMUMachine
class QEMUQtestProtocol:
"""
QEMUQtestProtocol implements a connection to a qtest socket.
:param address: QEMU address, can be either a unix socket path (string)
or a tuple in the form ( address, port ) for a TCP
connection
:param sock: An existing socket can be provided as an alternative to
an address. One of address or sock must be provided.
:param server: server mode, listens on the socket. Only meaningful
in conjunction with an address and not an existing
socket.
:raise socket.error: on socket connection errors
.. note::
No connection is established by __init__(), this is done
by the connect() or accept() methods.
"""
def __init__(self,
address: Optional[SocketAddrT] = None,
sock: Optional[socket.socket] = None,
server: bool = False):
if address is None and sock is None:
raise ValueError("Either 'address' or 'sock' must be specified")
if address is not None and sock is not None:
raise ValueError(
"Either 'address' or 'sock' must be specified, but not both")
if sock is not None and server:
raise ValueError("server=True is meaningless when passing socket")
self._address = address
self._sock = sock or self._get_sock()
self._sockfile: Optional[TextIO] = None
if server:
assert self._address is not None
self._sock.bind(self._address)
self._sock.listen(1)
def _get_sock(self) -> socket.socket:
assert self._address is not None
if isinstance(self._address, tuple):
family = socket.AF_INET
else:
family = socket.AF_UNIX
return socket.socket(family, socket.SOCK_STREAM)
def connect(self) -> None:
"""
Connect to the qtest socket.
@raise socket.error on socket connection errors
"""
if self._address is not None:
self._sock.connect(self._address)
self._sockfile = self._sock.makefile(mode='r')
def accept(self) -> None:
"""
Await connection from QEMU.
@raise socket.error on socket connection errors
"""
self._sock, _ = self._sock.accept()
self._sockfile = self._sock.makefile(mode='r')
def cmd(self, qtest_cmd: str) -> str:
"""
Send a qtest command on the wire.
@param qtest_cmd: qtest command text to be sent
"""
assert self._sockfile is not None
self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
resp = self._sockfile.readline()
return resp
def close(self) -> None:
"""
Close this socket.
"""
self._sock.close()
if self._sockfile:
self._sockfile.close()
self._sockfile = None
def settimeout(self, timeout: Optional[float]) -> None:
"""Set a timeout, in seconds."""
self._sock.settimeout(timeout)
class QEMUQtestMachine(QEMUMachine):
"""
A QEMU VM, with a qtest socket available.
"""
def __init__(self,
binary: str,
args: Sequence[str] = (),
wrapper: Sequence[str] = (),
name: Optional[str] = None,
base_temp_dir: str = "/var/tmp",
qmp_timer: Optional[float] = None):
# pylint: disable=too-many-arguments
if name is None:
name = "qemu-%d" % os.getpid()
super().__init__(binary, args, wrapper=wrapper, name=name,
base_temp_dir=base_temp_dir,
qmp_timer=qmp_timer)
self._qtest: Optional[QEMUQtestProtocol] = None
self._qtest_sock_pair: Optional[
Tuple[socket.socket, socket.socket]] = None
@property
def _base_args(self) -> List[str]:
args = super()._base_args
assert self._qtest_sock_pair is not None
fd = self._qtest_sock_pair[0].fileno()
args.extend([
'-chardev', f"socket,id=qtest,fd={fd}",
'-qtest', 'chardev:qtest',
'-accel', 'qtest'
])
return args
def _pre_launch(self) -> None:
self._qtest_sock_pair = socket.socketpair()
os.set_inheritable(self._qtest_sock_pair[0].fileno(), True)
super()._pre_launch()
self._qtest = QEMUQtestProtocol(sock=self._qtest_sock_pair[1])
def _post_launch(self) -> None:
assert self._qtest is not None
super()._post_launch()
if self._qtest_sock_pair:
self._qtest_sock_pair[0].close()
self._qtest.connect()
def _post_shutdown(self) -> None:
if self._qtest_sock_pair:
self._qtest_sock_pair[0].close()
self._qtest_sock_pair[1].close()
self._qtest_sock_pair = None
super()._post_shutdown()
def qtest(self, cmd: str) -> str:
"""
Send a qtest command to the guest.
:param cmd: qtest command to send
:return: qtest server response
"""
if self._qtest is None:
raise RuntimeError("qtest socket not available")
return self._qtest.cmd(cmd)
|