qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v6 2/5] qtest: Add scripts/qtest.py


From: Fam Zheng
Subject: [Qemu-devel] [PATCH v6 2/5] qtest: Add scripts/qtest.py
Date: Wed, 28 Jan 2015 10:28:35 +0800

This adds scripts/qtest.py as a python library for qtest protocol.

This is a skeleton with a basic "cmd" method to execute a command,
reading and parsing of qtest output could be added later on demand.

Signed-off-by: Fam Zheng <address@hidden>
Reviewed-by: Max Reitz <address@hidden>
---
 scripts/qtest.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 71 insertions(+)
 create mode 100644 scripts/qtest.py

diff --git a/scripts/qtest.py b/scripts/qtest.py
new file mode 100644
index 0000000..a971445
--- /dev/null
+++ b/scripts/qtest.py
@@ -0,0 +1,71 @@
+# QEMU qtest library
+#
+# Copyright (C) 2015 Red Hat Inc.
+#
+# Authors:
+#  Fam Zheng <address@hidden>
+#
+# This work is licensed under the terms of the GNU GPL, version 2.  See
+# the COPYING file in the top-level directory.
+#
+# Based on qmp.py.
+#
+
+import errno
+import socket
+
+class QEMUQtestProtocol(object):
+    def __init__(self, address, server=False):
+        """
+        Create a QEMUQtestProtocol object.
+
+        @param address: QEMU address, can be either a unix socket path (string)
+                        or a tuple in the form ( address, port ) for a TCP
+                        connection
+        @param server: server mode, listens on the socket (bool)
+        @raise socket.error on socket connection errors
+        @note No connection is established, this is done by the connect() or
+              accept() methods
+        """
+        self._address = address
+        self._sock = self._get_sock()
+        if server:
+            self._sock.bind(self._address)
+            self._sock.listen(1)
+
+    def _get_sock(self):
+        if isinstance(self._address, tuple):
+            family = socket.AF_INET
+        else:
+            family = socket.AF_UNIX
+        return socket.socket(family, socket.SOCK_STREAM)
+
+    def connect(self):
+        """
+        Connect to the qtest socket.
+
+        @raise socket.error on socket connection errors
+        """
+        self._sock.connect(self._address)
+
+    def accept(self):
+        """
+        Await connection from QEMU.
+
+        @raise socket.error on socket connection errors
+        """
+        self._sock, _ = self._sock.accept()
+
+    def cmd(self, qtest_cmd):
+        """
+        Send a qtest command on the wire.
+
+        @param qtest_cmd: qtest command text to be sent
+        """
+        self._sock.sendall(qtest_cmd + "\n")
+
+    def close(self):
+        self._sock.close()
+
+    def settimeout(self, timeout):
+        self._sock.settimeout(timeout)
-- 
1.9.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]