[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH RFC 08/12] migration/rdma: register memory for multiRDMA channels
From: |
Zhimin Feng |
Subject: |
[PATCH RFC 08/12] migration/rdma: register memory for multiRDMA channels |
Date: |
Thu, 9 Jan 2020 12:59:18 +0800 |
From: fengzhimin <address@hidden>
register memory for multiRDMA channels and transmit the destination
the keys to source to use including the virtual addresses.
Signed-off-by: fengzhimin <address@hidden>
---
migration/rdma.c | 192 ++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 190 insertions(+), 2 deletions(-)
diff --git a/migration/rdma.c b/migration/rdma.c
index 518a21b0fe..6ecc870844 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -3847,6 +3847,15 @@ static int rdma_load_hook(QEMUFile *f, void *opaque,
uint64_t flags, void *data)
return rdma_block_notification_handle(opaque, data);
case RAM_CONTROL_HOOK:
+ if (migrate_use_multiRDMA()) {
+ int i;
+ int thread_count = migrate_multiRDMA_channels();
+ /* Inform dest recv_thread to poll */
+ for (i = 0; i < thread_count; i++) {
+ qemu_sem_post(&multiRDMA_recv_state->params[i].sem);
+ }
+ }
+
return qemu_rdma_registration_handle(f, opaque);
default:
@@ -3920,6 +3929,17 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void
*opaque,
head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
trace_qemu_rdma_registration_stop_ram();
+ if (migrate_use_multiRDMA()) {
+ /*
+ * Inform the multiRDMA channels to register memory
+ */
+ int i;
+ int thread_count = migrate_multiRDMA_channels();
+ for (i = 0; i < thread_count; i++) {
+ qemu_sem_post(&multiRDMA_send_state->params[i].sem);
+ }
+ }
+
/*
* Make sure that we parallelize the pinning on both sides.
* For very large guests, doing this serially takes a really
@@ -3985,6 +4005,15 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void
*opaque,
head.type = RDMA_CONTROL_REGISTER_FINISHED;
ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
+ if (migrate_use_multiRDMA()) {
+ /* Inform src send_thread to send FINISHED signal */
+ int i;
+ int thread_count = migrate_multiRDMA_channels();
+ for (i = 0; i < thread_count; i++) {
+ qemu_sem_post(&multiRDMA_send_state->params[i].sem);
+ }
+ }
+
if (ret < 0) {
goto err;
}
@@ -4150,18 +4179,119 @@ err:
return msg;
}
+static int qemu_multiRDMA_registration_handle(void *opaque)
+{
+ RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
+ .repeat = 1 };
+ MultiRDMARecvParams *p = opaque;
+ RDMAContext *rdma = p->rdma;
+ RDMALocalBlocks *local = &rdma->local_ram_blocks;
+ RDMAControlHeader head;
+ int ret = 0;
+ int i = 0;
+
+ CHECK_ERROR_STATE();
+
+ do {
+ ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
+
+ if (ret < 0) {
+ break;
+ }
+
+ if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
+ error_report("rdma: Too many requests in this message (%d)."
+ "Bailing.", head.repeat);
+ ret = -EIO;
+ break;
+ }
+
+ switch (head.type) {
+ case RDMA_CONTROL_REGISTER_FINISHED:
+ goto out;
+ case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
+ qsort(rdma->local_ram_blocks.block,
+ rdma->local_ram_blocks.nb_blocks,
+ sizeof(RDMALocalBlock), dest_ram_sort_func);
+
+ if (rdma->pin_all) {
+ ret = qemu_rdma_reg_whole_ram_blocks(rdma);
+ if (ret) {
+ error_report("rdma migration: error dest "
+ "registering ram blocks");
+ goto out;
+ }
+ }
+
+ for (i = 0; i < local->nb_blocks; i++) {
+ /*
+ * The multiRDMA threads only register ram block
+ * to send data, other blocks are sent by main RDMA thread.
+ */
+ if (strcmp(local->block[i].block_name, "mach-virt.ram") == 0) {
+ rdma->dest_blocks[i].remote_host_addr =
+ (uintptr_t)(local->block[i].local_host_addr);
+
+ if (rdma->pin_all) {
+ rdma->dest_blocks[i].remote_rkey =
+ local->block[i].mr->rkey;
+ }
+
+ rdma->dest_blocks[i].offset = local->block[i].offset;
+ rdma->dest_blocks[i].length = local->block[i].length;
+
+ dest_block_to_network(&rdma->dest_blocks[i]);
+
+ break;
+ }
+ }
+
+ blocks.len = rdma->local_ram_blocks.nb_blocks
+ * sizeof(RDMADestBlock);
+
+ ret = qemu_rdma_post_send_control(rdma,
+ (uint8_t *) rdma->dest_blocks,
+ &blocks);
+
+ if (ret < 0) {
+ error_report("rdma migration: error sending remote info");
+ goto out;
+ }
+
+ break;
+ default:
+ error_report("Unknown control message %s",
control_desc(head.type));
+ ret = -EIO;
+ goto out;
+ }
+ } while (1);
+out:
+ if (ret < 0) {
+ rdma->error_state = ret;
+ }
+ return ret;
+}
+
static void *multiRDMA_recv_thread(void *opaque)
{
MultiRDMARecvParams *p = opaque;
+ int ret;
while (true) {
+ qemu_sem_wait(&p->sem);
+
qemu_mutex_lock(&p->mutex);
if (p->quit) {
qemu_mutex_unlock(&p->mutex);
break;
}
qemu_mutex_unlock(&p->mutex);
- qemu_sem_wait(&p->sem);
+
+ ret = qemu_multiRDMA_registration_handle(opaque);
+ if (ret < 0) {
+ qemu_file_set_error(p->file, ret);
+ break;
+ }
}
qemu_mutex_lock(&p->mutex);
@@ -4378,18 +4508,76 @@ static void migration_rdma_send_initial_packet(QEMUFile
*f, uint8_t id)
static void *multiRDMA_send_thread(void *opaque)
{
MultiRDMASendParams *p = opaque;
+ RDMAContext *rdma = p->rdma;
+ int ret;
/* send the multiRDMA channels magic */
migration_rdma_send_initial_packet(p->file, p->id);
+ /* wait for semaphore notification to register memory */
+ qemu_sem_wait(&p->sem);
+
+ RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
+ RDMALocalBlocks *local = &rdma->local_ram_blocks;
+ int reg_result_idx, i, nb_dest_blocks;
+ RDMAControlHeader head = { .len = 0, .repeat = 1 };
+
+ head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
+
+ ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
+ ®_result_idx, rdma->pin_all ?
+ qemu_rdma_reg_whole_ram_blocks : NULL);
+ if (ret < 0) {
+ return NULL;
+ }
+
+ nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
+
+ if (local->nb_blocks != nb_dest_blocks) {
+ rdma->error_state = -EINVAL;
+ return NULL;
+ }
+
+ qemu_rdma_move_header(rdma, reg_result_idx, &resp);
+ memcpy(rdma->dest_blocks,
+ rdma->wr_data[reg_result_idx].control_curr, resp.len);
+
+ for (i = 0; i < nb_dest_blocks; i++) {
+ /*
+ * The multiRDMA threads only register ram block
+ * to send data, other blocks are sent by main RDMA thread.
+ */
+ if (strcmp(local->block[i].block_name, "mach-virt.ram") == 0) {
+ network_to_dest_block(&rdma->dest_blocks[i]);
+
+ /* We require that the blocks are in the same order */
+ if (rdma->dest_blocks[i].length != local->block[i].length) {
+ rdma->error_state = -EINVAL;
+ return NULL;
+ }
+ local->block[i].remote_host_addr =
+ rdma->dest_blocks[i].remote_host_addr;
+ local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
+ break;
+ }
+ }
+
while (true) {
+ qemu_sem_wait(&p->sem);
+
qemu_mutex_lock(&p->mutex);
if (p->quit) {
qemu_mutex_unlock(&p->mutex);
break;
}
qemu_mutex_unlock(&p->mutex);
- qemu_sem_wait(&p->sem);
+
+ /* Send FINISHED to the destination */
+ head.type = RDMA_CONTROL_REGISTER_FINISHED;
+ ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
+ if (ret < 0) {
+ return NULL;
+ }
}
qemu_mutex_lock(&p->mutex);
--
2.19.1