qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

RE: [RFC v4 08/21] vfio-user: define socket receive functions


From: Thanos Makatos
Subject: RE: [RFC v4 08/21] vfio-user: define socket receive functions
Date: Fri, 4 Feb 2022 12:42:02 +0000

> -----Original Message-----
> From: Qemu-devel <qemu-devel-
> bounces+thanos.makatos=nutanix.com@nongnu.org> On Behalf Of Thanos
> Makatos
> Sent: 03 February 2022 21:54
> To: John Johnson <john.g.johnson@oracle.com>; qemu-devel@nongnu.org
> Subject: RE: [RFC v4 08/21] vfio-user: define socket receive functions
> 
> 
> 
> > -----Original Message-----
> > From: Qemu-devel <qemu-devel-
> > bounces+thanos.makatos=nutanix.com@nongnu.org> On Behalf Of John
> > Johnson
> > Sent: 12 January 2022 00:44
> > To: qemu-devel@nongnu.org
> > Subject: [RFC v4 08/21] vfio-user: define socket receive functions
> >
> > Add infrastructure needed to receive incoming messages
> >
> > Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> > Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> > Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> > ---
> >  hw/vfio/user-protocol.h |  54 ++++++++
> >  hw/vfio/user.h          |   6 +
> >  hw/vfio/pci.c           |   6 +
> >  hw/vfio/user.c          | 327
> > ++++++++++++++++++++++++++++++++++++++++++++++++
> >  MAINTAINERS             |   1 +
> >  5 files changed, 394 insertions(+)
> >  create mode 100644 hw/vfio/user-protocol.h
> >
> > diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
> > new file mode 100644
> > index 0000000..d23877c
> > --- /dev/null
> > +++ b/hw/vfio/user-protocol.h
> > @@ -0,0 +1,54 @@
> > +#ifndef VFIO_USER_PROTOCOL_H
> > +#define VFIO_USER_PROTOCOL_H
> > +
> > +/*
> > + * vfio protocol over a UNIX socket.
> > + *
> > + * Copyright © 2018, 2021 Oracle and/or its affiliates.
> > + *
> > + * This work is licensed under the terms of the GNU GPL, version 2.  See
> > + * the COPYING file in the top-level directory.
> > + *
> > + * Each message has a standard header that describes the command
> > + * being sent, which is almost always a VFIO ioctl().
> > + *
> > + * The header may be followed by command-specific data, such as the
> > + * region and offset info for read and write commands.
> > + */
> > +
> > +typedef struct {
> > +    uint16_t id;
> > +    uint16_t command;
> > +    uint32_t size;
> > +    uint32_t flags;
> > +    uint32_t error_reply;
> > +} VFIOUserHdr;
> > +
> > +/* VFIOUserHdr commands */
> > +enum vfio_user_command {
> > +    VFIO_USER_VERSION                   = 1,
> > +    VFIO_USER_DMA_MAP                   = 2,
> > +    VFIO_USER_DMA_UNMAP                 = 3,
> > +    VFIO_USER_DEVICE_GET_INFO           = 4,
> > +    VFIO_USER_DEVICE_GET_REGION_INFO    = 5,
> > +    VFIO_USER_DEVICE_GET_REGION_IO_FDS  = 6,
> > +    VFIO_USER_DEVICE_GET_IRQ_INFO       = 7,
> > +    VFIO_USER_DEVICE_SET_IRQS           = 8,
> > +    VFIO_USER_REGION_READ               = 9,
> > +    VFIO_USER_REGION_WRITE              = 10,
> > +    VFIO_USER_DMA_READ                  = 11,
> > +    VFIO_USER_DMA_WRITE                 = 12,
> > +    VFIO_USER_DEVICE_RESET              = 13,
> > +    VFIO_USER_DIRTY_PAGES               = 14,
> > +    VFIO_USER_MAX,
> > +};
> > +
> > +/* VFIOUserHdr flags */
> > +#define VFIO_USER_REQUEST       0x0
> > +#define VFIO_USER_REPLY         0x1
> > +#define VFIO_USER_TYPE          0xF
> > +
> > +#define VFIO_USER_NO_REPLY      0x10
> > +#define VFIO_USER_ERROR         0x20
> > +
> > +#endif /* VFIO_USER_PROTOCOL_H */
> > diff --git a/hw/vfio/user.h b/hw/vfio/user.h
> > index da92862..72eefa7 100644
> > --- a/hw/vfio/user.h
> > +++ b/hw/vfio/user.h
> > @@ -11,6 +11,8 @@
> >   *
> >   */
> >
> > +#include "user-protocol.h"
> > +
> >  typedef struct {
> >      int send_fds;
> >      int recv_fds;
> > @@ -27,6 +29,7 @@ enum msg_type {
> >
> >  typedef struct VFIOUserMsg {
> >      QTAILQ_ENTRY(VFIOUserMsg) next;
> > +    VFIOUserHdr *hdr;
> >      VFIOUserFDs *fds;
> >      uint32_t rsize;
> >      uint32_t id;
> > @@ -74,5 +77,8 @@ typedef struct VFIOProxy {
> >
> >  VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
> >  void vfio_user_disconnect(VFIOProxy *proxy);
> > +void vfio_user_set_handler(VFIODevice *vbasedev,
> > +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> > +                           void *reqarg);
> >
> >  #endif /* VFIO_USER_H */
> > diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> > index 9fd7c07..0de915d 100644
> > --- a/hw/vfio/pci.c
> > +++ b/hw/vfio/pci.c
> > @@ -3386,6 +3386,11 @@ type_init(register_vfio_pci_dev_type)
> >   * vfio-user routines.
> >   */
> >
> > +static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
> > +{
> > +
> > +}
> > +
> >  /*
> >   * Emulated devices don't use host hot reset
> >   */
> > @@ -3432,6 +3437,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev,
> > Error **errp)
> >          return;
> >      }
> >      vbasedev->proxy = proxy;
> > +    vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
> >
> >      vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
> >      vbasedev->dev = DEVICE(vdev);
> > diff --git a/hw/vfio/user.c b/hw/vfio/user.c
> > index c843f90..e1dfd5d 100644
> > --- a/hw/vfio/user.c
> > +++ b/hw/vfio/user.c
> > @@ -25,10 +25,26 @@
> >  #include "sysemu/iothread.h"
> >  #include "user.h"
> >
> > +static uint64_t max_xfer_size;
> >  static IOThread *vfio_user_iothread;
> >
> >  static void vfio_user_shutdown(VFIOProxy *proxy);
> > +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr
> *hdr,
> > +                                     VFIOUserFDs *fds);
> > +static VFIOUserFDs *vfio_user_getfds(int numfds);
> > +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
> >
> > +static void vfio_user_recv(void *opaque);
> > +static int vfio_user_recv_one(VFIOProxy *proxy);
> > +static void vfio_user_cb(void *opaque);
> > +
> > +static void vfio_user_request(void *opaque);
> > +
> > +static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
> > +{
> > +    hdr->flags |= VFIO_USER_ERROR;
> > +    hdr->error_reply = err;
> > +}
> >
> >  /*
> >   * Functions called by main, CPU, or iothread threads
> > @@ -40,10 +56,261 @@ static void vfio_user_shutdown(VFIOProxy *proxy)
> >      qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL,
> NULL);
> >  }
> >
> > +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr
> *hdr,
> > +                                     VFIOUserFDs *fds)
> > +{
> > +    VFIOUserMsg *msg;
> > +
> > +    msg = QTAILQ_FIRST(&proxy->free);
> > +    if (msg != NULL) {
> > +        QTAILQ_REMOVE(&proxy->free, msg, next);
> > +    } else {
> > +        msg = g_malloc0(sizeof(*msg));
> > +        qemu_cond_init(&msg->cv);
> > +    }
> > +
> > +    msg->hdr = hdr;
> > +    msg->fds = fds;
> > +    return msg;
> > +}
> > +
> > +/*
> > + * Recycle a message list entry to the free list.
> > + */
> > +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg)
> > +{
> > +    if (msg->type == VFIO_MSG_NONE) {
> > +        error_printf("vfio_user_recycle - freeing free msg\n");
> > +        return;
> > +    }
> > +
> > +    /* free msg buffer if no one is waiting to consume the reply */
> > +    if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC)
> {
> > +        g_free(msg->hdr);
> > +        if (msg->fds != NULL) {
> > +            g_free(msg->fds);
> > +        }
> > +    }
> > +
> > +    msg->type = VFIO_MSG_NONE;
> > +    msg->hdr = NULL;
> > +    msg->fds = NULL;
> > +    msg->complete = false;
> > +    QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
> > +}
> > +
> > +static VFIOUserFDs *vfio_user_getfds(int numfds)
> > +{
> > +    VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
> > +
> > +    fds->fds = (int *)((char *)fds + sizeof(*fds));
> > +
> > +    return fds;
> > +}
> > +
> >  /*
> >   * Functions only called by iothread
> >   */
> >
> > +static void vfio_user_recv(void *opaque)
> > +{
> > +    VFIOProxy *proxy = opaque;
> > +
> > +    QEMU_LOCK_GUARD(&proxy->lock);
> > +
> > +    if (proxy->state == VFIO_PROXY_CONNECTED) {
> > +        while (vfio_user_recv_one(proxy) == 0) {
> > +            ;
> > +        }
> > +    }
> > +}
> > +
> > +/*
> > + * Receive and process one incoming message.
> > + *
> > + * For replies, find matching outgoing request and wake any waiters.
> > + * For requests, queue in incoming list and run request BH.
> > + */
> > +static int vfio_user_recv_one(VFIOProxy *proxy)
> > +{
> > +    VFIOUserMsg *msg = NULL;
> > +    g_autofree int *fdp = NULL;
> > +    VFIOUserFDs *reqfds;
> > +    VFIOUserHdr hdr;
> > +    struct iovec iov = {
> > +        .iov_base = &hdr,
> > +        .iov_len = sizeof(hdr),
> > +    };
> > +    bool isreply = false;
> > +    int i, ret;
> > +    size_t msgleft, numfds = 0;
> > +    char *data = NULL;
> > +    char *buf = NULL;
> > +    Error *local_err = NULL;
> > +
> > +    /*
> > +     * Read header
> > +     */
> > +    ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
> > +                                 &local_err);
> > +    if (ret == QIO_CHANNEL_ERR_BLOCK) {
> > +        return ret;
> > +    }
> > +    if (ret <= 0) {
> > +        /* read error or other side closed connection */
> > +        if (ret == 0) {
> > +            error_setg(&local_err, "vfio_user_recv server closed socket");
> > +        } else {
> > +            error_prepend(&local_err, "vfio_user_recv");
> > +        }
> > +        goto fatal;
> > +    }
> > +    if (ret < sizeof(msg)) {
> > +        error_setg(&local_err, "vfio_user_recv short read of header");
> > +        goto fatal;
> > +    }
> > +
> > +    /*
> > +     * Validate header
> > +     */
> > +    if (hdr.size < sizeof(VFIOUserHdr)) {
> > +        error_setg(&local_err, "vfio_user_recv bad header size");
> > +        goto fatal;
> > +    }
> > +    switch (hdr.flags & VFIO_USER_TYPE) {
> > +    case VFIO_USER_REQUEST:
> > +        isreply = false;
> > +        break;
> > +    case VFIO_USER_REPLY:
> > +        isreply = true;
> > +        break;
> > +    default:
> > +        error_setg(&local_err, "vfio_user_recv unknown message type");
> > +        goto fatal;
> > +    }
> > +
> > +    /*
> > +     * For replies, find the matching pending request.
> > +     * For requests, reap incoming FDs.
> > +     */
> > +    if (isreply) {
> > +        QTAILQ_FOREACH(msg, &proxy->pending, next) {
> > +            if (hdr.id == msg->id) {
> > +                break;
> > +            }
> > +        }
> > +        if (msg == NULL) {
> > +            error_setg(&local_err, "vfio_user_recv unexpected reply");
> > +            goto err;
> > +        }
> > +        QTAILQ_REMOVE(&proxy->pending, msg, next);
> > +
> > +        /*
> > +         * Process any received FDs
> > +         */
> > +        if (numfds != 0) {
> > +            if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
> > +                error_setg(&local_err, "vfio_user_recv unexpected FDs");
> > +                goto err;
> > +            }
> > +            msg->fds->recv_fds = numfds;
> > +            memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
> > +        }
> > +    } else {
> > +        if (numfds != 0) {
> > +            reqfds = vfio_user_getfds(numfds);
> > +            memcpy(reqfds->fds, fdp, numfds * sizeof(int));
> > +        } else {
> > +            reqfds = NULL;
> > +        }
> > +    }
> > +
> > +    /*
> > +     * Put the whole message into a single buffer.
> > +     */
> > +    if (isreply) {
> > +        if (hdr.size > msg->rsize) {
> > +            error_setg(&local_err,
> > +                       "vfio_user_recv reply larger than recv buffer");
> > +            goto err;
> > +        }
> > +        *msg->hdr = hdr;
> > +        data = (char *)msg->hdr + sizeof(hdr);
> > +    } else {
> > +        if (hdr.size > max_xfer_size) {
> > +            error_setg(&local_err, "vfio_user_recv request larger than 
> > max");
> > +            goto err;
> > +        }
> > +        buf = g_malloc0(hdr.size);
> > +        memcpy(buf, &hdr, sizeof(hdr));
> > +        data = buf + sizeof(hdr);
> > +        msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
> > +        msg->type = VFIO_MSG_REQ;
> > +    }
> > +
> > +    msgleft = hdr.size - sizeof(hdr);
> > +    while (msgleft > 0) {
> > +        ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
> > +
> > +        /* error or would block */
> > +        if (ret < 0) {
> > +            goto fatal;
> > +        }
> 
> IIUC qio_channel_read() ends up calling qio_channel_socket_readv() which can
> return QIO_CHANNEL_ERR_BLOCK (-2). The if will be taken so local_err is NULL
> and that causes a segfault when error_report_err(local_err) is called before
> returning from this function.

In fact, don't we need to continue if qio_channel_read() returns 
QIO_CHANNEL_ERR_BLOCK and only fail if it returns -1?

> 
> > +
> > +        msgleft -= ret;
> > +        data += ret;
> > +    }
> > +
> > +    /*
> > +     * Replies signal a waiter, if none just check for errors
> > +     * and free the message buffer.
> > +     *
> > +     * Requests get queued for the BH.
> > +     */
> > +    if (isreply) {
> > +        msg->complete = true;
> > +        if (msg->type == VFIO_MSG_WAIT) {
> > +            qemu_cond_signal(&msg->cv);
> > +        } else {
> > +            if (hdr.flags & VFIO_USER_ERROR) {
> > +                error_printf("vfio_user_rcv error reply on async request 
> > ");
> > +                error_printf("command %x error %s\n", hdr.command,
> > +                             strerror(hdr.error_reply));
> > +            }
> > +            /* youngest nowait msg has been ack'd */
> > +            if (proxy->last_nowait == msg) {
> > +                proxy->last_nowait = NULL;
> > +            }
> > +            vfio_user_recycle(proxy, msg);
> > +        }
> > +    } else {
> > +        QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
> > +        qemu_bh_schedule(proxy->req_bh);
> > +    }
> > +    return 0;
> > +
> > +    /*
> > +     * fatal means the other side closed or we don't trust the stream
> > +     * err means this message is corrupt
> > +     */
> > +fatal:
> > +    vfio_user_shutdown(proxy);
> > +    proxy->state = VFIO_PROXY_ERROR;
> > +
> > +err:
> > +    for (i = 0; i < numfds; i++) {
> > +        close(fdp[i]);
> > +    }
> > +    if (isreply && msg != NULL) {
> > +        /* force an error to keep sending thread from hanging */
> > +        vfio_user_set_error(msg->hdr, EINVAL);
> > +        msg->complete = true;
> > +        qemu_cond_signal(&msg->cv);
> > +    }
> > +    error_report_err(local_err);
> > +    return -1;
> > +}
> > +
> >  static void vfio_user_cb(void *opaque)
> >  {
> >      VFIOProxy *proxy = opaque;
> > @@ -59,6 +326,51 @@ static void vfio_user_cb(void *opaque)
> >   * Functions called by main or CPU threads
> >   */
> >
> > +/*
> > + * Process incoming requests.
> > + *
> > + * The bus-specific callback has the form:
> > + *    request(opaque, msg)
> > + * where 'opaque' was specified in vfio_user_set_handler
> > + * and 'msg' is the inbound message.
> > + *
> > + * The callback is responsible for disposing of the message buffer,
> > + * usually by re-using it when calling vfio_send_reply or vfio_send_error,
> > + * both of which free their message buffer when the reply is sent.
> > + *
> > + * If the callback uses a new buffer, it needs to free the old one.
> > + */
> > +static void vfio_user_request(void *opaque)
> > +{
> > +    VFIOProxy *proxy = opaque;
> > +    VFIOUserMsgQ new, free;
> > +    VFIOUserMsg *msg, *m1;
> > +
> > +    /* reap all incoming */
> > +    QTAILQ_INIT(&new);
> > +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> > +        QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
> > +            QTAILQ_REMOVE(&proxy->pending, msg, next);
> > +            QTAILQ_INSERT_TAIL(&new, msg, next);
> > +        }
> > +    }
> > +
> > +    /* process list */
> > +    QTAILQ_INIT(&free);
> > +    QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
> > +        QTAILQ_REMOVE(&new, msg, next);
> > +        proxy->request(proxy->req_arg, msg);
> > +        QTAILQ_INSERT_HEAD(&free, msg, next);
> > +    }
> > +
> > +    /* free list */
> > +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> > +        QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
> > +            vfio_user_recycle(proxy, msg);
> > +        }
> > +    }
> > +}
> > +
> >  static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
> >      QLIST_HEAD_INITIALIZER(vfio_user_sockets);
> >
> > @@ -97,6 +409,7 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> > *addr, Error **errp)
> >      }
> >
> >      proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
> > +    proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
> >
> >      QTAILQ_INIT(&proxy->outgoing);
> >      QTAILQ_INIT(&proxy->incoming);
> > @@ -107,6 +420,18 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> > *addr, Error **errp)
> >      return proxy;
> >  }
> >
> > +void vfio_user_set_handler(VFIODevice *vbasedev,
> > +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> > +                           void *req_arg)
> > +{
> > +    VFIOProxy *proxy = vbasedev->proxy;
> > +
> > +    proxy->request = handler;
> > +    proxy->req_arg = req_arg;
> > +    qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
> > +                                   vfio_user_recv, NULL, proxy);
> > +}
> > +
> >  void vfio_user_disconnect(VFIOProxy *proxy)
> >  {
> >      VFIOUserMsg *r1, *r2;
> > @@ -122,6 +447,8 @@ void vfio_user_disconnect(VFIOProxy *proxy)
> >      }
> >      object_unref(OBJECT(proxy->ioc));
> >      proxy->ioc = NULL;
> > +    qemu_bh_delete(proxy->req_bh);
> > +    proxy->req_bh = NULL;
> >
> >      proxy->state = VFIO_PROXY_CLOSING;
> >      QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
> > diff --git a/MAINTAINERS b/MAINTAINERS
> > index cfaccbf..bc0ba88 100644
> > --- a/MAINTAINERS
> > +++ b/MAINTAINERS
> > @@ -1909,6 +1909,7 @@ S: Supported
> >  F: docs/devel/vfio-user.rst
> >  F: hw/vfio/user.c
> >  F: hw/vfio/user.h
> > +F: hw/vfio/user-protocol.h
> >
> >  vhost
> >  M: Michael S. Tsirkin <mst@redhat.com>
> > --
> > 1.8.3.1
> >


reply via email to

[Prev in Thread] Current Thread [Next in Thread]