aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFam Zheng <famz@redhat.com>2015-01-30 10:49:43 +0800
committerStefan Hajnoczi <stefanha@redhat.com>2015-02-16 15:07:18 +0000
commita628daa42db50a3fc1203dd81bba5a2879b76656 (patch)
treeeee3d12c1860bf98ab8b071da52b118326e99ab4
parenta91f9584565901635295b08f98d5f3048981c2f5 (diff)
downloadqemu-a628daa42db50a3fc1203dd81bba5a2879b76656.zip
qemu-a628daa42db50a3fc1203dd81bba5a2879b76656.tar.gz
qemu-a628daa42db50a3fc1203dd81bba5a2879b76656.tar.bz2
qtest: Add scripts/qtest.py
This adds scripts/qtest.py as a python library for qtest protocol. This is a skeleton with a basic "cmd" method to execute a command, reading and parsing of qtest output could be added later on demand. Signed-off-by: Fam Zheng <famz@redhat.com> Reviewed-by: Max Reitz <mreitz@redhat.com> Message-id: 1422586186-9925-3-git-send-email-famz@redhat.com Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
-rw-r--r--scripts/qtest.py71
1 files changed, 71 insertions, 0 deletions
diff --git a/scripts/qtest.py b/scripts/qtest.py
new file mode 100644
index 0000000..a971445
--- /dev/null
+++ b/scripts/qtest.py
@@ -0,0 +1,71 @@
+# QEMU qtest library
+#
+# Copyright (C) 2015 Red Hat Inc.
+#
+# Authors:
+# Fam Zheng <famz@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+# Based on qmp.py.
+#
+
+import errno
+import socket
+
+class QEMUQtestProtocol(object):
+ def __init__(self, address, server=False):
+ """
+ Create a QEMUQtestProtocol object.
+
+ @param address: QEMU address, can be either a unix socket path (string)
+ or a tuple in the form ( address, port ) for a TCP
+ connection
+ @param server: server mode, listens on the socket (bool)
+ @raise socket.error on socket connection errors
+ @note No connection is established, this is done by the connect() or
+ accept() methods
+ """
+ self._address = address
+ self._sock = self._get_sock()
+ if server:
+ self._sock.bind(self._address)
+ self._sock.listen(1)
+
+ def _get_sock(self):
+ if isinstance(self._address, tuple):
+ family = socket.AF_INET
+ else:
+ family = socket.AF_UNIX
+ return socket.socket(family, socket.SOCK_STREAM)
+
+ def connect(self):
+ """
+ Connect to the qtest socket.
+
+ @raise socket.error on socket connection errors
+ """
+ self._sock.connect(self._address)
+
+ def accept(self):
+ """
+ Await connection from QEMU.
+
+ @raise socket.error on socket connection errors
+ """
+ self._sock, _ = self._sock.accept()
+
+ def cmd(self, qtest_cmd):
+ """
+ Send a qtest command on the wire.
+
+ @param qtest_cmd: qtest command text to be sent
+ """
+ self._sock.sendall(qtest_cmd + "\n")
+
+ def close(self):
+ self._sock.close()
+
+ def settimeout(self, timeout):
+ self._sock.settimeout(timeout)