#! /usr/bin/env python # # qtest harness for PyUnit # # Copyright (C) 2011 Red Hat, Inc. # Author: Paolo Bonzini # # This file is licensed under the terms of the GNU GPL, version 2 or later. # See the COPYING file in the top-level directory. import os import struct import socket import unittest class ClientDisconnected(Exception): pass class QTestCase(unittest.TestCase): @classmethod def supportedArches(): return ['all'] @classmethod def extraArgs(): return () @classmethod def isSupportedArch(self, arch): return any(x == 'all' or x == arch for x in self.supportedArches()) @classmethod def _raiseIRQ(self, irq): self._irqs.add(irq) @classmethod def _lowerIRQ(self, irq): self._irqs.discard(irq) @classmethod def setUpClass(self): qemu_binary = os.getenv('QTEST_QEMU_BINARY') arch = os.path.basename(qemu_binary).replace('qemu-system-','') if not self.isSupportedArch(arch): self._classSetupFailed = True return socket_path = '/tmp/qtest-%d.sock' % os.getpid() server_socket = socket.socket(socket.AF_UNIX) server_socket.bind(socket_path) server_socket.listen(1) self.pid = os.spawnl(os.P_NOWAIT, qemu_binary, qemu_binary, '-qtest', 'unix:%s' % socket_path, '-qtest-log', '/dev/null', '-machine', 'accel=qtest', *self.extraArgs()) self._socket, addr = server_socket.accept() self._irqs = set() self._buffer = '' self._reply = None server_socket.close() @classmethod def tearDownClass(self): self._socket.close() self._socket = None os.kill(self.pid, 15) os.waitpid(self.pid, 0) @classmethod def _qtestResponse(self, buf): tokens = buf.split() if tokens[0] == 'OK': self._reply = tokens return if tokens[0] == 'IRQ': if tokens[1] == 'raise': self._raiseIRQ(int(tokens[2])) else: self._lowerIRQ(int(tokens[2])) return @classmethod def waitForEvents(self, blocking=True): self._socket.setblocking(int(blocking)) while True: i = self._buffer.find('\n') if i != -1: response, self._buffer = self._buffer[:i], self._buffer[i+1:] self._qtestResponse(response) self._socket.setblocking(0) continue try: data = self._socket.recv(4096) self._buffer = self._buffer + data if (len(data) == 0): raise ClientDisconnected except: return @classmethod def _send(self, str): assert self._reply is None self._socket.sendall(str + '\n') while self._reply is None: self.waitForEvents() reply, self._reply = self._reply, None return reply # IRQ access def getIRQ(self, irq): self.waitForEvents(False) return irq in self._irqs def waitIRQ(self, irq, level=True): self.waitForEvents(False) while (irq in self._irqs) != level: self.waitForEvents(True) # PIO reads def inb(self, port): return int(self._send('inb 0x%x' % port)[1][2:], 16) def inw(self, port): return int(self._send('inw 0x%x' % port)[1][2:], 16) def inl(self, port): return int(self._send('inl 0x%x' % port)[1][2:], 16) # Memory reads def memread_unpack(self, addr, format): size = struct.calcsize(format) str = 'read 0x%x 0x%x' % (addr, size) reply = self._send(str)[1] bytes = reply[2:].decode('hex_codec') return struct.unpack(format, bytes) def memread(self, addr, size): return self.memread_unpack('%dB' % size) def ldb(self, addr): return self.memread_unpack(addr, 'B')[0] def ldw_le(self, addr): return self.memread_unpack(addr, 'H')[0] def ldl_be(self, addr): return self.memread_unpack(addr, '>I')[0] # PIO writes def outb(self, port, val): self._send('outb 0x%x 0x%x' % (port, val)) def outw(self, port, val): self._send('outw 0x%x 0x%x' % (port, val)) def outl(self, port, val): self._send('outl 0x%x 0x%x' % (port, val)) # Memory writes def memwrite_pack(self, addr, format, *data): bytes = struct.pack(format, *data) str = 'write 0x%x 0x%x 0x%s' % ( addr, len(bytes), bytes.encode('hex_codec')) self._send(str) def memwrite(self, addr, *data): self.memwrite_pack('%dB' % len(data), *data) def stb(self, addr, datum): self.memwrite_pack(addr, 'B', datum) def stw_le(self, addr, datum): self.memwrite_pack(addr, 'H', datum) def stl_be(self, addr, datum): self.memwrite_pack(addr, '>I', datum)