[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [RFC patch v1 2/3] qemu-file: add buffered mode
From: |
Dr. David Alan Gilbert |
Subject: |
Re: [RFC patch v1 2/3] qemu-file: add buffered mode |
Date: |
Mon, 27 Apr 2020 13:14:33 +0100 |
User-agent: |
Mutt/1.13.4 (2020-02-15) |
* Denis Plotnikov (address@hidden) wrote:
> The patch adds ability to qemu-file to write the data
> asynchronously to improve the performance on writing.
> Before, only synchronous writing was supported.
>
> Enabling of the asyncronous mode is managed by new
> "enabled_buffered" callback.
It's a bit invasive isn't it - changes a lot of functions in a lot of
places!
The multifd code separated the control headers from the data on separate
fd's - but that doesn't help your case.
Is there any chance you could do this by using the existing 'save_page'
hook (that RDMA uses).
In the cover letter you mention direct qemu_fflush calls - have we got a
few too many in some palces that you think we can clean out?
Dave
> Signed-off-by: Denis Plotnikov <address@hidden>
> ---
> include/qemu/typedefs.h | 1 +
> migration/qemu-file.c | 351
> +++++++++++++++++++++++++++++++++++++++++++++---
> migration/qemu-file.h | 9 ++
> 3 files changed, 339 insertions(+), 22 deletions(-)
>
> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
> index 88dce54..9b388c8 100644
> --- a/include/qemu/typedefs.h
> +++ b/include/qemu/typedefs.h
> @@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
> typedef struct QemuConsole QemuConsole;
> typedef struct QEMUFile QEMUFile;
> typedef struct QEMUFileBuffer QEMUFileBuffer;
> +typedef struct QEMUFileAioTask QEMUFileAioTask;
> typedef struct QemuLockable QemuLockable;
> typedef struct QemuMutex QemuMutex;
> typedef struct QemuOpt QemuOpt;
> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 285c6ef..f42f949 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -29,19 +29,25 @@
> #include "qemu-file.h"
> #include "trace.h"
> #include "qapi/error.h"
> +#include "block/aio_task.h"
>
> -#define IO_BUF_SIZE 32768
> +#define IO_BUF_SIZE (1024 * 1024)
> #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
> +#define IO_BUF_NUM 2
> +#define IO_BUF_ALIGNMENT 512
>
> -QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
> +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
> +QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
> +QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
>
> struct QEMUFileBuffer {
> int buf_index;
> - int buf_size; /* 0 when writing */
> + int buf_size; /* 0 when non-buffered writing */
> uint8_t *buf;
> unsigned long *may_free;
> struct iovec *iov;
> unsigned int iovcnt;
> + QLIST_ENTRY(QEMUFileBuffer) link;
> };
>
> struct QEMUFile {
> @@ -60,6 +66,22 @@ struct QEMUFile {
> bool shutdown;
> /* currently used buffer */
> QEMUFileBuffer *current_buf;
> + /*
> + * with buffered_mode enabled all the data copied to 512 byte
> + * aligned buffer, including iov data. Then the buffer is passed
> + * to writev_buffer callback.
> + */
> + bool buffered_mode;
> + /* for async buffer writing */
> + AioTaskPool *pool;
> + /* the list of free buffers, currently used on is NOT there */
> + QLIST_HEAD(, QEMUFileBuffer) free_buffers;
> +};
> +
> +struct QEMUFileAioTask {
> + AioTask task;
> + QEMUFile *f;
> + QEMUFileBuffer *fb;
> };
>
> /*
> @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const
> QEMUFileOps *ops)
> f->opaque = opaque;
> f->ops = ops;
>
> - f->current_buf = g_new0(QEMUFileBuffer, 1);
> - f->current_buf->buf = g_malloc(IO_BUF_SIZE);
> - f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
> - f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
> + if (f->ops->enable_buffered) {
> + f->buffered_mode = f->ops->enable_buffered(f->opaque);
> + }
> +
> + if (f->buffered_mode && qemu_file_is_writable(f)) {
> + int i;
> + /*
> + * in buffered_mode we don't use internal io vectors
> + * and may_free bitmap, because we copy the data to be
> + * written right away to the buffer
> + */
> + f->pool = aio_task_pool_new(IO_BUF_NUM);
> +
> + /* allocate io buffers */
> + for (i = 0; i < IO_BUF_NUM; i++) {
> + QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
> +
> + fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
> + fb->buf_size = IO_BUF_SIZE;
> +
> + /*
> + * put the first buffer to the current buf and the rest
> + * to the list of free buffers
> + */
> + if (i == 0) {
> + f->current_buf = fb;
> + } else {
> + QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
> + }
> + }
> + } else {
> + f->current_buf = g_new0(QEMUFileBuffer, 1);
> + f->current_buf->buf = g_malloc(IO_BUF_SIZE);
> + f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
> + f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
> + }
>
> return f;
> }
> @@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
> unsigned long idx;
> QEMUFileBuffer *fb = f->current_buf;
>
> + assert(!f->buffered_mode);
> +
> /* Find and release all the contiguous memory ranges marked as may_free.
> */
> idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
> if (idx >= fb->iovcnt) {
> @@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
> bitmap_zero(fb->may_free, MAX_IOV_SIZE);
> }
>
> +static void advance_buf_ptr(QEMUFile *f, size_t size)
> +{
> + QEMUFileBuffer *fb = f->current_buf;
> + /* must not advance to 0 */
> + assert(size);
> + /* must not overflow buf_index (int) */
> + assert(fb->buf_index + size <= INT_MAX);
> + /* must not exceed buf_size */
> + assert(fb->buf_index + size <= fb->buf_size);
> +
> + fb->buf_index += size;
> +}
> +
> +static size_t get_buf_free_size(QEMUFile *f)
> +{
> + QEMUFileBuffer *fb = f->current_buf;
> + /* buf_index can't be greated than buf_size */
> + assert(fb->buf_size >= fb->buf_index);
> + return fb->buf_size - fb->buf_index;
> +}
> +
> +static size_t get_buf_used_size(QEMUFile *f)
> +{
> + QEMUFileBuffer *fb = f->current_buf;
> + return fb->buf_index;
> +}
> +
> +static uint8_t *get_buf_ptr(QEMUFile *f)
> +{
> + QEMUFileBuffer *fb = f->current_buf;
> + /* protects from out of bound reading */
> + assert(fb->buf_index <= IO_BUF_SIZE);
> + return fb->buf + fb->buf_index;
> +}
> +
> +static bool buf_is_full(QEMUFile *f)
> +{
> + return get_buf_free_size(f) == 0;
> +}
> +
> +static void reset_buf(QEMUFile *f)
> +{
> + QEMUFileBuffer *fb = f->current_buf;
> + fb->buf_index = 0;
> +}
> +
> +static int write_task_fn(AioTask *task)
> +{
> + int ret;
> + Error *local_error = NULL;
> + QEMUFileAioTask *t = (QEMUFileAioTask *) task;
> + QEMUFile *f = t->f;
> + QEMUFileBuffer *fb = t->fb;
> + uint64_t pos = f->pos;
> + struct iovec v = (struct iovec) {
> + .iov_base = fb->buf,
> + .iov_len = fb->buf_index,
> + };
> +
> + assert(f->buffered_mode);
> +
> + /*
> + * Increment file position.
> + * This needs to be here before calling writev_buffer, because
> + * writev_buffer is asynchronous and there could be more than one
> + * writev_buffer started simultaniously. Each writev_buffer should
> + * use its own file pos to write to. writev_buffer may write less
> + * than buf_index bytes but we treat this situation as an error.
> + * If error appeared, further file using is meaningless.
> + * We expect that, the most of the time the full buffer is written,
> + * (when buf_size == buf_index). The only case when the non-full
> + * buffer is written (buf_size != buf_index) is file close,
> + * when we need to flush the rest of the buffer content.
> + */
> + f->pos += fb->buf_index;
> +
> + ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
> +
> + /* return the just written buffer to the free list */
> + QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
> +
> + /* check that we have written everything */
> + if (ret != fb->buf_index) {
> + qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
> + }
> +
> + /*
> + * always return 0 - don't use task error handling, relay on
> + * qemu file error handling
> + */
> + return 0;
> +}
> +
> +static void qemu_file_switch_current_buf(QEMUFile *f)
> +{
> + /*
> + * if the list is empty, wait until some task returns a buffer
> + * to the list of free buffers.
> + */
> + if (QLIST_EMPTY(&f->free_buffers)) {
> + aio_task_pool_wait_slot(f->pool);
> + }
> +
> + /*
> + * sanity check that the list isn't empty
> + * if the free list was empty, we waited for a task complition,
> + * and the pompleted task must return a buffer to a list of free buffers
> + */
> + assert(!QLIST_EMPTY(&f->free_buffers));
> +
> + /* set the current buffer for using from the free list */
> + f->current_buf = QLIST_FIRST(&f->free_buffers);
> + reset_buf(f);
> +
> + QLIST_REMOVE(f->current_buf, link);
> +}
> +
> +/**
> + * Asynchronously flushes QEMUFile buffer
> + *
> + * This will flush all pending data. If data was only partially flushed, it
> + * will set an error state. The function may return before the data actually
> + * written.
> + */
> +static void flush_buffer(QEMUFile *f)
> +{
> + QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
> +
> + *t = (QEMUFileAioTask) {
> + .task.func = &write_task_fn,
> + .f = f,
> + .fb = f->current_buf,
> + };
> +
> + /* aio_task_pool should free t for us */
> + aio_task_pool_start_task(f->pool, (AioTask *) t);
> +
> + /* if no errors this will switch the buffer */
> + qemu_file_switch_current_buf(f);
> +}
> +
> /**
> * Flushes QEMUFile buffer
> *
> @@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
> if (f->shutdown) {
> return;
> }
> +
> + if (f->buffered_mode) {
> + return;
> + }
> +
> if (fb->iovcnt > 0) {
> + /* this is non-buffered mode */
> expect = iov_size(fb->iov, fb->iovcnt);
> ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
> &local_error);
> @@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
>
> void qemu_update_position(QEMUFile *f, size_t size)
> {
> + assert(!f->buffered_mode);
> f->pos += size;
> }
>
> @@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t size)
> int qemu_fclose(QEMUFile *f)
> {
> int ret;
> - qemu_fflush(f);
> +
> + if (qemu_file_is_writable(f) && f->buffered_mode) {
> + ret = qemu_file_get_error(f);
> + if (!ret) {
> + flush_buffer(f);
> + }
> + /* wait until all tasks are done */
> + aio_task_pool_wait_all(f->pool);
> + } else {
> + qemu_fflush(f);
> + }
> +
> ret = qemu_file_get_error(f);
>
> if (f->ops->close) {
> @@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f)
> ret = f->last_error;
> }
> error_free(f->last_error_obj);
> - g_free(f->current_buf->buf);
> - g_free(f->current_buf->iov);
> - g_free(f->current_buf->may_free);
> - g_free(f->current_buf);
> +
> + if (f->buffered_mode) {
> + QEMUFileBuffer *fb, *next;
> + /*
> + * put the current back to the free buffers list
> + * to destroy all the buffers in one loop
> + */
> + QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link);
> +
> + /* destroy all the buffers */
> + QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) {
> + QLIST_REMOVE(fb, link);
> + /* looks like qemu_vfree pairs with qemu_memalign */
> + qemu_vfree(fb->buf);
> + g_free(fb);
> + }
> + g_free(f->pool);
> + } else {
> + g_free(f->current_buf->buf);
> + g_free(f->current_buf->iov);
> + g_free(f->current_buf->may_free);
> + g_free(f->current_buf);
> + }
> +
> g_free(f);
> trace_qemu_file_fclose();
> return ret;
> }
>
> /*
> + * Copy an external buffer to the intenal current buffer.
> + */
> +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
> + bool may_free)
> +{
> + size_t data_size = size;
> + const uint8_t *src_ptr = buf;
> +
> + assert(f->buffered_mode);
> + assert(size <= INT_MAX);
> +
> + while (data_size > 0) {
> + size_t chunk_size;
> +
> + if (buf_is_full(f)) {
> + flush_buffer(f);
> + if (qemu_file_get_error(f)) {
> + return;
> + }
> + }
> +
> + chunk_size = MIN(get_buf_free_size(f), data_size);
> +
> + memcpy(get_buf_ptr(f), src_ptr, chunk_size);
> +
> + advance_buf_ptr(f, chunk_size);
> +
> + src_ptr += chunk_size;
> + data_size -= chunk_size;
> + f->bytes_xfer += chunk_size;
> + }
> +
> + if (may_free) {
> + if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) {
> + error_report("migrate: madvise DONTNEED failed %p %zd: %s",
> + buf, size, strerror(errno));
> + }
> + }
> +}
> +
> +/*
> * Add buf to iovec. Do flush if iovec is full.
> *
> * Return values:
> @@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf,
> size_t size,
> static void add_buf_to_iovec(QEMUFile *f, size_t len)
> {
> QEMUFileBuffer *fb = f->current_buf;
> +
> + assert(!f->buffered_mode);
> +
> if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
> fb->buf_index += len;
> if (fb->buf_index == IO_BUF_SIZE) {
> @@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t
> *buf, size_t size,
> return;
> }
>
> - f->bytes_xfer += size;
> - add_to_iovec(f, buf, size, may_free);
> + if (f->buffered_mode) {
> + copy_buf(f, buf, size, may_free);
> + } else {
> + f->bytes_xfer += size;
> + add_to_iovec(f, buf, size, may_free);
> + }
> }
>
> void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
> @@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf,
> size_t size)
> return;
> }
>
> + if (f->buffered_mode) {
> + copy_buf(f, buf, size, false);
> + return;
> + }
> +
> while (size > 0) {
> l = IO_BUF_SIZE - fb->buf_index;
> if (l > size) {
> @@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v)
> return;
> }
>
> - fb->buf[fb->buf_index] = v;
> - f->bytes_xfer++;
> - add_buf_to_iovec(f, 1);
> + if (f->buffered_mode) {
> + copy_buf(f, (const uint8_t *) &v, 1, false);
> + } else {
> + fb->buf[fb->buf_index] = v;
> + add_buf_to_iovec(f, 1);
> + f->bytes_xfer++;
> + }
> }
>
> void qemu_file_skip(QEMUFile *f, int size)
> {
> QEMUFileBuffer *fb = f->current_buf;
>
> + assert(!f->buffered_mode);
> +
> if (fb->buf_index + size <= fb->buf_size) {
> fb->buf_index += size;
> }
> @@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f)
> {
> int64_t ret = f->pos;
> int i;
> - QEMUFileBuffer *fb = f->current_buf;
>
> - for (i = 0; i < fb->iovcnt; i++) {
> - ret += fb->iov[i].iov_len;
> + if (f->buffered_mode) {
> + ret += get_buf_used_size(f);
> + } else {
> + QEMUFileBuffer *fb = f->current_buf;
> + for (i = 0; i < fb->iovcnt; i++) {
> + ret += fb->iov[i].iov_len;
> + }
> }
>
> return ret;
> @@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>
> int64_t qemu_ftell(QEMUFile *f)
> {
> - qemu_fflush(f);
> - return f->pos;
> + if (f->buffered_mode) {
> + return qemu_ftell_fast(f);
> + } else {
> + qemu_fflush(f);
> + return f->pos;
> + }
> }
>
> int qemu_file_rate_limit(QEMUFile *f)
> @@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream
> *stream,
> QEMUFileBuffer *fb = f->current_buf;
> ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
>
> + assert(!f->buffered_mode);
> +
> if (blen < compressBound(size)) {
> return -1;
> }
> @@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
> int len = 0;
> QEMUFileBuffer *fb_src = f_src->current_buf;
>
> + assert(!f_des->buffered_mode);
> + assert(!f_src->buffered_mode);
> +
> if (fb_src->buf_index > 0) {
> len = fb_src->buf_index;
> qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
> diff --git a/migration/qemu-file.h b/migration/qemu-file.h
> index a9b6d6c..08655d2 100644
> --- a/migration/qemu-file.h
> +++ b/migration/qemu-file.h
> @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
> typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
> Error **errp);
>
> +/*
> + * Enables or disables the buffered mode
> + * Existing blocking reads/writes must be woken
> + * Returns true if the buffered mode has to be enabled,
> + * false if it has to be disabled.
> + */
> +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
> +
> typedef struct QEMUFileOps {
> QEMUFileGetBufferFunc *get_buffer;
> QEMUFileCloseFunc *close;
> @@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
> QEMUFileWritevBufferFunc *writev_buffer;
> QEMURetPathFunc *get_return_path;
> QEMUFileShutdownFunc *shut_down;
> + QEMUFileEnableBufferedFunc *enable_buffered;
> } QEMUFileOps;
>
> typedef struct QEMUFileHooks {
> --
> 1.8.3.1
>
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK
[RFC patch v1 1/3] qemu-file: introduce current buffer, Denis Plotnikov, 2020/04/13
Re: [RFC patch v1 0/3] qemu-file writing performance improving, Denis V. Lunev, 2020/04/13
Re: [RFC patch v1 0/3] qemu-file writing performance improving, no-reply, 2020/04/13
Re: [RFC patch v1 0/3] qemu-file writing performance improving, Denis Plotnikov, 2020/04/21