qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [RFC v3 08/19] vfio-user: define socket receive functions


From: Alex Williamson
Subject: Re: [RFC v3 08/19] vfio-user: define socket receive functions
Date: Fri, 19 Nov 2021 15:42:26 -0700

On Mon,  8 Nov 2021 16:46:36 -0800
John Johnson <john.g.johnson@oracle.com> wrote:

> Add infrastructure needed to receive incoming messages
> 
> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> ---
>  hw/vfio/pci.h           |   2 +-
>  hw/vfio/user-protocol.h |  62 +++++++++
>  hw/vfio/user.h          |   9 +-
>  hw/vfio/pci.c           |  12 +-
>  hw/vfio/user.c          | 326 
> ++++++++++++++++++++++++++++++++++++++++++++++++
>  MAINTAINERS             |   1 +
>  6 files changed, 409 insertions(+), 3 deletions(-)
>  create mode 100644 hw/vfio/user-protocol.h
> 
> diff --git a/hw/vfio/pci.h b/hw/vfio/pci.h
> index 08ac647..ec9f345 100644
> --- a/hw/vfio/pci.h
> +++ b/hw/vfio/pci.h
> @@ -193,7 +193,7 @@ OBJECT_DECLARE_SIMPLE_TYPE(VFIOUserPCIDevice, 
> VFIO_USER_PCI)
>  struct VFIOUserPCIDevice {
>      VFIOPCIDevice device;
>      char *sock_name;
> -    bool secure_dma; /* disable shared mem for DMA */

Don't introduce it into the series to start with, confusing to review.

> +    bool send_queued;   /* all sends are queued */
>  };
>  
>  /* Use uin32_t for vendor & device so PCI_ANY_ID expands and cannot match hw 
> */
> diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
> new file mode 100644
> index 0000000..27062cb
> --- /dev/null
> +++ b/hw/vfio/user-protocol.h
> @@ -0,0 +1,62 @@
> +#ifndef VFIO_USER_PROTOCOL_H
> +#define VFIO_USER_PROTOCOL_H
> +
> +/*
> + * vfio protocol over a UNIX socket.
> + *
> + * Copyright © 2018, 2021 Oracle and/or its affiliates.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + * Each message has a standard header that describes the command
> + * being sent, which is almost always a VFIO ioctl().
> + *
> + * The header may be followed by command-specific data, such as the
> + * region and offset info for read and write commands.
> + */
> +
> +typedef struct {
> +    uint16_t id;
> +    uint16_t command;
> +    uint32_t size;
> +    uint32_t flags;
> +    uint32_t error_reply;
> +} VFIOUserHdr;
> +

A comment referencing the doc would probably be a good idea about here.

> +/* VFIOUserHdr commands */
> +enum vfio_user_command {
> +    VFIO_USER_VERSION                   = 1,
> +    VFIO_USER_DMA_MAP                   = 2,
> +    VFIO_USER_DMA_UNMAP                 = 3,
> +    VFIO_USER_DEVICE_GET_INFO           = 4,
> +    VFIO_USER_DEVICE_GET_REGION_INFO    = 5,
> +    VFIO_USER_DEVICE_GET_REGION_IO_FDS  = 6,
> +    VFIO_USER_DEVICE_GET_IRQ_INFO       = 7,
> +    VFIO_USER_DEVICE_SET_IRQS           = 8,
> +    VFIO_USER_REGION_READ               = 9,
> +    VFIO_USER_REGION_WRITE              = 10,
> +    VFIO_USER_DMA_READ                  = 11,
> +    VFIO_USER_DMA_WRITE                 = 12,
> +    VFIO_USER_DEVICE_RESET              = 13,
> +    VFIO_USER_DIRTY_PAGES               = 14,
> +    VFIO_USER_MAX,
> +};
> +
> +/* VFIOUserHdr flags */
> +#define VFIO_USER_REQUEST       0x0
> +#define VFIO_USER_REPLY         0x1
> +#define VFIO_USER_TYPE          0xF
> +
> +#define VFIO_USER_NO_REPLY      0x10
> +#define VFIO_USER_ERROR         0x20
> +
> +
> +#define VFIO_USER_DEF_MAX_FDS   8
> +#define VFIO_USER_MAX_MAX_FDS   16
> +
> +#define VFIO_USER_DEF_MAX_XFER  (1024 * 1024)
> +#define VFIO_USER_MAX_MAX_XFER  (64 * 1024 * 1024)

These are essentially magic numbers, some discussion of how these
limits are derived would be useful for future contributors, but also
only DEV_MAX_XFER is used in this patch and it's confusing why the
macro isn't used directly.  Most of the logic surrounding these is
added in the next patch, so it doesn't really make sense to add them
here.  Thanks,

Alex

> +
> +
> +#endif /* VFIO_USER_PROTOCOL_H */
> diff --git a/hw/vfio/user.h b/hw/vfio/user.h
> index 301ef6a..bd3717f 100644
> --- a/hw/vfio/user.h
> +++ b/hw/vfio/user.h
> @@ -11,6 +11,8 @@
>   *
>   */
>  
> +#include "user-protocol.h"
> +
>  typedef struct {
>      int send_fds;
>      int recv_fds;
> @@ -27,6 +29,7 @@ enum msg_type {
>  
>  typedef struct VFIOUserMsg {
>      QTAILQ_ENTRY(VFIOUserMsg) next;
> +    VFIOUserHdr *hdr;
>      VFIOUserFDs *fds;
>      uint32_t rsize;
>      uint32_t id;
> @@ -70,9 +73,13 @@ typedef struct VFIOProxy {
>  } VFIOProxy;
>  
>  /* VFIOProxy flags */
> -#define VFIO_PROXY_CLIENT       0x1
> +#define VFIO_PROXY_CLIENT        0x1
> +#define VFIO_PROXY_FORCE_QUEUED  0x4
>  
>  VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
>  void vfio_user_disconnect(VFIOProxy *proxy);
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> +                           void *reqarg);
>  
>  #endif /* VFIO_USER_H */
> diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> index ebfabb1..db45179 100644
> --- a/hw/vfio/pci.c
> +++ b/hw/vfio/pci.c
> @@ -3448,6 +3448,11 @@ struct VFIOValidOps vfio_pci_valid_ops = {
>   * vfio-user routines.
>   */
>  
> +static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
> +{
> +
> +}
> +
>  /*
>   * Emulated devices don't use host hot reset
>   */
> @@ -3501,6 +3506,11 @@ static void vfio_user_pci_realize(PCIDevice *pdev, 
> Error **errp)
>          return;
>      }
>      vbasedev->proxy = proxy;
> +    vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
> +
> +    if (udev->send_queued) {
> +        proxy->flags |= VFIO_PROXY_FORCE_QUEUED;
> +    }
>  
>      vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
>      vbasedev->dev = DEVICE(vdev);
> @@ -3524,7 +3534,7 @@ static void vfio_user_instance_finalize(Object *obj)
>  
>  static Property vfio_user_pci_dev_properties[] = {
>      DEFINE_PROP_STRING("socket", VFIOUserPCIDevice, sock_name),
> -    DEFINE_PROP_BOOL("secure-dma", VFIOUserPCIDevice, secure_dma, false),
> +    DEFINE_PROP_BOOL("x-send-queued", VFIOUserPCIDevice, send_queued, false),
>      DEFINE_PROP_END_OF_LIST(),
>  };
>  
> diff --git a/hw/vfio/user.c b/hw/vfio/user.c
> index 92d4e03..f662ae0 100644
> --- a/hw/vfio/user.c
> +++ b/hw/vfio/user.c
> @@ -25,9 +25,27 @@
>  #include "sysemu/iothread.h"
>  #include "user.h"
>  
> +static uint64_t max_xfer_size = VFIO_USER_DEF_MAX_XFER;
>  static IOThread *vfio_user_iothread;
> +
>  static void vfio_user_shutdown(VFIOProxy *proxy);
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> +                                     VFIOUserFDs *fds);
> +static VFIOUserFDs *vfio_user_getfds(int numfds);
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
> +
> +static void vfio_user_recv(void *opaque);
> +static int vfio_user_recv_one(VFIOProxy *proxy);
> +static void vfio_user_cb(void *opaque);
>  
> +static void vfio_user_request(void *opaque);
> +
> +
> +static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
> +{
> +    hdr->flags |= VFIO_USER_ERROR;
> +    hdr->error_reply = err;
> +}
>  
>  /*
>   * Functions called by main, CPU, or iothread threads
> @@ -39,11 +57,259 @@ static void vfio_user_shutdown(VFIOProxy *proxy)
>      qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL, NULL);
>  }
>  
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> +                                     VFIOUserFDs *fds)
> +{
> +    VFIOUserMsg *msg;
> +
> +    msg = QTAILQ_FIRST(&proxy->free);
> +    if (msg != NULL) {
> +        QTAILQ_REMOVE(&proxy->free, msg, next);
> +    } else {
> +        msg = g_malloc0(sizeof(*msg));
> +        qemu_cond_init(&msg->cv);
> +    }
> +
> +    msg->hdr = hdr;
> +    msg->fds = fds;
> +    return msg;
> +}
> +
> +/*
> + * Recycle a message list entry to the free list.
> + */
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg)
> +{
> +    if (msg->type == VFIO_MSG_NONE) {
> +        error_printf("vfio_user_recycle - freeing free msg\n");
> +        return;
> +    }
> +
> +    /* free msg buffer if no one is waiting to consume the reply */
> +    if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
> +        g_free(msg->hdr);
> +    }
> +
> +    msg->type = VFIO_MSG_NONE;
> +    msg->hdr = NULL;
> +    msg->fds = NULL;
> +    msg->complete = false;
> +    QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
> +}
> +
> +static VFIOUserFDs *vfio_user_getfds(int numfds)
> +{
> +    VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
> +
> +    fds->fds = (int *)((char *)fds + sizeof(*fds));
> +
> +    return fds;
> +}
>  
>  /*
>   * Functions only called by iothread
>   */
>  
> +static void vfio_user_recv(void *opaque)
> +{
> +    VFIOProxy *proxy = opaque;
> +
> +
> +    QEMU_LOCK_GUARD(&proxy->lock);
> +
> +    if (proxy->state == VFIO_PROXY_CONNECTED) {
> +        while (vfio_user_recv_one(proxy) == 0) {
> +            ;
> +        }
> +    }
> +}
> +
> +/*
> + * Receive and process one incoming message.
> + *
> + * For replies, find matching outgoing request and wake any waiters.
> + * For requests, queue in incoming list and run request BH.
> + */
> +static int vfio_user_recv_one(VFIOProxy *proxy)
> +{
> +    VFIOUserMsg *msg = NULL;
> +    g_autofree int *fdp = NULL;
> +    VFIOUserFDs *reqfds;
> +    VFIOUserHdr hdr;
> +    struct iovec iov = {
> +        .iov_base = &hdr,
> +        .iov_len = sizeof(hdr),
> +    };
> +    bool isreply = false;
> +    int i, ret;
> +    size_t msgleft, numfds = 0;
> +    char *data = NULL;
> +    char *buf = NULL;
> +    Error *local_err = NULL;
> +
> +    /*
> +     * Read header
> +     */
> +    ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
> +                                 &local_err);
> +    if (ret == QIO_CHANNEL_ERR_BLOCK) {
> +        return ret;
> +    }
> +    if (ret <= 0) {
> +        /* read error or other side closed connection */
> +        if (ret == 0) {
> +            error_setg(&local_err, "vfio_user_recv server closed socket");
> +        } else {
> +            error_prepend(&local_err, "vfio_user_recv");
> +        }
> +        goto fatal;
> +    }
> +    if (ret < sizeof(msg)) {
> +        error_setg(&local_err, "vfio_user_recv short read of header");
> +        goto fatal;
> +    }
> +
> +    /*
> +     * Validate header
> +     */
> +    if (hdr.size < sizeof(VFIOUserHdr)) {
> +        error_setg(&local_err, "vfio_user_recv bad header size");
> +        goto fatal;
> +    }
> +    switch (hdr.flags & VFIO_USER_TYPE) {
> +    case VFIO_USER_REQUEST:
> +        isreply = false;
> +        break;
> +    case VFIO_USER_REPLY:
> +        isreply = true;
> +        break;
> +    default:
> +        error_setg(&local_err, "vfio_user_recv unknown message type");
> +        goto fatal;
> +    }
> +
> +    /*
> +     * For replies, find the matching pending request.
> +     * For requests, reap incoming FDs.
> +     */
> +    if (isreply) {
> +        QTAILQ_FOREACH(msg, &proxy->pending, next) {
> +            if (hdr.id == msg->id) {
> +                break;
> +            }
> +        }
> +        if (msg == NULL) {
> +            error_setg(&local_err, "vfio_user_recv unexpected reply");
> +            goto err;
> +        }
> +        QTAILQ_REMOVE(&proxy->pending, msg, next);
> +
> +        /*
> +         * Process any received FDs
> +         */
> +        if (numfds != 0) {
> +            if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
> +                error_setg(&local_err, "vfio_user_recv unexpected FDs");
> +                goto err;
> +            }
> +            msg->fds->recv_fds = numfds;
> +            memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
> +        }
> +    } else {
> +        if (numfds != 0) {
> +            reqfds = vfio_user_getfds(numfds);
> +            memcpy(reqfds->fds, fdp, numfds * sizeof(int));
> +        } else {
> +            reqfds = NULL;
> +        }
> +    }
> +
> +    /*
> +     * Put the whole message into a single buffer.
> +     */
> +    if (isreply) {
> +        if (hdr.size > msg->rsize) {
> +            error_setg(&local_err,
> +                       "vfio_user_recv reply larger than recv buffer");
> +            goto err;
> +        }
> +        *msg->hdr = hdr;
> +        data = (char *)msg->hdr + sizeof(hdr);
> +    } else {
> +        if (hdr.size > max_xfer_size) {
> +            error_setg(&local_err, "vfio_user_recv request larger than max");
> +            goto err;
> +        }
> +        buf = g_malloc0(hdr.size);
> +        memcpy(buf, &hdr, sizeof(hdr));
> +        data = buf + sizeof(hdr);
> +        msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
> +        msg->type = VFIO_MSG_REQ;
> +    }
> +
> +    msgleft = hdr.size - sizeof(hdr);
> +    if (msgleft != 0) {
> +        ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
> +        /* error or would block */
> +        if (ret < 0) {
> +            goto fatal;
> +        }
> +        if (ret != msgleft) {
> +            error_setg(&local_err, "vfio_user_recv short read of msg body");
> +            goto fatal;
> +        }
> +    }
> +
> +    /*
> +     * Replies signal a waiter, if none just check for errors
> +     * and free the message buffer.
> +     *
> +     * Requests get queued for the BH.
> +     */
> +    if (isreply) {
> +        msg->complete = true;
> +        if (msg->type == VFIO_MSG_WAIT) {
> +            qemu_cond_signal(&msg->cv);
> +        } else {
> +            if (hdr.flags & VFIO_USER_ERROR) {
> +                error_printf("vfio_user_rcv error reply on async request ");
> +                error_printf("command %x error %s\n", hdr.command,
> +                             strerror(hdr.error_reply));
> +            }
> +            /* youngest nowait msg has been ack'd */
> +            if (proxy->last_nowait == msg) {
> +                proxy->last_nowait = NULL;
> +            }
> +            vfio_user_recycle(proxy, msg);
> +        }
> +    } else {
> +        QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
> +        qemu_bh_schedule(proxy->req_bh);
> +    }
> +    return 0;
> +
> +    /*
> +     * fatal means the other side closed or we don't trust the stream
> +     * err means this message is corrupt
> +     */
> +fatal:
> +    vfio_user_shutdown(proxy);
> +    proxy->state = VFIO_PROXY_ERROR;
> +
> +err:
> +    for (i = 0; i < numfds; i++) {
> +        close(fdp[i]);
> +    }
> +    if (isreply && msg != NULL) {
> +        /* force an error to keep sending thread from hanging */
> +        vfio_user_set_error(msg->hdr, EINVAL);
> +        msg->complete = true;
> +        qemu_cond_signal(&msg->cv);
> +    }
> +    error_report_err(local_err);
> +    return -1;
> +}
> +
>  static void vfio_user_cb(void *opaque)
>  {
>      VFIOProxy *proxy = opaque;
> @@ -59,6 +325,51 @@ static void vfio_user_cb(void *opaque)
>   * Functions called by main or CPU threads
>   */
>  
> +/*
> + * Process incoming requests.
> + *
> + * The bus-specific callback has the form:
> + *    request(opaque, msg)
> + * where 'opaque' was specified in vfio_user_set_handler
> + * and 'msg' is the inbound message.
> + *
> + * The callback is responsible for disposing of the message buffer,
> + * usually by re-using it when calling vfio_send_reply or vfio_send_error,
> + * both of which free their message buffer when the reply is sent.
> + *
> + * If the callback uses a new buffer, it needs to free the old one.
> + */
> +static void vfio_user_request(void *opaque)
> +{
> +    VFIOProxy *proxy = opaque;
> +    VFIOUserMsgQ new, free;
> +    VFIOUserMsg *msg;
> +
> +    /* reap all incoming */
> +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> +        new = proxy->incoming;
> +        QTAILQ_INIT(&proxy->incoming);
> +    }
> +    QTAILQ_INIT(&free);
> +
> +    /* process list */
> +    while (!QTAILQ_EMPTY(&new)) {
> +        msg = QTAILQ_FIRST(&new);
> +        QTAILQ_REMOVE(&new, msg, next);
> +        proxy->request(proxy->req_arg, msg);
> +        QTAILQ_INSERT_HEAD(&free, msg, next);
> +    }
> +
> +    /* free list */
> +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> +        while (!QTAILQ_EMPTY(&free)) {
> +            msg = QTAILQ_FIRST(&free);
> +            QTAILQ_REMOVE(&free, msg, next);
> +            vfio_user_recycle(proxy, msg);
> +        }
> +    }
> +}
> +
>  static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
>      QLIST_HEAD_INITIALIZER(vfio_user_sockets);
>  
> @@ -97,6 +408,7 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, 
> Error **errp)
>      }
>  
>      proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
> +    proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
>  
>      QTAILQ_INIT(&proxy->outgoing);
>      QTAILQ_INIT(&proxy->incoming);
> @@ -107,6 +419,18 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, 
> Error **errp)
>      return proxy;
>  }
>  
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> +                           void *req_arg)
> +{
> +    VFIOProxy *proxy = vbasedev->proxy;
> +
> +    proxy->request = handler;
> +    proxy->req_arg = req_arg;
> +    qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
> +                                   vfio_user_recv, NULL, proxy);
> +}
> +
>  void vfio_user_disconnect(VFIOProxy *proxy)
>  {
>      VFIOUserMsg *r1, *r2;
> @@ -122,6 +446,8 @@ void vfio_user_disconnect(VFIOProxy *proxy)
>      }
>      object_unref(OBJECT(proxy->ioc));
>      proxy->ioc = NULL;
> +    qemu_bh_delete(proxy->req_bh);
> +    proxy->req_bh = NULL;
>  
>      proxy->state = VFIO_PROXY_CLOSING;
>      QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
> diff --git a/MAINTAINERS b/MAINTAINERS
> index f429bab..52d37dd 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -1888,6 +1888,7 @@ S: Supported
>  F: docs/devel/vfio-user.rst
>  F: hw/vfio/user.c
>  F: hw/vfio/user.h
> +F: hw/vfio/user-protocol.h
>  
>  vhost
>  M: Michael S. Tsirkin <mst@redhat.com>




reply via email to

[Prev in Thread] Current Thread [Next in Thread]