[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return
From: |
Dr. David Alan Gilbert |
Subject: |
Re: [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return path |
Date: |
Wed, 1 Apr 2015 16:14:05 +0100 |
User-agent: |
Mutt/1.5.23 (2014-03-12) |
* David Gibson (address@hidden) wrote:
> On Fri, Mar 20, 2015 at 06:17:31PM +0000, Dr. David Alan Gilbert wrote:
> > * David Gibson (address@hidden) wrote:
> > > On Wed, Feb 25, 2015 at 04:51:35PM +0000, Dr. David Alan Gilbert (git)
> > > wrote:
> > > > From: "Dr. David Alan Gilbert" <address@hidden>
> > > >
> > > > Open a return path, and handle messages that are received upon it.
> > > >
> > > > Signed-off-by: Dr. David Alan Gilbert <address@hidden>
> > > > ---
> > > > include/migration/migration.h | 8 ++
> > > > migration/migration.c | 178
> > > > +++++++++++++++++++++++++++++++++++++++++-
> > > > trace-events | 13 +++
> > > > 3 files changed, 198 insertions(+), 1 deletion(-)
> > > >
> > > > diff --git a/include/migration/migration.h
> > > > b/include/migration/migration.h
> > > > index 6775747..5242ead 100644
> > > > --- a/include/migration/migration.h
> > > > +++ b/include/migration/migration.h
> > > > @@ -73,6 +73,14 @@ struct MigrationState
> > > >
> > > > int state;
> > > > MigrationParams params;
> > > > +
> > > > + /* State related to return path */
> > > > + struct {
> > > > + QEMUFile *file;
> > > > + QemuThread rp_thread;
> > > > + bool error;
> > > > + } rp_state;
> > > > +
> > > > double mbps;
> > > > int64_t total_time;
> > > > int64_t downtime;
> > > > diff --git a/migration/migration.c b/migration/migration.c
> > > > index 80d234c..34cd4fe 100644
> > > > --- a/migration/migration.c
> > > > +++ b/migration/migration.c
> > > > @@ -237,6 +237,23 @@ MigrationCapabilityStatusList
> > > > *qmp_query_migrate_capabilities(Error **errp)
> > > > return head;
> > > > }
> > > >
> > > > +/*
> > > > + * Return true if we're already in the middle of a migration
> > > > + * (i.e. any of the active or setup states)
> > > > + */
> > > > +static bool migration_already_active(MigrationState *ms)
> > > > +{
> > > > + switch (ms->state) {
> > > > + case MIG_STATE_ACTIVE:
> > > > + case MIG_STATE_SETUP:
> > > > + return true;
> > > > +
> > > > + default:
> > > > + return false;
> > > > +
> > > > + }
> > > > +}
> > > > +
> > > > static void get_xbzrle_cache_stats(MigrationInfo *info)
> > > > {
> > > > if (migrate_use_xbzrle()) {
> > > > @@ -362,6 +379,21 @@ static void migrate_set_state(MigrationState *s,
> > > > int old_state, int new_state)
> > > > }
> > > > }
> > > >
> > > > +static void migrate_fd_cleanup_src_rp(MigrationState *ms)
> > > > +{
> > > > + QEMUFile *rp = ms->rp_state.file;
> > > > +
> > > > + /*
> > > > + * When stuff goes wrong (e.g. failing destination) on the rp, it
> > > > can get
> > > > + * cleaned up from a few threads; make sure not to do it twice in
> > > > parallel
> > > > + */
> > > > + rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL);
> > >
> > > A cmpxchg seems dangerously subtle for such a basic and infrequent
> > > operation, but ok.
> >
> > I'll take other suggestions; but I'm trying to just do
> > 'if the qemu_file still exists close it', and it didn't seem
> > worth introducing another state variable to atomically update
> > when we've already got the file pointer itself.
>
> Yes, I see the rationale. My concern is just that the more atomicity
> mechanisms are scattered through the code, the harder it is to analyze
> and be sure you haven't missed race cases (or introduced then with a
> future change).
>
> In short, I prefer to see a simple-as-possible, and preferably
> documented, consistent overall concurrency scheme for a data
> structure, rather than scattered atomic ops for various variable where
> it's difficult to see how all the pieces might relate together.
>
> > > > + if (rp) {
> > > > + trace_migrate_fd_cleanup_src_rp();
> > > > + qemu_fclose(rp);
> > > > + }
> > > > +}
> > > > +
> > > > static void migrate_fd_cleanup(void *opaque)
> > > > {
> > > > MigrationState *s = opaque;
> > > > @@ -369,6 +401,8 @@ static void migrate_fd_cleanup(void *opaque)
> > > > qemu_bh_delete(s->cleanup_bh);
> > > > s->cleanup_bh = NULL;
> > > >
> > > > + migrate_fd_cleanup_src_rp(s);
> > > > +
> > > > if (s->file) {
> > > > trace_migrate_fd_cleanup();
> > > > qemu_mutex_unlock_iothread();
> > > > @@ -406,6 +440,11 @@ static void migrate_fd_cancel(MigrationState *s)
> > > > QEMUFile *f = migrate_get_current()->file;
> > > > trace_migrate_fd_cancel();
> > > >
> > > > + if (s->rp_state.file) {
> > > > + /* shutdown the rp socket, so causing the rp thread to
> > > > shutdown */
> > > > + qemu_file_shutdown(s->rp_state.file);
> > >
> > > I missed where qemu_file_shutdown() was implemented. Does this
> > > introduce a leftover socket dependency?
> >
> > No, it shouldn't. The shutdown() causes a shutdown(2) syscall to
> > be issued on the socket stopping anything blocking on it; it then
> > gets closed at the end after the rp thread has exited.
>
>
> Sorry, that's not what I meant. I mean is this a hole in the
> abstraction of the QemuFile, because it assumes that what you're
> dealing with here is indeed a socket, rather than something else?
It's just a dependency that we have a shutdown method on the qemu_file
we're using; if it's not a socket then whatever it is, if we're going
to use it for a rp then it needs to implement something equivalent.
> > > > + }
> > > > +
> > > > do {
> > > > old_state = s->state;
> > > > if (old_state != MIG_STATE_SETUP && old_state !=
> > > > MIG_STATE_ACTIVE) {
> > > > @@ -658,8 +697,145 @@ int64_t migrate_xbzrle_cache_size(void)
> > > > return s->xbzrle_cache_size;
> > > > }
> > > >
> > > > -/* migration thread support */
> > > > +/*
> > > > + * Something bad happened to the RP stream, mark an error
> > > > + * The caller shall print something to indicate why
> > > > + */
> > > > +static void source_return_path_bad(MigrationState *s)
> > > > +{
> > > > + s->rp_state.error = true;
> > > > + migrate_fd_cleanup_src_rp(s);
> > > > +}
> > > > +
> > > > +/*
> > > > + * Handles messages sent on the return path towards the source VM
> > > > + *
> > > > + */
> > > > +static void *source_return_path_thread(void *opaque)
> > > > +{
> > > > + MigrationState *ms = opaque;
> > > > + QEMUFile *rp = ms->rp_state.file;
> > > > + uint16_t expected_len, header_len, header_com;
> > > > + const int max_len = 512;
> > > > + uint8_t buf[max_len];
> > > > + uint32_t tmp32;
> > > > + int res;
> > > > +
> > > > + trace_source_return_path_thread_entry();
> > > > + while (rp && !qemu_file_get_error(rp) &&
> > > > + migration_already_active(ms)) {
> > > > + trace_source_return_path_thread_loop_top();
> > > > + header_com = qemu_get_be16(rp);
> > > > + header_len = qemu_get_be16(rp);
> > > > +
> > > > + switch (header_com) {
> > > > + case MIG_RP_CMD_SHUT:
> > > > + case MIG_RP_CMD_PONG:
> > > > + expected_len = 4;
> > >
> > > Could the knowledge of expected lengths be folded into the switch
> > > below? Switching twice on the same thing is a bit icky.
> >
> > No, because the length at this point is used to valdiate the
> > length field in the header prior to reading the body.
> > The other switch processes the contents of the body that
> > have been read.
>
> Ok.
>
> > > > + break;
> > > > +
> > > > + default:
> > > > + error_report("RP: Received invalid cmd 0x%04x length
> > > > 0x%04x",
> > > > + header_com, header_len);
> > > > + source_return_path_bad(ms);
> > > > + goto out;
> > > > + }
> > > >
> > > > + if (header_len > expected_len) {
> > > > + error_report("RP: Received command 0x%04x with"
> > > > + "incorrect length %d expecting %d",
> > > > + header_com, header_len,
> > > > + expected_len);
> > > > + source_return_path_bad(ms);
> > > > + goto out;
> > > > + }
> > > > +
> > > > + /* We know we've got a valid header by this point */
> > > > + res = qemu_get_buffer(rp, buf, header_len);
> > > > + if (res != header_len) {
> > > > + trace_source_return_path_thread_failed_read_cmd_data();
> > > > + source_return_path_bad(ms);
> > > > + goto out;
> > > > + }
> > > > +
> > > > + /* OK, we have the command and the data */
> > > > + switch (header_com) {
> > > > + case MIG_RP_CMD_SHUT:
> > > > + tmp32 = be32_to_cpup((uint32_t *)buf);
> > > > + trace_source_return_path_thread_shut(tmp32);
> > > > + if (tmp32) {
> > > > + error_report("RP: Sibling indicated error %d", tmp32);
> > > > + source_return_path_bad(ms);
> > > > + }
> > > > + /*
> > > > + * We'll let the main thread deal with closing the RP
> > > > + * we could do a shutdown(2) on it, but we're the only user
> > > > + * anyway, so there's nothing gained.
> > > > + */
> > > > + goto out;
> > > > +
> > > > + case MIG_RP_CMD_PONG:
> > > > + tmp32 = be32_to_cpup((uint32_t *)buf);
> > > > + trace_source_return_path_thread_pong(tmp32);
> > > > + break;
> > > > +
> > > > + default:
> > > > + /* This shouldn't happen because we should catch this
> > > > above */
> > > > + trace_source_return_path_bad_header_com();
> > > > + }
> > > > + /* Latest command processed, now leave a gap for the next one
> > > > */
> > > > + header_com = MIG_RP_CMD_INVALID;
> > >
> > > This assignment will always get overwritten.
> >
> > Thanks; gone - it's a left over from an old version.
> >
> > > > + }
> > > > + if (rp && qemu_file_get_error(rp)) {
> > > > + trace_source_return_path_thread_bad_end();
> > > > + source_return_path_bad(ms);
> > > > + }
> > > > +
> > > > + trace_source_return_path_thread_end();
> > > > +out:
> > > > + return NULL;
> > > > +}
> > > > +
> > > > +__attribute__ (( unused )) /* Until later in patch series */
> > > > +static int open_outgoing_return_path(MigrationState *ms)
> > >
> > > Uh.. surely this should be open_incoming_return_path(); it's designed
> > > to be used on the source side, AFAICT.
> > >
> > > > +{
> > > > +
> > > > + ms->rp_state.file = qemu_file_get_return_path(ms->file);
> > > > + if (!ms->rp_state.file) {
> > > > + return -1;
> > > > + }
> > > > +
> > > > + trace_open_outgoing_return_path();
> > > > + qemu_thread_create(&ms->rp_state.rp_thread, "return path",
> > > > + source_return_path_thread, ms,
> > > > QEMU_THREAD_JOINABLE);
> > > > +
> > > > + trace_open_outgoing_return_path_continue();
> > > > +
> > > > + return 0;
> > > > +}
> > > > +
> > > > +__attribute__ (( unused )) /* Until later in patch series */
> > > > +static void await_outgoing_return_path_close(MigrationState *ms)
> > >
> > > Likewise "incoming" here, surely.
> >
> > I've changed those two to open_source_return_path() which seems less
> > ambiguous;
> > that OK?
>
> Uh.. not really, it just moves the ambiguity to a different place (is
> "source return path" the return path *on* the source or *to* the
> source).
>
> Perhaps "open_return_path_on_source" and
> "await_return_path_close_on_source"? I'm not particularly fond of
> those, but they're the best I've come up with yet.
Done.
Dave
>
> --
> David Gibson | I'll have my music baroque, and my code
> david AT gibson.dropbear.id.au | minimalist, thank you. NOT _the_
> _other_
> | _way_ _around_!
> http://www.ozlabs.org/~dgibson
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK
- Re: [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return path,
Dr. David Alan Gilbert <=