qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [incomplete 1/1] Rsocket migration support [incomplete]


From: Dr. David Alan Gilbert (git)
Subject: [Qemu-devel] [incomplete 1/1] Rsocket migration support [incomplete]
Date: Mon, 19 Jan 2015 11:01:12 +0000

From: "Dr. David Alan Gilbert" <address@hidden>

Implement migration over RDMA using the 'rsocket' library, the code
appears to work for guests with < 4GB RAM, but is hitting what
appears to be internal library limitations above that.
(riowrite always returns EAGAIN as soon as I register more RAM).
Note also that the library doesn't provide zero copy on the send
side.

The code has a few other hacks and incompletenesses, but I thought
I'd release the source anyway for anyone else interested in
investigating.

Very lightly tested, but it did manage to migrate a 'stressapptest'
run on a 3.5GB guest successfully over 10Gb ROCE.

Note it needs the 'qemu_ram_foreach_block: pass up error value, and down
the ramblock name' patch from my postcopy world.

Signed-off-by: Dr. David Alan Gilbert <address@hidden>
---
 arch_init.c                   |   3 +-
 include/migration/migration.h |   4 +
 include/qemu/iov.h            |  18 +-
 include/qemu/sockets.h        |   4 +
 migration/Makefile.objs       |   2 +-
 migration/migration.c         |   4 +
 migration/rsocket.c           | 964 ++++++++++++++++++++++++++++++++++++++++++
 qemu-coroutine-io.c           |   3 +-
 trace-events                  |  35 ++
 util/iov.c                    |  14 +-
 util/qemu-sockets.c           |   8 +-
 11 files changed, 1042 insertions(+), 17 deletions(-)
 create mode 100644 migration/rsocket.c

diff --git a/arch_init.c b/arch_init.c
index 7680d28..5aaa51b 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -847,10 +847,10 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
         qemu_put_be64(f, block->length);
     }
 
-    qemu_mutex_unlock_ramlist();
 
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
+    qemu_mutex_unlock_ramlist();
 
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
@@ -1103,6 +1103,7 @@ static int ram_load(QEMUFile *f, void *opaque, int 
version_id)
 
                 total_ram_bytes -= length;
             }
+            ram_control_before_iterate(f, RAM_CONTROL_SETUP);
             break;
         case RAM_SAVE_FLAG_COMPRESS:
             host = host_from_stream_offset(f, addr, flags);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3cb5ba8..a54dd99 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -94,6 +94,10 @@ void rdma_start_outgoing_migration(void *opaque, const char 
*host_port, Error **
 
 void rdma_start_incoming_migration(const char *host_port, Error **errp);
 
+void rsocket_start_outgoing_migration(MigrationState *s, const char 
*host_port, Error **errp);
+
+void rsocket_start_incoming_migration(const char *host_port, Error **errp);
+
 void migrate_fd_error(MigrationState *s);
 
 void migrate_fd_connect(MigrationState *s);
diff --git a/include/qemu/iov.h b/include/qemu/iov.h
index 68d25f2..7bd9935 100644
--- a/include/qemu/iov.h
+++ b/include/qemu/iov.h
@@ -58,6 +58,17 @@ size_t iov_memset(const struct iovec *iov, const unsigned 
int iov_cnt,
                   size_t offset, int fillc, size_t bytes);
 
 /*
+ * Helper function for iov_send_recv for standard FDs
+ */
+ssize_t iov_send_recv_fd(int sockfd, struct iovec *iov, unsigned iov_cnt,
+                         bool do_send);
+
+/*
+ * Type of iov_send_recv_fd and similar helper functions.
+ */
+typedef ssize_t (*iov_send_recv_func)(int, struct iovec *, unsigned, bool);
+
+/*
  * Send/recv data from/to iovec buffers directly
  *
  * `offset' bytes in the beginning of iovec buffer are skipped and
@@ -76,11 +87,12 @@ size_t iov_memset(const struct iovec *iov, const unsigned 
int iov_cnt,
  * should be within the iovec, not only beginning of it.
  */
 ssize_t iov_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt,
-                      size_t offset, size_t bytes, bool do_send);
+                      size_t offset, size_t bytes, bool do_send,
+                      iov_send_recv_func helper);
 #define iov_recv(sockfd, iov, iov_cnt, offset, bytes) \
-  iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, false)
+  iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, false, iov_send_recv_fd)
 #define iov_send(sockfd, iov, iov_cnt, offset, bytes) \
-  iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, true)
+  iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, true, iov_send_recv_fd)
 
 /**
  * Produce a text hexdump of iovec `iov' with `iov_cnt' number of elements
diff --git a/include/qemu/sockets.h b/include/qemu/sockets.h
index f47dae6..8611d8f 100644
--- a/include/qemu/sockets.h
+++ b/include/qemu/sockets.h
@@ -60,8 +60,11 @@ int inet_nonblocking_connect(const char *str,
                              NonBlockingConnectHandler *callback,
                              void *opaque, Error **errp);
 
+void inet_addr_to_opts(QemuOpts *opts, const InetSocketAddress *addr);
 int inet_dgram_opts(QemuOpts *opts, Error **errp);
 NetworkAddressFamily inet_netfamily(int family);
+int inet_getport(struct addrinfo *e);
+void inet_setport(struct addrinfo *e, int port);
 
 int unix_listen_opts(QemuOpts *opts, Error **errp);
 int unix_listen(const char *path, char *ostr, int olen, Error **errp);
@@ -71,6 +74,7 @@ int unix_connect(const char *path, Error **errp);
 int unix_nonblocking_connect(const char *str,
                              NonBlockingConnectHandler *callback,
                              void *opaque, Error **errp);
+struct addrinfo *inet_parse_connect_opts(QemuOpts *opts, Error **errp);
 
 SocketAddress *socket_parse(const char *str, Error **errp);
 int socket_connect(SocketAddress *addr, Error **errp,
diff --git a/migration/Makefile.objs b/migration/Makefile.objs
index d929e96..02fe66f 100644
--- a/migration/Makefile.objs
+++ b/migration/Makefile.objs
@@ -3,7 +3,7 @@ common-obj-y += vmstate.o
 common-obj-y += qemu-file.o qemu-file-buf.o qemu-file-unix.o qemu-file-stdio.o
 common-obj-y += xbzrle.o
 
-common-obj-$(CONFIG_RDMA) += rdma.o
+common-obj-$(CONFIG_RDMA) += rdma.o rsocket.o
 common-obj-$(CONFIG_POSIX) += exec.o unix.o fd.o
 
 common-obj-y += block.o
diff --git a/migration/migration.c b/migration/migration.c
index c49a05a..d264400 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -74,6 +74,8 @@ void qemu_start_incoming_migration(const char *uri, Error 
**errp)
 #ifdef CONFIG_RDMA
     else if (strstart(uri, "rdma:", &p))
         rdma_start_incoming_migration(p, errp);
+    else if (strstart(uri, "rsocket:", &p))
+        rsocket_start_incoming_migration(p, errp);
 #endif
 #if !defined(WIN32)
     else if (strstart(uri, "exec:", &p))
@@ -442,6 +444,8 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
 #ifdef CONFIG_RDMA
     } else if (strstart(uri, "rdma:", &p)) {
         rdma_start_outgoing_migration(s, p, &local_err);
+    } else if (strstart(uri, "rsocket:", &p)) {
+        rsocket_start_outgoing_migration(s, p, &local_err);
 #endif
 #if !defined(WIN32)
     } else if (strstart(uri, "exec:", &p)) {
diff --git a/migration/rsocket.c b/migration/rsocket.c
new file mode 100644
index 0000000..59fca29
--- /dev/null
+++ b/migration/rsocket.c
@@ -0,0 +1,964 @@
+/*
+ * QEMU live migration
+ *
+ * Copyright Copyright 2015 Red Hat, Inc. and/or its affiliates
+ *
+ * Authors:
+ *  David Gilbert <address@hidden>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ * (Based on migration/tcp.c)
+ */
+
+#include <string.h>
+
+#include "qemu-common.h"
+#include "qemu/error-report.h"
+#include "qemu/iov.h"
+#include "qemu/sockets.h"
+#include "qemu/thread.h"
+#include "migration/migration.h"
+#include "migration/qemu-file.h"
+#include "block/block.h"
+#include "qemu/main-loop.h"
+#include "trace.h"
+
+#include "rdma/rsocket.h"
+
+
+typedef struct Rsocket_handler_thread_data {
+    int rfd;
+    short events;
+    IOHandler *callback;
+    void      *callback_data;
+
+    QemuThread thread;
+    int pipe_fds[2];
+} Rsocket_handler_thread_data;
+
+/*
+ * Called via qemu_set_fd_handler in main thread, after
+ * handler_thread has kicked it on receipt of the event.
+ */
+static void rsocket_handler_fdcallback(void *opaque)
+{
+    Rsocket_handler_thread_data *rhtd = opaque;
+    IOHandler *callback = rhtd->callback;
+    IOHandler *callback_data = rhtd->callback_data;
+
+    trace_rsocket_handler_fdcallback();
+    /* First do some cleanup */
+    qemu_thread_join(&rhtd->thread);
+    qemu_set_fd_handler(rhtd->pipe_fds[0], NULL, NULL, NULL);
+    close(rhtd->pipe_fds[0]);
+    close(rhtd->pipe_fds[1]);
+    g_free(rhtd);
+
+    callback(callback_data);
+}
+
+/* Created by rsocket_set_handler */
+static void *handler_thread(void *opaque)
+{
+    Rsocket_handler_thread_data *rhtd = opaque;
+
+    struct pollfd pollfd;
+    pollfd.fd = rhtd->rfd;
+    pollfd.events = rhtd->events;
+    pollfd.revents = 0;
+
+    trace_rsocket_handler_thread_top();
+    rpoll(&pollfd, 1, -1 /* Forever - hmm */);
+
+    trace_rsocket_handler_thread_after_poll();
+    /* 
+     * Kick the real handler
+     * rsocket_handler_fdcallback will now be called and cleanup
+     */
+    return (void *)write(rhtd->pipe_fds[1], "K", 1);
+}
+
+/*
+ * The 'rfd' isn't a real fd, and so we can't give it to qemu_set_fd_handler,
+ * so spawn a dummy thread that waits in an rpoll, kicks a real fd which
+ * then ends up calling callback.
+ */
+static int rsocket_set_handler(int rfd, short events, const char *name,
+                               IOHandler *callback, void *callback_data)
+{
+    Rsocket_handler_thread_data *rhtd = g_malloc0(sizeof(*rhtd));
+
+    trace_rsocket_set_handler();
+    rhtd->rfd = rfd;
+    rhtd->callback = callback;
+    rhtd->callback_data = callback_data;
+    rhtd->events = events;
+    if (qemu_pipe(rhtd->pipe_fds)) {
+        return -1;
+    }
+    qemu_set_fd_handler(rhtd->pipe_fds[0], rsocket_handler_fdcallback, NULL,
+                         rhtd);
+    qemu_thread_create(&rhtd->thread, name, handler_thread, rhtd,
+                       QEMU_THREAD_JOINABLE);
+
+    return 0;
+}
+
+/* - - - Replacements for util/qemu_socket.c functions - - - - - - - - - - */
+
+/* Struct to store connect state for non blocking connect */
+typedef struct RConnectState {
+    int fd;
+    struct addrinfo *addr_list;
+    struct addrinfo *current_addr;
+    NonBlockingConnectHandler *callback;
+    void *opaque;
+} RConnectState;
+
+static void rsocket_set_nonblock(int rfd)
+{
+    long f; /* rsocket uses va_arg(..,long) to read this! */
+    trace_rsocket_set_nonblock(rfd);
+    /* Take care, rsocket's rfcntl is very basic */
+    f = rfcntl(rfd, F_GETFL);
+    rfcntl(rfd, F_SETFL, f | O_NONBLOCK);
+}
+
+/*
+ * rsocket_inet_connect_addr/rsocket_wait_for_connect/rsocket_inet_connect_opts
+ * deal with multiple addresses and connections that take a while to connect
+ */
+static int rsocket_inet_connect_addr(struct addrinfo *addr, bool *in_progress,
+                                     RConnectState *connect_state, Error 
**errp);
+static void rsocket_wait_for_connect(void *opaque)
+{
+    RConnectState *s = opaque;
+    int val = 0, rc = 0;
+    socklen_t valsize = sizeof(val);
+    bool in_progress;
+    Error *err = NULL;
+
+    trace_rsocket_wait_for_connect();
+    do {
+        rc = rgetsockopt(s->fd, SOL_SOCKET, SO_ERROR, &val, &valsize);
+    } while (rc == -1 && errno == EINTR);
+
+    /* update rc to contain error */
+    if (!rc && val) {
+        rc = -1;
+        errno = val;
+    }
+
+    /* connect error */
+    if (rc < 0) {
+        error_setg_errno(&err, errno, "Error connecting to rsocket");
+        rclose(s->fd);
+        s->fd = rc;
+    }
+
+    /* try to connect to the next address on the list */
+    if (s->current_addr) {
+        while (s->current_addr->ai_next != NULL && s->fd < 0) {
+            s->current_addr = s->current_addr->ai_next;
+            s->fd = rsocket_inet_connect_addr(s->current_addr, &in_progress, s,
+                                              NULL);
+            if (s->fd < 0) {
+                error_free(err);
+                err = NULL;
+                error_setg_errno(&err, errno,
+                                 "Unable to start rsocket connect");
+            }
+            /* connect in progress */
+            if (in_progress) {
+                goto out;
+            }
+        }
+
+        freeaddrinfo(s->addr_list);
+    }
+
+    if (s->callback) {
+        s->callback(s->fd, err, s->opaque);
+    }
+    g_free(s);
+out:
+    error_free(err);
+}
+
+static int rsocket_inet_connect_addr(struct addrinfo *addr, bool *in_progress,
+                                     RConnectState *connect_state, Error 
**errp)
+{
+    int sock, rc, tmp;
+
+    trace_rsocket_inet_connect_addr();
+    *in_progress = false;
+
+    sock = rsocket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+    if (sock < 0) {
+        error_setg_errno(errp, errno, "Failed to create socket");
+        return -1;
+    }
+    rsetsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                 (const char *)&tmp, sizeof(tmp));
+    /* connect to peer */
+    do {
+        rc = 0;
+        if (rconnect(sock, addr->ai_addr, addr->ai_addrlen) < 0) {
+            rc = -errno;
+        }
+    } while (rc == -EINTR);
+
+    if (connect_state != NULL && rc == -EINPROGRESS) {
+        connect_state->fd = sock;
+        rsocket_set_handler(sock, POLLOUT | POLLERR , "rsocketconnect",
+                            rsocket_wait_for_connect, connect_state);
+        *in_progress = true;
+    } else if (rc < 0) {
+        error_setg_errno(errp, errno, "Failed to connect socket");
+        rclose(sock);
+        return -1;
+    }
+    return sock;
+}
+
+static int rsocket_inet_connect_opts(QemuOpts *opts, Error **errp,
+                      NonBlockingConnectHandler *callback, void *opaque)
+{
+    Error *local_err = NULL;
+    struct addrinfo *res, *e;
+    int sock = -1;
+    bool in_progress;
+    RConnectState *connect_state = NULL;
+
+    trace_rsocket_inet_connect_opts();
+    res = inet_parse_connect_opts(opts, errp);
+    if (!res) {
+        return -1;
+    }
+
+    if (callback != NULL) {
+        connect_state = g_malloc0(sizeof(*connect_state));
+        connect_state->addr_list = res;
+        connect_state->callback = callback;
+        connect_state->opaque = opaque;
+    }
+
+    for (e = res; e != NULL; e = e->ai_next) {
+        error_free(local_err);
+        local_err = NULL;
+        if (connect_state != NULL) {
+            connect_state->current_addr = e;
+        }
+        sock = rsocket_inet_connect_addr(e, &in_progress, connect_state,
+                                         &local_err);
+        if (sock >= 0) {
+            break;
+        }
+    }
+
+    if (sock < 0) {
+        error_propagate(errp, local_err);
+    } else if (in_progress) {
+        /* wait_for_connect() will do the rest */
+        return sock;
+    } else {
+        if (callback) {
+            callback(sock, NULL, opaque);
+        }
+    }
+    g_free(connect_state);
+    freeaddrinfo(res);
+    return sock;
+}
+
+static int rsocket_inet_listen_opts(QemuOpts *opts, int port_offset,
+                                        Error **errp)
+{
+    struct addrinfo ai,*res,*e;
+    const char *addr;
+    char port[33];
+    char uaddr[INET6_ADDRSTRLEN+1];
+    char uport[33];
+    int slisten;
+    int rc, to, port_min, port_max, p;
+
+    trace_rsocket_inet_listen_opts();
+    memset(&ai,0, sizeof(ai));
+    ai.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+    ai.ai_family = PF_UNSPEC;
+    ai.ai_socktype = SOCK_STREAM;
+
+    if ((qemu_opt_get(opts, "host") == NULL) ||
+        (qemu_opt_get(opts, "port") == NULL)) {
+        error_setg(errp, "host and/or port not specified");
+        return -1;
+    }
+    pstrcpy(port, sizeof(port), qemu_opt_get(opts, "port"));
+    addr = qemu_opt_get(opts, "host");
+
+    to = qemu_opt_get_number(opts, "to", 0);
+    if (qemu_opt_get_bool(opts, "ipv4", 0))
+        ai.ai_family = PF_INET;
+    if (qemu_opt_get_bool(opts, "ipv6", 0))
+        ai.ai_family = PF_INET6;
+
+    /* lookup */
+    if (port_offset) {
+        unsigned long long baseport;
+        if (parse_uint_full(port, &baseport, 10) < 0) {
+            error_setg(errp, "can't convert to a number: %s", port);
+            return -1;
+        }
+        if (baseport > 65535 ||
+            baseport + port_offset > 65535) {
+            error_setg(errp, "port %s out of range", port);
+            return -1;
+        }
+        snprintf(port, sizeof(port), "%d", (int)baseport + port_offset);
+    }
+    rc = getaddrinfo(strlen(addr) ? addr : NULL, port, &ai, &res);
+    if (rc != 0) {
+        error_setg(errp, "address resolution failed for %s:%s: %s", addr, port,
+                   gai_strerror(rc));
+        return -1;
+    }
+
+    /* create socket + bind */
+    for (e = res; e != NULL; e = e->ai_next) {
+        int tmp;
+        getnameinfo((struct sockaddr*)e->ai_addr,e->ai_addrlen,
+                       uaddr,INET6_ADDRSTRLEN,uport,32,
+                       NI_NUMERICHOST | NI_NUMERICSERV);
+        slisten = rsocket(e->ai_family, e->ai_socktype, e->ai_protocol);
+        if (slisten < 0) {
+            if (!e->ai_next) {
+                error_setg_errno(errp, errno, "Failed to create socket");
+            }
+            continue;
+        }
+
+        tmp = 1;
+        rsetsockopt(slisten, SOL_SOCKET, SO_REUSEADDR,
+                     (const char *)&tmp, sizeof(tmp));
+#ifdef IPV6_V6ONLY
+        if (e->ai_family == PF_INET6) {
+            /* listen on both ipv4 and ipv6 */
+            const int off = 0;
+
+            rsetsockopt(slisten, IPPROTO_IPV6, IPV6_V6ONLY, &off,
+                            sizeof(off));
+        }
+#endif
+
+        port_min = inet_getport(e);
+        port_max = to ? to + port_offset : port_min;
+        for (p = port_min; p <= port_max; p++) {
+            inet_setport(e, p);
+            if (rbind(slisten, e->ai_addr, e->ai_addrlen) == 0) {
+                goto listen;
+            }
+            if (p == port_max) {
+                if (!e->ai_next) {
+                    error_setg_errno(errp, errno, "Failed to bind socket");
+                }
+            }
+        }
+        rclose(slisten);
+    }
+    freeaddrinfo(res);
+    return -1;
+
+listen:
+    if (rlisten(slisten,1) != 0) {
+        error_setg_errno(errp, errno, "Failed to listen on socket");
+        rclose(slisten);
+        freeaddrinfo(res);
+        return -1;
+    }
+    snprintf(uport, sizeof(uport), "%d", inet_getport(e) - port_offset);
+    qemu_opt_set(opts, "host", uaddr);
+    qemu_opt_set(opts, "port", uport);
+    qemu_opt_set(opts, "ipv6", (e->ai_family == PF_INET6) ? "on" : "off");
+    qemu_opt_set(opts, "ipv4", (e->ai_family != PF_INET6) ? "on" : "off");
+    freeaddrinfo(res);
+    return slisten;
+}
+
+static int rsocket_inet_listen(const char *str, 
+                int socktype, int port_offset, Error **errp)
+{
+    QemuOpts *opts;
+    int sock = -1;
+    InetSocketAddress *addr;
+
+    trace_rsocket_inet_listen();
+    addr = inet_parse(str, errp);
+    if (addr != NULL) {
+        opts = qemu_opts_create(&socket_optslist, NULL, 0, &error_abort);
+        inet_addr_to_opts(opts, addr);
+        qapi_free_InetSocketAddress(addr);
+        sock = rsocket_inet_listen_opts(opts, port_offset, errp);
+        qemu_opts_del(opts);
+    }
+    return sock;
+}
+
+/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
+typedef struct RAMBlockMapEntry {
+    void    *host_addr;
+    off_t    rsocket_offset;
+    size_t   len;
+} RAMBlockMapEntry;
+
+typedef struct QEMURsocket {
+    int rfd;
+    QEMUFile *file;
+    bool isSource;
+
+    /* Used by the 'yield_thread' that waits on an rsocket for data */
+    QemuThread thread;
+    QemuSemaphore sem;
+    int pipe_fds[2];
+    short poll_event;
+
+    /* A mapping from ram_addr_t (block_offset) to RAMBlockMapEntry */
+    GHashTable *RAMBlockMap;
+
+    /* Only used for get_fd */
+    int dummy_fd;
+} QEMURsocket;
+
+/* The yield thread is used once the rsocket is open to wait for data
+ * or the space to write data by using 'rpoll'; it then uses a pipe
+ * to wake a waiting coroutine.
+ * The thread waits for a semaphore and when it receives it waits on the
+ * channel.
+ */
+static void *yield_thread(void *opaque)
+{
+    QEMURsocket *rs = opaque;
+    struct pollfd pfd;
+
+    while (1) {
+        trace_rsocket_yield_thread_top();
+        /* Wait until something tries to yield */
+        qemu_sem_wait(&rs->sem);
+        /* Now wait for the rsocket */
+        pfd.fd = rs->rfd;
+        pfd.events = atomic_fetch_add(&rs->poll_event, 0);
+        if (!pfd.events) {
+            trace_rsocket_yield_thread_requested_exit();
+            /* Exit */
+            break;
+        }
+        pfd.events |= POLLERR | POLLHUP | POLLNVAL;
+        rpoll(&pfd, 1, -1 /* Hmm */);
+
+        /* Kick the waiting coroutine */
+        if (write(rs->pipe_fds[1], "K", 1) != 1) {
+            break;
+        }
+    }
+    trace_rsocket_yield_thread_exit();
+    return NULL;
+}
+
+static int start_yield_thread(QEMURsocket *rs, short poll_event)
+{
+    trace_rsocket_start_yield_thread();
+    if (qemu_pipe(rs->pipe_fds)) {
+        return -errno;
+    }
+    rs->poll_event = poll_event;
+    qemu_sem_init(&rs->sem, 0);
+
+    qemu_thread_create(&rs->thread, "rsocketyield", yield_thread, rs,
+                       QEMU_THREAD_JOINABLE);
+
+    return 0;
+}
+
+static void stop_yield_thread(QEMURsocket *rs)
+{
+    trace_rsocket_stop_yield_thread();
+    /* Tell the thread to exit */
+    atomic_and(&rs->poll_event, 0);
+    qemu_sem_post(&rs->sem);
+    
+    qemu_thread_join(&rs->thread);
+    close(rs->pipe_fds[0]);
+    close(rs->pipe_fds[1]);
+    qemu_sem_destroy(&rs->sem);
+}
+
+static int rsocket_yield(QEMURsocket *rs)
+{
+    char dummy;
+
+    trace_rsocket_yield();
+    /* Ask the yield_thread to wait on the rsocket */
+    qemu_sem_post(&rs->sem);
+    /* And then wait for it to kick us */
+    yield_until_fd_readable(rs->pipe_fds[0]);
+    /* Consume the dummy character on the pipe */
+    return read(rs->pipe_fds[0], &dummy, 1);
+}
+
+/* helper function for iov_send_recv as used by rsocket_writev_buffer */
+static ssize_t rsocket_send(int rsockfd, struct iovec *iov, unsigned iov_cnt,
+                            bool do_send)
+{
+    ssize_t ret;
+    struct msghdr msg;
+
+    trace_rsocket_send();
+    assert(do_send);
+    memset(&msg, 0, sizeof(msg));
+    msg.msg_iov = iov;
+    msg.msg_iovlen = iov_cnt;
+    do {
+        ret = rsendmsg(rsockfd, &msg, 0);
+    } while (ret < 0 && errno == EINTR);
+    return ret;
+}
+
+static ssize_t rsocket_writev_buffer(void *opaque, struct iovec *iov, int 
iovcnt,
+                                     int64_t pos)
+{
+    QEMURsocket *rs = opaque;
+    ssize_t len;
+    ssize_t size = iov_size(iov, iovcnt);
+
+    trace_rsocket_writev_buffer(size);
+    len = iov_send_recv(rs->rfd, iov, iovcnt, 0 /* offset */, size, true, 
rsocket_send);
+    if (len < size) {
+        len = -socket_error();
+    }
+    trace_rsocket_writev_buffer_end(size, len);
+    return len;
+}
+
+static int rsocket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int 
size)
+{
+    QEMURsocket *rs = opaque;
+    ssize_t len;
+
+    trace_rsocket_get_buffer(size);
+    for (;;) {
+        len = rrecv(rs->rfd, buf, size, 0);
+        if (len != -1) {
+            break;
+        }
+        if (errno == EAGAIN) {
+            rsocket_yield(rs);
+        } else if (errno != EINTR) {
+            break;
+        }
+    }
+
+    if (len == -1) {
+        len = -socket_error();
+    }
+    trace_rsocket_get_buffer_exit(size, len);
+    return len;
+}
+
+static gboolean rsocket_close_RAMBlock_func(gpointer key, gpointer value,
+                                            gpointer user_data)
+{
+    QEMURsocket *rs = user_data;
+    /* key is the host address of the start of the RAMBlock */
+    RAMBlockMapEntry *rbme = value;
+
+    trace_rsocket_close_RAMBlock_func(rbme->host_addr, rs->isSource);
+
+    if (rs->isSource) {
+        /* rsocket segs if I don't unmap before close */
+        riounmap(rs->rfd, rbme->host_addr, rbme->len);
+    }
+    g_free(rbme);
+    
+    return TRUE; /* Delete from hash */
+}
+
+static int rsocket_close(void *opaque)
+{
+    QEMURsocket *rs = opaque;
+    trace_rsocket_close();
+    stop_yield_thread(rs);
+    g_hash_table_foreach_remove(rs->RAMBlockMap, rsocket_close_RAMBlock_func,
+                                rs);
+    g_hash_table_destroy(rs->RAMBlockMap);
+    /* rclose(rs->rfd); - HACK! I'm getting a seg in the rsocket code here; 
even with the close above */
+    close(rs->dummy_fd);
+    g_free(rs);
+
+    return 0;
+}
+
+static int rsocket_get_fd(void *opaque)
+{
+    QEMURsocket *rs = opaque;
+
+    /* Hack! get_fd is used for one thing in the general migration code, and
+     * that's for marking it as non-blocking; that needs fixing since we don't
+     * have a real fd we can return.
+     */
+    return rs->dummy_fd;
+}
+
+static int rsocket_dest_ramblock_reg(const char *block_name, void *host_addr,
+    ram_addr_t block_offset, ram_addr_t length, void *opaque)
+{
+    QEMURsocket *rs = opaque;
+    uint64_t rsocket_offset;
+    uint8_t block_name_len = strlen(block_name);
+    RAMBlockMapEntry *rbme = g_new0(RAMBlockMapEntry, 1);
+
+    /* Register every RAMBlock so that we can RDMA into it */
+    rsocket_offset = riomap(rs->rfd, host_addr, length, PROT_WRITE,
+                         0 /* flags? */, -1 /* Offset */);
+    if (rsocket_offset == -1) {
+        error_report("riomap for %s", block_name);
+        return -1;
+    }
+    rbme->rsocket_offset = rsocket_offset;
+    rbme->len = length;
+    rbme->host_addr = host_addr;
+    g_hash_table_insert(rs->RAMBlockMap, (gpointer)block_offset, rbme);
+
+    trace_rsocket_dest_ramblock_reg(block_name, block_offset, rsocket_offset);
+    
+    /* Send the block name and the key to the other side */
+    if (rwrite(rs->rfd, &block_name_len, 1)!=1) {
+        error_report("%s: block_name_len write for %s\n", strerror(errno), 
block_name);
+        return -1;
+    }
+    if (rwrite(rs->rfd, block_name, block_name_len)!=block_name_len) {
+        error_report("%s: block_name write for %s\n", strerror(errno), 
block_name);
+        return -1;
+    }
+    if (rwrite(rs->rfd, &rsocket_offset, 8)!=8) {
+        error_report("%s: rsocket_offset write for %s\n", strerror(errno), 
block_name);
+        return -1;
+    }
+    return 0;
+}
+
+/*
+ * Register all the RAMBlocks as places we might want to RDMA into.
+ */
+static int rsocket_dest_ramblock_setup(QEMURsocket *rs)
+{
+    char zero = 0;
+
+    /* 
+     * Now register each RAMBlock and get an 'offset' that the src can pass
+     * to Riowrite.
+     */
+    if (qemu_ram_foreach_block(rsocket_dest_ramblock_reg, rs)) {
+        error_report("Failed to map rsocket buffers");
+        return -1;
+    }
+
+    if (rwrite(rs->rfd, &zero, 1)!=1) {
+        error_report("%s: terminator write RAMBlock list\n", strerror(errno));
+        return -1;
+    }
+    return 0;
+}
+
+/* Effectively this is just a RAMBlock but we don't have access to it */
+typedef struct RAMBlockSourceData {
+    void      *host_addr;
+    ram_addr_t block_offset;
+    ram_addr_t length;
+} RAMBlockSourceData;
+
+static int source_ramblock_name_mapfunc(const char *block_name, void 
*host_addr,
+    ram_addr_t block_offset, ram_addr_t length, void *opaque)
+{
+    GHashTable *map = opaque;
+    RAMBlockSourceData *rbsd = g_new(RAMBlockSourceData, 1);
+
+    rbsd->host_addr = host_addr;
+    rbsd->block_offset = block_offset;
+    rbsd->length = length;
+    g_hash_table_insert(map, 
(gpointer)(intptr_t)g_quark_from_string(block_name), rbsd);
+    return 0;
+}
+
+/*
+ * Read the list of rsocket keys from the source.
+ */
+static int rsocket_source_ramblock_setup(QEMUFile *f, QEMURsocket *rs)
+{
+    char ram_block_name[256];
+    uint8_t block_name_len;
+    uint64_t rsocket_key;
+    GHashTable *block_name_map;
+    int ret = -1;
+
+    /* Ensure the previous data sent gets to the destination, because
+     * only then will it send this response.
+     */
+    qemu_fflush(f);
+
+    /* Build a mapping from RAMBlock name to ram_addr_t offset for block */
+    block_name_map = g_hash_table_new(NULL, NULL);
+    if (qemu_ram_foreach_block(source_ramblock_name_mapfunc, block_name_map)) {
+        error_report("Failed to make source RAMBlock map");
+        goto err;
+    }
+
+    /*
+     * We're sent a list of RAMBlocks of the form:
+     *    byte   - length of RAMBlock name
+     *    byte[] - The RAMBlock name
+     *  uint64_t - The rsocket 'offset' or key for the block
+     *
+     * If the length is 0 it's the end of the list.
+     */
+    do {
+        RAMBlockMapEntry *rbme;
+        if (rread(rs->rfd, &block_name_len, 1) != 1) {
+            error_report("%s: block_name_len read", strerror(errno));
+            goto err;
+        }
+        if (block_name_len) {
+            RAMBlockSourceData* rbsd;
+            if (rread(rs->rfd, ram_block_name, block_name_len) !=
+                block_name_len) {
+                error_report("%s: block_name read", strerror(errno));
+                goto err;
+            }
+            ram_block_name[block_name_len] = 0;
+            if (rread(rs->rfd, &rsocket_key, 8) != 8) {
+                error_report("%s: rsocket_key read", strerror(errno));
+                goto err;
+            }
+            rbsd = g_hash_table_lookup(block_name_map,
+                       
(gpointer)(intptr_t)g_quark_from_string(ram_block_name));
+            if (!rbsd) {
+                error_report("No matching RAMBlock for %s", ram_block_name);
+                goto err;
+            }
+            rbme = g_new0(RAMBlockMapEntry, 1);
+            rbme->rsocket_offset = rsocket_key;
+            rbme->len = rbsd->length;
+            rbme->host_addr = rbsd->host_addr;
+            g_hash_table_insert(rs->RAMBlockMap, (gpointer)rbsd->block_offset, 
rbme);
+            trace_rsocket_source_ramblock_setup(ram_block_name, 
rbsd->block_offset, rsocket_key);
+        }
+    } while (block_name_len);
+
+    ret = 0; /* Good */
+err:
+    /* TODO: Clean up contents */
+    g_hash_table_destroy(block_name_map);
+    return ret;
+}
+
+static int rsocket_dest_before_ram_iterate(QEMUFile *f, void *opaque,
+                                           uint64_t flags)
+{
+    QEMURsocket *rs = opaque;;
+
+    trace_rsocket_dest_before_ram_iterate(flags);
+    switch (flags) {
+    case RAM_CONTROL_SETUP:
+        /*
+         * Called after we've loaded the list of RAMBlocks from the source and
+         * checked them
+         */
+        return rsocket_dest_ramblock_setup(rs);
+        break;
+        
+    }
+
+    return 0;
+}
+
+static int rsocket_source_before_ram_iterate(QEMUFile *f, void *opaque,
+                                             uint64_t flags)
+{
+    QEMURsocket *rs = opaque;
+
+    trace_rsocket_source_before_ram_iterate(flags);
+    switch (flags) {
+    case RAM_CONTROL_SETUP:
+        /*
+         * Called after we've sent the list of RAMBlocks
+         */
+        return rsocket_source_ramblock_setup(f, rs);
+        break;
+        
+    }
+
+    return 0;
+}
+
+static size_t rsocket_save_page(QEMUFile *f, void *opaque,
+                                ram_addr_t block_offset, ram_addr_t offset,
+                                size_t size, int *bytes_sent)
+{
+    QEMURsocket *rs = opaque;
+    RAMBlockMapEntry *rbme;
+    size_t ret;
+
+    rbme = g_hash_table_lookup(rs->RAMBlockMap, (gpointer)block_offset);
+    if (!rbme) {
+        error_report("Unable to find matching RSocket key for block " 
RAM_ADDR_FMT, block_offset);
+        return -1;
+    }
+    trace_rsocket_save_page(block_offset, offset, rbme->rsocket_offset);
+
+    do {
+        ret=riowrite(rs->rfd, rbme->host_addr+offset, size, 
rbme->rsocket_offset+offset, 0);
+    } while (ret==0 && errno == EAGAIN);
+    if (ret != size) {
+        error_report("riowrite: %s (%zd/%zd)", strerror(errno), size, ret);
+        return -1;
+    }
+    *bytes_sent = size;
+
+    return RAM_SAVE_CONTROL_DELAYED;
+}
+
+
+static const QEMUFileOps rsocket_read_ops = {
+    .get_fd             = rsocket_get_fd,
+    .get_buffer         = rsocket_get_buffer,
+    .close              = rsocket_close,
+    .before_ram_iterate = rsocket_dest_before_ram_iterate,
+};
+
+static const QEMUFileOps rsocket_write_ops = {
+    .writev_buffer      = rsocket_writev_buffer,
+    .close              = rsocket_close,
+    .before_ram_iterate = rsocket_source_before_ram_iterate,
+    .save_page          = rsocket_save_page,
+};
+
+static QEMUFile *qemu_fopen_rsocket(int rfd, const char *mode)
+{
+    QEMURsocket *s;
+
+    trace_qemu_fopen_rsocket(rfd, mode);
+    if (qemu_file_mode_is_not_valid(mode)) {
+        return NULL;
+    }
+
+    s = g_malloc0(sizeof(QEMURsocket));
+    s->rfd = rfd;
+    s->dummy_fd = open("/dev/null", O_RDWR);
+    s->RAMBlockMap = g_hash_table_new(NULL, NULL);
+    if (s->dummy_fd == -1) {
+        error_report("Failed to open dummy_fd (%s)", strerror(errno));
+        goto err;
+    }
+    if (start_yield_thread(s, (mode[0] == 'w')?POLLOUT:POLLIN)) {
+        goto errfd;
+    }
+    if (mode[0] == 'w') {
+        s->file = qemu_fopen_ops(s, &rsocket_write_ops);
+        s->isSource = true;
+    } else {
+        s->file = qemu_fopen_ops(s, &rsocket_read_ops);
+        s->isSource = false;
+        rsocket_set_nonblock(rfd);
+    }
+
+    return s->file;
+
+errfd:
+    close(s->dummy_fd);
+err:
+    rclose(rfd);
+    g_free(s);
+    return NULL;
+}
+
+static void rsocket_have_connect(int rfd, Error *err, void *opaque)
+{
+    MigrationState *s = opaque;
+
+    trace_rsocket_have_connect(rfd);
+
+    if (rfd < 0) {
+        s->file = NULL;
+        migrate_fd_error(s);
+    } else {
+        s->file = qemu_fopen_rsocket(rfd, "wb");
+        migrate_fd_connect(s);
+    }
+}
+
+void rsocket_start_outgoing_migration(MigrationState *s, const char *host_port,
+                                      Error **errp)
+{
+    QemuOpts *opts;
+    InetSocketAddress *addr;
+
+    trace_rsocket_start_outgoing_migration();
+    addr = inet_parse(host_port, errp);
+    if (addr != NULL) {
+        opts = qemu_opts_create(&socket_optslist, NULL, 0, &error_abort);
+        inet_addr_to_opts(opts, addr);
+        qapi_free_InetSocketAddress(addr);
+        rsocket_inet_connect_opts(opts, errp, rsocket_have_connect, s);
+        qemu_opts_del(opts);
+    }
+}
+
+static void rsocket_accept_incoming_migration(void *opaque)
+{
+    struct sockaddr_in addr;
+    socklen_t addrlen = sizeof(addr);
+    int rfd_s = (intptr_t)opaque;
+    int rfd_c;
+    QEMUFile *f;
+    int err;
+
+    trace_rsocket_accept_incoming_migration();
+    do {
+        rfd_c = raccept(rfd_s, (struct sockaddr *)&addr, &addrlen);
+        err = errno;
+    } while (rfd_c < 0 && err == EINTR);
+    rclose(rfd_s);
+
+    if (rfd_c < 0) {
+        error_report("could not accept migration connection (%s)",
+                     strerror(err));
+        return;
+    }
+
+    f = qemu_fopen_rsocket(rfd_c, "rb");
+    if (f == NULL) {
+        error_report("could not qemu_fopen rsocket");
+        goto out;
+    }
+
+    process_incoming_migration(f);
+    return;
+
+out:
+    rclose(rfd_c);
+}
+
+void rsocket_start_incoming_migration(const char *host_port, Error **errp)
+{
+    int rfd;
+
+    trace_rsocket_start_incoming_migration();
+    rfd = rsocket_inet_listen(host_port, SOCK_STREAM, 0, errp);
+    if (rfd < 0) {
+        return;
+    }
+
+    rsocket_set_handler(rfd,POLLIN | POLLERR , "rsocketlisten",
+                        rsocket_accept_incoming_migration,
+                        (void *)(intptr_t)rfd);
+}
diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c
index d404926..9569d17 100644
--- a/qemu-coroutine-io.c
+++ b/qemu-coroutine-io.c
@@ -37,7 +37,8 @@ qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned 
iov_cnt,
     int err;
     while (done < bytes) {
         ret = iov_send_recv(sockfd, iov, iov_cnt,
-                            offset + done, bytes - done, do_send);
+                            offset + done, bytes - done, do_send,
+                            iov_send_recv_fd);
         if (ret > 0) {
             done += ret;
         } else if (ret < 0) {
diff --git a/trace-events b/trace-events
index b5722ea..92a6dff 100644
--- a/trace-events
+++ b/trace-events
@@ -1149,6 +1149,41 @@ vmstate_load_field_error(const char *field, int ret) 
"field \"%s\" load failed,
 # qemu-file.c
 qemu_file_fclose(void) ""
 
+rsocket_handler_fdcallback(void) ""
+rsocket_handler_thread_top(void) ""
+rsocket_handler_thread_after_poll(void) ""
+rsocket_set_handler(void) ""
+rsocket_wait_for_connect(void) ""
+rsocket_inet_connect_addr(void) ""
+rsocket_inet_connect_opts(void) ""
+rsocket_inet_listen_opts(void) ""
+rsocket_inet_listen(void) ""
+rsocket_yield_thread_top(void) ""
+rsocket_yield_thread_requested_exit(void) ""
+rsocket_yield_thread_exit(void) ""
+rsocket_set_nonblock(int rfd) "%d"
+rsocket_start_yield_thread(void) ""
+rsocket_stop_yield_thread(void) ""
+rsocket_yield(void) ""
+rsocket_send(void) ""
+rsocket_writev_buffer(ssize_t size) "%zd"
+rsocket_writev_buffer_end(ssize_t size, ssize_t len) "size=%zd / return %zd"
+rsocket_get_buffer(int size) "size=%d"
+rsocket_get_buffer_exit(int size, ssize_t len) "size=%d return %zd"
+rsocket_close(void) ""
+qemu_fopen_rsocket(int rfd, const char *mode) "rfd %d mode %s"
+rsocket_have_connect(int rfd) "rfd %d"
+rsocket_start_outgoing_migration(void) ""
+rsocket_accept_incoming_migration(void) ""
+rsocket_start_incoming_migration(void) ""
+rsocket_dest_ramblock_reg(const char *block_name, uint64_t block_offset, 
uint64_t rsock_key) "%s %" PRIx64 "->%" PRIx64
+rsocket_source_ramblock_setup(const char *block_name, uint64_t block_offset, 
uint64_t rsock_key) "%s %" PRIx64 "->%" PRIx64
+rsocket_dest_before_ram_iterate(uint64_t flags) "%" PRIx64
+rsocket_save_page(uint64_t block_offset, uint64_t offset, uint64_t 
rsocket_key) "block=%" PRIx64 " offset=%" PRIx64 " rsocket_key=%" PRIx64
+rsocket_source_before_ram_iterate(uint64_t flags) "%" PRIx64
+rsocket_close_RAMBlock_func(void *host_addr, bool isSource) "%p %u"
+rsocket_save_page_riowrite_eagain(uint64_t offset) "%" PRIx64
+
 # arch_init.c
 migration_bitmap_sync_start(void) ""
 migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64""
diff --git a/util/iov.c b/util/iov.c
index 2fb18e6..6dbf62e 100644
--- a/util/iov.c
+++ b/util/iov.c
@@ -88,9 +88,9 @@ size_t iov_size(const struct iovec *iov, const unsigned int 
iov_cnt)
     return len;
 }
 
-/* helper function for iov_send_recv() */
-static ssize_t
-do_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt, bool do_send)
+/* helper function for iov_send_recv() for normal FDs */
+ssize_t iov_send_recv_fd(int sockfd, struct iovec *iov, unsigned iov_cnt,
+                         bool do_send)
 {
 #ifdef CONFIG_POSIX
     ssize_t ret;
@@ -134,8 +134,8 @@ do_send_recv(int sockfd, struct iovec *iov, unsigned 
iov_cnt, bool do_send)
 }
 
 ssize_t iov_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt,
-                      size_t offset, size_t bytes,
-                      bool do_send)
+                      size_t offset, size_t bytes, bool do_send,
+                      iov_send_recv_func helper)
 {
     ssize_t total = 0;
     ssize_t ret;
@@ -174,11 +174,11 @@ ssize_t iov_send_recv(int sockfd, struct iovec *iov, 
unsigned iov_cnt,
             assert(iov[niov].iov_len > tail);
             orig_len = iov[niov].iov_len;
             iov[niov++].iov_len = tail;
-            ret = do_send_recv(sockfd, iov, niov, do_send);
+            ret = helper(sockfd, iov, niov, do_send);
             /* Undo the changes above before checking for errors */
             iov[niov-1].iov_len = orig_len;
         } else {
-            ret = do_send_recv(sockfd, iov, niov, do_send);
+            ret = helper(sockfd, iov, niov, do_send);
         }
         if (offset) {
             iov[0].iov_base -= offset;
diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c
index a76bb3c..2d0fa60 100644
--- a/util/qemu-sockets.c
+++ b/util/qemu-sockets.c
@@ -58,7 +58,7 @@ QemuOptsList socket_optslist = {
     },
 };
 
-static int inet_getport(struct addrinfo *e)
+int inet_getport(struct addrinfo *e)
 {
     struct sockaddr_in *i4;
     struct sockaddr_in6 *i6;
@@ -75,7 +75,7 @@ static int inet_getport(struct addrinfo *e)
     }
 }
 
-static void inet_setport(struct addrinfo *e, int port)
+void inet_setport(struct addrinfo *e, int port)
 {
     struct sockaddr_in *i4;
     struct sockaddr_in6 *i6;
@@ -319,7 +319,7 @@ static int inet_connect_addr(struct addrinfo *addr, bool 
*in_progress,
     return sock;
 }
 
-static struct addrinfo *inet_parse_connect_opts(QemuOpts *opts, Error **errp)
+struct addrinfo *inet_parse_connect_opts(QemuOpts *opts, Error **errp)
 {
     struct addrinfo ai, *res;
     int rc;
@@ -574,7 +574,7 @@ fail:
     return NULL;
 }
 
-static void inet_addr_to_opts(QemuOpts *opts, const InetSocketAddress *addr)
+void inet_addr_to_opts(QemuOpts *opts, const InetSocketAddress *addr)
 {
     bool ipv4 = addr->ipv4 || !addr->has_ipv4;
     bool ipv6 = addr->ipv6 || !addr->has_ipv6;
-- 
2.1.0




reply via email to

[Prev in Thread] Current Thread [Next in Thread]