[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v2 4/5] migration: implement bi-directional RDMA
From: |
858585 jemmy |
Subject: |
Re: [Qemu-devel] [PATCH v2 4/5] migration: implement bi-directional RDMA QIOChannel |
Date: |
Fri, 27 Apr 2018 15:56:38 +0800 |
On Fri, Apr 27, 2018 at 1:36 AM, Dr. David Alan Gilbert
<address@hidden> wrote:
> * Lidong Chen (address@hidden) wrote:
>> This patch implements bi-directional RDMA QIOChannel. Because different
>> threads may access RDMAQIOChannel concurrently, this patch use RCU to
>> protect it.
>>
>> Signed-off-by: Lidong Chen <address@hidden>
>
> I'm a bit confused by this.
>
> I can see it's adding RCU to protect the rdma structures against
> deletion from multiple threads; that I'm OK with in principal; is that
> the only locking we need? (I guess the two directions are actually
> separate RDMAContext's so maybe).
The qio_channel_rdma_close maybe invoked by migration thread and
return path thread
concurrently, so I use a mutex to protect it.
If one thread invoke qio_channel_rdma_writev, another thread invokes
qio_channel_rdma_readv,
two threads will use separate RDMAContext, so it does not need a lock.
If two threads invoke qio_channel_rdma_writev concurrently, it will
need a lock to protect.
but I find source qemu migration thread only invoke
qio_channel_rdma_writev, the return path
thread only invokes qio_channel_rdma_readv.
The destination qemu only invoked qio_channel_rdma_readv by main
thread before postcopy and or
listen thread after postcopy.
The destination qemu have already protected it by using
qemu_mutex_lock(&mis->rp_mutex) when writing data to
source qemu.
But should we use qemu_mutex_lock to protect qio_channel_rdma_writev
and qio_channel_rdma_readv?
to avoid some change in future invoke qio_channel_rdma_writev or
qio_channel_rdma_readv concurrently?
>
> But is there nothing else to make the QIOChannel bidirectional?
>
> Also, a lot seems dependent on listen_id, can you explain how that's
> being used.
The destination qemu is server side, so listen_id is not zero. the
source qemu is client side,
the listen_id is zero.
I use listen_id to determine whether qemu is destination or source.
for the destination qemu, if write data to source, it need use the
return_path rdma, like this:
if (rdma->listen_id) {
rdma = rdma->return_path;
}
for the source qemu, if read data from destination, it also need use
the return_path rdma.
if (!rdma->listen_id) {
rdma = rdma->return_path;
}
>
> Finally, I don't think you have anywhere that destroys the new mutex you
> added.
I will fix this next version.
>
> Dave
> P.S. Please cc Daniel Berrange on this series, since it's so much
> IOChannel stuff.
>
>> ---
>> migration/rdma.c | 162
>> +++++++++++++++++++++++++++++++++++++++++++++++++------
>> 1 file changed, 146 insertions(+), 16 deletions(-)
>>
>> diff --git a/migration/rdma.c b/migration/rdma.c
>> index f5c1d02..0652224 100644
>> --- a/migration/rdma.c
>> +++ b/migration/rdma.c
>> @@ -86,6 +86,7 @@ static uint32_t known_capabilities =
>> RDMA_CAPABILITY_PIN_ALL;
>> " to abort!"); \
>> rdma->error_reported = 1; \
>> } \
>> + rcu_read_unlock(); \
>> return rdma->error_state; \
>> } \
>> } while (0)
>> @@ -405,6 +406,7 @@ struct QIOChannelRDMA {
>> RDMAContext *rdma;
>> QEMUFile *file;
>> bool blocking; /* XXX we don't actually honour this yet */
>> + QemuMutex lock;
>> };
>>
>> /*
>> @@ -2635,12 +2637,29 @@ static ssize_t qio_channel_rdma_writev(QIOChannel
>> *ioc,
>> {
>> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>> QEMUFile *f = rioc->file;
>> - RDMAContext *rdma = rioc->rdma;
>> + RDMAContext *rdma;
>> int ret;
>> ssize_t done = 0;
>> size_t i;
>> size_t len = 0;
>>
>> + rcu_read_lock();
>> + rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> + if (!rdma) {
>> + rcu_read_unlock();
>> + return -EIO;
>> + }
>> +
>> + if (rdma->listen_id) {
>> + rdma = rdma->return_path;
>> + }
>> +
>> + if (!rdma) {
>> + rcu_read_unlock();
>> + return -EIO;
>> + }
>> +
>> CHECK_ERROR_STATE();
>>
>> /*
>> @@ -2650,6 +2669,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>> ret = qemu_rdma_write_flush(f, rdma);
>> if (ret < 0) {
>> rdma->error_state = ret;
>> + rcu_read_unlock();
>> return ret;
>> }
>>
>> @@ -2669,6 +2689,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>>
>> if (ret < 0) {
>> rdma->error_state = ret;
>> + rcu_read_unlock();
>> return ret;
>> }
>>
>> @@ -2677,6 +2698,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>> }
>> }
>>
>> + rcu_read_unlock();
>> return done;
>> }
>>
>> @@ -2710,12 +2732,29 @@ static ssize_t qio_channel_rdma_readv(QIOChannel
>> *ioc,
>> Error **errp)
>> {
>> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>> - RDMAContext *rdma = rioc->rdma;
>> + RDMAContext *rdma;
>> RDMAControlHeader head;
>> int ret = 0;
>> ssize_t i;
>> size_t done = 0;
>>
>> + rcu_read_lock();
>> + rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> + if (!rdma) {
>> + rcu_read_unlock();
>> + return -EIO;
>> + }
>> +
>> + if (!rdma->listen_id) {
>> + rdma = rdma->return_path;
>> + }
>> +
>> + if (!rdma) {
>> + rcu_read_unlock();
>> + return -EIO;
>> + }
>> +
>> CHECK_ERROR_STATE();
>>
>> for (i = 0; i < niov; i++) {
>> @@ -2727,7 +2766,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>> * were given and dish out the bytes until we run
>> * out of bytes.
>> */
>> - ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
>> + ret = qemu_rdma_fill(rdma, data, want, 0);
>> done += ret;
>> want -= ret;
>> /* Got what we needed, so go to next iovec */
>> @@ -2749,25 +2788,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel
>> *ioc,
>>
>> if (ret < 0) {
>> rdma->error_state = ret;
>> + rcu_read_unlock();
>> return ret;
>> }
>>
>> /*
>> * SEND was received with new bytes, now try again.
>> */
>> - ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
>> + ret = qemu_rdma_fill(rdma, data, want, 0);
>> done += ret;
>> want -= ret;
>>
>> /* Still didn't get enough, so lets just return */
>> if (want) {
>> if (done == 0) {
>> + rcu_read_unlock();
>> return QIO_CHANNEL_ERR_BLOCK;
>> } else {
>> break;
>> }
>> }
>> }
>> + rcu_read_unlock();
>> return done;
>> }
>>
>> @@ -2823,6 +2865,16 @@ qio_channel_rdma_source_prepare(GSource *source,
>> GIOCondition cond = 0;
>> *timeout = -1;
>>
>> + if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
>> + (!rdma->listen_id && rsource->condition == G_IO_IN)) {
>> + rdma = rdma->return_path;
>> + }
>> +
>> + if (!rdma) {
>> + error_report("RDMAContext is NULL when prepare Gsource");
>> + return FALSE;
>> + }
>> +
>> if (rdma->wr_data[0].control_len) {
>> cond |= G_IO_IN;
>> }
>> @@ -2838,6 +2890,16 @@ qio_channel_rdma_source_check(GSource *source)
>> RDMAContext *rdma = rsource->rioc->rdma;
>> GIOCondition cond = 0;
>>
>> + if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
>> + (!rdma->listen_id && rsource->condition == G_IO_IN)) {
>> + rdma = rdma->return_path;
>> + }
>> +
>> + if (!rdma) {
>> + error_report("RDMAContext is NULL when check Gsource");
>> + return FALSE;
>> + }
>> +
>> if (rdma->wr_data[0].control_len) {
>> cond |= G_IO_IN;
>> }
>> @@ -2856,6 +2918,16 @@ qio_channel_rdma_source_dispatch(GSource *source,
>> RDMAContext *rdma = rsource->rioc->rdma;
>> GIOCondition cond = 0;
>>
>> + if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
>> + (!rdma->listen_id && rsource->condition == G_IO_IN)) {
>> + rdma = rdma->return_path;
>> + }
>> +
>> + if (!rdma) {
>> + error_report("RDMAContext is NULL when dispatch Gsource");
>> + return FALSE;
>> + }
>> +
>> if (rdma->wr_data[0].control_len) {
>> cond |= G_IO_IN;
>> }
>> @@ -2905,15 +2977,29 @@ static int qio_channel_rdma_close(QIOChannel *ioc,
>> Error **errp)
>> {
>> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>> + RDMAContext *rdma;
>> trace_qemu_rdma_close();
>> - if (rioc->rdma) {
>> - if (!rioc->rdma->error_state) {
>> - rioc->rdma->error_state = qemu_file_get_error(rioc->file);
>> - }
>> - qemu_rdma_cleanup(rioc->rdma);
>> - g_free(rioc->rdma);
>> - rioc->rdma = NULL;
>> +
>> + qemu_mutex_lock(&rioc->lock);
>> + rdma = rioc->rdma;
>> + if (!rdma) {
>> + qemu_mutex_unlock(&rioc->lock);
>> + return 0;
>> + }
>> + atomic_rcu_set(&rioc->rdma, NULL);
>> + qemu_mutex_unlock(&rioc->lock);
>> +
>> + if (!rdma->error_state) {
>> + rdma->error_state = qemu_file_get_error(rioc->file);
>> + }
>> + qemu_rdma_cleanup(rdma);
>> +
>> + if (rdma->return_path) {
>> + qemu_rdma_cleanup(rdma->return_path);
>> + g_free(rdma->return_path);
>> }
>> +
>> + g_free(rdma);
>> return 0;
>> }
>>
>> @@ -2956,12 +3042,21 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void
>> *opaque,
>> size_t size, uint64_t *bytes_sent)
>> {
>> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> - RDMAContext *rdma = rioc->rdma;
>> + RDMAContext *rdma;
>> int ret;
>>
>> + rcu_read_lock();
>> + rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> + if (!rdma) {
>> + rcu_read_unlock();
>> + return -EIO;
>> + }
>> +
>> CHECK_ERROR_STATE();
>>
>> if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
>> + rcu_read_unlock();
>> return RAM_SAVE_CONTROL_NOT_SUPP;
>> }
>>
>> @@ -3046,9 +3141,11 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void
>> *opaque,
>> }
>> }
>>
>> + rcu_read_unlock();
>> return RAM_SAVE_CONTROL_DELAYED;
>> err:
>> rdma->error_state = ret;
>> + rcu_read_unlock();
>> return ret;
>> }
>>
>> @@ -3224,8 +3321,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f,
>> void *opaque)
>> RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
>> .repeat = 1 };
>> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> - RDMAContext *rdma = rioc->rdma;
>> - RDMALocalBlocks *local = &rdma->local_ram_blocks;
>> + RDMAContext *rdma;
>> + RDMALocalBlocks *local;
>> RDMAControlHeader head;
>> RDMARegister *reg, *registers;
>> RDMACompress *comp;
>> @@ -3238,8 +3335,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f,
>> void *opaque)
>> int count = 0;
>> int i = 0;
>>
>> + rcu_read_lock();
>> + rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> + if (!rdma) {
>> + rcu_read_unlock();
>> + return -EIO;
>> + }
>> +
>> CHECK_ERROR_STATE();
>>
>> + local = &rdma->local_ram_blocks;
>> do {
>> trace_qemu_rdma_registration_handle_wait();
>>
>> @@ -3469,6 +3575,7 @@ out:
>> if (ret < 0) {
>> rdma->error_state = ret;
>> }
>> + rcu_read_unlock();
>> return ret;
>> }
>>
>> @@ -3525,11 +3632,19 @@ static int qemu_rdma_registration_start(QEMUFile *f,
>> void *opaque,
>> uint64_t flags, void *data)
>> {
>> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> - RDMAContext *rdma = rioc->rdma;
>> + RDMAContext *rdma;
>> +
>> + rcu_read_lock();
>> + rdma = atomic_rcu_read(&rioc->rdma);
>> + if (!rdma) {
>> + rcu_read_unlock();
>> + return -EIO;
>> + }
>>
>> CHECK_ERROR_STATE();
>>
>> if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
>> + rcu_read_unlock();
>> return 0;
>> }
>>
>> @@ -3537,6 +3652,7 @@ static int qemu_rdma_registration_start(QEMUFile *f,
>> void *opaque,
>> qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
>> qemu_fflush(f);
>>
>> + rcu_read_unlock();
>> return 0;
>> }
>>
>> @@ -3549,13 +3665,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f,
>> void *opaque,
>> {
>> Error *local_err = NULL, **errp = &local_err;
>> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> - RDMAContext *rdma = rioc->rdma;
>> + RDMAContext *rdma;
>> RDMAControlHeader head = { .len = 0, .repeat = 1 };
>> int ret = 0;
>>
>> + rcu_read_lock();
>> + rdma = atomic_rcu_read(&rioc->rdma);
>> + if (!rdma) {
>> + rcu_read_unlock();
>> + return -EIO;
>> + }
>> +
>> CHECK_ERROR_STATE();
>>
>> if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
>> + rcu_read_unlock();
>> return 0;
>> }
>>
>> @@ -3587,6 +3711,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f,
>> void *opaque,
>> qemu_rdma_reg_whole_ram_blocks : NULL);
>> if (ret < 0) {
>> ERROR(errp, "receiving remote info!");
>> + rcu_read_unlock();
>> return ret;
>> }
>>
>> @@ -3610,6 +3735,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f,
>> void *opaque,
>> "not identical on both the source and destination.",
>> local->nb_blocks, nb_dest_blocks);
>> rdma->error_state = -EINVAL;
>> + rcu_read_unlock();
>> return -EINVAL;
>> }
>>
>> @@ -3626,6 +3752,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f,
>> void *opaque,
>> local->block[i].length,
>> rdma->dest_blocks[i].length);
>> rdma->error_state = -EINVAL;
>> + rcu_read_unlock();
>> return -EINVAL;
>> }
>> local->block[i].remote_host_addr =
>> @@ -3643,9 +3770,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f,
>> void *opaque,
>> goto err;
>> }
>>
>> + rcu_read_unlock();
>> return 0;
>> err:
>> rdma->error_state = ret;
>> + rcu_read_unlock();
>> return ret;
>> }
>>
>> @@ -3707,6 +3836,7 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma,
>> const char *mode)
>>
>> rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
>> rioc->rdma = rdma;
>> + qemu_mutex_init(&rioc->lock);
>>
>> if (mode[0] == 'w') {
>> rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
>> --
>> 1.8.3.1
>>
> --
> Dr. David Alan Gilbert / address@hidden / Manchester, UK
- Re: [Qemu-devel] [PATCH v2 2/5] migration: create a dedicated connection for rdma return path, (continued)