[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH 8/10] Introduce a buffered QEMUFile wrapper
From: |
Anthony Liguori |
Subject: |
[Qemu-devel] [PATCH 8/10] Introduce a buffered QEMUFile wrapper |
Date: |
Tue, 9 Sep 2008 14:50:00 -0500 |
This patch introduces a buffered QEMUFile wrapper. This allows QEMUFile's to be
rate limited. It also allows makes it easier to implement a QEMUFile that is
asynchronous.
The only real non-obvious part of the API is the "frozen" concept. If the
backend
returns EAGAIN, the QEMUFile is said to be "frozen". This means no additional
output will be sent to the backend until the file is unfrozen.
qemu_file_put_notify
can be used to unfreeze a frozen file.
A synchronous interface is also provided to wait for an unfreeze event. This is
used during the final part of live migration when the VM is no longer running.
Signed-off-by: Anthony Liguori <address@hidden>
diff --git a/Makefile.target b/Makefile.target
index 2e8e0a0..4b4cdd3 100644
--- a/Makefile.target
+++ b/Makefile.target
@@ -472,7 +472,7 @@ endif #CONFIG_DARWIN_USER
# System emulator target
ifndef CONFIG_USER_ONLY
-OBJS=vl.o osdep.o monitor.o pci.o loader.o isa_mmio.o machine.o net-checksum.o
+OBJS=vl.o osdep.o monitor.o pci.o loader.o isa_mmio.o machine.o net-checksum.o
migration.o
ifdef CONFIG_WIN32
OBJS+=block-raw-win32.o
else
diff --git a/hw/hw.h b/hw/hw.h
index b7958e4..10fc70a 100644
--- a/hw/hw.h
+++ b/hw/hw.h
@@ -11,8 +11,8 @@
* The pos argument can be ignored if the file is only being used for
* streaming. The handler should try to write all of the data it can.
*/
-typedef void (QEMUFilePutBufferFunc)(void *opaque, const uint8_t *buf,
- int64_t pos, int size);
+typedef int (QEMUFilePutBufferFunc)(void *opaque, const uint8_t *buf,
+ int64_t pos, int size);
/* Read a chunk of data from a file at the given position. The pos argument
* can be ignored if the file is only be used for streaming. The number of
diff --git a/migration.c b/migration.c
new file mode 100644
index 0000000..507c9d9
--- /dev/null
+++ b/migration.c
@@ -0,0 +1,200 @@
+#include "qemu-common.h"
+#include "hw/hw.h"
+#include "qemu-timer.h"
+#include "sysemu.h"
+#include "qemu-char.h"
+#include "migration.h"
+
+typedef struct QEMUFileBuffered
+{
+ BufferedPutFunc *put_buffer;
+ BufferedPutReadyFunc *put_ready;
+ BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
+ BufferedCloseFunc *close;
+ void *opaque;
+ QEMUFile *file;
+ int has_error;
+ int freeze_output;
+ size_t bytes_xfer;
+ size_t xfer_limit;
+ uint8_t *buffer;
+ size_t buffer_size;
+ size_t buffer_capacity;
+ QEMUTimer *timer;
+} QEMUFileBuffered;
+
+static void buffered_append(QEMUFileBuffered *s,
+ const uint8_t *buf, size_t size)
+{
+ if (size > (s->buffer_capacity - s->buffer_size)) {
+ void *tmp;
+
+ s->buffer_capacity += size + 1024;
+
+ tmp = qemu_realloc(s->buffer, s->buffer_capacity);
+ if (tmp == NULL) {
+ fprintf(stderr, "qemu file buffer expansion failed\n");
+ exit(1);
+ }
+
+ s->buffer = tmp;
+ }
+
+ memcpy(s->buffer + s->buffer_size, buf, size);
+ s->buffer_size += size;
+}
+
+static void buffered_flush(QEMUFileBuffered *s)
+{
+ size_t offset = 0;
+
+ if (s->has_error)
+ return;
+
+ while (offset < s->buffer_size) {
+ ssize_t ret;
+
+ ret = s->put_buffer(s->opaque, s->buffer + offset,
+ s->buffer_size - offset);
+ if (ret == -EAGAIN) {
+ s->freeze_output = 1;
+ break;
+ }
+
+ if (ret <= 0)
+ break;
+
+ offset += ret;
+ }
+
+ memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
+ s->buffer_size -= offset;
+}
+
+static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos,
int size)
+{
+ QEMUFileBuffered *s = opaque;
+ size_t offset = 0;
+ ssize_t ret;
+
+ if (s->has_error)
+ return -EINVAL;
+
+ s->freeze_output = 0;
+
+ buffered_flush(s);
+
+ while (offset < size) {
+ if (s->bytes_xfer > s->xfer_limit)
+ break;
+
+ ret = s->put_buffer(s->opaque, buf + offset, size - offset);
+ if (ret == -EAGAIN) {
+ s->freeze_output = 1;
+ break;
+ }
+
+ if (ret <= 0) {
+ s->has_error = 1;
+ break;
+ }
+
+ offset += ret;
+ s->bytes_xfer += ret;
+ }
+
+ buffered_append(s, buf + offset, size - offset);
+
+ return offset;
+}
+
+static int buffered_close(void *opaque)
+{
+ QEMUFileBuffered *s = opaque;
+ int ret;
+
+ if (s->has_error)
+ return -1;
+
+ while (s->buffer_size) {
+ buffered_flush(s);
+ if (s->freeze_output)
+ s->wait_for_unfreeze(s);
+ }
+
+ ret = s->close(s->opaque);
+
+ qemu_del_timer(s->timer);
+ qemu_free_timer(s->timer);
+ qemu_free(s->buffer);
+ qemu_free(s);
+
+ return ret;
+}
+
+static int buffered_rate_limit(void *opaque)
+{
+ QEMUFileBuffered *s = opaque;
+
+ if (s->has_error)
+ return 0;
+
+ if (s->freeze_output)
+ return 1;
+
+ if (s->bytes_xfer > s->xfer_limit)
+ return 1;
+
+ return 0;
+}
+
+static void buffered_rate_tick(void *opaque)
+{
+ QEMUFileBuffered *s = opaque;
+
+ if (s->has_error)
+ return;
+
+ qemu_mod_timer(s->timer, qemu_get_clock(rt_clock) + 100);
+
+ if (s->freeze_output)
+ return;
+
+ s->bytes_xfer = 0;
+
+ buffered_flush(s);
+
+ /* Add some checks around this */
+ s->put_ready(s->opaque);
+}
+
+QEMUFile *qemu_fopen_ops_buffered(void *opaque,
+ size_t bytes_per_sec,
+ BufferedPutFunc *put_buffer,
+ BufferedPutReadyFunc *put_ready,
+ BufferedWaitForUnfreezeFunc
*wait_for_unfreeze,
+ BufferedCloseFunc *close)
+{
+ QEMUFileBuffered *s;
+
+ s = qemu_mallocz(sizeof(*s));
+ if (s == NULL)
+ return NULL;
+
+ s->opaque = opaque;
+ s->xfer_limit = bytes_per_sec / 10;
+ s->put_buffer = put_buffer;
+ s->put_ready = put_ready;
+ s->wait_for_unfreeze = wait_for_unfreeze;
+ s->close = close;
+
+ s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
+ buffered_close, buffered_rate_limit);
+
+ s->timer = qemu_new_timer(rt_clock, buffered_rate_tick, s);
+
+ qemu_mod_timer(s->timer, qemu_get_clock(rt_clock) + 100);
+
+ return s->file;
+}
+
diff --git a/migration.h b/migration.h
new file mode 100644
index 0000000..3994fbb
--- /dev/null
+++ b/migration.h
@@ -0,0 +1,17 @@
+#ifndef QEMU_MIGRATION_H
+#define QEMU_MIGRATION_H
+
+#include "hw/hw.h"
+
+typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
+typedef void (BufferedPutReadyFunc)(void *opaque);
+typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
+typedef int (BufferedCloseFunc)(void *opaque);
+
+QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
+ BufferedPutFunc *put_buffer,
+ BufferedPutReadyFunc *put_ready,
+ BufferedWaitForUnfreezeFunc
*wait_for_unfreeze,
+ BufferedCloseFunc *close);
+
+#endif
diff --git a/vl.c b/vl.c
index d02194e..d89435a 100644
--- a/vl.c
+++ b/vl.c
@@ -6275,12 +6275,13 @@ typedef struct QEMUFileUnix
FILE *outfile;
} QEMUFileUnix;
-static void file_put_buffer(void *opaque, const uint8_t *buf,
- int64_t pos, int size)
+static int file_put_buffer(void *opaque, const uint8_t *buf,
+ int64_t pos, int size)
{
QEMUFileUnix *s = opaque;
fseek(s->outfile, pos, SEEK_SET);
fwrite(buf, 1, size, s->outfile);
+ return size;
}
static int file_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
@@ -6328,11 +6329,12 @@ typedef struct QEMUFileBdrv
int64_t base_offset;
} QEMUFileBdrv;
-static void bdrv_put_buffer(void *opaque, const uint8_t *buf,
- int64_t pos, int size)
+static int bdrv_put_buffer(void *opaque, const uint8_t *buf,
+ int64_t pos, int size)
{
QEMUFileBdrv *s = opaque;
bdrv_pwrite(s->bs, s->base_offset + pos, buf, size);
+ return size;
}
static int bdrv_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
Re: [Qemu-devel] [PATCH 2/10] Allow the monitor to be suspended during non-blocking op, Daniel P. Berrange, 2008/09/10
[Qemu-devel] [PATCH 10/10] TCP based live migration, Anthony Liguori, 2008/09/09
[Qemu-devel] [PATCH 8/10] Introduce a buffered QEMUFile wrapper,
Anthony Liguori <=
[Qemu-devel] [PATCH 9/10] Introduce the UI components for live migration, Anthony Liguori, 2008/09/09
[Qemu-devel] [PATCH 6/10] Introduce v3 of savevm protocol, Anthony Liguori, 2008/09/09
[Qemu-devel] [PATCH 5/10] Add network announce function, Anthony Liguori, 2008/09/09
[Qemu-devel] [PATCH 4/10] Add dirty tracking for live migration, Anthony Liguori, 2008/09/09