[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work
From: |
Dr. David Alan Gilbert |
Subject: |
Re: [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work |
Date: |
Wed, 15 Feb 2017 14:46:15 +0000 |
User-agent: |
Mutt/1.7.1 (2016-10-04) |
* Daniel P. Berrange (address@hidden) wrote:
> On Mon, Jan 23, 2017 at 10:32:13PM +0100, Juan Quintela wrote:
> > We create new channels for each new thread created. We only send through
> > them a character to be sure that we are creating the channels in the
> > right order.
> >
> > Note: Reference count/freeing of channels is not done
> >
> > Signed-off-by: Juan Quintela <address@hidden>
> > ---
> > include/migration/migration.h | 6 +++++
> > migration/ram.c | 45 +++++++++++++++++++++++++++++++++-
> > migration/socket.c | 56
> > +++++++++++++++++++++++++++++++++++++++++--
>
> BTW, right now libvirt never uses QEMU's tcp: protocol - it does everything
> with the fd: protocol. So either we need multi-fd support for fd: protocol,
> or libvirt needs to switch to use tcp:
I thought using fd was safer than tcp: because of the race when something else
could listen on the proposed port on the incoming side between the point of
libvirt
picking the port number and qemu starting.
> In fact, having said that, we're going to have to switch to use the tcp:
> protocol anyway in order to support TLS, so this is just another good
> reason for the switch.
I thought you had a way of allowing fd to work for TLS?
Dave
>
> We avoided tcp: in the past because QEMU was incapable of reporting error
> messages when the connection failed. That's fixed since
>
> commit d59ce6f34434bf47a9b26138c908650bf9a24be1
> Author: Daniel P. Berrange <address@hidden>
> Date: Wed Apr 27 11:05:00 2016 +0100
>
> migration: add reporting of errors for outgoing migration
>
> so libvirt should be ok to use tcp: now.
>
> > 3 files changed, 104 insertions(+), 3 deletions(-)
> >
> > diff --git a/include/migration/migration.h b/include/migration/migration.h
> > index f119ba0..3989bd6 100644
> > --- a/include/migration/migration.h
> > +++ b/include/migration/migration.h
> > @@ -22,6 +22,7 @@
> > #include "qapi-types.h"
> > #include "exec/cpu-common.h"
> > #include "qemu/coroutine_int.h"
> > +#include "io/channel.h"
> >
> > #define QEMU_VM_FILE_MAGIC 0x5145564d
> > #define QEMU_VM_FILE_VERSION_COMPAT 0x00000002
> > @@ -218,6 +219,11 @@ void tcp_start_incoming_migration(const char
> > *host_port, Error **errp);
> >
> > void tcp_start_outgoing_migration(MigrationState *s, const char
> > *host_port, Error **errp);
> >
> > +QIOChannel *socket_recv_channel_create(void);
> > +int socket_recv_channel_destroy(QIOChannel *recv);
> > +QIOChannel *socket_send_channel_create(void);
> > +int socket_send_channel_destroy(QIOChannel *send);
> > +
> > void unix_start_incoming_migration(const char *path, Error **errp);
> >
> > void unix_start_outgoing_migration(MigrationState *s, const char *path,
> > Error **errp);
> > diff --git a/migration/ram.c b/migration/ram.c
> > index 939f364..5ad7cb3 100644
> > --- a/migration/ram.c
> > +++ b/migration/ram.c
> > @@ -386,9 +386,11 @@ void migrate_compress_threads_create(void)
> >
> > struct MultiFDSendParams {
> > QemuThread thread;
> > + QIOChannel *c;
> > QemuCond cond;
> > QemuMutex mutex;
> > bool quit;
> > + bool started;
> > };
> > typedef struct MultiFDSendParams MultiFDSendParams;
> >
> > @@ -397,6 +399,13 @@ static MultiFDSendParams *multifd_send;
> > static void *multifd_send_thread(void *opaque)
> > {
> > MultiFDSendParams *params = opaque;
> > + char start = 's';
> > +
> > + qio_channel_write(params->c, &start, 1, &error_abort);
> > + qemu_mutex_lock(¶ms->mutex);
> > + params->started = true;
> > + qemu_cond_signal(¶ms->cond);
> > + qemu_mutex_unlock(¶ms->mutex);
> >
> > qemu_mutex_lock(¶ms->mutex);
> > while (!params->quit){
> > @@ -433,6 +442,7 @@ void migrate_multifd_send_threads_join(void)
> > qemu_thread_join(&multifd_send[i].thread);
> > qemu_mutex_destroy(&multifd_send[i].mutex);
> > qemu_cond_destroy(&multifd_send[i].cond);
> > + socket_send_channel_destroy(multifd_send[i].c);
> > }
> > g_free(multifd_send);
> > multifd_send = NULL;
> > @@ -452,18 +462,31 @@ void migrate_multifd_send_threads_create(void)
> > qemu_mutex_init(&multifd_send[i].mutex);
> > qemu_cond_init(&multifd_send[i].cond);
> > multifd_send[i].quit = false;
> > + multifd_send[i].started = false;
> > + multifd_send[i].c = socket_send_channel_create();
> > + if(!multifd_send[i].c) {
> > + error_report("Error creating a send channel");
> > + exit(0);
> > + }
> > snprintf(thread_name, 15, "multifd_send_%d", i);
> > qemu_thread_create(&multifd_send[i].thread, thread_name,
> > multifd_send_thread, &multifd_send[i],
> > QEMU_THREAD_JOINABLE);
> > + qemu_mutex_lock(&multifd_send[i].mutex);
> > + while (!multifd_send[i].started) {
> > + qemu_cond_wait(&multifd_send[i].cond, &multifd_send[i].mutex);
> > + }
> > + qemu_mutex_unlock(&multifd_send[i].mutex);
> > }
> > }
> >
> > struct MultiFDRecvParams {
> > QemuThread thread;
> > + QIOChannel *c;
> > QemuCond cond;
> > QemuMutex mutex;
> > bool quit;
> > + bool started;
> > };
> > typedef struct MultiFDRecvParams MultiFDRecvParams;
> >
> > @@ -472,7 +495,14 @@ static MultiFDRecvParams *multifd_recv;
> > static void *multifd_recv_thread(void *opaque)
> > {
> > MultiFDRecvParams *params = opaque;
> > -
> > + char start;
> > +
> > + qio_channel_read(params->c, &start, 1, &error_abort);
> > + qemu_mutex_lock(¶ms->mutex);
> > + params->started = true;
> > + qemu_cond_signal(¶ms->cond);
> > + qemu_mutex_unlock(¶ms->mutex);
> > +
> > qemu_mutex_lock(¶ms->mutex);
> > while (!params->quit){
> > qemu_cond_wait(¶ms->cond, ¶ms->mutex);
> > @@ -508,6 +538,7 @@ void migrate_multifd_recv_threads_join(void)
> > qemu_thread_join(&multifd_recv[i].thread);
> > qemu_mutex_destroy(&multifd_recv[i].mutex);
> > qemu_cond_destroy(&multifd_recv[i].cond);
> > + socket_send_channel_destroy(multifd_recv[i].c);
> > }
> > g_free(multifd_recv);
> > multifd_recv = NULL;
> > @@ -526,9 +557,21 @@ void migrate_multifd_recv_threads_create(void)
> > qemu_mutex_init(&multifd_recv[i].mutex);
> > qemu_cond_init(&multifd_recv[i].cond);
> > multifd_recv[i].quit = false;
> > + multifd_recv[i].started = false;
> > + multifd_recv[i].c = socket_recv_channel_create();
> > +
> > + if(!multifd_recv[i].c) {
> > + error_report("Error creating a recv channel");
> > + exit(0);
> > + }
> > qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
> > multifd_recv_thread, &multifd_recv[i],
> > QEMU_THREAD_JOINABLE);
> > + qemu_mutex_lock(&multifd_recv[i].mutex);
> > + while (!multifd_recv[i].started) {
> > + qemu_cond_wait(&multifd_recv[i].cond, &multifd_recv[i].mutex);
> > + }
> > + qemu_mutex_unlock(&multifd_recv[i].mutex);
> > }
> > }
> >
> > diff --git a/migration/socket.c b/migration/socket.c
> > index 11f80b1..7cd9213 100644
> > --- a/migration/socket.c
> > +++ b/migration/socket.c
> > @@ -24,6 +24,54 @@
> > #include "io/channel-socket.h"
> > #include "trace.h"
> >
> > +struct SocketArgs {
> > + QIOChannelSocket *ioc;
> > + SocketAddress *saddr;
> > + Error **errp;
> > +} socket_args;
>
> Passing data from one method to another indirectly via this random
> global var feels rather dirty, since two different pairs of methods
> are both using the same global var. It happens to be ok since one
> pair of methods is only ever called on the target, and one pair is
> only ever called on the source. It is recipe for future unpleasant
> surprises though, so I think this needs rethinking.
>
> > +QIOChannel *socket_recv_channel_create(void)
> > +{
> > + QIOChannelSocket *sioc;
> > + Error *err = NULL;
> > +
> > + sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
> > + &err);
> > + if (!sioc) {
> > + error_report("could not accept migration connection (%s)",
> > + error_get_pretty(err));
> > + return NULL;
> > + }
> > + return QIO_CHANNEL(sioc);
> > +}
> > +
> > +int socket_recv_channel_destroy(QIOChannel *recv)
> > +{
> > + // Remove channel
> > + object_unref(OBJECT(send));
> > + return 0;
> > +}
> > +
> > +QIOChannel *socket_send_channel_create(void)
> > +{
> > + QIOChannelSocket *sioc = qio_channel_socket_new();
> > +
> > + qio_channel_socket_connect_sync(sioc, socket_args.saddr,
> > + socket_args.errp);
> > + qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> > + return QIO_CHANNEL(sioc);
> > +}
> > +
> > +int socket_send_channel_destroy(QIOChannel *send)
> > +{
> > + // Remove channel
> > + object_unref(OBJECT(send));
> > + if (socket_args.saddr) {
> > + qapi_free_SocketAddress(socket_args.saddr);
> > + socket_args.saddr = NULL;
> > + }
> > + return 0;
> > +}
> >
> > static SocketAddress *tcp_build_address(const char *host_port, Error
> > **errp)
> > {
> > @@ -96,6 +144,10 @@ static void
> > socket_start_outgoing_migration(MigrationState *s,
> > struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
> >
> > data->s = s;
> > +
> > + socket_args.saddr = saddr;
> > + socket_args.errp = errp;
> > +
> > if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
> > data->hostname = g_strdup(saddr->u.inet.data->host);
> > }
> > @@ -106,7 +158,6 @@ static void
> > socket_start_outgoing_migration(MigrationState *s,
> > socket_outgoing_migration,
> > data,
> > socket_connect_data_free);
> > - qapi_free_SocketAddress(saddr);
> > }
> >
> > void tcp_start_outgoing_migration(MigrationState *s,
> > @@ -154,7 +205,7 @@ static gboolean
> > socket_accept_incoming_migration(QIOChannel *ioc,
> >
> > out:
> > /* Close listening socket as its no longer needed */
> > - qio_channel_close(ioc, NULL);
> > +// qio_channel_close(ioc, NULL);
> > return FALSE; /* unregister */
> > }
>
> If you changed this to return TRUE, then this existing code would be
> automatically invoked when the client makes its 2nd, 3rd, etc
> connection. You'd just have to put some logic in
> migration_channel_process_incoming to take different behaviour when
> seeing the 1st vs the additional connections.
>
>
> >
> > @@ -163,6 +214,7 @@ static void
> > socket_start_incoming_migration(SocketAddress *saddr,
> > Error **errp)
> > {
> > QIOChannelSocket *listen_ioc = qio_channel_socket_new();
> > + socket_args.ioc = listen_ioc;
> >
> > qio_channel_set_name(QIO_CHANNEL(listen_ioc),
> > "migration-socket-listener");
>
>
>
> Regards,
> Daniel
> --
> |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :|
> |: http://libvirt.org -o- http://virt-manager.org :|
> |: http://entangle-photo.org -o- http://search.cpan.org/~danberr/ :|
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK