qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v4 09/14] block: add block job transactions


From: Max Reitz
Subject: Re: [Qemu-devel] [PATCH v4 09/14] block: add block job transactions
Date: Mon, 3 Aug 2015 20:06:47 +0200
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.1.0

On 30.07.2015 08:39, Fam Zheng wrote:
Sometimes block jobs must execute as a transaction group.  Finishing
jobs wait until all other jobs are ready to complete successfully.
Failure or cancellation of one job cancels the other jobs in the group.

Signed-off-by: Stefan Hajnoczi <address@hidden>
[Rewrite the implementation which is now contained in block_job_completed.
--Fam]
Signed-off-by: Fam Zheng <address@hidden>
---
  blockjob.c               | 115 ++++++++++++++++++++++++++++++++++++++++++++++-
  include/block/block.h    |   1 +
  include/block/blockjob.h |  29 ++++++++++++
  3 files changed, 143 insertions(+), 2 deletions(-)

diff --git a/blockjob.c b/blockjob.c
index 36c18e0..c78ad0a 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -36,6 +36,16 @@
  #include "qemu/timer.h"
  #include "qapi-event.h"

+/* Transactional group of block jobs */
+struct BlockJobTxn {
+
+    /* Is this txn being cancelled? */
+    bool aborting;
+
+    /* List of jobs */
+    QLIST_HEAD(, BlockJob) jobs;
+};
+
  void *block_job_create(const BlockJobDriver *driver, BlockDriverState *bs,
                         int64_t speed, BlockCompletionFunc *cb,
                         void *opaque, Error **errp)
@@ -90,6 +100,83 @@ void block_job_unref(BlockJob *job)
      }
  }

+static void block_job_completed_single(BlockJob *job)
+{
+    if (!job->ret) {
+        if (job->driver->commit) {
+            job->driver->commit(job);
+        }
+    } else {
+        if (job->driver->abort) {
+            job->driver->abort(job);
+        }
+    }
+    job->cb(job->opaque, job->ret);
+    block_job_unref(job);
+}
+
+static void block_job_completed_txn_abort(BlockJob *job)
+{
+    AioContext *ctx;
+    BlockJobTxn *txn = job->txn;
+    BlockJob *other_job, *next;
+
+    if (txn->aborting) {
+        /*
+         * We are cancelled by another job, who will handle everything.

s/who/which/

+         */
+        return;
+    }
+    txn->aborting = true;
+    /* We are the first failed job. Cancel other jobs. */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        ctx = bdrv_get_aio_context(other_job->bs);
+        aio_context_acquire(ctx);
+    }
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        if (other_job == job || other_job->completed) {
+            /* Other are "effectively" cancelled by us, so set the status for

s/Other/Other jobs/?

+             * them; this job, however, may or may not be cancelled, depending
+             * on the caller, so leave it. */
+            other_job->cancelled = other_job != job;

If other_job == job, this is not left as it was, but set to false.

+            continue;
+        }
+        block_job_cancel_sync(other_job);
+        assert(other_job->completed);
+    }
+    QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
+        ctx = bdrv_get_aio_context(other_job->bs);
+        block_job_completed_single(other_job);
+        aio_context_release(ctx);
+    }
+    g_free(txn);
+}
+
+static void block_job_completed_txn_success(BlockJob *job)
+{
+    AioContext *ctx;
+    BlockJobTxn *txn = job->txn;
+    BlockJob *other_job, *next;
+    /*
+     * Successful completion, see if there are other running jobs in this
+     * txn.
+     **/
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        if (!other_job->completed) {
+            return;
+        }
+    }
+    /* We are the last completed job, commit the transaction. */
+    QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
+        ctx = bdrv_get_aio_context(other_job->bs);
+        aio_context_acquire(ctx);
+        assert(other_job->ret == 0);
+        block_job_completed_single(other_job);
+        aio_context_release(ctx);
+    }
+    g_free(txn);
+}
+
  void block_job_completed(BlockJob *job, int ret)
  {
      BlockDriverState *bs = job->bs;
@@ -98,8 +185,13 @@ void block_job_completed(BlockJob *job, int ret)
      assert(!job->completed);
      job->completed = true;
      job->ret = ret;
-    job->cb(job->opaque, ret);
-    block_job_unref(job);
+    if (!job->txn) {
+        block_job_completed_single(job);
+    } else if (ret < 0 || block_job_is_cancelled(job)) {
+        block_job_completed_txn_abort(job);
+    } else {
+        block_job_completed_txn_success(job);
+    }
  }

  void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
@@ -398,3 +490,22 @@ void block_job_defer_to_main_loop(BlockJob *job,

      qemu_bh_schedule(data->bh);
  }
+
+BlockJobTxn *block_job_txn_new(void)
+{
+    BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
+    QLIST_INIT(&txn->jobs);
+    return txn;
+}
+
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
+{
+    if (!txn) {
+        return;
+    }
+
+    assert(!job->txn);
+    job->txn = txn;
+
+    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+}
diff --git a/include/block/block.h b/include/block/block.h
index b1116e9..b7e06a5 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -14,6 +14,7 @@ typedef struct BlockDriver BlockDriver;
  typedef struct BlockJob BlockJob;
  typedef struct BdrvChild BdrvChild;
  typedef struct BdrvChildRole BdrvChildRole;
+typedef struct BlockJobTxn BlockJobTxn;

  typedef struct BlockDriverInfo {
      /* in bytes, 0 if irrelevant */
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index f6e4c86..c6b1bbb 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -152,6 +152,9 @@ struct BlockJob {
       */
      int ret;

+    /** Non-NULL if this job is part of a transaction */
+    BlockJobTxn *txn;
+    QLIST_ENTRY(BlockJob) txn_list;
  };

  /**
@@ -395,4 +398,30 @@ void block_job_defer_to_main_loop(BlockJob *job,
                                    BlockJobDeferToMainLoopFn *fn,
                                    void *opaque);

+/**
+ * block_job_txn_new:
+ *
+ * Allocate and return a new block job transaction.  Jobs can be added to the
+ * transaction using block_job_txn_add_job().
+ *
+ * The transaction is automatically freed when the last job completes or is
+ * cancelled.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group.  Jobs wait for each other before completing.  Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+BlockJobTxn *block_job_txn_new(void);
+
+/**
+ * block_job_txn_add_job:
+ * @txn: The transaction (may be NULL)
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction.  The @job must not already be in a transaction.
+ * The block job driver must call block_job_txn_prepare_to_complete() before
+ * final cleanup and completion.

I see this function mentioned only here, only in this patch. That doesn't seem right to me.

Max

+ */
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
+
  #endif





reply via email to

[Prev in Thread] Current Thread [Next in Thread]