qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[RFC PATCH 5/6] job: use global job_mutex to protect struct Job


From: Emanuele Giuseppe Esposito
Subject: [RFC PATCH 5/6] job: use global job_mutex to protect struct Job
Date: Wed, 7 Jul 2021 18:58:12 +0200

This lock is going to replace most of the AioContext locks
in the job and blockjob, so that a Job can run in an arbitrary
AioContext.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 include/block/blockjob_int.h |   1 +
 include/qemu/job.h           |   2 +
 block/backup.c               |   4 +
 block/mirror.c               |  11 +-
 blockdev.c                   |  62 ++++----
 blockjob.c                   |  67 +++++++--
 job-qmp.c                    |  55 +++----
 job.c                        | 284 +++++++++++++++++++++++++++--------
 qemu-img.c                   |  15 +-
 9 files changed, 350 insertions(+), 151 deletions(-)

diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h
index 6633d83da2..8b91126506 100644
--- a/include/block/blockjob_int.h
+++ b/include/block/blockjob_int.h
@@ -53,6 +53,7 @@ struct BlockJobDriver {
      */
     void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
 
+    /* Called with job mutex *not* held. */
     void (*set_speed)(BlockJob *job, int64_t speed);
 };
 
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 4421d08d93..359f4e6b3a 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -49,6 +49,8 @@ typedef struct Job {
     /**
      * The type of this job.
      * Set it in job_create and just read.
+     * All calls to the driver function must be not locked by job_mutex,
+     * to avoid deadlocks.
      */
     const JobDriver *driver;
 
diff --git a/block/backup.c b/block/backup.c
index bd3614ce70..80ce956299 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -315,6 +315,10 @@ static void coroutine_fn backup_pause(Job *job)
     }
 }
 
+/*
+ * Called with job mutex *not* held (we don't want to call block_copy_kick
+ * with the lock held!)
+ */
 static void coroutine_fn backup_set_speed(BlockJob *job, int64_t speed)
 {
     BackupBlockJob *s = container_of(job, BackupBlockJob, common);
diff --git a/block/mirror.c b/block/mirror.c
index 49aaaafffa..deefaa6a39 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -1150,9 +1150,11 @@ static void mirror_complete(Job *job, Error **errp)
     s->should_complete = true;
 
     /* If the job is paused, it will be re-entered when it is resumed */
+    job_lock();
     if (!job_is_paused(job)) {
-        job_enter(job);
+        job_enter_locked(job);
     }
+    job_unlock();
 }
 
 static void coroutine_fn mirror_pause(Job *job)
@@ -1171,10 +1173,13 @@ static bool mirror_drained_poll(BlockJob *job)
      * from one of our own drain sections, to avoid a deadlock waiting for
      * ourselves.
      */
-    if (!job_is_paused(&s->common.job) && !job_is_cancelled(&s->common.job) &&
-        !s->in_drain) {
+    job_lock();
+    if (!job_is_paused(&s->common.job) &&
+        !job_is_cancelled_locked(&s->common.job) && !s->in_drain) {
+        job_unlock();
         return true;
     }
+    job_unlock();
 
     return !!s->in_flight;
 }
diff --git a/blockdev.c b/blockdev.c
index 8e2c15370e..9255aea6a2 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -150,9 +150,11 @@ void blockdev_mark_auto_del(BlockBackend *blk)
             AioContext *aio_context = job_get_aiocontext(&job->job);
             aio_context_acquire(aio_context);
 
+            job_lock();
             job_cancel(&job->job, false);
 
             aio_context_release(aio_context);
+            job_unlock();
         }
     }
 
@@ -3309,48 +3311,44 @@ out:
     aio_context_release(aio_context);
 }
 
-/* Get a block job using its ID and acquire its AioContext */
-static BlockJob *find_block_job(const char *id, AioContext **aio_context,
-                                Error **errp)
+/* Get a block job using its ID and acquire its job_lock */
+static BlockJob *find_block_job(const char *id, Error **errp)
 {
     BlockJob *job;
 
     assert(id != NULL);
 
-    *aio_context = NULL;
-
+    job_lock();
     job = block_job_get(id);
 
     if (!job) {
         error_set(errp, ERROR_CLASS_DEVICE_NOT_ACTIVE,
                   "Block job '%s' not found", id);
+        job_unlock();
         return NULL;
     }
 
-    *aio_context = blk_get_aio_context(job->blk);
-    aio_context_acquire(*aio_context);
-
     return job;
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_set_speed(const char *device, int64_t speed, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
     }
 
     block_job_set_speed(job, speed, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_cancel(const char *device,
                           bool has_force, bool force, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
@@ -3369,13 +3367,13 @@ void qmp_block_job_cancel(const char *device,
     trace_qmp_block_job_cancel(job);
     job_user_cancel(&job->job, force, errp);
 out:
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_pause(const char *device, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
@@ -3383,13 +3381,13 @@ void qmp_block_job_pause(const char *device, Error 
**errp)
 
     trace_qmp_block_job_pause(job);
     job_user_pause(&job->job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_resume(const char *device, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
@@ -3397,13 +3395,13 @@ void qmp_block_job_resume(const char *device, Error 
**errp)
 
     trace_qmp_block_job_resume(job);
     job_user_resume(&job->job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_complete(const char *device, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
@@ -3411,13 +3409,13 @@ void qmp_block_job_complete(const char *device, Error 
**errp)
 
     trace_qmp_block_job_complete(job);
     job_complete(&job->job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_finalize(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(id, &aio_context, errp);
+    BlockJob *job = find_block_job(id, errp);
 
     if (!job) {
         return;
@@ -3427,20 +3425,14 @@ void qmp_block_job_finalize(const char *id, Error 
**errp)
     job_ref(&job->job);
     job_finalize(&job->job, errp);
 
-    /*
-     * Job's context might have changed via job_finalize (and job_txn_apply
-     * automatically acquires the new one), so make sure we release the correct
-     * one.
-     */
-    aio_context = blk_get_aio_context(job->blk);
     job_unref(&job->job);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_dismiss(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *bjob = find_block_job(id, &aio_context, errp);
+    BlockJob *bjob = find_block_job(id, errp);
     Job *job;
 
     if (!bjob) {
@@ -3450,7 +3442,7 @@ void qmp_block_job_dismiss(const char *id, Error **errp)
     trace_qmp_block_job_dismiss(bjob);
     job = &bjob->job;
     job_dismiss(&job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_change_backing_file(const char *device,
diff --git a/blockjob.c b/blockjob.c
index 7f49f03ec7..e7b289089b 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -42,15 +42,16 @@
  * The first includes functions used by the monitor.  The monitor is
  * peculiar in that it accesses the block job list with block_job_get, and
  * therefore needs consistency across block_job_get and the actual operation
- * (e.g. block_job_set_speed).  The consistency is achieved with
- * aio_context_acquire/release.  These functions are declared in blockjob.h.
+ * (e.g. block_job_set_speed).  To achieve this consistency, the caller
+ * calls block_job_lock/block_job_unlock itself around the whole operation.
+ * These functions are declared in blockjob.h.
  *
  * The second includes functions used by the block job drivers and sometimes
- * by the core block layer.  These do not care about locking, because the
- * whole coroutine runs under the AioContext lock, and are declared in
- * blockjob_int.h.
+ * by the core block layer. These delegate the locking to the callee instead,
+ * and are declared in blockjob_int.h.
  */
 
+/* Does not need job_mutex. Value is never modified */
 static bool is_block_job(Job *job)
 {
     return job_type(job) == JOB_TYPE_BACKUP ||
@@ -59,6 +60,7 @@ static bool is_block_job(Job *job)
            job_type(job) == JOB_TYPE_STREAM;
 }
 
+/* Called with job_mutex *not* held. */
 BlockJob *block_job_next(BlockJob *bjob)
 {
     Job *job = bjob ? &bjob->job : NULL;
@@ -70,6 +72,7 @@ BlockJob *block_job_next(BlockJob *bjob)
     return job ? container_of(job, BlockJob, job) : NULL;
 }
 
+/* Called with job_mutex held. */
 BlockJob *block_job_get(const char *id)
 {
     Job *job = job_get(id);
@@ -97,24 +100,31 @@ static char *child_job_get_parent_desc(BdrvChild *c)
     return g_strdup_printf("%s job '%s'", job_type_str(&job->job), 
job->job.id);
 }
 
+/* Called with job_mutex *not* held. */
 static void child_job_drained_begin(BdrvChild *c)
 {
     BlockJob *job = c->opaque;
+    job_lock();
     job_pause(&job->job);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 static bool child_job_drained_poll(BdrvChild *c)
 {
     BlockJob *bjob = c->opaque;
     Job *job = &bjob->job;
     const BlockJobDriver *drv = block_job_driver(bjob);
 
+    job_lock();
     /* An inactive or completed job doesn't have any pending requests. Jobs
      * with !job->busy are either already paused or have a pause point after
      * being reentered, so no job driver code will run before they pause. */
-    if (!job_is_busy(job) || job_is_completed(job)) {
+    if (!job_is_busy(job) || job_is_completed_locked(job)) {
+        job_unlock();
         return false;
     }
+    job_unlock();
 
     /* Otherwise, assume that it isn't fully stopped yet, but allow the job to
      * override this assumption. */
@@ -125,10 +135,13 @@ static bool child_job_drained_poll(BdrvChild *c)
     }
 }
 
+/* Called with job_mutex *not* held. */
 static void child_job_drained_end(BdrvChild *c, int *drained_end_counter)
 {
     BlockJob *job = c->opaque;
+    job_lock();
     job_resume(&job->job);
+    job_unlock();
 }
 
 static bool child_job_can_set_aio_ctx(BdrvChild *c, AioContext *ctx,
@@ -246,11 +259,15 @@ int block_job_add_bdrv(BlockJob *job, const char *name, 
BlockDriverState *bs,
     return 0;
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 static void block_job_on_idle(Notifier *n, void *opaque)
 {
+    job_unlock();
     aio_wait_kick();
+    job_lock();
 }
 
+/* Does not need job_mutex. Value is never modified */
 bool block_job_is_internal(BlockJob *job)
 {
     return (job->job.id == NULL);
@@ -267,6 +284,7 @@ static bool job_timer_pending(Job *job)
     return timer_pending(&job->sleep_timer);
 }
 
+/* Called with job_mutex held. May temporarly release the lock. */
 bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
 {
     const BlockJobDriver *drv = block_job_driver(job);
@@ -286,7 +304,9 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, 
Error **errp)
     job->speed = speed;
 
     if (drv->set_speed) {
+        job_unlock();
         drv->set_speed(job, speed);
+        job_lock();
     }
 
     if (speed && speed <= old_speed) {
@@ -304,6 +324,7 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, 
uint64_t n)
     return ratelimit_calculate_delay(&job->limit, n);
 }
 
+/* Called with block_job_mutex *not* held. */
 BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
 {
     BlockJobInfo *info;
@@ -319,6 +340,7 @@ BlockJobInfo *block_job_query(BlockJob *blkjob, Error 
**errp)
     progress_get_snapshot(&job->progress, &progress_current,
                           &progress_total);
 
+    job_lock();
     info = g_new0(BlockJobInfo, 1);
     info->type      = g_strdup(job_type_str(job));
     info->device    = g_strdup(job->id);
@@ -328,11 +350,11 @@ BlockJobInfo *block_job_query(BlockJob *blkjob, Error 
**errp)
     info->len       = progress_total;
     info->speed     = blkjob->speed;
     info->io_status = blkjob->iostatus;
-    info->ready     = job_is_ready(job);
+    info->ready     = job_is_ready_locked(job);
     info->status    = job_get_status(job);
     info->auto_finalize = job->auto_finalize;
     info->auto_dismiss = job->auto_dismiss;
-    job_ret = job_get_ret(job);
+    job_ret = job_get_ret_locked(job);
     if (job_ret) {
         Error *job_err = job_get_err(job);
         info->has_error = true;
@@ -340,9 +362,11 @@ BlockJobInfo *block_job_query(BlockJob *blkjob, Error 
**errp)
                         g_strdup(error_get_pretty(job_err)) :
                         g_strdup(strerror(-job_ret));
     }
+    job_unlock();
     return info;
 }
 
+/* Called with job_mutex held. */
 static void block_job_iostatus_set_err(BlockJob *job, int error)
 {
     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
@@ -351,6 +375,7 @@ static void block_job_iostatus_set_err(BlockJob *job, int 
error)
     }
 }
 
+/* Called with job_mutex held. */
 static void block_job_event_cancelled(Notifier *n, void *opaque)
 {
     BlockJob *job = opaque;
@@ -370,6 +395,7 @@ static void block_job_event_cancelled(Notifier *n, void 
*opaque)
                                         job->speed);
 }
 
+/* Called with job_mutex held. */
 static void block_job_event_completed(Notifier *n, void *opaque)
 {
     BlockJob *blkjob = opaque;
@@ -381,7 +407,7 @@ static void block_job_event_completed(Notifier *n, void 
*opaque)
         return;
     }
 
-    if (job_get_ret(job) < 0) {
+    if (job_get_ret_locked(job) < 0) {
         msg = error_get_pretty(job_get_err(job));
     }
 
@@ -397,6 +423,7 @@ static void block_job_event_completed(Notifier *n, void 
*opaque)
                                         msg);
 }
 
+/* Called with job_mutex held. */
 static void block_job_event_pending(Notifier *n, void *opaque)
 {
     BlockJob *job = opaque;
@@ -409,6 +436,7 @@ static void block_job_event_pending(Notifier *n, void 
*opaque)
                                       job->job.id);
 }
 
+/* Called with job_mutex held. */
 static void block_job_event_ready(Notifier *n, void *opaque)
 {
     BlockJob *job = opaque;
@@ -430,10 +458,11 @@ static void block_job_event_ready(Notifier *n, void 
*opaque)
 
 
 /*
- * API for block job drivers and the block layer.  These functions are
- * declared in blockjob_int.h.
+ * API for block job drivers and the block layer, who do not know about
+ * job_mutex.  These functions are declared in blockjob_int.h.
  */
 
+/* Called with block_job_mutex *not* held, but temporarly releases it. */
 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
                        JobTxn *txn, BlockDriverState *bs, uint64_t perm,
                        uint64_t shared_perm, int64_t speed, int flags,
@@ -472,6 +501,8 @@ void *block_job_create(const char *job_id, const 
BlockJobDriver *driver,
     job->ready_notifier.notify = block_job_event_ready;
     job->idle_notifier.notify = block_job_on_idle;
 
+    job_lock();
+
     notifier_list_add(&job->job.on_finalize_cancelled,
                       &job->finalize_cancelled_notifier);
     notifier_list_add(&job->job.on_finalize_completed,
@@ -482,7 +513,11 @@ void *block_job_create(const char *job_id, const 
BlockJobDriver *driver,
 
     error_setg(&job->blocker, "block device is in use by block job: %s",
                job_type_str(&job->job));
+
+    job_unlock();
+    /* calls drain and friends, that already take the lock */
     block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
+    job_lock();
 
     bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
 
@@ -493,27 +528,35 @@ void *block_job_create(const char *job_id, const 
BlockJobDriver *driver,
 
     if (!block_job_set_speed(job, speed, errp)) {
         job_early_fail(&job->job);
+        job_unlock();
         return NULL;
     }
 
+    job_unlock();
     return job;
 }
 
+/* Called with job_mutex *not* held. */
 void block_job_iostatus_reset(BlockJob *job)
 {
+    job_lock();
     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
+        job_unlock();
         return;
     }
     assert(job_user_paused(&job->job) && job_should_pause(&job->job));
     job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void block_job_user_resume(Job *job)
 {
     BlockJob *bjob = container_of(job, BlockJob, job);
     block_job_iostatus_reset(bjob);
 }
 
+/* Called with job_mutex *not* held. */
 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
                                         int is_read, int error)
 {
@@ -544,12 +587,14 @@ BlockErrorAction block_job_error_action(BlockJob *job, 
BlockdevOnError on_err,
                                         action);
     }
     if (action == BLOCK_ERROR_ACTION_STOP) {
+        job_lock();
         if (!job_user_paused(&job->job)) {
             job_pause(&job->job);
             /* make the pause user visible, which will be resumed from QMP. */
             job_set_user_paused(&job->job);
         }
         block_job_iostatus_set_err(job, error);
+        job_unlock();
     }
     return action;
 }
diff --git a/job-qmp.c b/job-qmp.c
index 12238a1643..03f3946490 100644
--- a/job-qmp.c
+++ b/job-qmp.c
@@ -29,29 +29,26 @@
 #include "qapi/error.h"
 #include "trace/trace-root.h"
 
-/* Get a job using its ID and acquire its AioContext */
-static Job *find_job(const char *id, AioContext **aio_context, Error **errp)
+/* Get a job using its ID and acquire its job_lock */
+static Job *find_job(const char *id, Error **errp)
 {
     Job *job;
 
-    *aio_context = NULL;
+    job_lock();
 
     job = job_get(id);
     if (!job) {
         error_setg(errp, "Job not found");
+        job_unlock();
         return NULL;
     }
 
-    *aio_context = job_get_aiocontext(job);
-    aio_context_acquire(*aio_context);
-
     return job;
 }
 
 void qmp_job_cancel(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -59,13 +56,12 @@ void qmp_job_cancel(const char *id, Error **errp)
 
     trace_qmp_job_cancel(job);
     job_user_cancel(job, true, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_pause(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -73,13 +69,12 @@ void qmp_job_pause(const char *id, Error **errp)
 
     trace_qmp_job_pause(job);
     job_user_pause(job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_resume(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -87,13 +82,12 @@ void qmp_job_resume(const char *id, Error **errp)
 
     trace_qmp_job_resume(job);
     job_user_resume(job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_complete(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -101,13 +95,12 @@ void qmp_job_complete(const char *id, Error **errp)
 
     trace_qmp_job_complete(job);
     job_complete(job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_finalize(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -117,20 +110,13 @@ void qmp_job_finalize(const char *id, Error **errp)
     job_ref(job);
     job_finalize(job, errp);
 
-    /*
-     * Job's context might have changed via job_finalize (and job_txn_apply
-     * automatically acquires the new one), so make sure we release the correct
-     * one.
-     */
-    aio_context = job_get_aiocontext(job);
     job_unref(job);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_dismiss(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -138,9 +124,10 @@ void qmp_job_dismiss(const char *id, Error **errp)
 
     trace_qmp_job_dismiss(job);
     job_dismiss(&job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex held. */
 static JobInfo *job_query_single(Job *job, Error **errp)
 {
     JobInfo *info;
@@ -175,15 +162,15 @@ JobInfoList *qmp_query_jobs(Error **errp)
 
     for (job = job_next(NULL); job; job = job_next(job)) {
         JobInfo *value;
-        AioContext *aio_context;
 
         if (job_is_internal(job)) {
             continue;
         }
-        aio_context = job_get_aiocontext(job);
-        aio_context_acquire(aio_context);
+
+        job_lock();
         value = job_query_single(job, errp);
-        aio_context_release(aio_context);
+        job_unlock();
+
         if (!value) {
             qapi_free_JobInfoList(head);
             return NULL;
diff --git a/job.c b/job.c
index 48b304c3ff..e2006532b5 100644
--- a/job.c
+++ b/job.c
@@ -93,19 +93,22 @@ static void __attribute__((__constructor__)) job_init(void)
     qemu_mutex_init(&job_mutex);
 }
 
+/* Does not need job_mutex */
 AioContext *job_get_aiocontext(Job *job)
 {
-    return job->aio_context;
+    return qatomic_read(&job->aio_context);
 }
 
+/* Does not need job_mutex */
 void job_set_aiocontext(Job *job, AioContext *aio)
 {
-    job->aio_context = aio;
+    qatomic_set(&job->aio_context, aio);
 }
 
+/* Called with job_mutex held. */
 bool job_is_busy(Job *job)
 {
-    return qatomic_read(&job->busy);
+    return job->busy;
 }
 
 /* Called with job_mutex held. */
@@ -124,59 +127,75 @@ int job_get_ret(Job *job)
     return ret;
 }
 
+/* Called with job_mutex held. */
 Error *job_get_err(Job *job)
 {
     return job->err;
 }
 
+/* Called with job_mutex held. */
 JobStatus job_get_status(Job *job)
 {
     return job->status;
 }
-
+/* Called with job_mutex *not* held. */
 void job_set_cancelled(Job *job, bool cancel)
 {
+    job_lock();
     job->cancelled = cancel;
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 bool job_is_force_cancel(Job *job)
 {
-    return job->force_cancel;
+    bool ret;
+    job_lock();
+    ret = job->force_cancel;
+    job_unlock();
+    return ret;
 }
 
+/* Does not need job_mutex */
 JobTxn *job_txn_new(void)
 {
     JobTxn *txn = g_new0(JobTxn, 1);
     QLIST_INIT(&txn->jobs);
-    txn->refcnt = 1;
+    qatomic_set(&txn->refcnt, 1);
     return txn;
 }
 
+/* Does not need job_mutex */
 static void job_txn_ref(JobTxn *txn)
 {
-    txn->refcnt++;
+    qatomic_inc(&txn->refcnt);
 }
 
+/* Does not need job_mutex */
 void job_txn_unref(JobTxn *txn)
 {
-    if (txn && --txn->refcnt == 0) {
+    if (txn && qatomic_dec_fetch(&txn->refcnt) == 0) {
         g_free(txn);
     }
 }
 
+/* Called with job_mutex *not* held. */
 void job_txn_add_job(JobTxn *txn, Job *job)
 {
     if (!txn) {
         return;
     }
 
+    job_lock();
     assert(!job->txn);
     job->txn = txn;
 
     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+    job_unlock();
     job_txn_ref(txn);
 }
 
+/* Called with job_mutex held. */
 static void job_txn_del_job(Job *job)
 {
     if (job->txn) {
@@ -186,6 +205,7 @@ static void job_txn_del_job(Job *job)
     }
 }
 
+/* Called with job_mutex held. */
 static int job_txn_apply(Job *job, int fn(Job *))
 {
     AioContext *inner_ctx;
@@ -221,11 +241,13 @@ static int job_txn_apply(Job *job, int fn(Job *))
     return rc;
 }
 
+/* Does not need job_mutex */
 bool job_is_internal(Job *job)
 {
     return (job->id == NULL);
 }
 
+/* Called with job_mutex held. */
 static void job_state_transition(Job *job, JobStatus s1)
 {
     JobStatus s0 = job->status;
@@ -241,6 +263,7 @@ static void job_state_transition(Job *job, JobStatus s1)
     }
 }
 
+/* Called with job_mutex held. */
 int job_apply_verb(Job *job, JobVerb verb, Error **errp)
 {
     JobStatus s0 = job->status;
@@ -255,11 +278,13 @@ int job_apply_verb(Job *job, JobVerb verb, Error **errp)
     return -EPERM;
 }
 
+/* Does not need job_mutex. Value is never modified */
 JobType job_type(const Job *job)
 {
     return job->driver->job_type;
 }
 
+/* Does not need job_mutex. Value is never modified */
 const char *job_type_str(const Job *job)
 {
     return JobType_str(job_type(job));
@@ -353,24 +378,34 @@ static bool job_started(Job *job)
     return job->co;
 }
 
+/* Called with job_mutex held. */
 bool job_should_pause(Job *job)
 {
     return job->pause_count > 0;
 }
 
+/* Called with job_mutex held. */
 bool job_is_paused(Job *job)
 {
     return job->paused;
 }
 
+/* Called with job_mutex *not* held. */
 Job *job_next(Job *job)
 {
+    Job *ret;
+    job_lock();
     if (!job) {
-        return QLIST_FIRST(&jobs);
+        ret = QLIST_FIRST(&jobs);
+        job_unlock();
+        return ret;
     }
-    return QLIST_NEXT(job, job_list);
+    ret = QLIST_NEXT(job, job_list);
+    job_unlock();
+    return ret;
 }
 
+/* Called with job_mutex held. */
 Job *job_get(const char *id)
 {
     Job *job;
@@ -388,13 +423,14 @@ Job *job_get(const char *id)
     return NULL;
 }
 
+/* Called with job_mutex *not* held. */
 static void job_sleep_timer_cb(void *opaque)
 {
     Job *job = opaque;
-
     job_enter(job);
 }
 
+/* Called with job_mutex *not* held. */
 void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
                  AioContext *ctx, int flags, BlockCompletionFunc *cb,
                  void *opaque, Error **errp)
@@ -449,6 +485,7 @@ void *job_create(const char *job_id, const JobDriver 
*driver, JobTxn *txn,
                    job_sleep_timer_cb, job);
 
     QLIST_INSERT_HEAD(&jobs, job, job_list);
+    job_unlock();
 
     /* Single jobs are modeled as single-job transactions for sake of
      * consolidating the job management logic */
@@ -463,11 +500,13 @@ void *job_create(const char *job_id, const JobDriver 
*driver, JobTxn *txn,
     return job;
 }
 
+/* Called with job_mutex held. */
 void job_ref(Job *job)
 {
     ++job->refcnt;
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 void job_unref(Job *job)
 {
     if (--job->refcnt == 0) {
@@ -476,7 +515,9 @@ void job_unref(Job *job)
         assert(!job->txn);
 
         if (job->driver->free) {
+            job_unlock();
             job->driver->free(job);
+            job_lock();
         }
 
         QLIST_REMOVE(job, job_list);
@@ -488,46 +529,55 @@ void job_unref(Job *job)
     }
 }
 
+/* API is thread safe */
 void job_progress_update(Job *job, uint64_t done)
 {
     progress_work_done(&job->progress, done);
 }
 
+/* API is thread safe */
 void job_progress_set_remaining(Job *job, uint64_t remaining)
 {
     progress_set_remaining(&job->progress, remaining);
 }
 
+/* API is thread safe */
 void job_progress_increase_remaining(Job *job, uint64_t delta)
 {
     progress_increase_remaining(&job->progress, delta);
 }
 
+/* Called with job_mutex held. */
 void job_event_cancelled(Job *job)
 {
     notifier_list_notify(&job->on_finalize_cancelled, job);
 }
 
+/* Called with job_mutex held. */
 void job_event_completed(Job *job)
 {
     notifier_list_notify(&job->on_finalize_completed, job);
 }
 
+/* Called with job_mutex held. */
 static void job_event_pending(Job *job)
 {
     notifier_list_notify(&job->on_pending, job);
 }
 
+/* Called with job_mutex held. */
 static void job_event_ready(Job *job)
 {
     notifier_list_notify(&job->on_ready, job);
 }
 
+/* Called with job_mutex held. */
 static void job_event_idle(Job *job)
 {
     notifier_list_notify(&job->on_idle, job);
 }
 
+/* Called with job_mutex held, but releases it temporarly. */
 void job_enter_cond(Job *job, bool(*fn)(Job *job))
 {
     if (!job_started(job)) {
@@ -537,14 +587,11 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job))
         return;
     }
 
-    job_lock();
     if (job->busy) {
-        job_unlock();
         return;
     }
 
     if (fn && !fn(job)) {
-        job_unlock();
         return;
     }
 
@@ -552,7 +599,8 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job))
     timer_del(&job->sleep_timer);
     job->busy = true;
     job_unlock();
-    aio_co_enter(job->aio_context, job->co);
+    aio_co_enter(job_get_aiocontext(job), job->co);
+    job_lock();
 }
 
 /* Called with job_mutex held. */
@@ -565,7 +613,7 @@ void job_enter_locked(Job *job)
 void job_enter(Job *job)
 {
     job_lock();
-    job_enter_locked(job, NULL);
+    job_enter_locked(job);
     job_unlock();
 }
 
@@ -574,7 +622,11 @@ void job_enter(Job *job)
  * is allowed and cancels the timer.
  *
  * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
- * called explicitly. */
+ * called explicitly.
+ *
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
 static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
 {
     job_lock();
@@ -587,86 +639,122 @@ static void coroutine_fn job_do_yield(Job *job, uint64_t 
ns)
     qemu_coroutine_yield();
 
     /* Set by job_enter_cond() before re-entering the coroutine.  */
+    job_lock();
     assert(job->busy);
+    job_unlock();
 }
 
+/*
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
 void coroutine_fn job_pause_point(Job *job)
 {
     assert(job && job_started(job));
 
+    job_lock();
     if (!job_should_pause(job)) {
+        job_unlock();
         return;
     }
-    if (job_is_cancelled(job)) {
+    if (job_is_cancelled_locked(job)) {
+        job_unlock();
         return;
     }
 
     if (job->driver->pause) {
+        job_unlock();
         job->driver->pause(job);
+        job_lock();
     }
 
-    if (job_should_pause(job) && !job_is_cancelled(job)) {
+    if (job_should_pause(job) && !job_is_cancelled_locked(job)) {
         JobStatus status = job->status;
         job_state_transition(job, status == JOB_STATUS_READY
                                   ? JOB_STATUS_STANDBY
                                   : JOB_STATUS_PAUSED);
         job->paused = true;
+        job_unlock();
         job_do_yield(job, -1);
+        job_lock();
         job->paused = false;
         job_state_transition(job, status);
     }
+    job_unlock();
 
     if (job->driver->resume) {
         job->driver->resume(job);
     }
 }
 
+/*
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
 void job_yield(Job *job)
 {
+    bool res;
+    job_lock();
     assert(job->busy);
 
     /* Check cancellation *before* setting busy = false, too!  */
-    if (job_is_cancelled(job)) {
+    if (job_is_cancelled_locked(job)) {
+        job_unlock();
         return;
     }
 
-    if (!job_should_pause(job)) {
+    res = job_should_pause(job);
+    job_unlock();
+
+    if (!res) {
         job_do_yield(job, -1);
     }
 
     job_pause_point(job);
 }
 
+/*
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
 {
+    bool res;
+    job_lock();
     assert(job->busy);
 
     /* Check cancellation *before* setting busy = false, too!  */
-    if (job_is_cancelled(job)) {
+    if (job_is_cancelled_locked(job)) {
+        job_unlock();
         return;
     }
 
-    if (!job_should_pause(job)) {
+    res = job_should_pause(job);
+    job_unlock();
+
+    if (!res) {
         job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
     }
 
     job_pause_point(job);
 }
 
-/* Assumes the block_job_mutex is held */
+/* Called with job_mutex held. */
 static bool job_timer_not_pending(Job *job)
 {
     return !timer_pending(&job->sleep_timer);
 }
 
+/* Called with job_mutex held. */
 void job_pause(Job *job)
 {
     job->pause_count++;
     if (!job->paused) {
-        job_enter(job);
+        job_enter_locked(job);
     }
 }
 
+/* Called with job_mutex held. */
 void job_resume(Job *job)
 {
     assert(job->pause_count > 0);
@@ -679,6 +767,7 @@ void job_resume(Job *job)
     job_enter_cond(job, job_timer_not_pending);
 }
 
+/* Called with job_mutex held. */
 void job_user_pause(Job *job, Error **errp)
 {
     if (job_apply_verb(job, JOB_VERB_PAUSE, errp)) {
@@ -692,16 +781,19 @@ void job_user_pause(Job *job, Error **errp)
     job_pause(job);
 }
 
+/* Called with job_mutex held. */
 bool job_user_paused(Job *job)
 {
     return job->user_paused;
 }
 
+/* Called with job_mutex held. */
 void job_set_user_paused(Job *job)
 {
     job->user_paused = true;
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 void job_user_resume(Job *job, Error **errp)
 {
     assert(job);
@@ -713,12 +805,15 @@ void job_user_resume(Job *job, Error **errp)
         return;
     }
     if (job->driver->user_resume) {
+        job_unlock();
         job->driver->user_resume(job);
+        job_lock();
     }
     job->user_paused = false;
     job_resume(job);
 }
 
+/* Called with job_mutex held. */
 static void job_do_dismiss(Job *job)
 {
     assert(job);
@@ -732,6 +827,7 @@ static void job_do_dismiss(Job *job)
     job_unref(job);
 }
 
+/* Called with job_mutex held. */
 void job_dismiss(Job **jobptr, Error **errp)
 {
     Job *job = *jobptr;
@@ -761,9 +857,10 @@ static void job_conclude(Job *job)
     }
 }
 
+/* Called with job_mutex held. */
 static void job_update_rc(Job *job)
 {
-    if (!job->ret && job_is_cancelled(job)) {
+    if (!job->ret && job_is_cancelled_locked(job)) {
         job->ret = -ECANCELED;
     }
     if (job->ret) {
@@ -774,22 +871,25 @@ static void job_update_rc(Job *job)
     }
 }
 
+/* Called with job_mutex *not* held. */
 static void job_commit(Job *job)
 {
-    assert(!job->ret);
+    assert(!job_get_ret(job));
     if (job->driver->commit) {
         job->driver->commit(job);
     }
 }
 
+/* Called with job_mutex *not* held. */
 static void job_abort(Job *job)
 {
-    assert(job->ret);
+    assert(job_get_ret(job));
     if (job->driver->abort) {
         job->driver->abort(job);
     }
 }
 
+/* Called with job_mutex *not* held. */
 static void job_clean(Job *job)
 {
     if (job->driver->clean) {
@@ -797,14 +897,18 @@ static void job_clean(Job *job)
     }
 }
 
+/* Called with job lock held, but it releases it temporarily */
 static int job_finalize_single(Job *job)
 {
-    assert(job_is_completed(job));
+    int ret;
+    assert(job_is_completed_locked(job));
 
     /* Ensure abort is called for late-transactional failures */
     job_update_rc(job);
 
-    if (!job->ret) {
+    ret = job->ret;
+    job_unlock();
+    if (!ret) {
         job_commit(job);
     } else {
         job_abort(job);
@@ -812,12 +916,13 @@ static int job_finalize_single(Job *job)
     job_clean(job);
 
     if (job->cb) {
-        job->cb(job->opaque, job->ret);
+        job->cb(job->opaque, ret);
     }
+    job_lock();
 
     /* Emit events only if we actually started */
     if (job_started(job)) {
-        if (job_is_cancelled(job)) {
+        if (job_is_cancelled_locked(job)) {
             job_event_cancelled(job);
         } else {
             job_event_completed(job);
@@ -829,15 +934,20 @@ static int job_finalize_single(Job *job)
     return 0;
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 static void job_cancel_async(Job *job, bool force)
 {
     if (job->driver->cancel) {
+        job_unlock();
         job->driver->cancel(job, force);
+        job_lock();
     }
     if (job->user_paused) {
         /* Do not call job_enter here, the caller will handle it.  */
         if (job->driver->user_resume) {
+            job_unlock();
             job->driver->user_resume(job);
+            job_lock();
         }
         job->user_paused = false;
         assert(job->pause_count > 0);
@@ -848,27 +958,21 @@ static void job_cancel_async(Job *job, bool force)
     job->force_cancel |= force;
 }
 
+/* Called with job_mutex held. */
 static void job_completed_txn_abort(Job *job)
 {
-    AioContext *outer_ctx = job->aio_context;
     AioContext *ctx;
     JobTxn *txn = job->txn;
     Job *other_job;
 
-    if (txn->aborting) {
+    if (qatomic_cmpxchg(&txn->aborting, false, true)) {
         /*
          * We are cancelled by another job, which will handle everything.
          */
         return;
     }
-    txn->aborting = true;
     job_txn_ref(txn);
 
-    /* We can only hold the single job's AioContext lock while calling
-     * job_finalize_single() because the finalization callbacks can involve
-     * calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */
-    aio_context_release(outer_ctx);
-
     /* Other jobs are effectively cancelled by us, set the status for
      * them; this job, however, may or may not be cancelled, depending
      * on the caller, so leave it. */
@@ -884,33 +988,39 @@ static void job_completed_txn_abort(Job *job)
         other_job = QLIST_FIRST(&txn->jobs);
         ctx = other_job->aio_context;
         aio_context_acquire(ctx);
-        if (!job_is_completed(other_job)) {
-            assert(job_is_cancelled(other_job));
+        if (!job_is_completed_locked(other_job)) {
+            assert(job_is_cancelled_locked(other_job));
             job_finish_sync(other_job, NULL, NULL);
         }
         job_finalize_single(other_job);
         aio_context_release(ctx);
     }
 
-    aio_context_acquire(outer_ctx);
-
     job_txn_unref(txn);
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 static int job_prepare(Job *job)
 {
+    int ret;
+
     if (job->ret == 0 && job->driver->prepare) {
-        job->ret = job->driver->prepare(job);
+        job_unlock();
+        ret = job->driver->prepare(job);
+        job_lock();
+        job->ret = ret;
         job_update_rc(job);
     }
     return job->ret;
 }
 
+/* Does not need job_mutex */
 static int job_needs_finalize(Job *job)
 {
     return !job->auto_finalize;
 }
 
+/* Called with job_mutex held. */
 static void job_do_finalize(Job *job)
 {
     int rc;
@@ -925,6 +1035,7 @@ static void job_do_finalize(Job *job)
     }
 }
 
+/* Called with job_mutex held. */
 void job_finalize(Job *job, Error **errp)
 {
     assert(job && job->id);
@@ -934,6 +1045,7 @@ void job_finalize(Job *job, Error **errp)
     job_do_finalize(job);
 }
 
+/* Called with job_mutex held. */
 static int job_transition_to_pending(Job *job)
 {
     job_state_transition(job, JOB_STATUS_PENDING);
@@ -943,17 +1055,22 @@ static int job_transition_to_pending(Job *job)
     return 0;
 }
 
+/* Called with job_mutex *not* held. */
 void job_transition_to_ready(Job *job)
 {
+    job_lock();
     job_state_transition(job, JOB_STATUS_READY);
     job_event_ready(job);
+    job_unlock();
 }
 
+/* Called with job_mutex held. */
 static void job_completed_txn_success(Job *job)
 {
-    JobTxn *txn = job->txn;
+    JobTxn *txn;
     Job *other_job;
 
+    txn = job->txn;
     job_state_transition(job, JOB_STATUS_WAITING);
 
     /*
@@ -961,7 +1078,7 @@ static void job_completed_txn_success(Job *job)
      * txn.
      */
     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
-        if (!job_is_completed(other_job)) {
+        if (!job_is_completed_locked(other_job)) {
             return;
         }
         assert(other_job->ret == 0);
@@ -975,9 +1092,10 @@ static void job_completed_txn_success(Job *job)
     }
 }
 
+/* Called with job_mutex held. */
 static void job_completed(Job *job)
 {
-    assert(job && job->txn && !job_is_completed(job));
+    assert(job && job->txn && !job_is_completed_locked(job));
 
     job_update_rc(job);
     trace_job_completed(job, job->ret);
@@ -988,14 +1106,16 @@ static void job_completed(Job *job)
     }
 }
 
-/** Useful only as a type shim for aio_bh_schedule_oneshot. */
+/**
+ * Useful only as a type shim for aio_bh_schedule_oneshot.
+ *  Called with job_mutex *not* held.
+ */
 static void job_exit(void *opaque)
 {
     Job *job = (Job *)opaque;
-    AioContext *ctx;
 
+    job_lock();
     job_ref(job);
-    aio_context_acquire(job->aio_context);
 
     /* This is a lie, we're not quiescent, but still doing the completion
      * callbacks. However, completion callbacks tend to involve operations that
@@ -1012,29 +1132,40 @@ static void job_exit(void *opaque)
      * acquiring the new lock, and we ref/unref to avoid job_completed freeing
      * the job underneath us.
      */
-    ctx = job->aio_context;
     job_unref(job);
-    aio_context_release(ctx);
+    job_unlock();
 }
 
 /**
  * All jobs must allow a pause point before entering their job proper. This
  * ensures that jobs can be paused prior to being started, then resumed later.
+ *
+ * Called with job_mutex *not* held.
  */
 static void coroutine_fn job_co_entry(void *opaque)
 {
     Job *job = opaque;
+    Error *local_error = NULL;
+    int ret;
 
     assert(job && job->driver && job->driver->run);
     job_pause_point(job);
-    job->ret = job->driver->run(job, &job->err);
+    ret = job->driver->run(job, &local_error);
+    job_lock();
+    if (local_error) {
+        error_propagate(&job->err, local_error);
+    }
+    job->ret = ret;
     job->deferred_to_main_loop = true;
     job->busy = true;
+    job_unlock();
     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
 }
 
+/* Called with job_mutex *not* held. */
 void job_start(Job *job)
 {
+    job_lock();
     assert(job && !job_started(job) && job->paused &&
            job->driver && job->driver->run);
     job->co = qemu_coroutine_create(job_co_entry, job);
@@ -1042,9 +1173,11 @@ void job_start(Job *job)
     job->busy = true;
     job->paused = false;
     job_state_transition(job, JOB_STATUS_RUNNING);
-    aio_co_enter(job->aio_context, job->co);
+    job_unlock();
+    aio_co_enter(job_get_aiocontext(job), job->co);
 }
 
+/* Called with job_mutex held. */
 void job_cancel(Job *job, bool force)
 {
     if (job->status == JOB_STATUS_CONCLUDED) {
@@ -1057,10 +1190,11 @@ void job_cancel(Job *job, bool force)
     } else if (job->deferred_to_main_loop) {
         job_completed_txn_abort(job);
     } else {
-        job_enter(job);
+        job_enter_locked(job);
     }
 }
 
+/* Called with job_mutex held. */
 void job_user_cancel(Job *job, bool force, Error **errp)
 {
     if (job_apply_verb(job, JOB_VERB_CANCEL, errp)) {
@@ -1069,19 +1203,36 @@ void job_user_cancel(Job *job, bool force, Error **errp)
     job_cancel(job, force);
 }
 
-/* A wrapper around job_cancel() taking an Error ** parameter so it may be
+/*
+ * A wrapper around job_cancel() taking an Error ** parameter so it may be
  * used with job_finish_sync() without the need for (rather nasty) function
- * pointer casts there. */
+ * pointer casts there.
+ *
+ * Called with job_mutex held.
+ */
 static void job_cancel_err(Job *job, Error **errp)
 {
     job_cancel(job, false);
 }
 
+/*
+ * Called with job_mutex *not* held, unlike most other APIs consumed
+ * by the monitor!
+ */
 int job_cancel_sync(Job *job)
 {
-    return job_finish_sync(job, &job_cancel_err, NULL);
+    int ret;
+
+    job_lock();
+    ret = job_finish_sync(job, &job_cancel_err, NULL);
+    job_unlock();
+    return ret;
 }
 
+/*
+ * Called with job_mutex *not* held, unlike most other APIs consumed
+ * by the monitor!
+ */
 void job_cancel_sync_all(void)
 {
     Job *job;
@@ -1095,11 +1246,13 @@ void job_cancel_sync_all(void)
     }
 }
 
+/* Called with job_mutex held. */
 int job_complete_sync(Job *job, Error **errp)
 {
     return job_finish_sync(job, job_complete, errp);
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 void job_complete(Job *job, Error **errp)
 {
     /* Should not be reachable via external interface for internal jobs */
@@ -1107,15 +1260,18 @@ void job_complete(Job *job, Error **errp)
     if (job_apply_verb(job, JOB_VERB_COMPLETE, errp)) {
         return;
     }
-    if (job_is_cancelled(job) || !job->driver->complete) {
+    if (job_is_cancelled_locked(job) || !job->driver->complete) {
         error_setg(errp, "The active block job '%s' cannot be completed",
                    job->id);
         return;
     }
 
+    job_unlock();
     job->driver->complete(job, errp);
+    job_lock();
 }
 
+/* Called with job_mutex held. */
 int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error 
**errp)
 {
     Error *local_err = NULL;
@@ -1132,10 +1288,12 @@ int job_finish_sync(Job *job, void (*finish)(Job *, 
Error **errp), Error **errp)
         return -EBUSY;
     }
 
-    AIO_WAIT_WHILE(job->aio_context,
-                   (job_enter(job), !job_is_completed(job)));
+    job_unlock();
+    AIO_WAIT_WHILE(NULL, (job_enter(job), !job_is_completed(job)));
+    job_lock();
 
-    ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
+    ret = (job_is_cancelled_locked(job) && job->ret == 0) ?
+           -ECANCELED : job->ret;
     job_unref(job);
     return ret;
 }
diff --git a/qemu-img.c b/qemu-img.c
index d16bd367d9..82debde038 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -898,17 +898,19 @@ static void common_block_job_cb(void *opaque, int ret)
     }
 }
 
+/* Called with job_mutex held. Releases it temporarly */
 static void run_block_job(BlockJob *job, Error **errp)
 {
     uint64_t progress_current, progress_total;
     AioContext *aio_context = blk_get_aio_context(job->blk);
     int ret = 0;
 
-    aio_context_acquire(aio_context);
     job_ref(&job->job);
     do {
         float progress = 0.0f;
+        job_unlock();
         aio_poll(aio_context, true);
+        job_lock();
 
         progress_get_snapshot(&job->job.progress, &progress_current,
                               &progress_total);
@@ -916,15 +918,15 @@ static void run_block_job(BlockJob *job, Error **errp)
             progress = (float)progress_current / progress_total * 100.f;
         }
         qemu_progress_print(progress, 0);
-    } while (!job_is_ready(&job->job) && !job_is_completed(&job->job));
+    } while (!job_is_ready_locked(&job->job) &&
+             !job_is_completed_locked(&job->job));
 
-    if (!job_is_completed(&job->job)) {
+    if (!job_is_completed_locked(&job->job)) {
         ret = job_complete_sync(&job->job, errp);
     } else {
-        ret = job_get_ret(&job->job);
+        ret = job_get_ret_locked(&job->job);
     }
     job_unref(&job->job);
-    aio_context_release(aio_context);
 
     /* publish completion progress only when success */
     if (!ret) {
@@ -1076,9 +1078,12 @@ static int img_commit(int argc, char **argv)
         bdrv_ref(bs);
     }
 
+    job_lock();
     job = block_job_get("commit");
     assert(job);
     run_block_job(job, &local_err);
+    job_unlock();
+
     if (local_err) {
         goto unref_backing;
     }
-- 
2.31.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]