aboutsummaryrefslogtreecommitdiff
path: root/python/qemu/aqmp/legacy.py
blob: f86cb298049a2a64c8fb57ceaf4be481405bbd53 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
Sync QMP Wrapper

This class pretends to be qemu.qmp.QEMUMonitorProtocol.
"""

#
# Copyright (C) 2009-2022 Red Hat Inc.
#
# Authors:
#  Luiz Capitulino <lcapitulino@redhat.com>
#  John Snow <jsnow@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2.  See
# the COPYING file in the top-level directory.
#

import asyncio
from typing import (
    Any,
    Awaitable,
    Dict,
    List,
    Optional,
    TypeVar,
    Union,
)

import qemu.qmp

from .error import QMPError
from .protocol import Runstate, SocketAddrT
from .qmp_client import QMPClient


# (Temporarily) Re-export QMPBadPortError
QMPBadPortError = qemu.qmp.QMPBadPortError

#: QMPMessage is an entire QMP message of any kind.
QMPMessage = Dict[str, Any]

#: QMPReturnValue is the 'return' value of a command.
QMPReturnValue = object

#: QMPObject is any object in a QMP message.
QMPObject = Dict[str, object]

# QMPMessage can be outgoing commands or incoming events/returns.
# QMPReturnValue is usually a dict/json object, but due to QAPI's
# 'returns-whitelist', it can actually be anything.
#
# {'return': {}} is a QMPMessage,
# {} is the QMPReturnValue.


# pylint: disable=missing-docstring


class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
    def __init__(self, address: SocketAddrT,
                 server: bool = False,
                 nickname: Optional[str] = None):

        # pylint: disable=super-init-not-called
        self._aqmp = QMPClient(nickname)
        self._aloop = asyncio.get_event_loop()
        self._address = address
        self._timeout: Optional[float] = None

        if server:
            self._sync(self._aqmp.start_server(self._address))

    _T = TypeVar('_T')

    def _sync(
            self, future: Awaitable[_T], timeout: Optional[float] = None
    ) -> _T:
        return self._aloop.run_until_complete(
            asyncio.wait_for(future, timeout=timeout)
        )

    def _get_greeting(self) -> Optional[QMPMessage]:
        if self._aqmp.greeting is not None:
            # pylint: disable=protected-access
            return self._aqmp.greeting._asdict()
        return None

    # __enter__ and __exit__ need no changes
    # parse_address needs no changes

    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
        self._aqmp.await_greeting = negotiate
        self._aqmp.negotiate = negotiate

        self._sync(
            self._aqmp.connect(self._address)
        )
        return self._get_greeting()

    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
        self._aqmp.await_greeting = True
        self._aqmp.negotiate = True

        self._sync(self._aqmp.accept(), timeout)

        ret = self._get_greeting()
        assert ret is not None
        return ret

    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
        return dict(
            self._sync(
                # pylint: disable=protected-access

                # _raw() isn't a public API, because turning off
                # automatic ID assignment is discouraged. For
                # compatibility with iotests *only*, do it anyway.
                self._aqmp._raw(qmp_cmd, assign_id=False),
                self._timeout
            )
        )

    # Default impl of cmd() delegates to cmd_obj

    def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
        return self._sync(
            self._aqmp.execute(cmd, kwds),
            self._timeout
        )

    def pull_event(self,
                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
        if not wait:
            # wait is False/0: "do not wait, do not except."
            if self._aqmp.events.empty():
                return None

        # If wait is 'True', wait forever. If wait is False/0, the events
        # queue must not be empty; but it still needs some real amount
        # of time to complete.
        timeout = None
        if wait and isinstance(wait, float):
            timeout = wait

        return dict(
            self._sync(
                self._aqmp.events.get(),
                timeout
            )
        )

    def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
        events = [dict(x) for x in self._aqmp.events.clear()]
        if events:
            return events

        event = self.pull_event(wait)
        return [event] if event is not None else []

    def clear_events(self) -> None:
        self._aqmp.events.clear()

    def close(self) -> None:
        self._sync(
            self._aqmp.disconnect()
        )

    def settimeout(self, timeout: Optional[float]) -> None:
        self._timeout = timeout

    def send_fd_scm(self, fd: int) -> None:
        self._aqmp.send_fd_scm(fd)

    def __del__(self) -> None:
        if self._aqmp.runstate == Runstate.IDLE:
            return

        if not self._aloop.is_running():
            self.close()
        else:
            # Garbage collection ran while the event loop was running.
            # Nothing we can do about it now, but if we don't raise our
            # own error, the user will be treated to a lot of traceback
            # they might not understand.
            raise QMPError(
                "QEMUMonitorProtocol.close()"
                " was not called before object was garbage collected"
            )