[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH 16/20] python/aqmp: Add message routing to QMP protocol
From: |
John Snow |
Subject: |
[PATCH 16/20] python/aqmp: Add message routing to QMP protocol |
Date: |
Thu, 1 Jul 2021 00:13:09 -0400 |
Add the ability to handle and route messages in qmp_protocol.py. The
interface for actually sending anything still isn't added until next
commit.
Signed-off-by: John Snow <jsnow@redhat.com>
---
python/qemu/aqmp/qmp_protocol.py | 98 +++++++++++++++++++++++++++++++-
1 file changed, 96 insertions(+), 2 deletions(-)
diff --git a/python/qemu/aqmp/qmp_protocol.py b/python/qemu/aqmp/qmp_protocol.py
index 5872bfc017..04c8a8cb54 100644
--- a/python/qemu/aqmp/qmp_protocol.py
+++ b/python/qemu/aqmp/qmp_protocol.py
@@ -7,15 +7,18 @@
incoming connection from that server.
"""
+# The import workarounds here are fixed in the next commit.
+import asyncio # pylint: disable=unused-import # noqa
import logging
from typing import (
Dict,
List,
Mapping,
Optional,
+ Union,
)
-from .error import ProtocolError
+from .error import AQMPError, ProtocolError
from .events import Events
from .message import Message
from .models import Greeting
@@ -56,6 +59,53 @@ class NegotiationError(_WrappedProtocolError):
"""
+class ExecInterruptedError(AQMPError):
+ """
+ Exception raised when an RPC is interrupted.
+
+ This error is raised when an execute() statement could not be
+ completed. This can occur because the connection itself was
+ terminated before a reply was received.
+
+ The true cause of the interruption will be available via `disconnect()`.
+ """
+
+
+class _MsgProtocolError(ProtocolError):
+ """
+ Abstract error class for protocol errors that have a `Message` object.
+
+ This Exception class is used for protocol errors where the `Message`
+ was mechanically understood, but was found to be inappropriate or
+ malformed.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+ def __init__(self, error_message: str, msg: Message):
+ super().__init__(error_message)
+ #: The received `Message` that caused the error.
+ self.msg: Message = msg
+
+ def __str__(self) -> str:
+ return "\n".join([
+ super().__str__(),
+ f" Message was: {str(self.msg)}\n",
+ ])
+
+
+class ServerParseError(_MsgProtocolError):
+ """
+ The Server sent a `Message` indicating parsing failure.
+
+ i.e. A reply has arrived from the server, but it is missing the "ID"
+ field, indicating a parsing error.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+
+
class QMP(AsyncProtocol[Message], Events):
"""
Implements a QMP client connection.
@@ -98,6 +148,9 @@ async def run(self, address='/tmp/qemu.socket'):
#: Logger object used for debugging messages.
logger = logging.getLogger(__name__)
+ # Type alias for pending execute() result items
+ _PendingT = Union[Message, ExecInterruptedError]
+
def __init__(self, name: Optional[str] = None) -> None:
super().__init__(name)
Events.__init__(self)
@@ -112,6 +165,9 @@ def __init__(self, name: Optional[str] = None) -> None:
# Cached Greeting, if one was awaited.
self._greeting: Optional[Greeting] = None
+ # Incoming RPC reply messages
+ self._pending: Dict[str, 'asyncio.Queue[QMP._PendingT]'] = {}
+
@upper_half
async def _begin_new_session(self) -> None:
"""
@@ -191,10 +247,27 @@ async def _negotiate(self) -> None:
self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
raise
+ @bottom_half
+ async def _bh_disconnect(self, force: bool = False) -> None:
+ await super()._bh_disconnect(force)
+
+ if self._pending:
+ self.logger.debug("Cancelling pending executions")
+ keys = self._pending.keys()
+ for key in keys:
+ self.logger.debug("Cancelling execution '%s'", key)
+ self._pending[key].put_nowait(
+ ExecInterruptedError("Disconnected")
+ )
+
+ self.logger.debug("QMP Disconnected.")
+
@bottom_half
async def _on_message(self, msg: Message) -> None:
"""
Add an incoming message to the appropriate queue/handler.
+
+ :raise ServerParseError: When Message has no 'event' nor 'id' member
"""
# Incoming messages are not fully parsed/validated here;
# do only light peeking to know how to route the messages.
@@ -204,7 +277,27 @@ async def _on_message(self, msg: Message) -> None:
return
# Below, we assume everything left is an execute/exec-oob response.
- # ... Which we'll implement in the next commit!
+
+ if 'id' not in msg:
+ # This is (very likely) a server parsing error.
+ # It doesn't inherently belong to any pending execution.
+ # Instead of performing clever recovery, just terminate.
+ # See "NOTE" in qmp-spec.txt, section 2.4.2
+ raise ServerParseError("Server sent a message without an ID,"
+ " indicating parse failure.", msg)
+
+ assert 'id' in msg
+ exec_id = str(msg['id'])
+
+ if exec_id not in self._pending:
+ # qmp-spec.txt, section 2.4:
+ # 'Clients should drop all the responses
+ # that have an unknown "id" field.'
+ self.logger.warning("Unknown ID '%s', message dropped.", exec_id)
+ self.logger.debug("Unroutable message: %s", str(msg))
+ return
+
+ await self._pending[exec_id].put(msg)
@upper_half
@bottom_half
@@ -237,6 +330,7 @@ def _do_send(self, msg: Message) -> None:
def _cleanup(self) -> None:
super()._cleanup()
self._greeting = None
+ assert not self._pending
@classmethod
def make_execute_msg(cls, cmd: str,
--
2.31.1
- [PATCH 07/20] python/aqmp: add runstate state machine to AsyncProtocol, (continued)
- [PATCH 07/20] python/aqmp: add runstate state machine to AsyncProtocol, John Snow, 2021/07/01
- [PATCH 08/20] python/aqmp: add logging to AsyncProtocol, John Snow, 2021/07/01
- [PATCH 06/20] python/aqmp: add generic async message-based protocol support, John Snow, 2021/07/01
- [PATCH 09/20] python/aqmp: add AsyncProtocol.accept() method, John Snow, 2021/07/01
- [PATCH 11/20] python/aqmp: add AsyncProtocol._readline() method, John Snow, 2021/07/01
- [PATCH 10/20] python/aqmp: add _cb_inbound and _cb_inbound logging hooks, John Snow, 2021/07/01
- [PATCH 13/20] python/aqmp: add well-known QMP object models, John Snow, 2021/07/01
- [PATCH 12/20] python/aqmp: add QMP Message format, John Snow, 2021/07/01
- [PATCH 16/20] python/aqmp: Add message routing to QMP protocol,
John Snow <=
- [PATCH 17/20] python/aqmp: add execute() interfaces, John Snow, 2021/07/01
- [PATCH 15/20] python/aqmp: add QMP protocol support, John Snow, 2021/07/01
- [PATCH 14/20] python/aqmp: add QMP event support, John Snow, 2021/07/01
- [PATCH 18/20] python/aqmp: add _raw() execution interface, John Snow, 2021/07/01
- [PATCH 19/20] python/aqmp: add asyncio_run compatibility wrapper, John Snow, 2021/07/01
- [PATCH 20/20] python/aqmp: add scary message, John Snow, 2021/07/01
- Re: [PATCH 00/20] python: introduce Asynchronous QMP package, Stefan Hajnoczi, 2021/07/05