[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
RE: [RFC v4 08/21] vfio-user: define socket receive functions
From: |
Thanos Makatos |
Subject: |
RE: [RFC v4 08/21] vfio-user: define socket receive functions |
Date: |
Thu, 3 Feb 2022 21:53:43 +0000 |
> -----Original Message-----
> From: Qemu-devel <qemu-devel-
> bounces+thanos.makatos=nutanix.com@nongnu.org> On Behalf Of John
> Johnson
> Sent: 12 January 2022 00:44
> To: qemu-devel@nongnu.org
> Subject: [RFC v4 08/21] vfio-user: define socket receive functions
>
> Add infrastructure needed to receive incoming messages
>
> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> ---
> hw/vfio/user-protocol.h | 54 ++++++++
> hw/vfio/user.h | 6 +
> hw/vfio/pci.c | 6 +
> hw/vfio/user.c | 327
> ++++++++++++++++++++++++++++++++++++++++++++++++
> MAINTAINERS | 1 +
> 5 files changed, 394 insertions(+)
> create mode 100644 hw/vfio/user-protocol.h
>
> diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
> new file mode 100644
> index 0000000..d23877c
> --- /dev/null
> +++ b/hw/vfio/user-protocol.h
> @@ -0,0 +1,54 @@
> +#ifndef VFIO_USER_PROTOCOL_H
> +#define VFIO_USER_PROTOCOL_H
> +
> +/*
> + * vfio protocol over a UNIX socket.
> + *
> + * Copyright © 2018, 2021 Oracle and/or its affiliates.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2. See
> + * the COPYING file in the top-level directory.
> + *
> + * Each message has a standard header that describes the command
> + * being sent, which is almost always a VFIO ioctl().
> + *
> + * The header may be followed by command-specific data, such as the
> + * region and offset info for read and write commands.
> + */
> +
> +typedef struct {
> + uint16_t id;
> + uint16_t command;
> + uint32_t size;
> + uint32_t flags;
> + uint32_t error_reply;
> +} VFIOUserHdr;
> +
> +/* VFIOUserHdr commands */
> +enum vfio_user_command {
> + VFIO_USER_VERSION = 1,
> + VFIO_USER_DMA_MAP = 2,
> + VFIO_USER_DMA_UNMAP = 3,
> + VFIO_USER_DEVICE_GET_INFO = 4,
> + VFIO_USER_DEVICE_GET_REGION_INFO = 5,
> + VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6,
> + VFIO_USER_DEVICE_GET_IRQ_INFO = 7,
> + VFIO_USER_DEVICE_SET_IRQS = 8,
> + VFIO_USER_REGION_READ = 9,
> + VFIO_USER_REGION_WRITE = 10,
> + VFIO_USER_DMA_READ = 11,
> + VFIO_USER_DMA_WRITE = 12,
> + VFIO_USER_DEVICE_RESET = 13,
> + VFIO_USER_DIRTY_PAGES = 14,
> + VFIO_USER_MAX,
> +};
> +
> +/* VFIOUserHdr flags */
> +#define VFIO_USER_REQUEST 0x0
> +#define VFIO_USER_REPLY 0x1
> +#define VFIO_USER_TYPE 0xF
> +
> +#define VFIO_USER_NO_REPLY 0x10
> +#define VFIO_USER_ERROR 0x20
> +
> +#endif /* VFIO_USER_PROTOCOL_H */
> diff --git a/hw/vfio/user.h b/hw/vfio/user.h
> index da92862..72eefa7 100644
> --- a/hw/vfio/user.h
> +++ b/hw/vfio/user.h
> @@ -11,6 +11,8 @@
> *
> */
>
> +#include "user-protocol.h"
> +
> typedef struct {
> int send_fds;
> int recv_fds;
> @@ -27,6 +29,7 @@ enum msg_type {
>
> typedef struct VFIOUserMsg {
> QTAILQ_ENTRY(VFIOUserMsg) next;
> + VFIOUserHdr *hdr;
> VFIOUserFDs *fds;
> uint32_t rsize;
> uint32_t id;
> @@ -74,5 +77,8 @@ typedef struct VFIOProxy {
>
> VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
> void vfio_user_disconnect(VFIOProxy *proxy);
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> + void (*handler)(void *opaque, VFIOUserMsg *msg),
> + void *reqarg);
>
> #endif /* VFIO_USER_H */
> diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> index 9fd7c07..0de915d 100644
> --- a/hw/vfio/pci.c
> +++ b/hw/vfio/pci.c
> @@ -3386,6 +3386,11 @@ type_init(register_vfio_pci_dev_type)
> * vfio-user routines.
> */
>
> +static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
> +{
> +
> +}
> +
> /*
> * Emulated devices don't use host hot reset
> */
> @@ -3432,6 +3437,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev,
> Error **errp)
> return;
> }
> vbasedev->proxy = proxy;
> + vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
>
> vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
> vbasedev->dev = DEVICE(vdev);
> diff --git a/hw/vfio/user.c b/hw/vfio/user.c
> index c843f90..e1dfd5d 100644
> --- a/hw/vfio/user.c
> +++ b/hw/vfio/user.c
> @@ -25,10 +25,26 @@
> #include "sysemu/iothread.h"
> #include "user.h"
>
> +static uint64_t max_xfer_size;
> static IOThread *vfio_user_iothread;
>
> static void vfio_user_shutdown(VFIOProxy *proxy);
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> + VFIOUserFDs *fds);
> +static VFIOUserFDs *vfio_user_getfds(int numfds);
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
>
> +static void vfio_user_recv(void *opaque);
> +static int vfio_user_recv_one(VFIOProxy *proxy);
> +static void vfio_user_cb(void *opaque);
> +
> +static void vfio_user_request(void *opaque);
> +
> +static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
> +{
> + hdr->flags |= VFIO_USER_ERROR;
> + hdr->error_reply = err;
> +}
>
> /*
> * Functions called by main, CPU, or iothread threads
> @@ -40,10 +56,261 @@ static void vfio_user_shutdown(VFIOProxy *proxy)
> qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL, NULL);
> }
>
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> + VFIOUserFDs *fds)
> +{
> + VFIOUserMsg *msg;
> +
> + msg = QTAILQ_FIRST(&proxy->free);
> + if (msg != NULL) {
> + QTAILQ_REMOVE(&proxy->free, msg, next);
> + } else {
> + msg = g_malloc0(sizeof(*msg));
> + qemu_cond_init(&msg->cv);
> + }
> +
> + msg->hdr = hdr;
> + msg->fds = fds;
> + return msg;
> +}
> +
> +/*
> + * Recycle a message list entry to the free list.
> + */
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg)
> +{
> + if (msg->type == VFIO_MSG_NONE) {
> + error_printf("vfio_user_recycle - freeing free msg\n");
> + return;
> + }
> +
> + /* free msg buffer if no one is waiting to consume the reply */
> + if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
> + g_free(msg->hdr);
> + if (msg->fds != NULL) {
> + g_free(msg->fds);
> + }
> + }
> +
> + msg->type = VFIO_MSG_NONE;
> + msg->hdr = NULL;
> + msg->fds = NULL;
> + msg->complete = false;
> + QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
> +}
> +
> +static VFIOUserFDs *vfio_user_getfds(int numfds)
> +{
> + VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
> +
> + fds->fds = (int *)((char *)fds + sizeof(*fds));
> +
> + return fds;
> +}
> +
> /*
> * Functions only called by iothread
> */
>
> +static void vfio_user_recv(void *opaque)
> +{
> + VFIOProxy *proxy = opaque;
> +
> + QEMU_LOCK_GUARD(&proxy->lock);
> +
> + if (proxy->state == VFIO_PROXY_CONNECTED) {
> + while (vfio_user_recv_one(proxy) == 0) {
> + ;
> + }
> + }
> +}
> +
> +/*
> + * Receive and process one incoming message.
> + *
> + * For replies, find matching outgoing request and wake any waiters.
> + * For requests, queue in incoming list and run request BH.
> + */
> +static int vfio_user_recv_one(VFIOProxy *proxy)
> +{
> + VFIOUserMsg *msg = NULL;
> + g_autofree int *fdp = NULL;
> + VFIOUserFDs *reqfds;
> + VFIOUserHdr hdr;
> + struct iovec iov = {
> + .iov_base = &hdr,
> + .iov_len = sizeof(hdr),
> + };
> + bool isreply = false;
> + int i, ret;
> + size_t msgleft, numfds = 0;
> + char *data = NULL;
> + char *buf = NULL;
> + Error *local_err = NULL;
> +
> + /*
> + * Read header
> + */
> + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
> + &local_err);
> + if (ret == QIO_CHANNEL_ERR_BLOCK) {
> + return ret;
> + }
> + if (ret <= 0) {
> + /* read error or other side closed connection */
> + if (ret == 0) {
> + error_setg(&local_err, "vfio_user_recv server closed socket");
> + } else {
> + error_prepend(&local_err, "vfio_user_recv");
> + }
> + goto fatal;
> + }
> + if (ret < sizeof(msg)) {
> + error_setg(&local_err, "vfio_user_recv short read of header");
> + goto fatal;
> + }
> +
> + /*
> + * Validate header
> + */
> + if (hdr.size < sizeof(VFIOUserHdr)) {
> + error_setg(&local_err, "vfio_user_recv bad header size");
> + goto fatal;
> + }
> + switch (hdr.flags & VFIO_USER_TYPE) {
> + case VFIO_USER_REQUEST:
> + isreply = false;
> + break;
> + case VFIO_USER_REPLY:
> + isreply = true;
> + break;
> + default:
> + error_setg(&local_err, "vfio_user_recv unknown message type");
> + goto fatal;
> + }
> +
> + /*
> + * For replies, find the matching pending request.
> + * For requests, reap incoming FDs.
> + */
> + if (isreply) {
> + QTAILQ_FOREACH(msg, &proxy->pending, next) {
> + if (hdr.id == msg->id) {
> + break;
> + }
> + }
> + if (msg == NULL) {
> + error_setg(&local_err, "vfio_user_recv unexpected reply");
> + goto err;
> + }
> + QTAILQ_REMOVE(&proxy->pending, msg, next);
> +
> + /*
> + * Process any received FDs
> + */
> + if (numfds != 0) {
> + if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
> + error_setg(&local_err, "vfio_user_recv unexpected FDs");
> + goto err;
> + }
> + msg->fds->recv_fds = numfds;
> + memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
> + }
> + } else {
> + if (numfds != 0) {
> + reqfds = vfio_user_getfds(numfds);
> + memcpy(reqfds->fds, fdp, numfds * sizeof(int));
> + } else {
> + reqfds = NULL;
> + }
> + }
> +
> + /*
> + * Put the whole message into a single buffer.
> + */
> + if (isreply) {
> + if (hdr.size > msg->rsize) {
> + error_setg(&local_err,
> + "vfio_user_recv reply larger than recv buffer");
> + goto err;
> + }
> + *msg->hdr = hdr;
> + data = (char *)msg->hdr + sizeof(hdr);
> + } else {
> + if (hdr.size > max_xfer_size) {
> + error_setg(&local_err, "vfio_user_recv request larger than max");
> + goto err;
> + }
> + buf = g_malloc0(hdr.size);
> + memcpy(buf, &hdr, sizeof(hdr));
> + data = buf + sizeof(hdr);
> + msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
> + msg->type = VFIO_MSG_REQ;
> + }
> +
> + msgleft = hdr.size - sizeof(hdr);
> + while (msgleft > 0) {
> + ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
> +
> + /* error or would block */
> + if (ret < 0) {
> + goto fatal;
> + }
IIUC qio_channel_read() ends up calling qio_channel_socket_readv() which can
return QIO_CHANNEL_ERR_BLOCK (-2). The if will be taken so local_err is NULL
and that causes a segfault when error_report_err(local_err) is called before
returning from this function.
> +
> + msgleft -= ret;
> + data += ret;
> + }
> +
> + /*
> + * Replies signal a waiter, if none just check for errors
> + * and free the message buffer.
> + *
> + * Requests get queued for the BH.
> + */
> + if (isreply) {
> + msg->complete = true;
> + if (msg->type == VFIO_MSG_WAIT) {
> + qemu_cond_signal(&msg->cv);
> + } else {
> + if (hdr.flags & VFIO_USER_ERROR) {
> + error_printf("vfio_user_rcv error reply on async request ");
> + error_printf("command %x error %s\n", hdr.command,
> + strerror(hdr.error_reply));
> + }
> + /* youngest nowait msg has been ack'd */
> + if (proxy->last_nowait == msg) {
> + proxy->last_nowait = NULL;
> + }
> + vfio_user_recycle(proxy, msg);
> + }
> + } else {
> + QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
> + qemu_bh_schedule(proxy->req_bh);
> + }
> + return 0;
> +
> + /*
> + * fatal means the other side closed or we don't trust the stream
> + * err means this message is corrupt
> + */
> +fatal:
> + vfio_user_shutdown(proxy);
> + proxy->state = VFIO_PROXY_ERROR;
> +
> +err:
> + for (i = 0; i < numfds; i++) {
> + close(fdp[i]);
> + }
> + if (isreply && msg != NULL) {
> + /* force an error to keep sending thread from hanging */
> + vfio_user_set_error(msg->hdr, EINVAL);
> + msg->complete = true;
> + qemu_cond_signal(&msg->cv);
> + }
> + error_report_err(local_err);
> + return -1;
> +}
> +
> static void vfio_user_cb(void *opaque)
> {
> VFIOProxy *proxy = opaque;
> @@ -59,6 +326,51 @@ static void vfio_user_cb(void *opaque)
> * Functions called by main or CPU threads
> */
>
> +/*
> + * Process incoming requests.
> + *
> + * The bus-specific callback has the form:
> + * request(opaque, msg)
> + * where 'opaque' was specified in vfio_user_set_handler
> + * and 'msg' is the inbound message.
> + *
> + * The callback is responsible for disposing of the message buffer,
> + * usually by re-using it when calling vfio_send_reply or vfio_send_error,
> + * both of which free their message buffer when the reply is sent.
> + *
> + * If the callback uses a new buffer, it needs to free the old one.
> + */
> +static void vfio_user_request(void *opaque)
> +{
> + VFIOProxy *proxy = opaque;
> + VFIOUserMsgQ new, free;
> + VFIOUserMsg *msg, *m1;
> +
> + /* reap all incoming */
> + QTAILQ_INIT(&new);
> + WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> + QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
> + QTAILQ_REMOVE(&proxy->pending, msg, next);
> + QTAILQ_INSERT_TAIL(&new, msg, next);
> + }
> + }
> +
> + /* process list */
> + QTAILQ_INIT(&free);
> + QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
> + QTAILQ_REMOVE(&new, msg, next);
> + proxy->request(proxy->req_arg, msg);
> + QTAILQ_INSERT_HEAD(&free, msg, next);
> + }
> +
> + /* free list */
> + WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> + QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
> + vfio_user_recycle(proxy, msg);
> + }
> + }
> +}
> +
> static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
> QLIST_HEAD_INITIALIZER(vfio_user_sockets);
>
> @@ -97,6 +409,7 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> *addr, Error **errp)
> }
>
> proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
> + proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
>
> QTAILQ_INIT(&proxy->outgoing);
> QTAILQ_INIT(&proxy->incoming);
> @@ -107,6 +420,18 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> *addr, Error **errp)
> return proxy;
> }
>
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> + void (*handler)(void *opaque, VFIOUserMsg *msg),
> + void *req_arg)
> +{
> + VFIOProxy *proxy = vbasedev->proxy;
> +
> + proxy->request = handler;
> + proxy->req_arg = req_arg;
> + qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
> + vfio_user_recv, NULL, proxy);
> +}
> +
> void vfio_user_disconnect(VFIOProxy *proxy)
> {
> VFIOUserMsg *r1, *r2;
> @@ -122,6 +447,8 @@ void vfio_user_disconnect(VFIOProxy *proxy)
> }
> object_unref(OBJECT(proxy->ioc));
> proxy->ioc = NULL;
> + qemu_bh_delete(proxy->req_bh);
> + proxy->req_bh = NULL;
>
> proxy->state = VFIO_PROXY_CLOSING;
> QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
> diff --git a/MAINTAINERS b/MAINTAINERS
> index cfaccbf..bc0ba88 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -1909,6 +1909,7 @@ S: Supported
> F: docs/devel/vfio-user.rst
> F: hw/vfio/user.c
> F: hw/vfio/user.h
> +F: hw/vfio/user-protocol.h
>
> vhost
> M: Michael S. Tsirkin <mst@redhat.com>
> --
> 1.8.3.1
>
- RE: [RFC v4 08/21] vfio-user: define socket receive functions,
Thanos Makatos <=