qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v3 16/27] migration: convert RDMA to use QIOChan


From: Dr. David Alan Gilbert
Subject: Re: [Qemu-devel] [PATCH v3 16/27] migration: convert RDMA to use QIOChannel interface
Date: Thu, 10 Mar 2016 17:00:20 +0000
User-agent: Mutt/1.5.24 (2015-08-30)

* Daniel P. Berrange (address@hidden) wrote:
> This converts the RDMA code to provide a subclass of
> QIOChannel that uses RDMA for the data transport.
> 
> This implementation of RDMA does not correctly
> handle non-blocking mode. Reads might block
> if there was not already some pending data
> and writes will block until all data is sent.
> This flawed behaviour was already present in
> the existing impl, so appears to not be a
> critical problem at this time. It should be
> on the list of things to fix in the future
> though.
> 
> The RDMA code would be much better off it it could
> be split up in a generic RDMA layer, a QIOChannel
> impl based on RMDA, and then the RMDA migration
> glue. This is left as a future exercise for the brave.
> 
> Signed-off-by: Daniel P. Berrange <address@hidden>

Reviewed-by: Dr. David Alan Gilbert <address@hidden>

(I don't know that much about the GSource stuff, but it looks
consistent)

> +static gboolean
> +qio_channel_rdma_source_prepare(GSource *source,
> +                                gint *timeout)
> +{
> +    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
> +    RDMAContext *rdma = rsource->rioc->rdma;
> +    GIOCondition cond = 0;
> +    *timeout = -1;
> +
> +    if (rdma->wr_data[0].control_len) {
> +        cond |= G_IO_IN;
> +    }
> +    cond |= G_IO_OUT;
> +
> +    return cond & rsource->condition;
> +}

I guess you could make that:
   *timeout = -1;
   return qio_channel_rdma_source_check(source);

Dave
 
> +static gboolean
> +qio_channel_rdma_source_check(GSource *source)
> +{
> +    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
> +    RDMAContext *rdma = rsource->rioc->rdma;
> +    GIOCondition cond = 0;
> +
> +    if (rdma->wr_data[0].control_len) {
> +        cond |= G_IO_IN;
> +    }
> +    cond |= G_IO_OUT;
> +
> +    return cond & rsource->condition;
> +}
> +
> +static gboolean
> +qio_channel_rdma_source_dispatch(GSource *source,
> +                                 GSourceFunc callback,
> +                                 gpointer user_data)
> +{
> +    QIOChannelFunc func = (QIOChannelFunc)callback;
> +    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
> +    RDMAContext *rdma = rsource->rioc->rdma;
> +    GIOCondition cond = 0;
> +
> +    if (rdma->wr_data[0].control_len) {
> +        cond |= G_IO_IN;
> +    }
> +    cond |= G_IO_OUT;
> +
> +    return (*func)(QIO_CHANNEL(rsource->rioc),
> +                   (cond & rsource->condition),
> +                   user_data);
> +}
> +
> +static void
> +qio_channel_rdma_source_finalize(GSource *source)
> +{
> +    QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
> +
> +    object_unref(OBJECT(ssource->rioc));
> +}
> +
> +GSourceFuncs qio_channel_rdma_source_funcs = {
> +    qio_channel_rdma_source_prepare,
> +    qio_channel_rdma_source_check,
> +    qio_channel_rdma_source_dispatch,
> +    qio_channel_rdma_source_finalize
> +};
> +
> +static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
> +                                              GIOCondition condition)
>  {
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> +    QIOChannelRDMASource *ssource;
> +    GSource *source;
> +
> +    source = g_source_new(&qio_channel_rdma_source_funcs,
> +                          sizeof(QIOChannelRDMASource));
> +    ssource = (QIOChannelRDMASource *)source;
> +
> +    ssource->rioc = rioc;
> +    object_ref(OBJECT(rioc));
> +
> +    ssource->condition = condition;
> +
> +    return source;
> +}
> +
> +
> +static int qio_channel_rdma_close(QIOChannel *ioc,
> +                                  Error **errp)
> +{
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>      trace_qemu_rdma_close();
> -    QEMUFileRDMA *r = opaque;
> -    if (r->rdma) {
> -        qemu_rdma_cleanup(r->rdma);
> -        g_free(r->rdma);
> +    if (rioc->rdma) {
> +        qemu_rdma_cleanup(rioc->rdma);
> +        g_free(rioc->rdma);
> +        rioc->rdma = NULL;
>      }
> -    g_free(r);
>      return 0;
>  }
>  
> @@ -2694,8 +2850,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void 
> *opaque,
>                                    ram_addr_t block_offset, ram_addr_t offset,
>                                    size_t size, uint64_t *bytes_sent)
>  {
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> +    RDMAContext *rdma = rioc->rdma;
>      int ret;
>  
>      CHECK_ERROR_STATE();
> @@ -2949,8 +3105,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, 
> void *opaque)
>                               };
>      RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
>                                   .repeat = 1 };
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> +    RDMAContext *rdma = rioc->rdma;
>      RDMALocalBlocks *local = &rdma->local_ram_blocks;
>      RDMAControlHeader head;
>      RDMARegister *reg, *registers;
> @@ -3205,9 +3361,10 @@ out:
>   * We've already built our local RAMBlock list, but not yet sent the list to
>   * the source.
>   */
> -static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char 
> *name)
> +static int
> +rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
>  {
> -    RDMAContext *rdma = rfile->rdma;
> +    RDMAContext *rdma = rioc->rdma;
>      int curr;
>      int found = -1;
>  
> @@ -3249,8 +3406,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, 
> uint64_t flags, void *data)
>  static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
>                                          uint64_t flags, void *data)
>  {
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> +    RDMAContext *rdma = rioc->rdma;
>  
>      CHECK_ERROR_STATE();
>  
> @@ -3269,8 +3426,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
> void *opaque,
>                                         uint64_t flags, void *data)
>  {
>      Error *local_err = NULL, **errp = &local_err;
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> +    RDMAContext *rdma = rioc->rdma;
>      RDMAControlHeader head = { .len = 0, .repeat = 1 };
>      int ret = 0;
>  
> @@ -3366,55 +3523,74 @@ err:
>      return ret;
>  }
>  
> -static int qemu_rdma_get_fd(void *opaque)
> -{
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> -
> -    return rdma->comp_channel->fd;
> -}
> -
> -static const QEMUFileOps rdma_read_ops = {
> -    .get_buffer    = qemu_rdma_get_buffer,
> -    .get_fd        = qemu_rdma_get_fd,
> -    .close         = qemu_rdma_close,
> -};
> -
>  static const QEMUFileHooks rdma_read_hooks = {
>      .hook_ram_load = rdma_load_hook,
>  };
>  
> -static const QEMUFileOps rdma_write_ops = {
> -    .put_buffer         = qemu_rdma_put_buffer,
> -    .close              = qemu_rdma_close,
> -};
> -
>  static const QEMUFileHooks rdma_write_hooks = {
>      .before_ram_iterate = qemu_rdma_registration_start,
>      .after_ram_iterate  = qemu_rdma_registration_stop,
>      .save_page          = qemu_rdma_save_page,
>  };
>  
> -static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
> +
> +static void qio_channel_rdma_finalize(Object *obj)
> +{
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
> +    if (rioc->rdma) {
> +        qemu_rdma_cleanup(rioc->rdma);
> +        g_free(rioc->rdma);
> +        rioc->rdma = NULL;
> +    }
> +}
> +
> +static void qio_channel_rdma_class_init(ObjectClass *klass,
> +                                        void *class_data G_GNUC_UNUSED)
> +{
> +    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
> +
> +    ioc_klass->io_writev = qio_channel_rdma_writev;
> +    ioc_klass->io_readv = qio_channel_rdma_readv;
> +    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
> +    ioc_klass->io_close = qio_channel_rdma_close;
> +    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
> +}
> +
> +static const TypeInfo qio_channel_rdma_info = {
> +    .parent = TYPE_QIO_CHANNEL,
> +    .name = TYPE_QIO_CHANNEL_RDMA,
> +    .instance_size = sizeof(QIOChannelRDMA),
> +    .instance_finalize = qio_channel_rdma_finalize,
> +    .class_init = qio_channel_rdma_class_init,
> +};
> +
> +static void qio_channel_rdma_register_types(void)
> +{
> +    type_register_static(&qio_channel_rdma_info);
> +}
> +
> +type_init(qio_channel_rdma_register_types);
> +
> +static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
>  {
> -    QEMUFileRDMA *r;
> +    QIOChannelRDMA *rioc;
>  
>      if (qemu_file_mode_is_not_valid(mode)) {
>          return NULL;
>      }
>  
> -    r = g_new0(QEMUFileRDMA, 1);
> -    r->rdma = rdma;
> +    rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
> +    rioc->rdma = rdma;
>  
>      if (mode[0] == 'w') {
> -        r->file = qemu_fopen_ops(r, &rdma_write_ops);
> -        qemu_file_set_hooks(r->file, &rdma_write_hooks);
> +        rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
> +        qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
>      } else {
> -        r->file = qemu_fopen_ops(r, &rdma_read_ops);
> -        qemu_file_set_hooks(r->file, &rdma_read_hooks);
> +        rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
> +        qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
>      }
>  
> -    return r->file;
> +    return rioc->file;
>  }
>  
>  static void rdma_accept_incoming_migration(void *opaque)
> -- 
> 2.5.0
> 
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK



reply via email to

[Prev in Thread] Current Thread [Next in Thread]