qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[RFC PATCH 8/9] Python: delete qemu.qmp


From: John Snow
Subject: [RFC PATCH 8/9] Python: delete qemu.qmp
Date: Thu, 16 Dec 2021 21:29:38 -0500

qemu.qmp has been forked out into its own repository and can be
installed independently, so it should no longer be included here in this
tree.

Signed-off-by: John Snow <jsnow@redhat.com>

---

Note: without re-locking Pipfile.lock, the CI venv won't actually have
that dependency installed. The next patch adds it for the sake of the CI
system passing, but a non-RFC version of this will want to pin qemu.qmp
to 0.0.1 (which doesn't exist yet!) and then re-lock Pipfile.lock, and
squash the following commit into this one.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/qmp/__init__.py   |  59 ---
 python/qemu/qmp/aqmp_tui.py   | 652 ------------------------
 python/qemu/qmp/error.py      |  50 --
 python/qemu/qmp/events.py     | 717 --------------------------
 python/qemu/qmp/legacy.py     | 319 ------------
 python/qemu/qmp/message.py    | 209 --------
 python/qemu/qmp/models.py     | 146 ------
 python/qemu/qmp/protocol.py   | 922 ----------------------------------
 python/qemu/qmp/py.typed      |   0
 python/qemu/qmp/qmp_client.py | 655 ------------------------
 python/qemu/qmp/qmp_shell.py  | 537 --------------------
 python/qemu/qmp/util.py       | 217 --------
 python/qemu/utils/qom_fuse.py |   1 -
 python/setup.cfg              |  27 +-
 python/tests/protocol.py      | 583 ---------------------
 15 files changed, 2 insertions(+), 5092 deletions(-)
 delete mode 100644 python/qemu/qmp/__init__.py
 delete mode 100644 python/qemu/qmp/aqmp_tui.py
 delete mode 100644 python/qemu/qmp/error.py
 delete mode 100644 python/qemu/qmp/events.py
 delete mode 100644 python/qemu/qmp/legacy.py
 delete mode 100644 python/qemu/qmp/message.py
 delete mode 100644 python/qemu/qmp/models.py
 delete mode 100644 python/qemu/qmp/protocol.py
 delete mode 100644 python/qemu/qmp/py.typed
 delete mode 100644 python/qemu/qmp/qmp_client.py
 delete mode 100644 python/qemu/qmp/qmp_shell.py
 delete mode 100644 python/qemu/qmp/util.py
 delete mode 100644 python/tests/protocol.py

diff --git a/python/qemu/qmp/__init__.py b/python/qemu/qmp/__init__.py
deleted file mode 100644
index a19868b13d..0000000000
--- a/python/qemu/qmp/__init__.py
+++ /dev/null
@@ -1,59 +0,0 @@
-"""
-QEMU Monitor Protocol (QMP) development library & tooling.
-
-This package provides a fairly low-level class for communicating
-asynchronously with QMP protocol servers, as implemented by QEMU, the
-QEMU Guest Agent, and the QEMU Storage Daemon.
-
-`QMPClient` provides the main functionality of this package. All errors
-raised by this library derive from `QMPError`, see `qmp.error` for
-additional detail. See `qmp.events` for an in-depth tutorial on
-managing QMP events.
-"""
-
-# Copyright (C) 2020, 2021 John Snow for Red Hat, Inc.
-#
-# Authors:
-#  John Snow <jsnow@redhat.com>
-#
-# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>.
-#
-# This work is licensed under the terms of the GNU GPL, version 2.  See
-# the COPYING file in the top-level directory.
-
-import logging
-
-from .error import QMPError
-from .events import EventListener
-from .message import Message
-from .protocol import (
-    ConnectError,
-    Runstate,
-    SocketAddrT,
-    StateError,
-)
-from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
-
-
-# Suppress logging unless an application engages it.
-logging.getLogger('qemu.qmp').addHandler(logging.NullHandler())
-
-
-# The order of these fields impact the Sphinx documentation order.
-__all__ = (
-    # Classes, most to least important
-    'QMPClient',
-    'Message',
-    'EventListener',
-    'Runstate',
-
-    # Exceptions, most generic to most explicit
-    'QMPError',
-    'StateError',
-    'ConnectError',
-    'ExecuteError',
-    'ExecInterruptedError',
-
-    # Type aliases
-    'SocketAddrT',
-)
diff --git a/python/qemu/qmp/aqmp_tui.py b/python/qemu/qmp/aqmp_tui.py
deleted file mode 100644
index 184a3e4690..0000000000
--- a/python/qemu/qmp/aqmp_tui.py
+++ /dev/null
@@ -1,652 +0,0 @@
-# Copyright (c) 2021
-#
-# Authors:
-#  Niteesh Babu G S <niteesh.gs@gmail.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2 or
-# later.  See the COPYING file in the top-level directory.
-"""
-AQMP TUI
-
-AQMP TUI is an asynchronous interface built on top the of the AQMP library.
-It is the successor of QMP-shell and is bought-in as a replacement for it.
-
-Example Usage: aqmp-tui <SOCKET | TCP IP:PORT>
-Full Usage: aqmp-tui --help
-"""
-
-import argparse
-import asyncio
-import json
-import logging
-from logging import Handler, LogRecord
-import signal
-from typing import (
-    List,
-    Optional,
-    Tuple,
-    Type,
-    Union,
-    cast,
-)
-
-from pygments import lexers
-from pygments import token as Token
-import urwid
-import urwid_readline
-
-from .error import ProtocolError
-from .legacy import QEMUMonitorProtocol, QMPBadPortError
-from .message import DeserializationError, Message, UnexpectedTypeError
-from .protocol import ConnectError, Runstate
-from .qmp_client import ExecInterruptedError, QMPClient
-from .util import create_task, pretty_traceback
-
-
-# The name of the signal that is used to update the history list
-UPDATE_MSG: str = 'UPDATE_MSG'
-
-
-palette = [
-    (Token.Punctuation, '', '', '', 'h15,bold', 'g7'),
-    (Token.Text, '', '', '', '', 'g7'),
-    (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'),
-    (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'),
-    (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'),
-    (Token.Keyword.Constant, '', '', '', '#6af', 'g7'),
-    ('DEBUG', '', '', '', '#ddf', 'g7'),
-    ('INFO', '', '', '', 'g100', 'g7'),
-    ('WARNING', '', '', '', '#ff6', 'g7'),
-    ('ERROR', '', '', '', '#a00', 'g7'),
-    ('CRITICAL', '', '', '', '#a00', 'g7'),
-    ('background', '', 'black', '', '', 'g7'),
-]
-
-
-def format_json(msg: str) -> str:
-    """
-    Formats valid/invalid multi-line JSON message into a single-line message.
-
-    Formatting is first tried using the standard json module. If that fails
-    due to an decoding error then a simple string manipulation is done to
-    achieve a single line JSON string.
-
-    Converting into single line is more asthetically pleasing when looking
-    along with error messages.
-
-    Eg:
-    Input:
-          [ 1,
-            true,
-            3 ]
-    The above input is not a valid QMP message and produces the following error
-    "QMP message is not a JSON object."
-    When displaying this in TUI in multiline mode we get
-
-        [ 1,
-          true,
-          3 ]: QMP message is not a JSON object.
-
-    whereas in singleline mode we get the following
-
-        [1, true, 3]: QMP message is not a JSON object.
-
-    The single line mode is more asthetically pleasing.
-
-    :param msg:
-        The message to formatted into single line.
-
-    :return: Formatted singleline message.
-    """
-    try:
-        msg = json.loads(msg)
-        return str(json.dumps(msg))
-    except json.decoder.JSONDecodeError:
-        msg = msg.replace('\n', '')
-        words = msg.split(' ')
-        words = list(filter(None, words))
-        return ' '.join(words)
-
-
-def has_handler_type(logger: logging.Logger,
-                     handler_type: Type[Handler]) -> bool:
-    """
-    The Logger class has no interface to check if a certain type of handler is
-    installed or not. So we provide an interface to do so.
-
-    :param logger:
-        Logger object
-    :param handler_type:
-        The type of the handler to be checked.
-
-    :return: returns True if handler of type `handler_type`.
-    """
-    for handler in logger.handlers:
-        if isinstance(handler, handler_type):
-            return True
-    return False
-
-
-class App(QMPClient):
-    """
-    Implements the AQMP TUI.
-
-    Initializes the widgets and starts the urwid event loop.
-
-    :param address:
-        Address of the server to connect to.
-    :param num_retries:
-        The number of times to retry before stopping to reconnect.
-    :param retry_delay:
-        The delay(sec) before each retry
-    """
-    def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int,
-                 retry_delay: Optional[int]) -> None:
-        urwid.register_signal(type(self), UPDATE_MSG)
-        self.window = Window(self)
-        self.address = address
-        self.aloop: Optional[asyncio.AbstractEventLoop] = None
-        self.num_retries = num_retries
-        self.retry_delay = retry_delay if retry_delay else 2
-        self.retry: bool = False
-        self.exiting: bool = False
-        super().__init__()
-
-    def add_to_history(self, msg: str, level: Optional[str] = None) -> None:
-        """
-        Appends the msg to the history list.
-
-        :param msg:
-            The raw message to be appended in string type.
-        """
-        urwid.emit_signal(self, UPDATE_MSG, msg, level)
-
-    def _cb_outbound(self, msg: Message) -> Message:
-        """
-        Callback: outbound message hook.
-
-        Appends the outgoing messages to the history box.
-
-        :param msg: raw outbound message.
-        :return: final outbound message.
-        """
-        str_msg = str(msg)
-
-        if not has_handler_type(logging.getLogger(), TUILogHandler):
-            logging.debug('Request: %s', str_msg)
-        self.add_to_history('<-- ' + str_msg)
-        return msg
-
-    def _cb_inbound(self, msg: Message) -> Message:
-        """
-        Callback: outbound message hook.
-
-        Appends the incoming messages to the history box.
-
-        :param msg: raw inbound message.
-        :return: final inbound message.
-        """
-        str_msg = str(msg)
-
-        if not has_handler_type(logging.getLogger(), TUILogHandler):
-            logging.debug('Request: %s', str_msg)
-        self.add_to_history('--> ' + str_msg)
-        return msg
-
-    async def _send_to_server(self, msg: Message) -> None:
-        """
-        This coroutine sends the message to the server.
-        The message has to be pre-validated.
-
-        :param msg:
-            Pre-validated message to be to sent to the server.
-
-        :raise Exception: When an unhandled exception is caught.
-        """
-        try:
-            await self._raw(msg, assign_id='id' not in msg)
-        except ExecInterruptedError as err:
-            logging.info('Error server disconnected before reply %s', str(err))
-            self.add_to_history('Server disconnected before reply', 'ERROR')
-        except Exception as err:
-            logging.error('Exception from _send_to_server: %s', str(err))
-            raise err
-
-    def cb_send_to_server(self, raw_msg: str) -> None:
-        """
-        Validates and sends the message to the server.
-        The raw string message is first converted into a Message object
-        and is then sent to the server.
-
-        :param raw_msg:
-            The raw string message to be sent to the server.
-
-        :raise Exception: When an unhandled exception is caught.
-        """
-        try:
-            msg = Message(bytes(raw_msg, encoding='utf-8'))
-            create_task(self._send_to_server(msg))
-        except (DeserializationError, UnexpectedTypeError) as err:
-            raw_msg = format_json(raw_msg)
-            logging.info('Invalid message: %s', err.error_message)
-            self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR')
-
-    def unhandled_input(self, key: str) -> None:
-        """
-        Handle's keys which haven't been handled by the child widgets.
-
-        :param key:
-            Unhandled key
-        """
-        if key == 'esc':
-            self.kill_app()
-
-    def kill_app(self) -> None:
-        """
-        Initiates killing of app. A bridge between asynchronous and synchronous
-        code.
-        """
-        create_task(self._kill_app())
-
-    async def _kill_app(self) -> None:
-        """
-        This coroutine initiates the actual disconnect process and calls
-        urwid.ExitMainLoop() to kill the TUI.
-
-        :raise Exception: When an unhandled exception is caught.
-        """
-        self.exiting = True
-        await self.disconnect()
-        logging.debug('Disconnect finished. Exiting app')
-        raise urwid.ExitMainLoop()
-
-    async def disconnect(self) -> None:
-        """
-        Overrides the disconnect method to handle the errors locally.
-        """
-        try:
-            await super().disconnect()
-        except (OSError, EOFError) as err:
-            logging.info('disconnect: %s', str(err))
-            self.retry = True
-        except ProtocolError as err:
-            logging.info('disconnect: %s', str(err))
-        except Exception as err:
-            logging.error('disconnect: Unhandled exception %s', str(err))
-            raise err
-
-    def _set_status(self, msg: str) -> None:
-        """
-        Sets the message as the status.
-
-        :param msg:
-            The message to be displayed in the status bar.
-        """
-        self.window.footer.set_text(msg)
-
-    def _get_formatted_address(self) -> str:
-        """
-        Returns a formatted version of the server's address.
-
-        :return: formatted address
-        """
-        if isinstance(self.address, tuple):
-            host, port = self.address
-            addr = f'{host}:{port}'
-        else:
-            addr = f'{self.address}'
-        return addr
-
-    async def _initiate_connection(self) -> Optional[ConnectError]:
-        """
-        Tries connecting to a server a number of times with a delay between
-        each try. If all retries failed then return the error faced during
-        the last retry.
-
-        :return: Error faced during last retry.
-        """
-        current_retries = 0
-        err = None
-
-        # initial try
-        await self.connect_server()
-        while self.retry and current_retries < self.num_retries:
-            logging.info('Connection Failed, retrying in %d', self.retry_delay)
-            status = f'[Retry #{current_retries} ({self.retry_delay}s)]'
-            self._set_status(status)
-
-            await asyncio.sleep(self.retry_delay)
-
-            err = await self.connect_server()
-            current_retries += 1
-        # If all retries failed report the last error
-        if err:
-            logging.info('All retries failed: %s', err)
-            return err
-        return None
-
-    async def manage_connection(self) -> None:
-        """
-        Manage the connection based on the current run state.
-
-        A reconnect is issued when the current state is IDLE and the number
-        of retries is not exhausted.
-        A disconnect is issued when the current state is DISCONNECTING.
-        """
-        while not self.exiting:
-            if self.runstate == Runstate.IDLE:
-                err = await self._initiate_connection()
-                # If retry is still true then, we have exhausted all our tries.
-                if err:
-                    self._set_status(f'[Error: {err.error_message}]')
-                else:
-                    addr = self._get_formatted_address()
-                    self._set_status(f'[Connected {addr}]')
-            elif self.runstate == Runstate.DISCONNECTING:
-                self._set_status('[Disconnected]')
-                await self.disconnect()
-                # check if a retry is needed
-                if self.runstate == Runstate.IDLE:
-                    continue
-            await self.runstate_changed()
-
-    async def connect_server(self) -> Optional[ConnectError]:
-        """
-        Initiates a connection to the server at address `self.address`
-        and in case of a failure, sets the status to the respective error.
-        """
-        try:
-            await self.connect(self.address)
-            self.retry = False
-        except ConnectError as err:
-            logging.info('connect_server: ConnectError %s', str(err))
-            self.retry = True
-            return err
-        return None
-
-    def run(self, debug: bool = False) -> None:
-        """
-        Starts the long running co-routines and the urwid event loop.
-
-        :param debug:
-            Enables/Disables asyncio event loop debugging
-        """
-        screen = urwid.raw_display.Screen()
-        screen.set_terminal_properties(256)
-
-        self.aloop = asyncio.get_event_loop()
-        self.aloop.set_debug(debug)
-
-        # Gracefully handle SIGTERM and SIGINT signals
-        cancel_signals = [signal.SIGTERM, signal.SIGINT]
-        for sig in cancel_signals:
-            self.aloop.add_signal_handler(sig, self.kill_app)
-
-        event_loop = urwid.AsyncioEventLoop(loop=self.aloop)
-        main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'),
-                                   unhandled_input=self.unhandled_input,
-                                   screen=screen,
-                                   palette=palette,
-                                   handle_mouse=True,
-                                   event_loop=event_loop)
-
-        create_task(self.manage_connection(), self.aloop)
-        try:
-            main_loop.run()
-        except Exception as err:
-            logging.error('%s\n%s\n', str(err), pretty_traceback())
-            raise err
-
-
-class StatusBar(urwid.Text):
-    """
-    A simple statusbar modelled using the Text widget. The status can be
-    set using the set_text function. All text set is aligned to right.
-
-    :param text: Initial text to be displayed. Default is empty str.
-    """
-    def __init__(self, text: str = ''):
-        super().__init__(text, align='right')
-
-
-class Editor(urwid_readline.ReadlineEdit):
-    """
-    A simple editor modelled using the urwid_readline.ReadlineEdit widget.
-    Mimcs GNU readline shortcuts and provides history support.
-
-    The readline shortcuts can be found below:
-    https://github.com/rr-/urwid_readline#features
-
-    Along with the readline features, this editor also has support for
-    history. Pressing the 'up'/'down' switches between the prev/next messages
-    available in the history.
-
-    Currently there is no support to save the history to a file. The history of
-    previous commands is lost on exit.
-
-    :param parent: Reference to the TUI object.
-    """
-    def __init__(self, parent: App) -> None:
-        super().__init__(caption='> ', multiline=True)
-        self.parent = parent
-        self.history: List[str] = []
-        self.last_index: int = -1
-        self.show_history: bool = False
-
-    def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]:
-        """
-        Handles the keypress on this widget.
-
-        :param size:
-            The current size of the widget.
-        :param key:
-            The key to be handled.
-
-        :return: Unhandled key if any.
-        """
-        msg = self.get_edit_text()
-        if key == 'up' and not msg:
-            # Show the history when 'up arrow' is pressed with no input text.
-            # NOTE: The show_history logic is necessary because in 'multiline'
-            # mode (which we use) 'up arrow' is used to move between lines.
-            if not self.history:
-                return None
-            self.show_history = True
-            last_msg = self.history[self.last_index]
-            self.set_edit_text(last_msg)
-            self.edit_pos = len(last_msg)
-        elif key == 'up' and self.show_history:
-            self.last_index = max(self.last_index - 1, -len(self.history))
-            self.set_edit_text(self.history[self.last_index])
-            self.edit_pos = len(self.history[self.last_index])
-        elif key == 'down' and self.show_history:
-            if self.last_index == -1:
-                self.set_edit_text('')
-                self.show_history = False
-            else:
-                self.last_index += 1
-                self.set_edit_text(self.history[self.last_index])
-                self.edit_pos = len(self.history[self.last_index])
-        elif key == 'meta enter':
-            # When using multiline, enter inserts a new line into the editor
-            # send the input to the server on alt + enter
-            self.parent.cb_send_to_server(msg)
-            self.history.append(msg)
-            self.set_edit_text('')
-            self.last_index = -1
-            self.show_history = False
-        else:
-            self.show_history = False
-            self.last_index = -1
-            return cast(Optional[str], super().keypress(size, key))
-        return None
-
-
-class EditorWidget(urwid.Filler):
-    """
-    Wrapper around the editor widget.
-
-    The Editor is a flow widget and has to wrapped inside a box widget.
-    This class wraps the Editor inside filler widget.
-
-    :param parent: Reference to the TUI object.
-    """
-    def __init__(self, parent: App) -> None:
-        super().__init__(Editor(parent), valign='top')
-
-
-class HistoryBox(urwid.ListBox):
-    """
-    This widget is modelled using the ListBox widget, contains the list of
-    all messages both QMP messages and log messsages to be shown in the TUI.
-
-    The messages are urwid.Text widgets. On every append of a message, the
-    focus is shifted to the last appended message.
-
-    :param parent: Reference to the TUI object.
-    """
-    def __init__(self, parent: App) -> None:
-        self.parent = parent
-        self.history = urwid.SimpleFocusListWalker([])
-        super().__init__(self.history)
-
-    def add_to_history(self,
-                       history: Union[str, List[Tuple[str, str]]]) -> None:
-        """
-        Appends a message to the list and set the focus to the last appended
-        message.
-
-        :param history:
-            The history item(message/event) to be appended to the list.
-        """
-        self.history.append(urwid.Text(history))
-        self.history.set_focus(len(self.history) - 1)
-
-    def mouse_event(self, size: Tuple[int, int], _event: str, button: float,
-                    _x: int, _y: int, focus: bool) -> None:
-        # Unfortunately there are no urwid constants that represent the mouse
-        # events.
-        if button == 4:  # Scroll up event
-            super().keypress(size, 'up')
-        elif button == 5:  # Scroll down event
-            super().keypress(size, 'down')
-
-
-class HistoryWindow(urwid.Frame):
-    """
-    This window composes the HistoryBox and EditorWidget in a horizontal split.
-    By default the first focus is given to the history box.
-
-    :param parent: Reference to the TUI object.
-    """
-    def __init__(self, parent: App) -> None:
-        self.parent = parent
-        self.editor_widget = EditorWidget(parent)
-        self.editor = urwid.LineBox(self.editor_widget)
-        self.history = HistoryBox(parent)
-        self.body = urwid.Pile([('weight', 80, self.history),
-                                ('weight', 20, self.editor)])
-        super().__init__(self.body)
-        urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history)
-
-    def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None:
-        """
-        Appends a message to the history box
-
-        :param msg:
-            The message to be appended to the history box.
-        :param level:
-            The log level of the message, if it is a log message.
-        """
-        formatted = []
-        if level:
-            msg = f'[{level}]: {msg}'
-            formatted.append((level, msg))
-        else:
-            lexer = lexers.JsonLexer()  # pylint: disable=no-member
-            for token in lexer.get_tokens(msg):
-                formatted.append(token)
-        self.history.add_to_history(formatted)
-
-
-class Window(urwid.Frame):
-    """
-    This window is the top most widget of the TUI and will contain other
-    windows. Each child of this widget is responsible for displaying a specific
-    functionality.
-
-    :param parent: Reference to the TUI object.
-    """
-    def __init__(self, parent: App) -> None:
-        self.parent = parent
-        footer = StatusBar()
-        body = HistoryWindow(parent)
-        super().__init__(body, footer=footer)
-
-
-class TUILogHandler(Handler):
-    """
-    This handler routes all the log messages to the TUI screen.
-    It is installed to the root logger to so that the log message from all
-    libraries begin used is routed to the screen.
-
-    :param tui: Reference to the TUI object.
-    """
-    def __init__(self, tui: App) -> None:
-        super().__init__()
-        self.tui = tui
-
-    def emit(self, record: LogRecord) -> None:
-        """
-        Emits a record to the TUI screen.
-
-        Appends the log message to the TUI screen
-        """
-        level = record.levelname
-        msg = record.getMessage()
-        self.tui.add_to_history(msg, level)
-
-
-def main() -> None:
-    """
-    Driver of the whole script, parses arguments, initialize the TUI and
-    the logger.
-    """
-    parser = argparse.ArgumentParser(description='AQMP TUI')
-    parser.add_argument('qmp_server', help='Address of the QMP server. '
-                        'Format <UNIX socket path | TCP addr:port>')
-    parser.add_argument('--num-retries', type=int, default=10,
-                        help='Number of times to reconnect before giving up.')
-    parser.add_argument('--retry-delay', type=int,
-                        help='Time(s) to wait before next retry. '
-                        'Default action is to wait 2s between each retry.')
-    parser.add_argument('--log-file', help='The Log file name')
-    parser.add_argument('--log-level', default='WARNING',
-                        help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
-    parser.add_argument('--asyncio-debug', action='store_true',
-                        help='Enable debug mode for asyncio loop. '
-                        'Generates lot of output, makes TUI unusable when '
-                        'logs are logged in the TUI. '
-                        'Use only when logging to a file.')
-    args = parser.parse_args()
-
-    try:
-        address = QEMUMonitorProtocol.parse_address(args.qmp_server)
-    except QMPBadPortError as err:
-        parser.error(str(err))
-
-    app = App(address, args.num_retries, args.retry_delay)
-
-    root_logger = logging.getLogger()
-    root_logger.setLevel(logging.getLevelName(args.log_level))
-
-    if args.log_file:
-        root_logger.addHandler(logging.FileHandler(args.log_file))
-    else:
-        root_logger.addHandler(TUILogHandler(app))
-
-    app.run(args.asyncio_debug)
-
-
-if __name__ == '__main__':
-    main()
diff --git a/python/qemu/qmp/error.py b/python/qemu/qmp/error.py
deleted file mode 100644
index 24ba4d5054..0000000000
--- a/python/qemu/qmp/error.py
+++ /dev/null
@@ -1,50 +0,0 @@
-"""
-QMP Error Classes
-
-This package seeks to provide semantic error classes that are intended
-to be used directly by clients when they would like to handle particular
-semantic failures (e.g. "failed to connect") without needing to know the
-enumeration of possible reasons for that failure.
-
-QMPError serves as the ancestor for all exceptions raised by this
-package, and is suitable for use in handling semantic errors from this
-library. In most cases, individual public methods will attempt to catch
-and re-encapsulate various exceptions to provide a semantic
-error-handling interface.
-
-.. admonition:: QMP Exception Hierarchy Reference
-
- |   `Exception`
- |    +-- `QMPError`
- |         +-- `ConnectError`
- |         +-- `StateError`
- |         +-- `ExecInterruptedError`
- |         +-- `ExecuteError`
- |         +-- `ListenerError`
- |         +-- `ProtocolError`
- |              +-- `DeserializationError`
- |              +-- `UnexpectedTypeError`
- |              +-- `ServerParseError`
- |              +-- `BadReplyError`
- |              +-- `GreetingError`
- |              +-- `NegotiationError`
-"""
-
-
-class QMPError(Exception):
-    """Abstract error class for all errors originating from this package."""
-
-
-class ProtocolError(QMPError):
-    """
-    Abstract error class for protocol failures.
-
-    Semantically, these errors are generally the fault of either the
-    protocol server or as a result of a bug in this library.
-
-    :param error_message: Human-readable string describing the error.
-    """
-    def __init__(self, error_message: str):
-        super().__init__(error_message)
-        #: Human-readable error message, without any prefix.
-        self.error_message: str = error_message
diff --git a/python/qemu/qmp/events.py b/python/qemu/qmp/events.py
deleted file mode 100644
index 6199776cc6..0000000000
--- a/python/qemu/qmp/events.py
+++ /dev/null
@@ -1,717 +0,0 @@
-"""
-QMP Events and EventListeners
-
-Asynchronous QMP uses `EventListener` objects to listen for events. An
-`EventListener` is a FIFO event queue that can be pre-filtered to listen
-for only specific events. Each `EventListener` instance receives its own
-copy of events that it hears, so events may be consumed without fear or
-worry for depriving other listeners of events they need to hear.
-
-
-EventListener Tutorial
-----------------------
-
-In all of the following examples, we assume that we have a `QMPClient`
-instantiated named ``qmp`` that is already connected.
-
-
-`listener()` context blocks with one name
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The most basic usage is by using the `listener()` context manager to
-construct them:
-
-.. code:: python
-
-   with qmp.listener('STOP') as listener:
-       await qmp.execute('stop')
-       await listener.get()
-
-The listener is active only for the duration of the ‘with’ block. This
-instance listens only for ‘STOP’ events.
-
-
-`listener()` context blocks with two or more names
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Multiple events can be selected for by providing any ``Iterable[str]``:
-
-.. code:: python
-
-   with qmp.listener(('STOP', 'RESUME')) as listener:
-       await qmp.execute('stop')
-       event = await listener.get()
-       assert event['event'] == 'STOP'
-
-       await qmp.execute('cont')
-       event = await listener.get()
-       assert event['event'] == 'RESUME'
-
-
-`listener()` context blocks with no names
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-By omitting names entirely, you can listen to ALL events.
-
-.. code:: python
-
-   with qmp.listener() as listener:
-       await qmp.execute('stop')
-       event = await listener.get()
-       assert event['event'] == 'STOP'
-
-This isn’t a very good use case for this feature: In a non-trivial
-running system, we may not know what event will arrive next. Grabbing
-the top of a FIFO queue returning multiple kinds of events may be prone
-to error.
-
-
-Using async iterators to retrieve events
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-If you’d like to simply watch what events happen to arrive, you can use
-the listener as an async iterator:
-
-.. code:: python
-
-   with qmp.listener() as listener:
-       async for event in listener:
-           print(f"Event arrived: {event['event']}")
-
-This is analogous to the following code:
-
-.. code:: python
-
-   with qmp.listener() as listener:
-       while True:
-           event = listener.get()
-           print(f"Event arrived: {event['event']}")
-
-This event stream will never end, so these blocks will never terminate.
-
-
-Using asyncio.Task to concurrently retrieve events
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Since a listener’s event stream will never terminate, it is not likely
-useful to use that form in a script. For longer-running clients, we can
-create event handlers by using `asyncio.Task` to create concurrent
-coroutines:
-
-.. code:: python
-
-   async def print_events(listener):
-       try:
-           async for event in listener:
-               print(f"Event arrived: {event['event']}")
-       except asyncio.CancelledError:
-           return
-
-   with qmp.listener() as listener:
-       task = asyncio.Task(print_events(listener))
-       await qmp.execute('stop')
-       await qmp.execute('cont')
-       task.cancel()
-       await task
-
-However, there is no guarantee that these events will be received by the
-time we leave this context block. Once the context block is exited, the
-listener will cease to hear any new events, and becomes inert.
-
-Be mindful of the timing: the above example will *probably*– but does
-not *guarantee*– that both STOP/RESUMED events will be printed. The
-example below outlines how to use listeners outside of a context block.
-
-
-Using `register_listener()` and `remove_listener()`
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-To create a listener with a longer lifetime, beyond the scope of a
-single block, create a listener and then call `register_listener()`:
-
-.. code:: python
-
-   class MyClient:
-       def __init__(self, qmp):
-           self.qmp = qmp
-           self.listener = EventListener()
-
-       async def print_events(self):
-           try:
-               async for event in self.listener:
-                   print(f"Event arrived: {event['event']}")
-           except asyncio.CancelledError:
-               return
-
-       async def run(self):
-           self.task = asyncio.Task(self.print_events)
-           self.qmp.register_listener(self.listener)
-           await qmp.execute('stop')
-           await qmp.execute('cont')
-
-       async def stop(self):
-           self.task.cancel()
-           await self.task
-           self.qmp.remove_listener(self.listener)
-
-The listener can be deactivated by using `remove_listener()`. When it is
-removed, any possible pending events are cleared and it can be
-re-registered at a later time.
-
-
-Using the built-in all events listener
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The `QMPClient` object creates its own default listener named
-:py:obj:`~Events.events` that can be used for the same purpose without
-having to create your own:
-
-.. code:: python
-
-   async def print_events(listener):
-       try:
-           async for event in listener:
-               print(f"Event arrived: {event['event']}")
-       except asyncio.CancelledError:
-           return
-
-   task = asyncio.Task(print_events(qmp.events))
-
-   await qmp.execute('stop')
-   await qmp.execute('cont')
-
-   task.cancel()
-   await task
-
-
-Using both .get() and async iterators
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The async iterator and `get()` methods pull events from the same FIFO
-queue. If you mix the usage of both, be aware: Events are emitted
-precisely once per listener.
-
-If multiple contexts try to pull events from the same listener instance,
-events are still emitted only precisely once.
-
-This restriction can be lifted by creating additional listeners.
-
-
-Creating multiple listeners
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Additional `EventListener` objects can be created at-will. Each one
-receives its own copy of events, with separate FIFO event queues.
-
-.. code:: python
-
-   my_listener = EventListener()
-   qmp.register_listener(my_listener)
-
-   await qmp.execute('stop')
-   copy1 = await my_listener.get()
-   copy2 = await qmp.events.get()
-
-   assert copy1 == copy2
-
-In this example, we await an event from both a user-created
-`EventListener` and the built-in events listener. Both receive the same
-event.
-
-
-Clearing listeners
-~~~~~~~~~~~~~~~~~~
-
-`EventListener` objects can be cleared, clearing all events seen thus far:
-
-.. code:: python
-
-   await qmp.execute('stop')
-   qmp.events.clear()
-   await qmp.execute('cont')
-   event = await qmp.events.get()
-   assert event['event'] == 'RESUME'
-
-`EventListener` objects are FIFO queues. If events are not consumed,
-they will remain in the queue until they are witnessed or discarded via
-`clear()`. FIFO queues will be drained automatically upon leaving a
-context block, or when calling `remove_listener()`.
-
-
-Accessing listener history
-~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-`EventListener` objects record their history. Even after being cleared,
-you can obtain a record of all events seen so far:
-
-.. code:: python
-
-   await qmp.execute('stop')
-   await qmp.execute('cont')
-   qmp.events.clear()
-
-   assert len(qmp.events.history) == 2
-   assert qmp.events.history[0]['event'] == 'STOP'
-   assert qmp.events.history[1]['event'] == 'RESUME'
-
-The history is updated immediately and does not require the event to be
-witnessed first.
-
-
-Using event filters
-~~~~~~~~~~~~~~~~~~~
-
-`EventListener` objects can be given complex filtering criteria if names
-are not sufficient:
-
-.. code:: python
-
-   def job1_filter(event) -> bool:
-       event_data = event.get('data', {})
-       event_job_id = event_data.get('id')
-       return event_job_id == "job1"
-
-   with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
-       await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
-       async for event in listener:
-           if event['data']['status'] == 'concluded':
-               break
-
-These filters might be most useful when parameterized. `EventListener`
-objects expect a function that takes only a single argument (the raw
-event, as a `Message`) and returns a bool; True if the event should be
-accepted into the stream. You can create a function that adapts this
-signature to accept configuration parameters:
-
-.. code:: python
-
-   def job_filter(job_id: str) -> EventFilter:
-       def filter(event: Message) -> bool:
-           return event['data']['id'] == job_id
-       return filter
-
-   with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
-       await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
-       async for event in listener:
-           if event['data']['status'] == 'concluded':
-               break
-
-
-Activating an existing listener with `listen()`
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Listeners with complex, long configurations can also be created manually
-and activated temporarily by using `listen()` instead of `listener()`:
-
-.. code:: python
-
-   listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
-                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
-                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
-
-   with qmp.listen(listener):
-       await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
-       async for event in listener:
-           print(event)
-           if event['event'] == 'BLOCK_JOB_COMPLETED':
-               break
-
-Any events that are not witnessed by the time the block is left will be
-cleared from the queue; entering the block is an implicit
-`register_listener()` and leaving the block is an implicit
-`remove_listener()`.
-
-
-Activating multiple existing listeners with `listen()`
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-While `listener()` is only capable of creating a single listener,
-`listen()` is capable of activating multiple listeners simultaneously:
-
-.. code:: python
-
-   def job_filter(job_id: str) -> EventFilter:
-       def filter(event: Message) -> bool:
-           return event['data']['id'] == job_id
-       return filter
-
-   jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
-   jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
-
-   with qmp.listen(jobA, jobB):
-       qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
-       qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
-
-       async for event in jobA.get():
-           if event['data']['status'] == 'concluded':
-               break
-       async for event in jobB.get():
-           if event['data']['status'] == 'concluded':
-               break
-
-
-Extending the `EventListener` class
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-In the case that a more specialized `EventListener` is desired to
-provide either more functionality or more compact syntax for specialized
-cases, it can be extended.
-
-One of the key methods to extend or override is
-:py:meth:`~EventListener.accept()`. The default implementation checks an
-incoming message for:
-
-1. A qualifying name, if any :py:obj:`~EventListener.names` were
-   specified at initialization time
-2. That :py:obj:`~EventListener.event_filter()` returns True.
-
-This can be modified however you see fit to change the criteria for
-inclusion in the stream.
-
-For convenience, a ``JobListener`` class could be created that simply
-bakes in configuration so it does not need to be repeated:
-
-.. code:: python
-
-   class JobListener(EventListener):
-       def __init__(self, job_id: str):
-           super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
-                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
-                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
-           self.job_id = job_id
-
-       def accept(self, event) -> bool:
-           if not super().accept(event):
-               return False
-           if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
-               return event['data']['id'] == job_id
-           return event['data']['device'] == job_id
-
-From here on out, you can conjure up a custom-purpose listener that
-listens only for job-related events for a specific job-id easily:
-
-.. code:: python
-
-   listener = JobListener('job4')
-   with qmp.listener(listener):
-       await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
-       async for event in listener:
-           print(event)
-           if event['event'] == 'BLOCK_JOB_COMPLETED':
-               break
-
-
-Experimental Interfaces & Design Issues
----------------------------------------
-
-These interfaces are not ones I am sure I will keep or otherwise modify
-heavily.
-
-qmp.listener()’s type signature
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-`listener()` does not return anything, because it was assumed the caller
-already had a handle to the listener. However, for
-``qmp.listener(EventListener())`` forms, the caller will not have saved
-a handle to the listener.
-
-Because this function can accept *many* listeners, I found it hard to
-accurately type in a way where it could be used in both “one” or “many”
-forms conveniently and in a statically type-safe manner.
-
-Ultimately, I removed the return altogether, but perhaps with more time
-I can work out a way to re-add it.
-
-
-API Reference
--------------
-
-"""
-
-import asyncio
-from contextlib import contextmanager
-import logging
-from typing import (
-    AsyncIterator,
-    Callable,
-    Iterable,
-    Iterator,
-    List,
-    Optional,
-    Set,
-    Tuple,
-    Union,
-)
-
-from .error import QMPError
-from .message import Message
-
-
-EventNames = Union[str, Iterable[str], None]
-EventFilter = Callable[[Message], bool]
-
-
-class ListenerError(QMPError):
-    """
-    Generic error class for `EventListener`-related problems.
-    """
-
-
-class EventListener:
-    """
-    Selectively listens for events with runtime configurable filtering.
-
-    This class is designed to be directly usable for the most common cases,
-    but it can be extended to provide more rigorous control.
-
-    :param names:
-        One or more names of events to listen for.
-        When not provided, listen for ALL events.
-    :param event_filter:
-        An optional event filtering function.
-        When names are also provided, this acts as a secondary filter.
-
-    When ``names`` and ``event_filter`` are both provided, the names
-    will be filtered first, and then the filter function will be called
-    second. The event filter function can assume that the format of the
-    event is a known format.
-    """
-    def __init__(
-        self,
-        names: EventNames = None,
-        event_filter: Optional[EventFilter] = None,
-    ):
-        # Queue of 'heard' events yet to be witnessed by a caller.
-        self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
-
-        # Intended as a historical record, NOT a processing queue or backlog.
-        self._history: List[Message] = []
-
-        #: Primary event filter, based on one or more event names.
-        self.names: Set[str] = set()
-        if isinstance(names, str):
-            self.names.add(names)
-        elif names is not None:
-            self.names.update(names)
-
-        #: Optional, secondary event filter.
-        self.event_filter: Optional[EventFilter] = event_filter
-
-    @property
-    def history(self) -> Tuple[Message, ...]:
-        """
-        A read-only history of all events seen so far.
-
-        This represents *every* event, including those not yet witnessed
-        via `get()` or ``async for``. It persists between `clear()`
-        calls and is immutable.
-        """
-        return tuple(self._history)
-
-    def accept(self, event: Message) -> bool:
-        """
-        Determine if this listener accepts this event.
-
-        This method determines which events will appear in the stream.
-        The default implementation simply checks the event against the
-        list of names and the event_filter to decide if this
-        `EventListener` accepts a given event. It can be
-        overridden/extended to provide custom listener behavior.
-
-        User code is not expected to need to invoke this method.
-
-        :param event: The event under consideration.
-        :return: `True`, if this listener accepts this event.
-        """
-        name_ok = (not self.names) or (event['event'] in self.names)
-        return name_ok and (
-            (not self.event_filter) or self.event_filter(event)
-        )
-
-    async def put(self, event: Message) -> None:
-        """
-        Conditionally put a new event into the FIFO queue.
-
-        This method is not designed to be invoked from user code, and it
-        should not need to be overridden. It is a public interface so
-        that `QMPClient` has an interface by which it can inform
-        registered listeners of new events.
-
-        The event will be put into the queue if
-        :py:meth:`~EventListener.accept()` returns `True`.
-
-        :param event: The new event to put into the FIFO queue.
-        """
-        if not self.accept(event):
-            return
-
-        self._history.append(event)
-        await self._queue.put(event)
-
-    async def get(self) -> Message:
-        """
-        Wait for the very next event in this stream.
-
-        If one is already available, return that one.
-        """
-        return await self._queue.get()
-
-    def empty(self) -> bool:
-        """
-        Return `True` if there are no pending events.
-        """
-        return self._queue.empty()
-
-    def clear(self) -> List[Message]:
-        """
-        Clear this listener of all pending events.
-
-        Called when an `EventListener` is being unregistered, this clears the
-        pending FIFO queue synchronously. It can be also be used to
-        manually clear any pending events, if desired.
-
-        :return: The cleared events, if any.
-
-        .. warning::
-            Take care when discarding events. Cleared events will be
-            silently tossed on the floor. All events that were ever
-            accepted by this listener are visible in `history()`.
-        """
-        events = []
-        while True:
-            try:
-                events.append(self._queue.get_nowait())
-            except asyncio.QueueEmpty:
-                break
-
-        return events
-
-    def __aiter__(self) -> AsyncIterator[Message]:
-        return self
-
-    async def __anext__(self) -> Message:
-        """
-        Enables the `EventListener` to function as an async iterator.
-
-        It may be used like this:
-
-        .. code:: python
-
-            async for event in listener:
-                print(event)
-
-        These iterators will never terminate of their own accord; you
-        must provide break conditions or otherwise prepare to run them
-        in an `asyncio.Task` that can be cancelled.
-        """
-        return await self.get()
-
-
-class Events:
-    """
-    Events is a mix-in class that adds event functionality to the QMP class.
-
-    It's designed specifically as a mix-in for `QMPClient`, and it
-    relies upon the class it is being mixed into having a 'logger'
-    property.
-    """
-    def __init__(self) -> None:
-        self._listeners: List[EventListener] = []
-
-        #: Default, all-events `EventListener`.
-        self.events: EventListener = EventListener()
-        self.register_listener(self.events)
-
-        # Parent class needs to have a logger
-        self.logger: logging.Logger
-
-    async def _event_dispatch(self, msg: Message) -> None:
-        """
-        Given a new event, propagate it to all of the active listeners.
-
-        :param msg: The event to propagate.
-        """
-        for listener in self._listeners:
-            await listener.put(msg)
-
-    def register_listener(self, listener: EventListener) -> None:
-        """
-        Register and activate an `EventListener`.
-
-        :param listener: The listener to activate.
-        :raise ListenerError: If the given listener is already registered.
-        """
-        if listener in self._listeners:
-            raise ListenerError("Attempted to re-register existing listener")
-        self.logger.debug("Registering %s.", str(listener))
-        self._listeners.append(listener)
-
-    def remove_listener(self, listener: EventListener) -> None:
-        """
-        Unregister and deactivate an `EventListener`.
-
-        The removed listener will have its pending events cleared via
-        `clear()`. The listener can be re-registered later when
-        desired.
-
-        :param listener: The listener to deactivate.
-        :raise ListenerError: If the given listener is not registered.
-        """
-        if listener == self.events:
-            raise ListenerError("Cannot remove the default listener.")
-        self.logger.debug("Removing %s.", str(listener))
-        listener.clear()
-        self._listeners.remove(listener)
-
-    @contextmanager
-    def listen(self, *listeners: EventListener) -> Iterator[None]:
-        r"""
-        Context manager: Temporarily listen with an `EventListener`.
-
-        Accepts one or more `EventListener` objects and registers them,
-        activating them for the duration of the context block.
-
-        `EventListener` objects will have any pending events in their
-        FIFO queue cleared upon exiting the context block, when they are
-        deactivated.
-
-        :param \*listeners: One or more EventListeners to activate.
-        :raise ListenerError: If the given listener(s) are already active.
-        """
-        _added = []
-
-        try:
-            for listener in listeners:
-                self.register_listener(listener)
-                _added.append(listener)
-
-            yield
-
-        finally:
-            for listener in _added:
-                self.remove_listener(listener)
-
-    @contextmanager
-    def listener(
-        self,
-        names: EventNames = (),
-        event_filter: Optional[EventFilter] = None
-    ) -> Iterator[EventListener]:
-        """
-        Context manager: Temporarily listen with a new `EventListener`.
-
-        Creates an `EventListener` object and registers it, activating
-        it for the duration of the context block.
-
-        :param names:
-            One or more names of events to listen for.
-            When not provided, listen for ALL events.
-        :param event_filter:
-            An optional event filtering function.
-            When names are also provided, this acts as a secondary filter.
-
-        :return: The newly created and active `EventListener`.
-        """
-        listener = EventListener(names, event_filter)
-        with self.listen(listener):
-            yield listener
diff --git a/python/qemu/qmp/legacy.py b/python/qemu/qmp/legacy.py
deleted file mode 100644
index 6c250cd46a..0000000000
--- a/python/qemu/qmp/legacy.py
+++ /dev/null
@@ -1,319 +0,0 @@
-"""
-(Legacy) Sync QMP Wrapper
-
-This module provides the `QEMUMonitorProtocol` class, which is a
-synchronous wrapper around `QMPClient`.
-
-Its design closely resembles that of the original QEMUMonitorProtocol
-class, originally written by Luiz Capitulino.
-"""
-
-# Copyright (C) 2009, 2010, 2021 Red Hat Inc.
-#
-# Authors:
-#  Luiz Capitulino <lcapitulino@redhat.com>
-#  John Snow <jsnow@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2.
-# See the COPYING file in the top-level directory.
-
-
-import asyncio
-from types import TracebackType
-from typing import (
-    Any,
-    Awaitable,
-    Dict,
-    List,
-    Optional,
-    Type,
-    TypeVar,
-    Union,
-)
-
-from .error import QMPError
-from .protocol import Runstate, SocketAddrT
-from .qmp_client import QMPClient
-
-
-#: QMPMessage is an entire QMP message of any kind.
-QMPMessage = Dict[str, Any]
-
-#: QMPReturnValue is the 'return' value of a command.
-QMPReturnValue = object
-
-#: QMPObject is any object in a QMP message.
-QMPObject = Dict[str, object]
-
-# QMPMessage can be outgoing commands or incoming events/returns.
-# QMPReturnValue is usually a dict/json object, but due to QAPI's
-# 'returns-whitelist', it can actually be anything.
-#
-# {'return': {}} is a QMPMessage,
-# {} is the QMPReturnValue.
-
-
-class QMPBadPortError(QMPError):
-    """
-    Unable to parse socket address: Port was non-numerical.
-    """
-
-
-class QEMUMonitorProtocol:
-    """
-    Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP)
-    and then allow to handle commands and events.
-
-    :param address:  QEMU address, can be either a unix socket path (string)
-                     or a tuple in the form ( address, port ) for a TCP
-                     connection
-    :param server:   Deprecated, ignored. (See 'accept')
-    :param nickname: Optional nickname used for logging.
-
-    ..note::
-        No connection is established during `__init__`, this is done by
-        the `connect()` or `accept()` methods.
-    """
-
-    def __init__(self, address: SocketAddrT,
-                 server: bool = False,  # pylint: disable=unused-argument
-                 nickname: Optional[str] = None):
-
-        # pylint: disable=super-init-not-called
-        self._aqmp = QMPClient(nickname)
-        self._aloop = asyncio.get_event_loop()
-        self._address = address
-        self._timeout: Optional[float] = None
-
-    _T = TypeVar('_T')
-
-    def _sync(
-            self, future: Awaitable[_T], timeout: Optional[float] = None
-    ) -> _T:
-        return self._aloop.run_until_complete(
-            asyncio.wait_for(future, timeout=timeout)
-        )
-
-    def _get_greeting(self) -> Optional[QMPMessage]:
-        if self._aqmp.greeting is not None:
-            # pylint: disable=protected-access
-            return self._aqmp.greeting._asdict()
-        return None
-
-    def __enter__(self: _T) -> _T:
-        # Implement context manager enter function.
-        return self
-
-    def __exit__(self,
-                 # pylint: disable=duplicate-code
-                 # see https://github.com/PyCQA/pylint/issues/3619
-                 exc_type: Optional[Type[BaseException]],
-                 exc_val: Optional[BaseException],
-                 exc_tb: Optional[TracebackType]) -> None:
-        # Implement context manager exit function.
-        self.close()
-
-    @classmethod
-    def parse_address(cls, address: str) -> SocketAddrT:
-        """
-        Parse a string into a QMP address.
-
-        Figure out if the argument is in the port:host form.
-        If it's not, it's probably a file path.
-        """
-        components = address.split(':')
-        if len(components) == 2:
-            try:
-                port = int(components[1])
-            except ValueError:
-                msg = f"Bad port: '{components[1]}' in '{address}'."
-                raise QMPBadPortError(msg) from None
-            return (components[0], port)
-
-        # Treat as filepath.
-        return address
-
-    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
-        """
-        Connect to the QMP Monitor and perform capabilities negotiation.
-
-        :return: QMP greeting dict, or None if negotiate is false
-        :raise ConnectError: on connection errors
-        """
-        self._aqmp.await_greeting = negotiate
-        self._aqmp.negotiate = negotiate
-
-        self._sync(
-            self._aqmp.connect(self._address)
-        )
-        return self._get_greeting()
-
-    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
-        """
-        Await connection from QMP Monitor and perform capabilities negotiation.
-
-        :param timeout:
-            timeout in seconds (nonnegative float number, or None).
-            If None, there is no timeout, and this may block forever.
-
-        :return: QMP greeting dict
-        :raise ConnectError: on connection errors
-        """
-        self._aqmp.await_greeting = True
-        self._aqmp.negotiate = True
-
-        self._sync(
-            self._aqmp.accept(self._address),
-            timeout
-        )
-
-        ret = self._get_greeting()
-        assert ret is not None
-        return ret
-
-    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
-        """
-        Send a QMP command to the QMP Monitor.
-
-        :param qmp_cmd: QMP command to be sent as a Python dict
-        :return: QMP response as a Python dict
-        """
-        return dict(
-            self._sync(
-                # pylint: disable=protected-access
-
-                # _raw() isn't a public API, because turning off
-                # automatic ID assignment is discouraged. For
-                # compatibility with iotests *only*, do it anyway.
-                self._aqmp._raw(qmp_cmd, assign_id=False),
-                self._timeout
-            )
-        )
-
-    def cmd(self, name: str,
-            args: Optional[Dict[str, object]] = None,
-            cmd_id: Optional[object] = None) -> QMPMessage:
-        """
-        Build a QMP command and send it to the QMP Monitor.
-
-        :param name: command name (string)
-        :param args: command arguments (dict)
-        :param cmd_id: command id (dict, list, string or int)
-        """
-        qmp_cmd: QMPMessage = {'execute': name}
-        if args:
-            qmp_cmd['arguments'] = args
-        if cmd_id:
-            qmp_cmd['id'] = cmd_id
-        return self.cmd_obj(qmp_cmd)
-
-    def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
-        """
-        Build and send a QMP command to the monitor, report errors if any
-        """
-        return self._sync(
-            self._aqmp.execute(cmd, kwds),
-            self._timeout
-        )
-
-    def pull_event(self,
-                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
-        """
-        Pulls a single event.
-
-        :param wait:
-            If False or 0, do not wait. Return None if no events ready.
-            If True, wait forever until the next event.
-            Otherwise, wait for the specified number of seconds.
-
-        :raise asyncio.TimeoutError:
-            When a timeout is requested and the timeout period elapses.
-
-        :return: The first available QMP event, or None.
-        """
-        if not wait:
-            # wait is False/0: "do not wait, do not except."
-            if self._aqmp.events.empty():
-                return None
-
-        # If wait is 'True', wait forever. If wait is False/0, the events
-        # queue must not be empty; but it still needs some real amount
-        # of time to complete.
-        timeout = None
-        if wait and isinstance(wait, float):
-            timeout = wait
-
-        return dict(
-            self._sync(
-                self._aqmp.events.get(),
-                timeout
-            )
-        )
-
-    def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
-        """
-        Get a list of QMP events and clear all pending events.
-
-        :param wait:
-            If False or 0, do not wait. Return None if no events ready.
-            If True, wait until we have at least one event.
-            Otherwise, wait for up to the specified number of seconds for at
-            least one event.
-
-        :raise asyncio.TimeoutError:
-            When a timeout is requested and the timeout period elapses.
-
-        :return: A list of QMP events.
-        """
-        events = [dict(x) for x in self._aqmp.events.clear()]
-        if events:
-            return events
-
-        event = self.pull_event(wait)
-        return [event] if event is not None else []
-
-    def clear_events(self) -> None:
-        """Clear current list of pending events."""
-        self._aqmp.events.clear()
-
-    def close(self) -> None:
-        """Close the connection."""
-        self._sync(
-            self._aqmp.disconnect()
-        )
-
-    def settimeout(self, timeout: Optional[float]) -> None:
-        """
-        Set the timeout for QMP RPC execution.
-
-        This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
-        The `accept`, `pull_event` and `get_event` methods have their
-        own configurable timeouts.
-
-        :param timeout:
-            timeout in seconds, or None.
-            None will wait indefinitely.
-        """
-        self._timeout = timeout
-
-    def send_fd_scm(self, fd: int) -> None:
-        """
-        Send a file descriptor to the remote via SCM_RIGHTS.
-        """
-        self._aqmp.send_fd_scm(fd)
-
-    def __del__(self) -> None:
-        if self._aqmp.runstate == Runstate.IDLE:
-            return
-
-        if not self._aloop.is_running():
-            self.close()
-        else:
-            # Garbage collection ran while the event loop was running.
-            # Nothing we can do about it now, but if we don't raise our
-            # own error, the user will be treated to a lot of traceback
-            # they might not understand.
-            raise QMPError(
-                "QEMUMonitorProtocol.close()"
-                " was not called before object was garbage collected"
-            )
diff --git a/python/qemu/qmp/message.py b/python/qemu/qmp/message.py
deleted file mode 100644
index f76ccc9074..0000000000
--- a/python/qemu/qmp/message.py
+++ /dev/null
@@ -1,209 +0,0 @@
-"""
-QMP Message Format
-
-This module provides the `Message` class, which represents a single QMP
-message sent to or from the server.
-"""
-
-import json
-from json import JSONDecodeError
-from typing import (
-    Dict,
-    Iterator,
-    Mapping,
-    MutableMapping,
-    Optional,
-    Union,
-)
-
-from .error import ProtocolError
-
-
-class Message(MutableMapping[str, object]):
-    """
-    Represents a single QMP protocol message.
-
-    QMP uses JSON objects as its basic communicative unit; so this
-    Python object is a :py:obj:`~collections.abc.MutableMapping`. It may
-    be instantiated from either another mapping (like a `dict`), or from
-    raw `bytes` that still need to be deserialized.
-
-    Once instantiated, it may be treated like any other MutableMapping::
-
-        >>> msg = Message(b'{"hello": "world"}')
-        >>> assert msg['hello'] == 'world'
-        >>> msg['id'] = 'foobar'
-        >>> print(msg)
-        {
-          "hello": "world",
-          "id": "foobar"
-        }
-
-    It can be converted to `bytes`::
-
-        >>> msg = Message({"hello": "world"})
-        >>> print(bytes(msg))
-        b'{"hello":"world","id":"foobar"}'
-
-    Or back into a garden-variety `dict`::
-
-       >>> dict(msg)
-       {'hello': 'world'}
-
-
-    :param value: Initial value, if any.
-    :param eager:
-        When `True`, attempt to serialize or deserialize the initial value
-        immediately, so that conversion exceptions are raised during
-        the call to ``__init__()``.
-    """
-    # pylint: disable=too-many-ancestors
-
-    def __init__(self,
-                 value: Union[bytes, Mapping[str, object]] = b'{}', *,
-                 eager: bool = True):
-        self._data: Optional[bytes] = None
-        self._obj: Optional[Dict[str, object]] = None
-
-        if isinstance(value, bytes):
-            self._data = value
-            if eager:
-                self._obj = self._deserialize(self._data)
-        else:
-            self._obj = dict(value)
-            if eager:
-                self._data = self._serialize(self._obj)
-
-    # Methods necessary to implement the MutableMapping interface, see:
-    # 
https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping
-
-    # We get pop, popitem, clear, update, setdefault, __contains__,
-    # keys, items, values, get, __eq__ and __ne__ for free.
-
-    def __getitem__(self, key: str) -> object:
-        return self._object[key]
-
-    def __setitem__(self, key: str, value: object) -> None:
-        self._object[key] = value
-        self._data = None
-
-    def __delitem__(self, key: str) -> None:
-        del self._object[key]
-        self._data = None
-
-    def __iter__(self) -> Iterator[str]:
-        return iter(self._object)
-
-    def __len__(self) -> int:
-        return len(self._object)
-
-    # Dunder methods not related to MutableMapping:
-
-    def __repr__(self) -> str:
-        if self._obj is not None:
-            return f"Message({self._object!r})"
-        return f"Message({bytes(self)!r})"
-
-    def __str__(self) -> str:
-        """Pretty-printed representation of this QMP message."""
-        return json.dumps(self._object, indent=2)
-
-    def __bytes__(self) -> bytes:
-        """bytes representing this QMP message."""
-        if self._data is None:
-            self._data = self._serialize(self._obj or {})
-        return self._data
-
-    # Conversion Methods
-
-    @property
-    def _object(self) -> Dict[str, object]:
-        """
-        A `dict` representing this QMP message.
-
-        Generated on-demand, if required. This property is private
-        because it returns an object that could be used to invalidate
-        the internal state of the `Message` object.
-        """
-        if self._obj is None:
-            self._obj = self._deserialize(self._data or b'{}')
-        return self._obj
-
-    @classmethod
-    def _serialize(cls, value: object) -> bytes:
-        """
-        Serialize a JSON object as `bytes`.
-
-        :raise ValueError: When the object cannot be serialized.
-        :raise TypeError: When the object cannot be serialized.
-
-        :return: `bytes` ready to be sent over the wire.
-        """
-        return json.dumps(value, separators=(',', ':')).encode('utf-8')
-
-    @classmethod
-    def _deserialize(cls, data: bytes) -> Dict[str, object]:
-        """
-        Deserialize JSON `bytes` into a native Python `dict`.
-
-        :raise DeserializationError:
-            If JSON deserialization fails for any reason.
-        :raise UnexpectedTypeError:
-            If the data does not represent a JSON object.
-
-        :return: A `dict` representing this QMP message.
-        """
-        try:
-            obj = json.loads(data)
-        except JSONDecodeError as err:
-            emsg = "Failed to deserialize QMP message."
-            raise DeserializationError(emsg, data) from err
-        if not isinstance(obj, dict):
-            raise UnexpectedTypeError(
-                "QMP message is not a JSON object.",
-                obj
-            )
-        return obj
-
-
-class DeserializationError(ProtocolError):
-    """
-    A QMP message was not understood as JSON.
-
-    When this Exception is raised, ``__cause__`` will be set to the
-    `json.JSONDecodeError` Exception, which can be interrogated for
-    further details.
-
-    :param error_message: Human-readable string describing the error.
-    :param raw: The raw `bytes` that prompted the failure.
-    """
-    def __init__(self, error_message: str, raw: bytes):
-        super().__init__(error_message)
-        #: The raw `bytes` that were not understood as JSON.
-        self.raw: bytes = raw
-
-    def __str__(self) -> str:
-        return "\n".join([
-            super().__str__(),
-            f"  raw bytes were: {str(self.raw)}",
-        ])
-
-
-class UnexpectedTypeError(ProtocolError):
-    """
-    A QMP message was JSON, but not a JSON object.
-
-    :param error_message: Human-readable string describing the error.
-    :param value: The deserialized JSON value that wasn't an object.
-    """
-    def __init__(self, error_message: str, value: object):
-        super().__init__(error_message)
-        #: The JSON value that was expected to be an object.
-        self.value: object = value
-
-    def __str__(self) -> str:
-        strval = json.dumps(self.value, indent=2)
-        return "\n".join([
-            super().__str__(),
-            f"  json value was: {strval}",
-        ])
diff --git a/python/qemu/qmp/models.py b/python/qemu/qmp/models.py
deleted file mode 100644
index de87f87804..0000000000
--- a/python/qemu/qmp/models.py
+++ /dev/null
@@ -1,146 +0,0 @@
-"""
-QMP Data Models
-
-This module provides simplistic data classes that represent the few
-structures that the QMP spec mandates; they are used to verify incoming
-data to make sure it conforms to spec.
-"""
-# pylint: disable=too-few-public-methods
-
-from collections import abc
-import copy
-from typing import (
-    Any,
-    Dict,
-    Mapping,
-    Optional,
-    Sequence,
-)
-
-
-class Model:
-    """
-    Abstract data model, representing some QMP object of some kind.
-
-    :param raw: The raw object to be validated.
-    :raise KeyError: If any required fields are absent.
-    :raise TypeError: If any required fields have the wrong type.
-    """
-    def __init__(self, raw: Mapping[str, Any]):
-        self._raw = raw
-
-    def _check_key(self, key: str) -> None:
-        if key not in self._raw:
-            raise KeyError(f"'{self._name}' object requires '{key}' member")
-
-    def _check_value(self, key: str, type_: type, typestr: str) -> None:
-        assert key in self._raw
-        if not isinstance(self._raw[key], type_):
-            raise TypeError(
-                f"'{self._name}' member '{key}' must be a {typestr}"
-            )
-
-    def _check_member(self, key: str, type_: type, typestr: str) -> None:
-        self._check_key(key)
-        self._check_value(key, type_, typestr)
-
-    @property
-    def _name(self) -> str:
-        return type(self).__name__
-
-    def __repr__(self) -> str:
-        return f"{self._name}({self._raw!r})"
-
-
-class Greeting(Model):
-    """
-    Defined in qmp-spec.txt, section 2.2, "Server Greeting".
-
-    :param raw: The raw Greeting object.
-    :raise KeyError: If any required fields are absent.
-    :raise TypeError: If any required fields have the wrong type.
-    """
-    def __init__(self, raw: Mapping[str, Any]):
-        super().__init__(raw)
-        #: 'QMP' member
-        self.QMP: QMPGreeting  # pylint: disable=invalid-name
-
-        self._check_member('QMP', abc.Mapping, "JSON object")
-        self.QMP = QMPGreeting(self._raw['QMP'])
-
-    def _asdict(self) -> Dict[str, object]:
-        """
-        For compatibility with the iotests sync QMP wrapper.
-
-        The legacy QMP interface needs Greetings as a garden-variety Dict.
-
-        This interface is private in the hopes that it will be able to
-        be dropped again in the near-future. Caller beware!
-        """
-        return dict(copy.deepcopy(self._raw))
-
-
-class QMPGreeting(Model):
-    """
-    Defined in qmp-spec.txt, section 2.2, "Server Greeting".
-
-    :param raw: The raw QMPGreeting object.
-    :raise KeyError: If any required fields are absent.
-    :raise TypeError: If any required fields have the wrong type.
-    """
-    def __init__(self, raw: Mapping[str, Any]):
-        super().__init__(raw)
-        #: 'version' member
-        self.version: Mapping[str, object]
-        #: 'capabilities' member
-        self.capabilities: Sequence[object]
-
-        self._check_member('version', abc.Mapping, "JSON object")
-        self.version = self._raw['version']
-
-        self._check_member('capabilities', abc.Sequence, "JSON array")
-        self.capabilities = self._raw['capabilities']
-
-
-class ErrorResponse(Model):
-    """
-    Defined in qmp-spec.txt, section 2.4.2, "error".
-
-    :param raw: The raw ErrorResponse object.
-    :raise KeyError: If any required fields are absent.
-    :raise TypeError: If any required fields have the wrong type.
-    """
-    def __init__(self, raw: Mapping[str, Any]):
-        super().__init__(raw)
-        #: 'error' member
-        self.error: ErrorInfo
-        #: 'id' member
-        self.id: Optional[object] = None  # pylint: disable=invalid-name
-
-        self._check_member('error', abc.Mapping, "JSON object")
-        self.error = ErrorInfo(self._raw['error'])
-
-        if 'id' in raw:
-            self.id = raw['id']
-
-
-class ErrorInfo(Model):
-    """
-    Defined in qmp-spec.txt, section 2.4.2, "error".
-
-    :param raw: The raw ErrorInfo object.
-    :raise KeyError: If any required fields are absent.
-    :raise TypeError: If any required fields have the wrong type.
-    """
-    def __init__(self, raw: Mapping[str, Any]):
-        super().__init__(raw)
-        #: 'class' member, with an underscore to avoid conflicts in Python.
-        self.class_: str
-        #: 'desc' member
-        self.desc: str
-
-        self._check_member('class', str, "string")
-        self.class_ = self._raw['class']
-
-        self._check_member('desc', str, "string")
-        self.desc = self._raw['desc']
diff --git a/python/qemu/qmp/protocol.py b/python/qemu/qmp/protocol.py
deleted file mode 100644
index 3bae528d59..0000000000
--- a/python/qemu/qmp/protocol.py
+++ /dev/null
@@ -1,922 +0,0 @@
-"""
-Generic Asynchronous Message-based Protocol Support
-
-This module provides a generic framework for sending and receiving
-messages over an asyncio stream. `AsyncProtocol` is an abstract class
-that implements the core mechanisms of a simple send/receive protocol,
-and is designed to be extended.
-
-In this package, it is used as the implementation for the `QMPClient`
-class.
-"""
-
-import asyncio
-from asyncio import StreamReader, StreamWriter
-from enum import Enum
-from functools import wraps
-import logging
-from ssl import SSLContext
-from typing import (
-    Any,
-    Awaitable,
-    Callable,
-    Generic,
-    List,
-    Optional,
-    Tuple,
-    TypeVar,
-    Union,
-    cast,
-)
-
-from .error import QMPError
-from .util import (
-    bottom_half,
-    create_task,
-    exception_summary,
-    flush,
-    is_closing,
-    pretty_traceback,
-    upper_half,
-    wait_closed,
-)
-
-
-T = TypeVar('T')
-_U = TypeVar('_U')
-_TaskFN = Callable[[], Awaitable[None]]  # aka ``async def func() -> None``
-
-InternetAddrT = Tuple[str, int]
-UnixAddrT = str
-SocketAddrT = Union[UnixAddrT, InternetAddrT]
-
-
-class Runstate(Enum):
-    """Protocol session runstate."""
-
-    #: Fully quiesced and disconnected.
-    IDLE = 0
-    #: In the process of connecting or establishing a session.
-    CONNECTING = 1
-    #: Fully connected and active session.
-    RUNNING = 2
-    #: In the process of disconnecting.
-    #: Runstate may be returned to `IDLE` by calling `disconnect()`.
-    DISCONNECTING = 3
-
-
-class ConnectError(QMPError):
-    """
-    Raised when the initial connection process has failed.
-
-    This Exception always wraps a "root cause" exception that can be
-    interrogated for additional information.
-
-    :param error_message: Human-readable string describing the error.
-    :param exc: The root-cause exception.
-    """
-    def __init__(self, error_message: str, exc: Exception):
-        super().__init__(error_message)
-        #: Human-readable error string
-        self.error_message: str = error_message
-        #: Wrapped root cause exception
-        self.exc: Exception = exc
-
-    def __str__(self) -> str:
-        cause = str(self.exc)
-        if not cause:
-            # If there's no error string, use the exception name.
-            cause = exception_summary(self.exc)
-        return f"{self.error_message}: {cause}"
-
-
-class StateError(QMPError):
-    """
-    An API command (connect, execute, etc) was issued at an inappropriate time.
-
-    This error is raised when a command like
-    :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
-    time.
-
-    :param error_message: Human-readable string describing the state violation.
-    :param state: The actual `Runstate` seen at the time of the violation.
-    :param required: The `Runstate` required to process this command.
-    """
-    def __init__(self, error_message: str,
-                 state: Runstate, required: Runstate):
-        super().__init__(error_message)
-        self.error_message = error_message
-        self.state = state
-        self.required = required
-
-
-F = TypeVar('F', bound=Callable[..., Any])  # pylint: disable=invalid-name
-
-
-# Don't Panic.
-def require(required_state: Runstate) -> Callable[[F], F]:
-    """
-    Decorator: protect a method so it can only be run in a certain `Runstate`.
-
-    :param required_state: The `Runstate` required to invoke this method.
-    :raise StateError: When the required `Runstate` is not met.
-    """
-    def _decorator(func: F) -> F:
-        # _decorator is the decorator that is built by calling the
-        # require() decorator factory; e.g.:
-        #
-        # @require(Runstate.IDLE) def foo(): ...
-        # will replace 'foo' with the result of '_decorator(foo)'.
-
-        @wraps(func)
-        def _wrapper(proto: 'AsyncProtocol[Any]',
-                     *args: Any, **kwargs: Any) -> Any:
-            # _wrapper is the function that gets executed prior to the
-            # decorated method.
-
-            name = type(proto).__name__
-
-            if proto.runstate != required_state:
-                if proto.runstate == Runstate.CONNECTING:
-                    emsg = f"{name} is currently connecting."
-                elif proto.runstate == Runstate.DISCONNECTING:
-                    emsg = (f"{name} is disconnecting."
-                            " Call disconnect() to return to IDLE state.")
-                elif proto.runstate == Runstate.RUNNING:
-                    emsg = f"{name} is already connected and running."
-                elif proto.runstate == Runstate.IDLE:
-                    emsg = f"{name} is disconnected and idle."
-                else:
-                    assert False
-                raise StateError(emsg, proto.runstate, required_state)
-            # No StateError, so call the wrapped method.
-            return func(proto, *args, **kwargs)
-
-        # Return the decorated method;
-        # Transforming Func to Decorated[Func].
-        return cast(F, _wrapper)
-
-    # Return the decorator instance from the decorator factory. Phew!
-    return _decorator
-
-
-class AsyncProtocol(Generic[T]):
-    """
-    AsyncProtocol implements a generic async message-based protocol.
-
-    This protocol assumes the basic unit of information transfer between
-    client and server is a "message", the details of which are left up
-    to the implementation. It assumes the sending and receiving of these
-    messages is full-duplex and not necessarily correlated; i.e. it
-    supports asynchronous inbound messages.
-
-    It is designed to be extended by a specific protocol which provides
-    the implementations for how to read and send messages. These must be
-    defined in `_do_recv()` and `_do_send()`, respectively.
-
-    Other callbacks have a default implementation, but are intended to be
-    either extended or overridden:
-
-     - `_establish_session`:
-         The base implementation starts the reader/writer tasks.
-         A protocol implementation can override this call, inserting
-         actions to be taken prior to starting the reader/writer tasks
-         before the super() call; actions needing to occur afterwards
-         can be written after the super() call.
-     - `_on_message`:
-         Actions to be performed when a message is received.
-     - `_cb_outbound`:
-         Logging/Filtering hook for all outbound messages.
-     - `_cb_inbound`:
-         Logging/Filtering hook for all inbound messages.
-         This hook runs *before* `_on_message()`.
-
-    :param name:
-        Name used for logging messages, if any. By default, messages
-        will log to 'qemu.qmp.protocol', but each individual connection
-        can be given its own logger by giving it a name; messages will
-        then log to 'qemu.qmp.protocol.${name}'.
-    """
-    # pylint: disable=too-many-instance-attributes
-
-    #: Logger object for debugging messages from this connection.
-    logger = logging.getLogger(__name__)
-
-    # Maximum allowable size of read buffer
-    _limit = (64 * 1024)
-
-    # -------------------------
-    # Section: Public interface
-    # -------------------------
-
-    def __init__(self, name: Optional[str] = None) -> None:
-        #: The nickname for this connection, if any.
-        self.name: Optional[str] = name
-        if self.name is not None:
-            self.logger = self.logger.getChild(self.name)
-
-        # stream I/O
-        self._reader: Optional[StreamReader] = None
-        self._writer: Optional[StreamWriter] = None
-
-        # Outbound Message queue
-        self._outgoing: asyncio.Queue[T]
-
-        # Special, long-running tasks:
-        self._reader_task: Optional[asyncio.Future[None]] = None
-        self._writer_task: Optional[asyncio.Future[None]] = None
-
-        # Aggregate of the above two tasks, used for Exception management.
-        self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None
-
-        #: Disconnect task. The disconnect implementation runs in a task
-        #: so that asynchronous disconnects (initiated by the
-        #: reader/writer) are allowed to wait for the reader/writers to
-        #: exit.
-        self._dc_task: Optional[asyncio.Future[None]] = None
-
-        self._runstate = Runstate.IDLE
-        self._runstate_changed: Optional[asyncio.Event] = None
-
-    def __repr__(self) -> str:
-        cls_name = type(self).__name__
-        tokens = []
-        if self.name is not None:
-            tokens.append(f"name={self.name!r}")
-        tokens.append(f"runstate={self.runstate.name}")
-        return f"<{cls_name} {' '.join(tokens)}>"
-
-    @property  # @upper_half
-    def runstate(self) -> Runstate:
-        """The current `Runstate` of the connection."""
-        return self._runstate
-
-    @upper_half
-    async def runstate_changed(self) -> Runstate:
-        """
-        Wait for the `runstate` to change, then return that runstate.
-        """
-        await self._runstate_event.wait()
-        return self.runstate
-
-    @upper_half
-    @require(Runstate.IDLE)
-    async def accept(self, address: SocketAddrT,
-                     ssl: Optional[SSLContext] = None) -> None:
-        """
-        Accept a connection and begin processing message queues.
-
-        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
-
-        :param address:
-            Address to listen to; UNIX socket path or TCP address/port.
-        :param ssl: SSL context to use, if any.
-
-        :raise StateError: When the `Runstate` is not `IDLE`.
-        :raise ConnectError: If a connection could not be accepted.
-        """
-        await self._new_session(address, ssl, accept=True)
-
-    @upper_half
-    @require(Runstate.IDLE)
-    async def connect(self, address: SocketAddrT,
-                      ssl: Optional[SSLContext] = None) -> None:
-        """
-        Connect to the server and begin processing message queues.
-
-        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
-
-        :param address:
-            Address to connect to; UNIX socket path or TCP address/port.
-        :param ssl: SSL context to use, if any.
-
-        :raise StateError: When the `Runstate` is not `IDLE`.
-        :raise ConnectError: If a connection cannot be made to the server.
-        """
-        await self._new_session(address, ssl)
-
-    @upper_half
-    async def disconnect(self) -> None:
-        """
-        Disconnect and wait for all tasks to fully stop.
-
-        If there was an exception that caused the reader/writers to
-        terminate prematurely, it will be raised here.
-
-        :raise Exception: When the reader or writer terminate unexpectedly.
-        """
-        self.logger.debug("disconnect() called.")
-        self._schedule_disconnect()
-        await self._wait_disconnect()
-
-    # --------------------------
-    # Section: Session machinery
-    # --------------------------
-
-    @property
-    def _runstate_event(self) -> asyncio.Event:
-        # asyncio.Event() objects should not be created prior to entrance into
-        # an event loop, so we can ensure we create it in the correct context.
-        # Create it on-demand *only* at the behest of an 'async def' method.
-        if not self._runstate_changed:
-            self._runstate_changed = asyncio.Event()
-        return self._runstate_changed
-
-    @upper_half
-    @bottom_half
-    def _set_state(self, state: Runstate) -> None:
-        """
-        Change the `Runstate` of the protocol connection.
-
-        Signals the `runstate_changed` event.
-        """
-        if state == self._runstate:
-            return
-
-        self.logger.debug("Transitioning from '%s' to '%s'.",
-                          str(self._runstate), str(state))
-        self._runstate = state
-        self._runstate_event.set()
-        self._runstate_event.clear()
-
-    @upper_half
-    async def _new_session(self,
-                           address: SocketAddrT,
-                           ssl: Optional[SSLContext] = None,
-                           accept: bool = False) -> None:
-        """
-        Establish a new connection and initialize the session.
-
-        Connect or accept a new connection, then begin the protocol
-        session machinery. If this call fails, `runstate` is guaranteed
-        to be set back to `IDLE`.
-
-        :param address:
-            Address to connect to/listen on;
-            UNIX socket path or TCP address/port.
-        :param ssl: SSL context to use, if any.
-        :param accept: Accept a connection instead of connecting when `True`.
-
-        :raise ConnectError:
-            When a connection or session cannot be established.
-
-            This exception will wrap a more concrete one. In most cases,
-            the wrapped exception will be `OSError` or `EOFError`. If a
-            protocol-level failure occurs while establishing a new
-            session, the wrapped error may also be an `QMPError`.
-        """
-        assert self.runstate == Runstate.IDLE
-
-        try:
-            phase = "connection"
-            await self._establish_connection(address, ssl, accept)
-
-            phase = "session"
-            await self._establish_session()
-
-        except BaseException as err:
-            emsg = f"Failed to establish {phase}"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            try:
-                # Reset from CONNECTING back to IDLE.
-                await self.disconnect()
-            except:
-                emsg = "Unexpected bottom half exception"
-                self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
-                raise
-
-            # NB: CancelledError is not a BaseException before Python 3.8
-            if isinstance(err, asyncio.CancelledError):
-                raise
-
-            if isinstance(err, Exception):
-                raise ConnectError(emsg, err) from err
-
-            # Raise BaseExceptions un-wrapped, they're more important.
-            raise
-
-        assert self.runstate == Runstate.RUNNING
-
-    @upper_half
-    async def _establish_connection(
-            self,
-            address: SocketAddrT,
-            ssl: Optional[SSLContext] = None,
-            accept: bool = False
-    ) -> None:
-        """
-        Establish a new connection.
-
-        :param address:
-            Address to connect to/listen on;
-            UNIX socket path or TCP address/port.
-        :param ssl: SSL context to use, if any.
-        :param accept: Accept a connection instead of connecting when `True`.
-        """
-        assert self.runstate == Runstate.IDLE
-        self._set_state(Runstate.CONNECTING)
-
-        # Allow runstate watchers to witness 'CONNECTING' state; some
-        # failures in the streaming layer are synchronous and will not
-        # otherwise yield.
-        await asyncio.sleep(0)
-
-        if accept:
-            await self._do_accept(address, ssl)
-        else:
-            await self._do_connect(address, ssl)
-
-    @upper_half
-    async def _do_accept(self, address: SocketAddrT,
-                         ssl: Optional[SSLContext] = None) -> None:
-        """
-        Acting as the transport server, accept a single connection.
-
-        :param address:
-            Address to listen on; UNIX socket path or TCP address/port.
-        :param ssl: SSL context to use, if any.
-
-        :raise OSError: For stream-related errors.
-        """
-        self.logger.debug("Awaiting connection on %s ...", address)
-        connected = asyncio.Event()
-        server: Optional[asyncio.AbstractServer] = None
-
-        async def _client_connected_cb(reader: asyncio.StreamReader,
-                                       writer: asyncio.StreamWriter) -> None:
-            """Used to accept a single incoming connection, see below."""
-            nonlocal server
-            nonlocal connected
-
-            # A connection has been accepted; stop listening for new ones.
-            assert server is not None
-            server.close()
-            await server.wait_closed()
-            server = None
-
-            # Register this client as being connected
-            self._reader, self._writer = (reader, writer)
-
-            # Signal back: We've accepted a client!
-            connected.set()
-
-        if isinstance(address, tuple):
-            coro = asyncio.start_server(
-                _client_connected_cb,
-                host=address[0],
-                port=address[1],
-                ssl=ssl,
-                backlog=1,
-                limit=self._limit,
-            )
-        else:
-            coro = asyncio.start_unix_server(
-                _client_connected_cb,
-                path=address,
-                ssl=ssl,
-                backlog=1,
-                limit=self._limit,
-            )
-
-        server = await coro     # Starts listening
-        await connected.wait()  # Waits for the callback to fire (and finish)
-        assert server is None
-
-        self.logger.debug("Connection accepted.")
-
-    @upper_half
-    async def _do_connect(self, address: SocketAddrT,
-                          ssl: Optional[SSLContext] = None) -> None:
-        """
-        Acting as the transport client, initiate a connection to a server.
-
-        :param address:
-            Address to connect to; UNIX socket path or TCP address/port.
-        :param ssl: SSL context to use, if any.
-
-        :raise OSError: For stream-related errors.
-        """
-        self.logger.debug("Connecting to %s ...", address)
-
-        if isinstance(address, tuple):
-            connect = asyncio.open_connection(
-                address[0],
-                address[1],
-                ssl=ssl,
-                limit=self._limit,
-            )
-        else:
-            connect = asyncio.open_unix_connection(
-                path=address,
-                ssl=ssl,
-                limit=self._limit,
-            )
-        self._reader, self._writer = await connect
-
-        self.logger.debug("Connected.")
-
-    @upper_half
-    async def _establish_session(self) -> None:
-        """
-        Establish a new session.
-
-        Starts the readers/writer tasks; subclasses may perform their
-        own negotiations here. The Runstate will be RUNNING upon
-        successful conclusion.
-        """
-        assert self.runstate == Runstate.CONNECTING
-
-        self._outgoing = asyncio.Queue()
-
-        reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
-        writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
-
-        self._reader_task = create_task(reader_coro)
-        self._writer_task = create_task(writer_coro)
-
-        self._bh_tasks = asyncio.gather(
-            self._reader_task,
-            self._writer_task,
-        )
-
-        self._set_state(Runstate.RUNNING)
-        await asyncio.sleep(0)  # Allow runstate_event to process
-
-    @upper_half
-    @bottom_half
-    def _schedule_disconnect(self) -> None:
-        """
-        Initiate a disconnect; idempotent.
-
-        This method is used both in the upper-half as a direct
-        consequence of `disconnect()`, and in the bottom-half in the
-        case of unhandled exceptions in the reader/writer tasks.
-
-        It can be invoked no matter what the `runstate` is.
-        """
-        if not self._dc_task:
-            self._set_state(Runstate.DISCONNECTING)
-            self.logger.debug("Scheduling disconnect.")
-            self._dc_task = create_task(self._bh_disconnect())
-
-    @upper_half
-    async def _wait_disconnect(self) -> None:
-        """
-        Waits for a previously scheduled disconnect to finish.
-
-        This method will gather any bottom half exceptions and re-raise
-        the one that occurred first; presuming it to be the root cause
-        of any subsequent Exceptions. It is intended to be used in the
-        upper half of the call chain.
-
-        :raise Exception:
-            Arbitrary exception re-raised on behalf of the reader/writer.
-        """
-        assert self.runstate == Runstate.DISCONNECTING
-        assert self._dc_task
-
-        aws: List[Awaitable[object]] = [self._dc_task]
-        if self._bh_tasks:
-            aws.insert(0, self._bh_tasks)
-        all_defined_tasks = asyncio.gather(*aws)
-
-        # Ensure disconnect is done; Exception (if any) is not raised here:
-        await asyncio.wait((self._dc_task,))
-
-        try:
-            await all_defined_tasks  # Raise Exceptions from the bottom half.
-        finally:
-            self._cleanup()
-            self._set_state(Runstate.IDLE)
-
-    @upper_half
-    def _cleanup(self) -> None:
-        """
-        Fully reset this object to a clean state and return to `IDLE`.
-        """
-        def _paranoid_task_erase(task: Optional['asyncio.Future[_U]']
-                                 ) -> Optional['asyncio.Future[_U]']:
-            # Help to erase a task, ENSURING it is fully quiesced first.
-            assert (task is None) or task.done()
-            return None if (task and task.done()) else task
-
-        assert self.runstate == Runstate.DISCONNECTING
-        self._dc_task = _paranoid_task_erase(self._dc_task)
-        self._reader_task = _paranoid_task_erase(self._reader_task)
-        self._writer_task = _paranoid_task_erase(self._writer_task)
-        self._bh_tasks = _paranoid_task_erase(self._bh_tasks)
-
-        self._reader = None
-        self._writer = None
-
-        # NB: _runstate_changed cannot be cleared because we still need it to
-        # send the final runstate changed event ...!
-
-    # ----------------------------
-    # Section: Bottom Half methods
-    # ----------------------------
-
-    @bottom_half
-    async def _bh_disconnect(self) -> None:
-        """
-        Disconnect and cancel all outstanding tasks.
-
-        It is designed to be called from its task context,
-        :py:obj:`~AsyncProtocol._dc_task`. By running in its own task,
-        it is free to wait on any pending actions that may still need to
-        occur in either the reader or writer tasks.
-        """
-        assert self.runstate == Runstate.DISCONNECTING
-
-        def _done(task: Optional['asyncio.Future[Any]']) -> bool:
-            return task is not None and task.done()
-
-        # Are we already in an error pathway? If either of the tasks are
-        # already done, or if we have no tasks but a reader/writer; we
-        # must be.
-        #
-        # NB: We can't use _bh_tasks to check for premature task
-        # completion, because it may not yet have had a chance to run
-        # and gather itself.
-        tasks = tuple(filter(None, (self._writer_task, self._reader_task)))
-        error_pathway = _done(self._reader_task) or _done(self._writer_task)
-        if not tasks:
-            error_pathway |= bool(self._reader) or bool(self._writer)
-
-        try:
-            # Try to flush the writer, if possible.
-            # This *may* cause an error and force us over into the error path.
-            if not error_pathway:
-                await self._bh_flush_writer()
-        except BaseException as err:
-            error_pathway = True
-            emsg = "Failed to flush the writer"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            raise
-        finally:
-            # Cancel any still-running tasks (Won't raise):
-            if self._writer_task is not None and not self._writer_task.done():
-                self.logger.debug("Cancelling writer task.")
-                self._writer_task.cancel()
-            if self._reader_task is not None and not self._reader_task.done():
-                self.logger.debug("Cancelling reader task.")
-                self._reader_task.cancel()
-
-            # Close out the tasks entirely (Won't raise):
-            if tasks:
-                self.logger.debug("Waiting for tasks to complete ...")
-                await asyncio.wait(tasks)
-
-            # Lastly, close the stream itself. (*May raise*!):
-            await self._bh_close_stream(error_pathway)
-            self.logger.debug("Disconnected.")
-
-    @bottom_half
-    async def _bh_flush_writer(self) -> None:
-        if not self._writer_task:
-            return
-
-        self.logger.debug("Draining the outbound queue ...")
-        await self._outgoing.join()
-        if self._writer is not None:
-            self.logger.debug("Flushing the StreamWriter ...")
-            await flush(self._writer)
-
-    @bottom_half
-    async def _bh_close_stream(self, error_pathway: bool = False) -> None:
-        # NB: Closing the writer also implcitly closes the reader.
-        if not self._writer:
-            return
-
-        if not is_closing(self._writer):
-            self.logger.debug("Closing StreamWriter.")
-            self._writer.close()
-
-        self.logger.debug("Waiting for StreamWriter to close ...")
-        try:
-            await wait_closed(self._writer)
-        except Exception:  # pylint: disable=broad-except
-            # It's hard to tell if the Stream is already closed or
-            # not. Even if one of the tasks has failed, it may have
-            # failed for a higher-layered protocol reason. The
-            # stream could still be open and perfectly fine.
-            # I don't know how to discern its health here.
-
-            if error_pathway:
-                # We already know that *something* went wrong. Let's
-                # just trust that the Exception we already have is the
-                # better one to present to the user, even if we don't
-                # genuinely *know* the relationship between the two.
-                self.logger.debug(
-                    "Discarding Exception from wait_closed:\n%s\n",
-                    pretty_traceback(),
-                )
-            else:
-                # Oops, this is a brand-new error!
-                raise
-        finally:
-            self.logger.debug("StreamWriter closed.")
-
-    @bottom_half
-    async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
-        """
-        Run one of the bottom-half methods in a loop forever.
-
-        If the bottom half ever raises any exception, schedule a
-        disconnect that will terminate the entire loop.
-
-        :param async_fn: The bottom-half method to run in a loop.
-        :param name: The name of this task, used for logging.
-        """
-        try:
-            while True:
-                await async_fn()
-        except asyncio.CancelledError:
-            # We have been cancelled by _bh_disconnect, exit gracefully.
-            self.logger.debug("Task.%s: cancelled.", name)
-            return
-        except BaseException as err:
-            self.logger.log(
-                logging.INFO if isinstance(err, EOFError) else logging.ERROR,
-                "Task.%s: %s",
-                name, exception_summary(err)
-            )
-            self.logger.debug("Task.%s: failure:\n%s\n",
-                              name, pretty_traceback())
-            self._schedule_disconnect()
-            raise
-        finally:
-            self.logger.debug("Task.%s: exiting.", name)
-
-    @bottom_half
-    async def _bh_send_message(self) -> None:
-        """
-        Wait for an outgoing message, then send it.
-
-        Designed to be run in `_bh_loop_forever()`.
-        """
-        msg = await self._outgoing.get()
-        try:
-            await self._send(msg)
-        finally:
-            self._outgoing.task_done()
-
-    @bottom_half
-    async def _bh_recv_message(self) -> None:
-        """
-        Wait for an incoming message and call `_on_message` to route it.
-
-        Designed to be run in `_bh_loop_forever()`.
-        """
-        msg = await self._recv()
-        await self._on_message(msg)
-
-    # --------------------
-    # Section: Message I/O
-    # --------------------
-
-    @upper_half
-    @bottom_half
-    def _cb_outbound(self, msg: T) -> T:
-        """
-        Callback: outbound message hook.
-
-        This is intended for subclasses to be able to add arbitrary
-        hooks to filter or manipulate outgoing messages. The base
-        implementation does nothing but log the message without any
-        manipulation of the message.
-
-        :param msg: raw outbound message
-        :return: final outbound message
-        """
-        self.logger.debug("--> %s", str(msg))
-        return msg
-
-    @upper_half
-    @bottom_half
-    def _cb_inbound(self, msg: T) -> T:
-        """
-        Callback: inbound message hook.
-
-        This is intended for subclasses to be able to add arbitrary
-        hooks to filter or manipulate incoming messages. The base
-        implementation does nothing but log the message without any
-        manipulation of the message.
-
-        This method does not "handle" incoming messages; it is a filter.
-        The actual "endpoint" for incoming messages is `_on_message()`.
-
-        :param msg: raw inbound message
-        :return: processed inbound message
-        """
-        self.logger.debug("<-- %s", str(msg))
-        return msg
-
-    @upper_half
-    @bottom_half
-    async def _readline(self) -> bytes:
-        """
-        Wait for a newline from the incoming reader.
-
-        This method is provided as a convenience for upper-layer
-        protocols, as many are line-based.
-
-        This method *may* return a sequence of bytes without a trailing
-        newline if EOF occurs, but *some* bytes were received. In this
-        case, the next call will raise `EOFError`. It is assumed that
-        the layer 5 protocol will decide if there is anything meaningful
-        to be done with a partial message.
-
-        :raise OSError: For stream-related errors.
-        :raise EOFError:
-            If the reader stream is at EOF and there are no bytes to return.
-        :return: bytes, including the newline.
-        """
-        assert self._reader is not None
-        msg_bytes = await self._reader.readline()
-
-        if not msg_bytes:
-            if self._reader.at_eof():
-                raise EOFError
-
-        return msg_bytes
-
-    @upper_half
-    @bottom_half
-    async def _do_recv(self) -> T:
-        """
-        Abstract: Read from the stream and return a message.
-
-        Very low-level; intended to only be called by `_recv()`.
-        """
-        raise NotImplementedError
-
-    @upper_half
-    @bottom_half
-    async def _recv(self) -> T:
-        """
-        Read an arbitrary protocol message.
-
-        .. warning::
-            This method is intended primarily for `_bh_recv_message()`
-            to use in an asynchronous task loop. Using it outside of
-            this loop will "steal" messages from the normal routing
-            mechanism. It is safe to use prior to `_establish_session()`,
-            but should not be used otherwise.
-
-        This method uses `_do_recv()` to retrieve the raw message, and
-        then transforms it using `_cb_inbound()`.
-
-        :return: A single (filtered, processed) protocol message.
-        """
-        message = await self._do_recv()
-        return self._cb_inbound(message)
-
-    @upper_half
-    @bottom_half
-    def _do_send(self, msg: T) -> None:
-        """
-        Abstract: Write a message to the stream.
-
-        Very low-level; intended to only be called by `_send()`.
-        """
-        raise NotImplementedError
-
-    @upper_half
-    @bottom_half
-    async def _send(self, msg: T) -> None:
-        """
-        Send an arbitrary protocol message.
-
-        This method will transform any outgoing messages according to
-        `_cb_outbound()`.
-
-        .. warning::
-            Like `_recv()`, this method is intended to be called by
-            the writer task loop that processes outgoing
-            messages. Calling it directly may circumvent logic
-            implemented by the caller meant to correlate outgoing and
-            incoming messages.
-
-        :raise OSError: For problems with the underlying stream.
-        """
-        msg = self._cb_outbound(msg)
-        self._do_send(msg)
-
-    @bottom_half
-    async def _on_message(self, msg: T) -> None:
-        """
-        Called to handle the receipt of a new message.
-
-        .. caution::
-            This is executed from within the reader loop, so be advised
-            that waiting on either the reader or writer task will lead
-            to deadlock. Additionally, any unhandled exceptions will
-            directly cause the loop to halt, so logic may be best-kept
-            to a minimum if at all possible.
-
-        :param msg: The incoming message, already logged/filtered.
-        """
-        # Nothing to do in the abstract case.
diff --git a/python/qemu/qmp/py.typed b/python/qemu/qmp/py.typed
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/python/qemu/qmp/qmp_client.py b/python/qemu/qmp/qmp_client.py
deleted file mode 100644
index 8ea9e45115..0000000000
--- a/python/qemu/qmp/qmp_client.py
+++ /dev/null
@@ -1,655 +0,0 @@
-"""
-QMP Protocol Implementation
-
-This module provides the `QMPClient` class, which can be used to connect
-and send commands to a QMP server such as QEMU. The QMP class can be
-used to either connect to a listening server, or used to listen and
-accept an incoming connection from that server.
-"""
-
-import asyncio
-import logging
-import socket
-import struct
-from typing import (
-    Dict,
-    List,
-    Mapping,
-    Optional,
-    Union,
-    cast,
-)
-
-from .error import ProtocolError, QMPError
-from .events import Events
-from .message import Message
-from .models import ErrorResponse, Greeting
-from .protocol import AsyncProtocol, Runstate, require
-from .util import (
-    bottom_half,
-    exception_summary,
-    pretty_traceback,
-    upper_half,
-)
-
-
-class _WrappedProtocolError(ProtocolError):
-    """
-    Abstract exception class for Protocol errors that wrap an Exception.
-
-    :param error_message: Human-readable string describing the error.
-    :param exc: The root-cause exception.
-    """
-    def __init__(self, error_message: str, exc: Exception):
-        super().__init__(error_message)
-        self.exc = exc
-
-    def __str__(self) -> str:
-        return f"{self.error_message}: {self.exc!s}"
-
-
-class GreetingError(_WrappedProtocolError):
-    """
-    An exception occurred during the Greeting phase.
-
-    :param error_message: Human-readable string describing the error.
-    :param exc: The root-cause exception.
-    """
-
-
-class NegotiationError(_WrappedProtocolError):
-    """
-    An exception occurred during the Negotiation phase.
-
-    :param error_message: Human-readable string describing the error.
-    :param exc: The root-cause exception.
-    """
-
-
-class ExecuteError(QMPError):
-    """
-    Exception raised by `QMPClient.execute()` on RPC failure.
-
-    :param error_response: The RPC error response object.
-    :param sent: The sent RPC message that caused the failure.
-    :param received: The raw RPC error reply received.
-    """
-    def __init__(self, error_response: ErrorResponse,
-                 sent: Message, received: Message):
-        super().__init__(error_response.error.desc)
-        #: The sent `Message` that caused the failure
-        self.sent: Message = sent
-        #: The received `Message` that indicated failure
-        self.received: Message = received
-        #: The parsed error response
-        self.error: ErrorResponse = error_response
-        #: The QMP error class
-        self.error_class: str = error_response.error.class_
-
-
-class ExecInterruptedError(QMPError):
-    """
-    Exception raised by `execute()` (et al) when an RPC is interrupted.
-
-    This error is raised when an `execute()` statement could not be
-    completed.  This can occur because the connection itself was
-    terminated before a reply was received.
-
-    The true cause of the interruption will be available via `disconnect()`.
-    """
-
-
-class _MsgProtocolError(ProtocolError):
-    """
-    Abstract error class for protocol errors that have a `Message` object.
-
-    This Exception class is used for protocol errors where the `Message`
-    was mechanically understood, but was found to be inappropriate or
-    malformed.
-
-    :param error_message: Human-readable string describing the error.
-    :param msg: The QMP `Message` that caused the error.
-    """
-    def __init__(self, error_message: str, msg: Message):
-        super().__init__(error_message)
-        #: The received `Message` that caused the error.
-        self.msg: Message = msg
-
-    def __str__(self) -> str:
-        return "\n".join([
-            super().__str__(),
-            f"  Message was: {str(self.msg)}\n",
-        ])
-
-
-class ServerParseError(_MsgProtocolError):
-    """
-    The Server sent a `Message` indicating parsing failure.
-
-    i.e. A reply has arrived from the server, but it is missing the "ID"
-    field, indicating a parsing error.
-
-    :param error_message: Human-readable string describing the error.
-    :param msg: The QMP `Message` that caused the error.
-    """
-
-
-class BadReplyError(_MsgProtocolError):
-    """
-    An execution reply was successfully routed, but not understood.
-
-    If a QMP message is received with an 'id' field to allow it to be
-    routed, but is otherwise malformed, this exception will be raised.
-
-    A reply message is malformed if it is missing either the 'return' or
-    'error' keys, or if the 'error' value has missing keys or members of
-    the wrong type.
-
-    :param error_message: Human-readable string describing the error.
-    :param msg: The malformed reply that was received.
-    :param sent: The message that was sent that prompted the error.
-    """
-    def __init__(self, error_message: str, msg: Message, sent: Message):
-        super().__init__(error_message, msg)
-        #: The sent `Message` that caused the failure
-        self.sent = sent
-
-
-class QMPClient(AsyncProtocol[Message], Events):
-    """
-    Implements a QMP client connection.
-
-    QMP can be used to establish a connection as either the transport
-    client or server, though this class always acts as the QMP client.
-
-    :param name: Optional nickname for the connection, used for logging.
-
-    Basic script-style usage looks like this::
-
-      qmp = QMPClient('my_virtual_machine_name')
-      await qmp.connect(('127.0.0.1', 1234))
-      ...
-      res = await qmp.execute('block-query')
-      ...
-      await qmp.disconnect()
-
-    Basic async client-style usage looks like this::
-
-      class Client:
-          def __init__(self, name: str):
-              self.qmp = QMPClient(name)
-
-          async def watch_events(self):
-              try:
-                  async for event in self.qmp.events:
-                      print(f"Event: {event['event']}")
-              except asyncio.CancelledError:
-                  return
-
-          async def run(self, address='/tmp/qemu.socket'):
-              await self.qmp.connect(address)
-              asyncio.create_task(self.watch_events())
-              await self.qmp.runstate_changed.wait()
-              await self.disconnect()
-
-    See `qmp.events` for more detail on event handling patterns.
-    """
-    #: Logger object used for debugging messages.
-    logger = logging.getLogger(__name__)
-
-    # Read buffer limit; large enough to accept query-qmp-schema
-    _limit = (256 * 1024)
-
-    # Type alias for pending execute() result items
-    _PendingT = Union[Message, ExecInterruptedError]
-
-    def __init__(self, name: Optional[str] = None) -> None:
-        super().__init__(name)
-        Events.__init__(self)
-
-        #: Whether or not to await a greeting after establishing a connection.
-        self.await_greeting: bool = True
-
-        #: Whether or not to perform capabilities negotiation upon connection.
-        #: Implies `await_greeting`.
-        self.negotiate: bool = True
-
-        # Cached Greeting, if one was awaited.
-        self._greeting: Optional[Greeting] = None
-
-        # Command ID counter
-        self._execute_id = 0
-
-        # Incoming RPC reply messages.
-        self._pending: Dict[
-            Union[str, None],
-            'asyncio.Queue[QMPClient._PendingT]'
-        ] = {}
-
-    @property
-    def greeting(self) -> Optional[Greeting]:
-        """The `Greeting` from the QMP server, if any."""
-        return self._greeting
-
-    @upper_half
-    async def _establish_session(self) -> None:
-        """
-        Initiate the QMP session.
-
-        Wait for the QMP greeting and perform capabilities negotiation.
-
-        :raise GreetingError: When the greeting is not understood.
-        :raise NegotiationError: If the negotiation fails.
-        :raise EOFError: When the server unexpectedly hangs up.
-        :raise OSError: For underlying stream errors.
-        """
-        self._greeting = None
-        self._pending = {}
-
-        if self.await_greeting or self.negotiate:
-            self._greeting = await self._get_greeting()
-
-        if self.negotiate:
-            await self._negotiate()
-
-        # This will start the reader/writers:
-        await super()._establish_session()
-
-    @upper_half
-    async def _get_greeting(self) -> Greeting:
-        """
-        :raise GreetingError: When the greeting is not understood.
-        :raise EOFError: When the server unexpectedly hangs up.
-        :raise OSError: For underlying stream errors.
-
-        :return: the Greeting object given by the server.
-        """
-        self.logger.debug("Awaiting greeting ...")
-
-        try:
-            msg = await self._recv()
-            return Greeting(msg)
-        except (ProtocolError, KeyError, TypeError) as err:
-            emsg = "Did not understand Greeting"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            raise GreetingError(emsg, err) from err
-        except BaseException as err:
-            # EOFError, OSError, or something unexpected.
-            emsg = "Failed to receive Greeting"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            raise
-
-    @upper_half
-    async def _negotiate(self) -> None:
-        """
-        Perform QMP capabilities negotiation.
-
-        :raise NegotiationError: When negotiation fails.
-        :raise EOFError: When the server unexpectedly hangs up.
-        :raise OSError: For underlying stream errors.
-        """
-        self.logger.debug("Negotiating capabilities ...")
-
-        arguments: Dict[str, List[str]] = {'enable': []}
-        if self._greeting and 'oob' in self._greeting.QMP.capabilities:
-            arguments['enable'].append('oob')
-        msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
-
-        # It's not safe to use execute() here, because the reader/writers
-        # aren't running. AsyncProtocol *requires* that a new session
-        # does not fail after the reader/writers are running!
-        try:
-            await self._send(msg)
-            reply = await self._recv()
-            assert 'return' in reply
-            assert 'error' not in reply
-        except (ProtocolError, AssertionError) as err:
-            emsg = "Negotiation failed"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            raise NegotiationError(emsg, err) from err
-        except BaseException as err:
-            # EOFError, OSError, or something unexpected.
-            emsg = "Negotiation failed"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            raise
-
-    @bottom_half
-    async def _bh_disconnect(self) -> None:
-        try:
-            await super()._bh_disconnect()
-        finally:
-            if self._pending:
-                self.logger.debug("Cancelling pending executions")
-            keys = self._pending.keys()
-            for key in keys:
-                self.logger.debug("Cancelling execution '%s'", key)
-                self._pending[key].put_nowait(
-                    ExecInterruptedError("Disconnected")
-                )
-
-            self.logger.debug("QMP Disconnected.")
-
-    @upper_half
-    def _cleanup(self) -> None:
-        super()._cleanup()
-        assert not self._pending
-
-    @bottom_half
-    async def _on_message(self, msg: Message) -> None:
-        """
-        Add an incoming message to the appropriate queue/handler.
-
-        :raise ServerParseError: When Message indicates server parse failure.
-        """
-        # Incoming messages are not fully parsed/validated here;
-        # do only light peeking to know how to route the messages.
-
-        if 'event' in msg:
-            await self._event_dispatch(msg)
-            return
-
-        # Below, we assume everything left is an execute/exec-oob response.
-
-        exec_id = cast(Optional[str], msg.get('id'))
-
-        if exec_id in self._pending:
-            await self._pending[exec_id].put(msg)
-            return
-
-        # We have a message we can't route back to a caller.
-
-        is_error = 'error' in msg
-        has_id = 'id' in msg
-
-        if is_error and not has_id:
-            # This is very likely a server parsing error.
-            # It doesn't inherently belong to any pending execution.
-            # Instead of performing clever recovery, just terminate.
-            # See "NOTE" in qmp-spec.txt, section 2.4.2
-            raise ServerParseError(
-                ("Server sent an error response without an ID, "
-                 "but there are no ID-less executions pending. "
-                 "Assuming this is a server parser failure."),
-                msg
-            )
-
-        # qmp-spec.txt, section 2.4:
-        # 'Clients should drop all the responses
-        # that have an unknown "id" field.'
-        self.logger.log(
-            logging.ERROR if is_error else logging.WARNING,
-            "Unknown ID '%s', message dropped.",
-            exec_id,
-        )
-        self.logger.debug("Unroutable message: %s", str(msg))
-
-    @upper_half
-    @bottom_half
-    async def _do_recv(self) -> Message:
-        """
-        :raise OSError: When a stream error is encountered.
-        :raise EOFError: When the stream is at EOF.
-        :raise ProtocolError:
-            When the Message is not understood.
-            See also `Message._deserialize`.
-
-        :return: A single QMP `Message`.
-        """
-        msg_bytes = await self._readline()
-        msg = Message(msg_bytes, eager=True)
-        return msg
-
-    @upper_half
-    @bottom_half
-    def _do_send(self, msg: Message) -> None:
-        """
-        :raise ValueError: JSON serialization failure
-        :raise TypeError: JSON serialization failure
-        :raise OSError: When a stream error is encountered.
-        """
-        assert self._writer is not None
-        self._writer.write(bytes(msg))
-
-    @upper_half
-    def _get_exec_id(self) -> str:
-        exec_id = f"__qmp#{self._execute_id:05d}"
-        self._execute_id += 1
-        return exec_id
-
-    @upper_half
-    async def _issue(self, msg: Message) -> Union[None, str]:
-        """
-        Issue a QMP `Message` and do not wait for a reply.
-
-        :param msg: The QMP `Message` to send to the server.
-
-        :return: The ID of the `Message` sent.
-        """
-        msg_id: Optional[str] = None
-        if 'id' in msg:
-            assert isinstance(msg['id'], str)
-            msg_id = msg['id']
-
-        self._pending[msg_id] = asyncio.Queue(maxsize=1)
-        try:
-            await self._outgoing.put(msg)
-        except:
-            del self._pending[msg_id]
-            raise
-
-        return msg_id
-
-    @upper_half
-    async def _reply(self, msg_id: Union[str, None]) -> Message:
-        """
-        Await a reply to a previously issued QMP message.
-
-        :param msg_id: The ID of the previously issued message.
-
-        :return: The reply from the server.
-        :raise ExecInterruptedError:
-            When the reply could not be retrieved because the connection
-            was lost, or some other problem.
-        """
-        queue = self._pending[msg_id]
-
-        try:
-            result = await queue.get()
-            if isinstance(result, ExecInterruptedError):
-                raise result
-            return result
-        finally:
-            del self._pending[msg_id]
-
-    @upper_half
-    async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
-        """
-        Send a QMP `Message` to the server and await a reply.
-
-        This method *assumes* you are sending some kind of an execute
-        statement that *will* receive a reply.
-
-        An execution ID will be assigned if assign_id is `True`. It can be
-        disabled, but this requires that an ID is manually assigned
-        instead. For manually assigned IDs, you must not use the string
-        '__qmp#' anywhere in the ID.
-
-        :param msg: The QMP `Message` to execute.
-        :param assign_id: If True, assign a new execution ID.
-
-        :return: Execution reply from the server.
-        :raise ExecInterruptedError:
-            When the reply could not be retrieved because the connection
-            was lost, or some other problem.
-        """
-        if assign_id:
-            msg['id'] = self._get_exec_id()
-        elif 'id' in msg:
-            assert isinstance(msg['id'], str)
-            assert '__qmp#' not in msg['id']
-
-        exec_id = await self._issue(msg)
-        return await self._reply(exec_id)
-
-    @upper_half
-    @require(Runstate.RUNNING)
-    async def _raw(
-            self,
-            msg: Union[Message, Mapping[str, object], bytes],
-            assign_id: bool = True,
-    ) -> Message:
-        """
-        Issue a raw `Message` to the QMP server and await a reply.
-
-        :param msg:
-            A Message to send to the server. It may be a `Message`, any
-            Mapping (including Dict), or raw bytes.
-        :param assign_id:
-            Assign an arbitrary execution ID to this message. If
-            `False`, the existing id must either be absent (and no other
-            such pending execution may omit an ID) or a string. If it is
-            a string, it must not start with '__qmp#' and no other such
-            pending execution may currently be using that ID.
-
-        :return: Execution reply from the server.
-
-        :raise ExecInterruptedError:
-            When the reply could not be retrieved because the connection
-            was lost, or some other problem.
-        :raise TypeError:
-            When assign_id is `False`, an ID is given, and it is not a string.
-        :raise ValueError:
-            When assign_id is `False`, but the ID is not usable;
-            Either because it starts with '__qmp#' or it is already in-use.
-        """
-        # 1. convert generic Mapping or bytes to a QMP Message
-        # 2. copy Message objects so that we assign an ID only to the copy.
-        msg = Message(msg)
-
-        exec_id = msg.get('id')
-        if not assign_id and 'id' in msg:
-            if not isinstance(exec_id, str):
-                raise TypeError(f"ID ('{exec_id}') must be a string.")
-            if exec_id.startswith('__qmp#'):
-                raise ValueError(
-                    f"ID ('{exec_id}') must not start with '__qmp#'."
-                )
-
-        if not assign_id and exec_id in self._pending:
-            raise ValueError(
-                f"ID '{exec_id}' is in-use and cannot be used."
-            )
-
-        return await self._execute(msg, assign_id=assign_id)
-
-    @upper_half
-    @require(Runstate.RUNNING)
-    async def execute_msg(self, msg: Message) -> object:
-        """
-        Execute a QMP command and return its value.
-
-        :param msg: The QMP `Message` to execute.
-
-        :return:
-            The command execution return value from the server. The type of
-            object returned depends on the command that was issued,
-            though most in QEMU return a `dict`.
-        :raise ValueError:
-            If the QMP `Message` does not have either the 'execute' or
-            'exec-oob' fields set.
-        :raise ExecuteError: When the server returns an error response.
-        :raise ExecInterruptedError: if the connection was terminated early.
-        """
-        if not ('execute' in msg or 'exec-oob' in msg):
-            raise ValueError("Requires 'execute' or 'exec-oob' message")
-
-        # Copy the Message so that the ID assigned by _execute() is
-        # local to this method; allowing the ID to be seen in raised
-        # Exceptions but without modifying the caller's held copy.
-        msg = Message(msg)
-        reply = await self._execute(msg)
-
-        if 'error' in reply:
-            try:
-                error_response = ErrorResponse(reply)
-            except (KeyError, TypeError) as err:
-                # Error response was malformed.
-                raise BadReplyError(
-                    "QMP error reply is malformed", reply, msg,
-                ) from err
-
-            raise ExecuteError(error_response, msg, reply)
-
-        if 'return' not in reply:
-            raise BadReplyError(
-                "QMP reply is missing a 'error' or 'return' member",
-                reply, msg,
-            )
-
-        return reply['return']
-
-    @classmethod
-    def make_execute_msg(cls, cmd: str,
-                         arguments: Optional[Mapping[str, object]] = None,
-                         oob: bool = False) -> Message:
-        """
-        Create an executable message to be sent by `execute_msg` later.
-
-        :param cmd: QMP command name.
-        :param arguments: Arguments (if any). Must be JSON-serializable.
-        :param oob: If `True`, execute "out of band".
-
-        :return: An executable QMP `Message`.
-        """
-        msg = Message({'exec-oob' if oob else 'execute': cmd})
-        if arguments is not None:
-            msg['arguments'] = arguments
-        return msg
-
-    @upper_half
-    async def execute(self, cmd: str,
-                      arguments: Optional[Mapping[str, object]] = None,
-                      oob: bool = False) -> object:
-        """
-        Execute a QMP command and return its value.
-
-        :param cmd: QMP command name.
-        :param arguments: Arguments (if any). Must be JSON-serializable.
-        :param oob: If `True`, execute "out of band".
-
-        :return:
-            The command execution return value from the server. The type of
-            object returned depends on the command that was issued,
-            though most in QEMU return a `dict`.
-        :raise ExecuteError: When the server returns an error response.
-        :raise ExecInterruptedError: if the connection was terminated early.
-        """
-        msg = self.make_execute_msg(cmd, arguments, oob=oob)
-        return await self.execute_msg(msg)
-
-    @upper_half
-    @require(Runstate.RUNNING)
-    def send_fd_scm(self, fd: int) -> None:
-        """
-        Send a file descriptor to the remote via SCM_RIGHTS.
-        """
-        assert self._writer is not None
-        sock = self._writer.transport.get_extra_info('socket')
-
-        if sock.family != socket.AF_UNIX:
-            raise QMPError("Sending file descriptors requires a UNIX socket.")
-
-        if not hasattr(sock, 'sendmsg'):
-            # We need to void the warranty sticker.
-            # Access to sendmsg is scheduled for removal in Python 3.11.
-            # Find the real backing socket to use it anyway.
-            sock = sock._sock  # pylint: disable=protected-access
-
-        sock.sendmsg(
-            [b' '],
-            [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
-        )
diff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py
deleted file mode 100644
index 571110f4f8..0000000000
--- a/python/qemu/qmp/qmp_shell.py
+++ /dev/null
@@ -1,537 +0,0 @@
-#
-# Copyright (C) 2009, 2010 Red Hat Inc.
-#
-# Authors:
-#  Luiz Capitulino <lcapitulino@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2.  See
-# the COPYING file in the top-level directory.
-#
-
-"""
-Low-level QEMU shell on top of QMP.
-
-usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
-
-positional arguments:
-  qmp_server            < UNIX socket path | TCP address:port >
-
-optional arguments:
-  -h, --help            show this help message and exit
-  -H, --hmp             Use HMP interface
-  -N, --skip-negotiation
-                        Skip negotiate (for qemu-ga)
-  -v, --verbose         Verbose (echo commands sent and received)
-  -p, --pretty          Pretty-print JSON
-
-
-Start QEMU with:
-
-# qemu [...] -qmp unix:./qmp-sock,server
-
-Run the shell:
-
-$ qmp-shell ./qmp-sock
-
-Commands have the following format:
-
-   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
-
-For example:
-
-(QEMU) device_add driver=e1000 id=net1
-{'return': {}}
-(QEMU)
-
-key=value pairs also support Python or JSON object literal subset notations,
-without spaces. Dictionaries/objects {} are supported as are arrays [].
-
-   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
-
-Both JSON and Python formatting should work, including both styles of
-string literal quotes. Both paradigms of literal values should work,
-including null/true/false for JSON and None/True/False for Python.
-
-
-Transactions have the following multi-line format:
-
-   transaction(
-   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
-   ...
-   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
-   )
-
-One line transactions are also supported:
-
-   transaction( action-name1 ... )
-
-For example:
-
-    (QEMU) transaction(
-    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
-    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
-    TRANS> )
-    {"return": {}}
-    (QEMU)
-
-Use the -v and -p options to activate the verbose and pretty-print options,
-which will echo back the properly formatted JSON-compliant QMP that is being
-sent to QEMU, which is useful for debugging and documentation generation.
-"""
-
-import argparse
-import ast
-import json
-import logging
-import os
-import re
-import readline
-import sys
-from typing import (
-    Iterator,
-    List,
-    NoReturn,
-    Optional,
-    Sequence,
-)
-
-from qemu.qmp import ConnectError, QMPError, SocketAddrT
-from qemu.qmp.legacy import (
-    QEMUMonitorProtocol,
-    QMPBadPortError,
-    QMPMessage,
-    QMPObject,
-)
-
-
-LOG = logging.getLogger(__name__)
-
-
-class QMPCompleter:
-    """
-    QMPCompleter provides a readline library tab-complete behavior.
-    """
-    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
-    # but pylint as of today does not know that List[str] is simply 'list'.
-    def __init__(self) -> None:
-        self._matches: List[str] = []
-
-    def append(self, value: str) -> None:
-        """Append a new valid completion to the list of possibilities."""
-        return self._matches.append(value)
-
-    def complete(self, text: str, state: int) -> Optional[str]:
-        """readline.set_completer() callback implementation."""
-        for cmd in self._matches:
-            if cmd.startswith(text):
-                if state == 0:
-                    return cmd
-                state -= 1
-        return None
-
-
-class QMPShellError(QMPError):
-    """
-    QMP Shell Base error class.
-    """
-
-
-class FuzzyJSON(ast.NodeTransformer):
-    """
-    This extension of ast.NodeTransformer filters literal "true/false/null"
-    values in a Python AST and replaces them by proper "True/False/None" values
-    that Python can properly evaluate.
-    """
-
-    @classmethod
-    def visit_Name(cls,  # pylint: disable=invalid-name
-                   node: ast.Name) -> ast.AST:
-        """
-        Transform Name nodes with certain values into Constant (keyword) nodes.
-        """
-        if node.id == 'true':
-            return ast.Constant(value=True)
-        if node.id == 'false':
-            return ast.Constant(value=False)
-        if node.id == 'null':
-            return ast.Constant(value=None)
-        return node
-
-
-class QMPShell(QEMUMonitorProtocol):
-    """
-    QMPShell provides a basic readline-based QMP shell.
-
-    :param address: Address of the QMP server.
-    :param pretty: Pretty-print QMP messages.
-    :param verbose: Echo outgoing QMP messages to console.
-    """
-    def __init__(self, address: SocketAddrT,
-                 pretty: bool = False, verbose: bool = False):
-        super().__init__(address)
-        self._greeting: Optional[QMPMessage] = None
-        self._completer = QMPCompleter()
-        self._transmode = False
-        self._actions: List[QMPMessage] = []
-        self._histfile = os.path.join(os.path.expanduser('~'),
-                                      '.qmp-shell_history')
-        self.pretty = pretty
-        self.verbose = verbose
-
-    def close(self) -> None:
-        # Hook into context manager of parent to save shell history.
-        self._save_history()
-        super().close()
-
-    def _fill_completion(self) -> None:
-        cmds = self.cmd('query-commands')
-        if 'error' in cmds:
-            return
-        for cmd in cmds['return']:
-            self._completer.append(cmd['name'])
-
-    def _completer_setup(self) -> None:
-        self._completer = QMPCompleter()
-        self._fill_completion()
-        readline.set_history_length(1024)
-        readline.set_completer(self._completer.complete)
-        readline.parse_and_bind("tab: complete")
-        # NB: default delimiters conflict with some command names
-        # (eg. query-), clearing everything as it doesn't seem to matter
-        readline.set_completer_delims('')
-        try:
-            readline.read_history_file(self._histfile)
-        except FileNotFoundError:
-            pass
-        except IOError as err:
-            msg = f"Failed to read history '{self._histfile}': {err!s}"
-            LOG.warning(msg)
-
-    def _save_history(self) -> None:
-        try:
-            readline.write_history_file(self._histfile)
-        except IOError as err:
-            msg = f"Failed to save history file '{self._histfile}': {err!s}"
-            LOG.warning(msg)
-
-    @classmethod
-    def _parse_value(cls, val: str) -> object:
-        try:
-            return int(val)
-        except ValueError:
-            pass
-
-        if val.lower() == 'true':
-            return True
-        if val.lower() == 'false':
-            return False
-        if val.startswith(('{', '[')):
-            # Try first as pure JSON:
-            try:
-                return json.loads(val)
-            except ValueError:
-                pass
-            # Try once again as FuzzyJSON:
-            try:
-                tree = ast.parse(val, mode='eval')
-                transformed = FuzzyJSON().visit(tree)
-                return ast.literal_eval(transformed)
-            except (SyntaxError, ValueError):
-                pass
-        return val
-
-    def _cli_expr(self,
-                  tokens: Sequence[str],
-                  parent: QMPObject) -> None:
-        for arg in tokens:
-            (key, sep, val) = arg.partition('=')
-            if sep != '=':
-                raise QMPShellError(
-                    f"Expected a key=value pair, got '{arg!s}'"
-                )
-
-            value = self._parse_value(val)
-            optpath = key.split('.')
-            curpath = []
-            for path in optpath[:-1]:
-                curpath.append(path)
-                obj = parent.get(path, {})
-                if not isinstance(obj, dict):
-                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
-                    raise QMPShellError(msg.format('.'.join(curpath)))
-                parent[path] = obj
-                parent = obj
-            if optpath[-1] in parent:
-                if isinstance(parent[optpath[-1]], dict):
-                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
-                    raise QMPShellError(msg.format('.'.join(curpath)))
-                raise QMPShellError(f'Cannot set "{key}" multiple times')
-            parent[optpath[-1]] = value
-
-    def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
-        """
-        Build a QMP input object from a user provided command-line in the
-        following format:
-
-            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
-        """
-        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
-        cmdargs = re.findall(argument_regex, cmdline)
-        qmpcmd: QMPMessage
-
-        # Transactional CLI entry:
-        if cmdargs and cmdargs[0] == 'transaction(':
-            self._transmode = True
-            self._actions = []
-            cmdargs.pop(0)
-
-        # Transactional CLI exit:
-        if cmdargs and cmdargs[0] == ')' and self._transmode:
-            self._transmode = False
-            if len(cmdargs) > 1:
-                msg = 'Unexpected input after close of Transaction sub-shell'
-                raise QMPShellError(msg)
-            qmpcmd = {
-                'execute': 'transaction',
-                'arguments': {'actions': self._actions}
-            }
-            return qmpcmd
-
-        # No args, or no args remaining
-        if not cmdargs:
-            return None
-
-        if self._transmode:
-            # Parse and cache this Transactional Action
-            finalize = False
-            action = {'type': cmdargs[0], 'data': {}}
-            if cmdargs[-1] == ')':
-                cmdargs.pop(-1)
-                finalize = True
-            self._cli_expr(cmdargs[1:], action['data'])
-            self._actions.append(action)
-            return self._build_cmd(')') if finalize else None
-
-        # Standard command: parse and return it to be executed.
-        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
-        self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
-        return qmpcmd
-
-    def _print(self, qmp_message: object) -> None:
-        jsobj = json.dumps(qmp_message,
-                           indent=4 if self.pretty else None,
-                           sort_keys=self.pretty)
-        print(str(jsobj))
-
-    def _execute_cmd(self, cmdline: str) -> bool:
-        try:
-            qmpcmd = self._build_cmd(cmdline)
-        except QMPShellError as err:
-            print(
-                f"Error while parsing command line: {err!s}\n"
-                "command format: <command-name> "
-                "[arg-name1=arg1] ... [arg-nameN=argN",
-                file=sys.stderr
-            )
-            return True
-        # For transaction mode, we may have just cached the action:
-        if qmpcmd is None:
-            return True
-        if self.verbose:
-            self._print(qmpcmd)
-        resp = self.cmd_obj(qmpcmd)
-        if resp is None:
-            print('Disconnected')
-            return False
-        self._print(resp)
-        return True
-
-    def connect(self, negotiate: bool = True) -> None:
-        self._greeting = super().connect(negotiate)
-        self._completer_setup()
-
-    def show_banner(self,
-                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
-        """
-        Print to stdio a greeting, and the QEMU version if available.
-        """
-        print(msg)
-        if not self._greeting:
-            print('Connected')
-            return
-        version = self._greeting['QMP']['version']['qemu']
-        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
-
-    @property
-    def prompt(self) -> str:
-        """
-        Return the current shell prompt, including a trailing space.
-        """
-        if self._transmode:
-            return 'TRANS> '
-        return '(QEMU) '
-
-    def read_exec_command(self) -> bool:
-        """
-        Read and execute a command.
-
-        @return True if execution was ok, return False if disconnected.
-        """
-        try:
-            cmdline = input(self.prompt)
-        except EOFError:
-            print()
-            return False
-
-        if cmdline == '':
-            for event in self.get_events():
-                print(event)
-            return True
-
-        return self._execute_cmd(cmdline)
-
-    def repl(self) -> Iterator[None]:
-        """
-        Return an iterator that implements the REPL.
-        """
-        self.show_banner()
-        while self.read_exec_command():
-            yield
-        self.close()
-
-
-class HMPShell(QMPShell):
-    """
-    HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
-
-    :param address: Address of the QMP server.
-    :param pretty: Pretty-print QMP messages.
-    :param verbose: Echo outgoing QMP messages to console.
-    """
-    def __init__(self, address: SocketAddrT,
-                 pretty: bool = False, verbose: bool = False):
-        super().__init__(address, pretty, verbose)
-        self._cpu_index = 0
-
-    def _cmd_completion(self) -> None:
-        for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
-            if cmd and cmd[0] != '[' and cmd[0] != '\t':
-                name = cmd.split()[0]  # drop help text
-                if name == 'info':
-                    continue
-                if name.find('|') != -1:
-                    # Command in the form 'foobar|f' or 'f|foobar', take the
-                    # full name
-                    opt = name.split('|')
-                    if len(opt[0]) == 1:
-                        name = opt[1]
-                    else:
-                        name = opt[0]
-                self._completer.append(name)
-                self._completer.append('help ' + name)  # help completion
-
-    def _info_completion(self) -> None:
-        for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
-            if cmd:
-                self._completer.append('info ' + cmd.split()[1])
-
-    def _other_completion(self) -> None:
-        # special cases
-        self._completer.append('help info')
-
-    def _fill_completion(self) -> None:
-        self._cmd_completion()
-        self._info_completion()
-        self._other_completion()
-
-    def _cmd_passthrough(self, cmdline: str,
-                         cpu_index: int = 0) -> QMPMessage:
-        return self.cmd_obj({
-            'execute': 'human-monitor-command',
-            'arguments': {
-                'command-line': cmdline,
-                'cpu-index': cpu_index
-            }
-        })
-
-    def _execute_cmd(self, cmdline: str) -> bool:
-        if cmdline.split()[0] == "cpu":
-            # trap the cpu command, it requires special setting
-            try:
-                idx = int(cmdline.split()[1])
-                if 'return' not in self._cmd_passthrough('info version', idx):
-                    print('bad CPU index')
-                    return True
-                self._cpu_index = idx
-            except ValueError:
-                print('cpu command takes an integer argument')
-                return True
-        resp = self._cmd_passthrough(cmdline, self._cpu_index)
-        if resp is None:
-            print('Disconnected')
-            return False
-        assert 'return' in resp or 'error' in resp
-        if 'return' in resp:
-            # Success
-            if len(resp['return']) > 0:
-                print(resp['return'], end=' ')
-        else:
-            # Error
-            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
-        return True
-
-    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
-        QMPShell.show_banner(self, msg)
-
-
-def die(msg: str) -> NoReturn:
-    """Write an error to stderr, then exit with a return code of 1."""
-    sys.stderr.write('ERROR: %s\n' % msg)
-    sys.exit(1)
-
-
-def main() -> None:
-    """
-    qmp-shell entry point: parse command line arguments and start the REPL.
-    """
-    parser = argparse.ArgumentParser()
-    parser.add_argument('-H', '--hmp', action='store_true',
-                        help='Use HMP interface')
-    parser.add_argument('-N', '--skip-negotiation', action='store_true',
-                        help='Skip negotiate (for qemu-ga)')
-    parser.add_argument('-v', '--verbose', action='store_true',
-                        help='Verbose (echo commands sent and received)')
-    parser.add_argument('-p', '--pretty', action='store_true',
-                        help='Pretty-print JSON')
-
-    default_server = os.environ.get('QMP_SOCKET')
-    parser.add_argument('qmp_server', action='store',
-                        default=default_server,
-                        help='< UNIX socket path | TCP address:port >')
-
-    args = parser.parse_args()
-    if args.qmp_server is None:
-        parser.error("QMP socket or TCP address must be specified")
-
-    shell_class = HMPShell if args.hmp else QMPShell
-
-    try:
-        address = shell_class.parse_address(args.qmp_server)
-    except QMPBadPortError:
-        parser.error(f"Bad port number: {args.qmp_server}")
-        return  # pycharm doesn't know error() is noreturn
-
-    with shell_class(address, args.pretty, args.verbose) as qemu:
-        try:
-            qemu.connect(negotiate=not args.skip_negotiation)
-        except ConnectError as err:
-            if isinstance(err.exc, OSError):
-                die(f"Couldn't connect to {args.qmp_server}: {err!s}")
-            die(str(err))
-
-        for _ in qemu.repl():
-            pass
-
-
-if __name__ == '__main__':
-    main()
diff --git a/python/qemu/qmp/util.py b/python/qemu/qmp/util.py
deleted file mode 100644
index eaa5fc7d5f..0000000000
--- a/python/qemu/qmp/util.py
+++ /dev/null
@@ -1,217 +0,0 @@
-"""
-Miscellaneous Utilities
-
-This module provides asyncio utilities and compatibility wrappers for
-Python 3.6 to provide some features that otherwise become available in
-Python 3.7+.
-
-Various logging and debugging utilities are also provided, such as
-`exception_summary()` and `pretty_traceback()`, used primarily for
-adding information into the logging stream.
-"""
-
-import asyncio
-import sys
-import traceback
-from typing import (
-    Any,
-    Coroutine,
-    Optional,
-    TypeVar,
-    cast,
-)
-
-
-T = TypeVar('T')
-
-
-# --------------------------
-# Section: Utility Functions
-# --------------------------
-
-
-async def flush(writer: asyncio.StreamWriter) -> None:
-    """
-    Utility function to ensure a StreamWriter is *fully* drained.
-
-    `asyncio.StreamWriter.drain` only promises we will return to below
-    the "high-water mark". This function ensures we flush the entire
-    buffer -- by setting the high water mark to 0 and then calling
-    drain. The flow control limits are restored after the call is
-    completed.
-    """
-    transport = cast(asyncio.WriteTransport, writer.transport)
-
-    # https://github.com/python/typeshed/issues/5779
-    low, high = transport.get_write_buffer_limits()  # type: ignore
-    transport.set_write_buffer_limits(0, 0)
-    try:
-        await writer.drain()
-    finally:
-        transport.set_write_buffer_limits(high, low)
-
-
-def upper_half(func: T) -> T:
-    """
-    Do-nothing decorator that annotates a method as an "upper-half" method.
-
-    These methods must not call bottom-half functions directly, but can
-    schedule them to run.
-    """
-    return func
-
-
-def bottom_half(func: T) -> T:
-    """
-    Do-nothing decorator that annotates a method as a "bottom-half" method.
-
-    These methods must take great care to handle their own exceptions whenever
-    possible. If they go unhandled, they will cause termination of the loop.
-
-    These methods do not, in general, have the ability to directly
-    report information to a caller’s context and will usually be
-    collected as a Task result instead.
-
-    They must not call upper-half functions directly.
-    """
-    return func
-
-
-# -------------------------------
-# Section: Compatibility Wrappers
-# -------------------------------
-
-
-def create_task(coro: Coroutine[Any, Any, T],
-                loop: Optional[asyncio.AbstractEventLoop] = None
-                ) -> 'asyncio.Future[T]':
-    """
-    Python 3.6-compatible `asyncio.create_task` wrapper.
-
-    :param coro: The coroutine to execute in a task.
-    :param loop: Optionally, the loop to create the task in.
-
-    :return: An `asyncio.Future` object.
-    """
-    if sys.version_info >= (3, 7):
-        if loop is not None:
-            return loop.create_task(coro)
-        return asyncio.create_task(coro)  # pylint: disable=no-member
-
-    # Python 3.6:
-    return asyncio.ensure_future(coro, loop=loop)
-
-
-def is_closing(writer: asyncio.StreamWriter) -> bool:
-    """
-    Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
-
-    :param writer: The `asyncio.StreamWriter` object.
-    :return: `True` if the writer is closing, or closed.
-    """
-    if sys.version_info >= (3, 7):
-        return writer.is_closing()
-
-    # Python 3.6:
-    transport = writer.transport
-    assert isinstance(transport, asyncio.WriteTransport)
-    return transport.is_closing()
-
-
-async def wait_closed(writer: asyncio.StreamWriter) -> None:
-    """
-    Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
-
-    :param writer: The `asyncio.StreamWriter` to wait on.
-    """
-    if sys.version_info >= (3, 7):
-        await writer.wait_closed()
-        return
-
-    # Python 3.6
-    transport = writer.transport
-    assert isinstance(transport, asyncio.WriteTransport)
-
-    while not transport.is_closing():
-        await asyncio.sleep(0)
-
-    # This is an ugly workaround, but it's the best I can come up with.
-    sock = transport.get_extra_info('socket')
-
-    if sock is None:
-        # Our transport doesn't have a socket? ...
-        # Nothing we can reasonably do.
-        return
-
-    while sock.fileno() != -1:
-        await asyncio.sleep(0)
-
-
-def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
-    """
-    Python 3.6-compatible `asyncio.run` wrapper.
-
-    :param coro: A coroutine to execute now.
-    :return: The return value from the coroutine.
-    """
-    if sys.version_info >= (3, 7):
-        return asyncio.run(coro, debug=debug)
-
-    # Python 3.6
-    loop = asyncio.get_event_loop()
-    loop.set_debug(debug)
-    ret = loop.run_until_complete(coro)
-    loop.close()
-
-    return ret
-
-
-# ----------------------------
-# Section: Logging & Debugging
-# ----------------------------
-
-
-def exception_summary(exc: BaseException) -> str:
-    """
-    Return a summary string of an arbitrary exception.
-
-    It will be of the form "ExceptionType: Error Message", if the error
-    string is non-empty, and just "ExceptionType" otherwise.
-    """
-    name = type(exc).__qualname__
-    smod = type(exc).__module__
-    if smod not in ("__main__", "builtins"):
-        name = smod + '.' + name
-
-    error = str(exc)
-    if error:
-        return f"{name}: {error}"
-    return name
-
-
-def pretty_traceback(prefix: str = "  | ") -> str:
-    """
-    Formats the current traceback, indented to provide visual distinction.
-
-    This is useful for printing a traceback within a traceback for
-    debugging purposes when encapsulating errors to deliver them up the
-    stack; when those errors are printed, this helps provide a nice
-    visual grouping to quickly identify the parts of the error that
-    belong to the inner exception.
-
-    :param prefix: The prefix to append to each line of the traceback.
-    :return: A string, formatted something like the following::
-
-      | Traceback (most recent call last):
-      |   File "foobar.py", line 42, in arbitrary_example
-      |     foo.baz()
-      | ArbitraryError: [Errno 42] Something bad happened!
-    """
-    output = "".join(traceback.format_exception(*sys.exc_info()))
-
-    exc_lines = []
-    for line in output.split('\n'):
-        exc_lines.append(prefix + line)
-
-    # The last line is always empty, omit it
-    return "\n".join(exc_lines[:-1])
diff --git a/python/qemu/utils/qom_fuse.py b/python/qemu/utils/qom_fuse.py
index 8dcd59fcde..4877e4b78f 100644
--- a/python/qemu/utils/qom_fuse.py
+++ b/python/qemu/utils/qom_fuse.py
@@ -47,7 +47,6 @@
 
 import fuse
 from fuse import FUSE, FuseOSError, Operations
-
 from qemu.qmp import ExecuteError
 
 from .qom_common import QOMCommand
diff --git a/python/setup.cfg b/python/setup.cfg
index 7907994c5a..64ec4e7c14 100644
--- a/python/setup.cfg
+++ b/python/setup.cfg
@@ -22,9 +22,10 @@ classifiers =
     Typing :: Typed
 
 [options]
+install_requires =
+    qemu.qmp
 python_requires = >= 3.6
 packages =
-    qemu.qmp
     qemu.machine
     qemu.utils
 
@@ -43,20 +44,11 @@ devel =
     mypy >= 0.780
     pylint >= 2.8.0
     tox >= 3.18.0
-    urwid >= 2.1.2
-    urwid-readline >= 0.13
-    Pygments >= 2.9.0
 
 # Provides qom-fuse functionality
 fuse =
     fusepy >= 2.0.4
 
-# AQMP TUI dependencies
-tui =
-    urwid >= 2.1.2
-    urwid-readline >= 0.13
-    Pygments >= 2.9.0
-
 [options.entry_points]
 console_scripts =
     qom = qemu.utils.qom:main
@@ -66,8 +58,6 @@ console_scripts =
     qom-tree = qemu.utils.qom:QOMTree.entry_point
     qom-fuse = qemu.utils.qom_fuse:QOMFuse.entry_point [fuse]
     qemu-ga-client = qemu.utils.qemu_ga_client:main
-    qmp-shell = qemu.qmp.qmp_shell:main
-    aqmp-tui = qemu.qmp.aqmp_tui:main [tui]
 
 [flake8]
 extend-ignore = E722  # Prefer pylint's bare-except checks to flake8's
@@ -83,24 +73,11 @@ namespace_packages = True
 # fusepy has no type stubs:
 allow_subclassing_any = True
 
-[mypy-qemu.qmp.aqmp_tui]
-# urwid and urwid_readline have no type stubs:
-allow_subclassing_any = True
-
 # The following missing import directives are because these libraries do not
 # provide type stubs. Allow them on an as-needed basis for mypy.
 [mypy-fuse]
 ignore_missing_imports = True
 
-[mypy-urwid]
-ignore_missing_imports = True
-
-[mypy-urwid_readline]
-ignore_missing_imports = True
-
-[mypy-pygments]
-ignore_missing_imports = True
-
 [pylint.messages control]
 # Disable the message, report, category or checker with the given id(s). You
 # can either give multiple identifiers separated by comma (,) or put this
diff --git a/python/tests/protocol.py b/python/tests/protocol.py
deleted file mode 100644
index 8864e66a94..0000000000
--- a/python/tests/protocol.py
+++ /dev/null
@@ -1,583 +0,0 @@
-import asyncio
-from contextlib import contextmanager
-import os
-import socket
-from tempfile import TemporaryDirectory
-
-import avocado
-
-from qemu.qmp import ConnectError, Runstate
-from qemu.qmp.protocol import AsyncProtocol, StateError
-from qemu.qmp.util import asyncio_run, create_task
-
-
-class NullProtocol(AsyncProtocol[None]):
-    """
-    NullProtocol is a test mockup of an AsyncProtocol implementation.
-
-    It adds a fake_session instance variable that enables a code path
-    that bypasses the actual connection logic, but still allows the
-    reader/writers to start.
-
-    Because the message type is defined as None, an asyncio.Event named
-    'trigger_input' is created that prohibits the reader from
-    incessantly being able to yield None; this event can be poked to
-    simulate an incoming message.
-
-    For testing symmetry with do_recv, an interface is added to "send" a
-    Null message.
-
-    For testing purposes, a "simulate_disconnection" method is also
-    added which allows us to trigger a bottom half disconnect without
-    injecting any real errors into the reader/writer loops; in essence
-    it performs exactly half of what disconnect() normally does.
-    """
-    def __init__(self, name=None):
-        self.fake_session = False
-        self.trigger_input: asyncio.Event
-        super().__init__(name)
-
-    async def _establish_session(self):
-        self.trigger_input = asyncio.Event()
-        await super()._establish_session()
-
-    async def _do_accept(self, address, ssl=None):
-        if not self.fake_session:
-            await super()._do_accept(address, ssl)
-
-    async def _do_connect(self, address, ssl=None):
-        if not self.fake_session:
-            await super()._do_connect(address, ssl)
-
-    async def _do_recv(self) -> None:
-        await self.trigger_input.wait()
-        self.trigger_input.clear()
-
-    def _do_send(self, msg: None) -> None:
-        pass
-
-    async def send_msg(self) -> None:
-        await self._outgoing.put(None)
-
-    async def simulate_disconnect(self) -> None:
-        """
-        Simulates a bottom-half disconnect.
-
-        This method schedules a disconnection but does not wait for it
-        to complete. This is used to put the loop into the DISCONNECTING
-        state without fully quiescing it back to IDLE. This is normally
-        something you cannot coax AsyncProtocol to do on purpose, but it
-        will be similar to what happens with an unhandled Exception in
-        the reader/writer.
-
-        Under normal circumstances, the library design requires you to
-        await on disconnect(), which awaits the disconnect task and
-        returns bottom half errors as a pre-condition to allowing the
-        loop to return back to IDLE.
-        """
-        self._schedule_disconnect()
-
-
-class LineProtocol(AsyncProtocol[str]):
-    def __init__(self, name=None):
-        super().__init__(name)
-        self.rx_history = []
-
-    async def _do_recv(self) -> str:
-        raw = await self._readline()
-        msg = raw.decode()
-        self.rx_history.append(msg)
-        return msg
-
-    def _do_send(self, msg: str) -> None:
-        assert self._writer is not None
-        self._writer.write(msg.encode() + b'\n')
-
-    async def send_msg(self, msg: str) -> None:
-        await self._outgoing.put(msg)
-
-
-def run_as_task(coro, allow_cancellation=False):
-    """
-    Run a given coroutine as a task.
-
-    Optionally, wrap it in a try..except block that allows this
-    coroutine to be canceled gracefully.
-    """
-    async def _runner():
-        try:
-            await coro
-        except asyncio.CancelledError:
-            if allow_cancellation:
-                return
-            raise
-    return create_task(_runner())
-
-
-@contextmanager
-def jammed_socket():
-    """
-    Opens up a random unused TCP port on localhost, then jams it.
-    """
-    socks = []
-
-    try:
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        sock.bind(('127.0.0.1', 0))
-        sock.listen(1)
-        address = sock.getsockname()
-
-        socks.append(sock)
-
-        # I don't *fully* understand why, but it takes *two* un-accepted
-        # connections to start jamming the socket.
-        for _ in range(2):
-            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            sock.connect(address)
-            socks.append(sock)
-
-        yield address
-
-    finally:
-        for sock in socks:
-            sock.close()
-
-
-class Smoke(avocado.Test):
-
-    def setUp(self):
-        self.proto = NullProtocol()
-
-    def test__repr__(self):
-        self.assertEqual(
-            repr(self.proto),
-            "<NullProtocol runstate=IDLE>"
-        )
-
-    def testRunstate(self):
-        self.assertEqual(
-            self.proto.runstate,
-            Runstate.IDLE
-        )
-
-    def testDefaultName(self):
-        self.assertEqual(
-            self.proto.name,
-            None
-        )
-
-    def testLogger(self):
-        self.assertEqual(
-            self.proto.logger.name,
-            'qemu.qmp.protocol'
-        )
-
-    def testName(self):
-        self.proto = NullProtocol('Steve')
-
-        self.assertEqual(
-            self.proto.name,
-            'Steve'
-        )
-
-        self.assertEqual(
-            self.proto.logger.name,
-            'qemu.qmp.protocol.Steve'
-        )
-
-        self.assertEqual(
-            repr(self.proto),
-            "<NullProtocol name='Steve' runstate=IDLE>"
-        )
-
-
-class TestBase(avocado.Test):
-
-    def setUp(self):
-        self.proto = NullProtocol(type(self).__name__)
-        self.assertEqual(self.proto.runstate, Runstate.IDLE)
-        self.runstate_watcher = None
-
-    def tearDown(self):
-        self.assertEqual(self.proto.runstate, Runstate.IDLE)
-
-    async def _asyncSetUp(self):
-        pass
-
-    async def _asyncTearDown(self):
-        if self.runstate_watcher:
-            await self.runstate_watcher
-
-    @staticmethod
-    def async_test(async_test_method):
-        """
-        Decorator; adds SetUp and TearDown to async tests.
-        """
-        async def _wrapper(self, *args, **kwargs):
-            loop = asyncio.get_event_loop()
-            loop.set_debug(True)
-
-            await self._asyncSetUp()
-            await async_test_method(self, *args, **kwargs)
-            await self._asyncTearDown()
-
-        return _wrapper
-
-    # Definitions
-
-    # The states we expect a "bad" connect/accept attempt to transition through
-    BAD_CONNECTION_STATES = (
-        Runstate.CONNECTING,
-        Runstate.DISCONNECTING,
-        Runstate.IDLE,
-    )
-
-    # The states we expect a "good" session to transition through
-    GOOD_CONNECTION_STATES = (
-        Runstate.CONNECTING,
-        Runstate.RUNNING,
-        Runstate.DISCONNECTING,
-        Runstate.IDLE,
-    )
-
-    # Helpers
-
-    async def _watch_runstates(self, *states):
-        """
-        This launches a task alongside (most) tests below to confirm that
-        the sequence of runstate changes that occur is exactly as
-        anticipated.
-        """
-        async def _watcher():
-            for state in states:
-                new_state = await self.proto.runstate_changed()
-                self.assertEqual(
-                    new_state,
-                    state,
-                    msg=f"Expected state '{state.name}'",
-                )
-
-        self.runstate_watcher = create_task(_watcher())
-        # Kick the loop and force the task to block on the event.
-        await asyncio.sleep(0)
-
-
-class State(TestBase):
-
-    @TestBase.async_test
-    async def testSuperfluousDisconnect(self):
-        """
-        Test calling disconnect() while already disconnected.
-        """
-        await self._watch_runstates(
-            Runstate.DISCONNECTING,
-            Runstate.IDLE,
-        )
-        await self.proto.disconnect()
-
-
-class Connect(TestBase):
-    """
-    Tests primarily related to calling Connect().
-    """
-    async def _bad_connection(self, family: str):
-        assert family in ('INET', 'UNIX')
-
-        if family == 'INET':
-            await self.proto.connect(('127.0.0.1', 0))
-        elif family == 'UNIX':
-            await self.proto.connect('/dev/null')
-
-    async def _hanging_connection(self):
-        with jammed_socket() as addr:
-            await self.proto.connect(addr)
-
-    async def _bad_connection_test(self, family: str):
-        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
-
-        with self.assertRaises(ConnectError) as context:
-            await self._bad_connection(family)
-
-        self.assertIsInstance(context.exception.exc, OSError)
-        self.assertEqual(
-            context.exception.error_message,
-            "Failed to establish connection"
-        )
-
-    @TestBase.async_test
-    async def testBadINET(self):
-        """
-        Test an immediately rejected call to an IP target.
-        """
-        await self._bad_connection_test('INET')
-
-    @TestBase.async_test
-    async def testBadUNIX(self):
-        """
-        Test an immediately rejected call to a UNIX socket target.
-        """
-        await self._bad_connection_test('UNIX')
-
-    @TestBase.async_test
-    async def testCancellation(self):
-        """
-        Test what happens when a connection attempt is aborted.
-        """
-        # Note that accept() cannot be cancelled outright, as it isn't a task.
-        # However, we can wrap it in a task and cancel *that*.
-        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
-        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
-
-        state = await self.proto.runstate_changed()
-        self.assertEqual(state, Runstate.CONNECTING)
-
-        # This is insider baseball, but the connection attempt has
-        # yielded *just* before the actual connection attempt, so kick
-        # the loop to make sure it's truly wedged.
-        await asyncio.sleep(0)
-
-        task.cancel()
-        await task
-
-    @TestBase.async_test
-    async def testTimeout(self):
-        """
-        Test what happens when a connection attempt times out.
-        """
-        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
-        task = run_as_task(self._hanging_connection())
-
-        # More insider baseball: to improve the speed of this test while
-        # guaranteeing that the connection even gets a chance to start,
-        # verify that the connection hangs *first*, then await the
-        # result of the task with a nearly-zero timeout.
-
-        state = await self.proto.runstate_changed()
-        self.assertEqual(state, Runstate.CONNECTING)
-        await asyncio.sleep(0)
-
-        with self.assertRaises(asyncio.TimeoutError):
-            await asyncio.wait_for(task, timeout=0)
-
-    @TestBase.async_test
-    async def testRequire(self):
-        """
-        Test what happens when a connection attempt is made while CONNECTING.
-        """
-        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
-        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
-
-        state = await self.proto.runstate_changed()
-        self.assertEqual(state, Runstate.CONNECTING)
-
-        with self.assertRaises(StateError) as context:
-            await self._bad_connection('UNIX')
-
-        self.assertEqual(
-            context.exception.error_message,
-            "NullProtocol is currently connecting."
-        )
-        self.assertEqual(context.exception.state, Runstate.CONNECTING)
-        self.assertEqual(context.exception.required, Runstate.IDLE)
-
-        task.cancel()
-        await task
-
-    @TestBase.async_test
-    async def testImplicitRunstateInit(self):
-        """
-        Test what happens if we do not wait on the runstate event until
-        AFTER a connection is made, i.e., connect()/accept() themselves
-        initialize the runstate event. All of the above tests force the
-        initialization by waiting on the runstate *first*.
-        """
-        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
-
-        # Kick the loop to coerce the state change
-        await asyncio.sleep(0)
-        assert self.proto.runstate == Runstate.CONNECTING
-
-        # We already missed the transition to CONNECTING
-        await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE)
-
-        task.cancel()
-        await task
-
-
-class Accept(Connect):
-    """
-    All of the same tests as Connect, but using the accept() interface.
-    """
-    async def _bad_connection(self, family: str):
-        assert family in ('INET', 'UNIX')
-
-        if family == 'INET':
-            await self.proto.accept(('example.com', 1))
-        elif family == 'UNIX':
-            await self.proto.accept('/dev/null')
-
-    async def _hanging_connection(self):
-        with TemporaryDirectory(suffix='.qmp') as tmpdir:
-            sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
-            await self.proto.accept(sock)
-
-
-class FakeSession(TestBase):
-
-    def setUp(self):
-        super().setUp()
-        self.proto.fake_session = True
-
-    async def _asyncSetUp(self):
-        await super()._asyncSetUp()
-        await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
-
-    async def _asyncTearDown(self):
-        await self.proto.disconnect()
-        await super()._asyncTearDown()
-
-    ####
-
-    @TestBase.async_test
-    async def testFakeConnect(self):
-
-        """Test the full state lifecycle (via connect) with a no-op session."""
-        await self.proto.connect('/not/a/real/path')
-        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
-
-    @TestBase.async_test
-    async def testFakeAccept(self):
-        """Test the full state lifecycle (via accept) with a no-op session."""
-        await self.proto.accept('/not/a/real/path')
-        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
-
-    @TestBase.async_test
-    async def testFakeRecv(self):
-        """Test receiving a fake/null message."""
-        await self.proto.accept('/not/a/real/path')
-
-        logname = self.proto.logger.name
-        with self.assertLogs(logname, level='DEBUG') as context:
-            self.proto.trigger_input.set()
-            self.proto.trigger_input.clear()
-            await asyncio.sleep(0)  # Kick reader.
-
-        self.assertEqual(
-            context.output,
-            [f"DEBUG:{logname}:<-- None"],
-        )
-
-    @TestBase.async_test
-    async def testFakeSend(self):
-        """Test sending a fake/null message."""
-        await self.proto.accept('/not/a/real/path')
-
-        logname = self.proto.logger.name
-        with self.assertLogs(logname, level='DEBUG') as context:
-            # Cheat: Send a Null message to nobody.
-            await self.proto.send_msg()
-            # Kick writer; awaiting on a queue.put isn't sufficient to yield.
-            await asyncio.sleep(0)
-
-        self.assertEqual(
-            context.output,
-            [f"DEBUG:{logname}:--> None"],
-        )
-
-    async def _prod_session_api(
-            self,
-            current_state: Runstate,
-            error_message: str,
-            accept: bool = True
-    ):
-        with self.assertRaises(StateError) as context:
-            if accept:
-                await self.proto.accept('/not/a/real/path')
-            else:
-                await self.proto.connect('/not/a/real/path')
-
-        self.assertEqual(context.exception.error_message, error_message)
-        self.assertEqual(context.exception.state, current_state)
-        self.assertEqual(context.exception.required, Runstate.IDLE)
-
-    @TestBase.async_test
-    async def testAcceptRequireRunning(self):
-        """Test that accept() cannot be called when Runstate=RUNNING"""
-        await self.proto.accept('/not/a/real/path')
-
-        await self._prod_session_api(
-            Runstate.RUNNING,
-            "NullProtocol is already connected and running.",
-            accept=True,
-        )
-
-    @TestBase.async_test
-    async def testConnectRequireRunning(self):
-        """Test that connect() cannot be called when Runstate=RUNNING"""
-        await self.proto.accept('/not/a/real/path')
-
-        await self._prod_session_api(
-            Runstate.RUNNING,
-            "NullProtocol is already connected and running.",
-            accept=False,
-        )
-
-    @TestBase.async_test
-    async def testAcceptRequireDisconnecting(self):
-        """Test that accept() cannot be called when Runstate=DISCONNECTING"""
-        await self.proto.accept('/not/a/real/path')
-
-        # Cheat: force a disconnect.
-        await self.proto.simulate_disconnect()
-
-        await self._prod_session_api(
-            Runstate.DISCONNECTING,
-            ("NullProtocol is disconnecting."
-             " Call disconnect() to return to IDLE state."),
-            accept=True,
-        )
-
-    @TestBase.async_test
-    async def testConnectRequireDisconnecting(self):
-        """Test that connect() cannot be called when Runstate=DISCONNECTING"""
-        await self.proto.accept('/not/a/real/path')
-
-        # Cheat: force a disconnect.
-        await self.proto.simulate_disconnect()
-
-        await self._prod_session_api(
-            Runstate.DISCONNECTING,
-            ("NullProtocol is disconnecting."
-             " Call disconnect() to return to IDLE state."),
-            accept=False,
-        )
-
-
-class SimpleSession(TestBase):
-
-    def setUp(self):
-        super().setUp()
-        self.server = LineProtocol(type(self).__name__ + '-server')
-
-    async def _asyncSetUp(self):
-        await super()._asyncSetUp()
-        await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
-
-    async def _asyncTearDown(self):
-        await self.proto.disconnect()
-        try:
-            await self.server.disconnect()
-        except EOFError:
-            pass
-        await super()._asyncTearDown()
-
-    @TestBase.async_test
-    async def testSmoke(self):
-        with TemporaryDirectory(suffix='.qmp') as tmpdir:
-            sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
-            server_task = create_task(self.server.accept(sock))
-
-            # give the server a chance to start listening [...]
-            await asyncio.sleep(0)
-            await self.proto.connect(sock)
-- 
2.31.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]