qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 16/26] FVD: add impl for buffered journal updates


From: Chunqiang Tang
Subject: [Qemu-devel] [PATCH 16/26] FVD: add impl for buffered journal updates
Date: Fri, 25 Feb 2011 17:37:56 -0500

This patch is part of the Fast Virtual Disk (FVD) proposal.
See http://wiki.qemu.org/Features/FVD.

This patch enhances FVD's journal with the capability of buffering
multiple metadata updates and sending them to the journal in a single write.

Signed-off-by: Chunqiang Tang <address@hidden>
---
 block/fvd-journal-buf.c |  336 ++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 333 insertions(+), 3 deletions(-)

diff --git a/block/fvd-journal-buf.c b/block/fvd-journal-buf.c
index 3efdd47..b4077ce 100644
--- a/block/fvd-journal-buf.c
+++ b/block/fvd-journal-buf.c
@@ -20,15 +20,345 @@
  * case for cache!=writethrough.
  
*============================================================================*/
 
+static inline int bjnl_write_buf(FvdAIOCB *acb);
+static void bjnl_send_current_buf_to_write_queue(BlockDriverState *bs);
+
+static inline void bjnl_finish_write_buf(FvdAIOCB *acb, int ret)
+{
+    ASSERT (acb->type == OP_BJNL_BUF_WRITE);
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    QDEBUG("JOURNAL: bjnl_finish_write_buf acb%llu-%p\n", acb->uuid, acb);
+
+    my_qemu_vfree(acb->jcb.iov.iov_base);
+    QTAILQ_REMOVE(&s->bjnl.queued_bufs, acb, jcb.bjnl_next_queued_buf);
+    my_qemu_aio_release(acb);
+
+    if (ret != 0) {
+        s->metadata_err_prohibit_write = true;
+    }
+}
+
+static inline void bjnl_write_next_buf(BDRVFvdState *s)
+{
+    FvdAIOCB *acb;
+    while ((acb = QTAILQ_FIRST(&s->bjnl.queued_bufs))) {
+        if (bjnl_write_buf(acb) == 0) {
+            return;
+        }
+    }
+}
+
+static inline void bjnl_aio_flush_cb(void *opaque, int ret)
+{
+    FvdAIOCB *acb = (FvdAIOCB *) opaque;
+
+    if (acb->cancel_in_progress) {
+        return;
+    }
+
+    QDEBUG("JOURNAL: bjnl_aio_flush_cb acb%llu-%p\n", acb->uuid, acb);
+
+    /* Invoke the callback initially provided to fvd_aio_flush(). */
+    acb->common.cb(acb->common.opaque, ret);
+    my_qemu_aio_release(acb);
+}
+
+static inline void bjnl_write_buf_cb(void *opaque, int ret)
+{
+    FvdAIOCB *acb = (FvdAIOCB *) opaque;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    if (acb->cancel_in_progress) {
+        return;
+    }
+
+    QDEBUG("JOURNAL: bjnl_write_buf_cb acb%llu-%p\n", acb->uuid, acb);
+    bjnl_finish_write_buf(acb, ret);
+    bjnl_write_next_buf(s);
+}
+
+#ifndef ENABLE_QDEBUG
+#  define PRINT_JRECORDS(buf,len) do{}while(0)
+#else
+static void print_jrecords(const uint8_t *buf, size_t len);
+#  define PRINT_JRECORDS print_jrecords
+#endif
+
+static int bjnl_write_buf_start(FvdAIOCB *acb)
+{
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+    int64_t journal_sec;
+    int nb_sectors = acb->jcb.iov.iov_len / 512;
+    int ret;
+
+    ASSERT (nb_sectors <= s->journal_size);
+    QDEBUG("JOURNAL: bjnl_write_buf_start acb%llu-%p\n", acb->uuid, acb);
+
+    if (s->next_journal_sector + nb_sectors <= s->journal_size) {
+        journal_sec = s->next_journal_sector;
+        s->next_journal_sector += nb_sectors;
+    } else {
+        if ((ret = recycle_journal(bs))) {
+            goto fail;
+        }
+        journal_sec = 0;
+        s->next_journal_sector = nb_sectors;
+    }
+
+    PRINT_JRECORDS(acb->jcb.iov.iov_base, acb->jcb.iov.iov_len);
+
+    acb->jcb.hd_acb = bdrv_aio_writev(s->fvd_metadata,
+                                      s->journal_offset + journal_sec,
+                                      &acb->jcb.qiov, nb_sectors,
+                                      bjnl_write_buf_cb, acb);
+    if (acb->jcb.hd_acb) {
+        return 0;
+    } else {
+        ret = -EIO;
+    }
+
+fail:
+    bjnl_finish_write_buf(acb, ret);
+    return ret;
+}
+
+static void bjnl_flush_data_before_update_bitmap_cb(void *opaque, int ret)
+{
+    FvdAIOCB *acb = opaque;
+
+    if (acb->cancel_in_progress) {
+        return;
+    }
+
+    QDEBUG("JOURNAL: bjnl_flush_data_before_update_bitmap_cb acb%llu-%p\n",
+           acb->uuid, acb);
+
+    if (ret != 0) {
+        bjnl_finish_write_buf(acb, ret);
+    } else if (bjnl_write_buf_start(acb) == 0) {
+        return;
+    }
+
+    bjnl_write_next_buf(acb->common.bs->opaque);
+}
+
+static inline int bjnl_write_buf(FvdAIOCB *acb)
+{
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    QDEBUG("JOURNAL: bjnl_write_buf acb%llu-%p\n", acb->uuid, acb);
+
+    if (!acb->jcb.bitmap_updated) {
+        return bjnl_write_buf_start(acb);
+    }
+
+    /* If bitmap_updated, fvd_data need be flushed first before bitmap changes
+     * can be committed. Otherwise, a host crashes after bitmap metadata are
+     * updated but before the corresponding data are persisted on disk, the VM
+     * will get corrupted data, as correct data may be in the base image. */
+    acb->jcb.hd_acb = bdrv_aio_flush(s->fvd_data,
+                                     bjnl_flush_data_before_update_bitmap_cb,
+                                     acb);
+    if (acb->jcb.hd_acb) {
+        return 0;
+    } else {
+        bjnl_finish_write_buf(acb, -1);
+        return -1;
+    }
+}
+
+static void bjnl_send_current_buf_to_write_queue(BlockDriverState *bs)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    if (!s->bjnl.buf) {
+        return;
+    }
+    if (s->bjnl.buf_used == 0) {
+        my_qemu_vfree (s->bjnl.buf);
+        s->bjnl.buf = NULL;
+        return;
+    }
+    if (s->bjnl.buf_used < s->bjnl.buf_size) {
+        /* Mark the end of the buffer as EMPTY_JRECORD. */
+        *((uint32_t*)(s->bjnl.buf + s->bjnl.buf_used)) = EMPTY_JRECORD;
+    }
+
+    /* Cretae a new acb and put it in the queue of bjnl.queued_bufs. */
+    FvdAIOCB *acb = my_qemu_aio_get(&fvd_aio_pool, bs, NULL, NULL);
+    if (!acb) {
+        s->metadata_err_prohibit_write = true;
+        my_qemu_vfree (s->bjnl.buf);
+        s->bjnl.buf = NULL;
+        return;
+    }
+
+    acb->type = OP_BJNL_BUF_WRITE;
+    acb->cancel_in_progress = false;
+    acb->jcb.iov.iov_base = s->bjnl.buf;
+    acb->jcb.iov.iov_len = ROUND_UP(s->bjnl.buf_used, 512); /* Full jnl 
sector*/
+    acb->jcb.hd_acb = NULL;
+    acb->jcb.bitmap_updated = s->bjnl.buf_contains_bitmap_update;
+    s->bjnl.buf_contains_bitmap_update = false;
+    qemu_iovec_init_external(&acb->jcb.qiov, &acb->jcb.iov, 1);
+    QTAILQ_INSERT_TAIL(&s->bjnl.queued_bufs, acb, jcb.bjnl_next_queued_buf);
+
+    PRINT_JRECORDS(acb->jcb.iov.iov_base, acb->jcb.iov.iov_len);
+
+    /* If no ongoing journal write, start this one. */
+    if (acb == QTAILQ_FIRST(&s->bjnl.queued_bufs)) {
+        /* Since this acb is not owned by any VM-generated request, it can
+         * only be started in context_id 0. Otherwise, qemu_aio_wait() may
+         * never process the callbacks generated by bjnl_write_buf(). */
+        if (get_async_context_id() == 0) {
+            bjnl_write_buf(acb);
+        } else {
+            /* Start journal write in the timer callback. */
+            qemu_mod_timer(s->bjnl.clean_buf_timer, qemu_get_clock(rt_clock));
+            s->bjnl.timer_scheduled = true;
+        }
+    }
+
+    s->bjnl.buf = NULL;
+    QDEBUG("JOURNAL: acb%llu-%p  added to bjnl_write_queue\n", acb->uuid, acb);
+}
+
 static uint8_t * bjnl_alloc_journal_records_from_buf(BlockDriverState *bs,
                                                 bool update_bitmap,
                                                 size_t record_size)
 {
-    return NULL;
-}
+    BDRVFvdState *s = bs->opaque;
+
+    if (!s->bjnl.timer_scheduled) {
+        QDEBUG("JOURNAL: bjnl_start_timer\n");
+        /* Now we have dirty data. Start a timer to write it out later. */
+        int64_t expire = qemu_get_clock(rt_clock) + s->bjnl.clean_buf_period;
+        qemu_mod_timer(s->bjnl.clean_buf_timer, expire);
+        s->bjnl.timer_scheduled = true;
+    }
+
+    if (s->bjnl.buf && s->bjnl.buf_used + record_size <= s->bjnl.buf_size) {
+        size_t current_sector_left = 512 - s->bjnl.buf_used % 512;
+        if (current_sector_left >= record_size) {
+            /* Continue to use current sector.*/
+use_current_buf:
+            QDEBUG("JOURNAL: bjnl_alloc_buf buf_used=%zu new=%zu limit=%zu\n",
+                   s->bjnl.buf_used, record_size, s->bjnl.buf_size);
+            uint8_t *buf = s->bjnl.buf + s->bjnl.buf_used;
+            s->bjnl.buf_used += record_size;
+            if (update_bitmap) {
+                s->bjnl.buf_contains_bitmap_update = true;
+            }
+            return buf;
+        }
+
+        /* Mark the end of the valid section of the current buffer sector
+         * and start to use the next sector.*/
+        *((uint32_t*)(s->bjnl.buf + s->bjnl.buf_used)) = EMPTY_JRECORD;
+        s->bjnl.buf_used += current_sector_left;
 
+        if (s->bjnl.buf_used + record_size <= s->bjnl.buf_size) {
+            goto use_current_buf;
+        }
+    }
+
+    if (s->bjnl.buf) {
+        QDEBUG("JOURNAL: bjnl_buf_full_start_new buf_used=%zu new=%zu "
+               "limit=%zu\n", s->bjnl.buf_used, record_size, s->bjnl.buf_size);
+    } else {
+        QDEBUG("JOURNAL: bjnl_buf_full_start_new no_current_buf\n");
+    }
+
+    /* Need to start a new buffer. Send current buffer to write queue first. */
+    bjnl_send_current_buf_to_write_queue(bs);
+
+    s->bjnl.buf_used = record_size;
+    record_size = ROUND_UP(record_size, 512);
+    s->bjnl.buf_size = MAX(record_size, s->bjnl.def_buf_size);
+    s->bjnl.buf_contains_bitmap_update = update_bitmap;
+    s->bjnl.buf = my_qemu_blockalign(s->fvd_metadata, s->bjnl.buf_size);
+
+    return s->bjnl.buf;
+}
 
 static void bjnl_clean_buf_timer_cb(BlockDriverState * bs)
 {
-    /* To be implemented. */
+    BDRVFvdState *s = bs->opaque;
+    FvdAIOCB *acb;
+
+    ASSERT (get_async_context_id() == 0);
+    QDEBUG("JOURNAL: bjnl_timer_expired\n");
+
+    /* Clean the current buffer. */
+    if (s->bjnl.buf && s->bjnl.buf_used > 0) {
+        QDEBUG ("JOURNAL: timer bjnl_send_current_buf_to_write_queue\n");
+        bjnl_send_current_buf_to_write_queue(bs);
+    }
+
+    /* Start writing the first buffer if it is not already started. */
+    while ((acb = QTAILQ_FIRST(&s->bjnl.queued_bufs)) && !acb->jcb.hd_acb) {
+        QDEBUG("JOURNAL: acb%llu-%p  bjnl_write_buf by timer",
+               acb->uuid, acb);
+        if (bjnl_write_buf(acb) == 0) {
+            break;
+        }
+    }
+
+    /* The timer is no longer scheduled. It will be scheduled when needed. */
+    s->bjnl.timer_scheduled = false;
+}
+
+#ifdef ENABLE_QDEBUG
+static void print_jrecords(const uint8_t *sector, size_t len)
+{
+    const uint8_t *end = sector + len;
+    ASSERT(len % 512 == 0);
+
+    QDEBUG("JOURNAL: write bjnl_records\n");
+    while (sector < end) {
+        uint32_t *type = (uint32_t *) sector;   /* Journal record type. */
+        while ((uint8_t *) type < (sector + 512)) {
+            if (le32_to_cpu(*type) == BITMAP_JRECORD) {
+                uint32_t *nb_sectors = type + 1;
+                int64_t *sector_num = (int64_t *) (type + 2);
+
+                QDEBUG("JOURNAL: write BITMAP_JRECORD sector_num=%" PRId64
+                       " nb_sectors=%u\n", le64_to_cpu(*sector_num),
+                       le32_to_cpu(*nb_sectors));
+
+                /* First field of the next journal record. */
+                type = (uint32_t *) (sector_num + 1);
+            } else if (le32_to_cpu(*type) == TABLE_JRECORD) {
+                uint64_t *epoch = (uint64_t *) (type + 1);
+                uint32_t *count = (uint32_t *) (epoch + 1);
+                uint32_t *offset = count + 1;
+                uint32_t *content = offset + 1;
+                const uint32_t chunk = le32_to_cpu(*offset);
+                const uint64_t epo = le64_to_cpu(*epoch);
+                const uint32_t n = le32_to_cpu(*count);
+                uint32_t i;
+
+                QDEBUG("JOURNAL: write TABLE_JRECORD epoch=%" PRIu64
+                       " chunk_start=%u " "nb_chunks=%u\n", epo, chunk, n);
+                for (i = 0; i < n; i++) {
+                    QDEBUG("\tMap chunk %u to %u\n", chunk + i,
+                           READ_TABLE(content[i]));
+                }
+
+                type = content + n;     /* First field of the next record. */
+            } else {
+                /* End of valid records in this journal sector. */
+                ASSERT(le32_to_cpu(*type) == EMPTY_JRECORD);
+                break;
+            }
+        }
+
+        sector += 512;
+    }
 }
+#endif
-- 
1.7.0.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]