qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 8/10] Introduce a buffered QEMUFile wrapper


From: Anthony Liguori
Subject: [Qemu-devel] [PATCH 8/10] Introduce a buffered QEMUFile wrapper
Date: Tue, 9 Sep 2008 14:50:00 -0500

This patch introduces a buffered QEMUFile wrapper.  This allows QEMUFile's to be
rate limited.  It also allows makes it easier to implement a QEMUFile that is
asynchronous.

The only real non-obvious part of the API is the "frozen" concept.  If the 
backend
returns EAGAIN, the QEMUFile is said to be "frozen".  This means no additional
output will be sent to the backend until the file is unfrozen.  
qemu_file_put_notify
can be used to unfreeze a frozen file.

A synchronous interface is also provided to wait for an unfreeze event.  This is
used during the final part of live migration when the VM is no longer running.

Signed-off-by: Anthony Liguori <address@hidden>

diff --git a/Makefile.target b/Makefile.target
index 2e8e0a0..4b4cdd3 100644
--- a/Makefile.target
+++ b/Makefile.target
@@ -472,7 +472,7 @@ endif #CONFIG_DARWIN_USER
 # System emulator target
 ifndef CONFIG_USER_ONLY
 
-OBJS=vl.o osdep.o monitor.o pci.o loader.o isa_mmio.o machine.o net-checksum.o
+OBJS=vl.o osdep.o monitor.o pci.o loader.o isa_mmio.o machine.o net-checksum.o 
migration.o
 ifdef CONFIG_WIN32
 OBJS+=block-raw-win32.o
 else
diff --git a/hw/hw.h b/hw/hw.h
index b7958e4..10fc70a 100644
--- a/hw/hw.h
+++ b/hw/hw.h
@@ -11,8 +11,8 @@
  * The pos argument can be ignored if the file is only being used for
  * streaming.  The handler should try to write all of the data it can.
  */
-typedef void (QEMUFilePutBufferFunc)(void *opaque, const uint8_t *buf,
-                                     int64_t pos, int size);
+typedef int (QEMUFilePutBufferFunc)(void *opaque, const uint8_t *buf,
+                                    int64_t pos, int size);
 
 /* Read a chunk of data from a file at the given position.  The pos argument
  * can be ignored if the file is only be used for streaming.  The number of
diff --git a/migration.c b/migration.c
new file mode 100644
index 0000000..507c9d9
--- /dev/null
+++ b/migration.c
@@ -0,0 +1,200 @@
+#include "qemu-common.h"
+#include "hw/hw.h"
+#include "qemu-timer.h"
+#include "sysemu.h"
+#include "qemu-char.h"
+#include "migration.h"
+
+typedef struct QEMUFileBuffered
+{
+    BufferedPutFunc *put_buffer;
+    BufferedPutReadyFunc *put_ready;
+    BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
+    BufferedCloseFunc *close;
+    void *opaque;
+    QEMUFile *file;
+    int has_error;
+    int freeze_output;
+    size_t bytes_xfer;
+    size_t xfer_limit;
+    uint8_t *buffer;
+    size_t buffer_size;
+    size_t buffer_capacity;
+    QEMUTimer *timer;
+} QEMUFileBuffered;
+
+static void buffered_append(QEMUFileBuffered *s,
+                            const uint8_t *buf, size_t size)
+{
+    if (size > (s->buffer_capacity - s->buffer_size)) {
+        void *tmp;
+
+        s->buffer_capacity += size + 1024;
+
+        tmp = qemu_realloc(s->buffer, s->buffer_capacity);
+        if (tmp == NULL) {
+            fprintf(stderr, "qemu file buffer expansion failed\n");
+            exit(1);
+        }
+
+        s->buffer = tmp;
+    }
+
+    memcpy(s->buffer + s->buffer_size, buf, size);
+    s->buffer_size += size;
+}
+
+static void buffered_flush(QEMUFileBuffered *s)
+{
+    size_t offset = 0;
+
+    if (s->has_error)
+        return;
+
+    while (offset < s->buffer_size) {
+        ssize_t ret;
+
+        ret = s->put_buffer(s->opaque, s->buffer + offset,
+                            s->buffer_size - offset);
+        if (ret == -EAGAIN) {
+            s->freeze_output = 1;
+            break;
+        }
+
+        if (ret <= 0)
+            break;
+
+        offset += ret;
+    }
+
+    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
+    s->buffer_size -= offset;
+}
+
+static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, 
int size)
+{
+    QEMUFileBuffered *s = opaque;
+    size_t offset = 0;
+    ssize_t ret;
+
+    if (s->has_error)
+        return -EINVAL;
+
+    s->freeze_output = 0;
+
+    buffered_flush(s);
+
+    while (offset < size) {
+        if (s->bytes_xfer > s->xfer_limit)
+            break;
+
+        ret = s->put_buffer(s->opaque, buf + offset, size - offset);
+        if (ret == -EAGAIN) {
+            s->freeze_output = 1;
+            break;
+        }
+
+        if (ret <= 0) {
+            s->has_error = 1;
+            break;
+        }
+
+        offset += ret;
+        s->bytes_xfer += ret;
+    }
+
+    buffered_append(s, buf + offset, size - offset);
+
+    return offset;
+}
+
+static int buffered_close(void *opaque)
+{
+    QEMUFileBuffered *s = opaque;
+    int ret;
+
+    if (s->has_error)
+        return -1;
+
+    while (s->buffer_size) {
+        buffered_flush(s);
+        if (s->freeze_output)
+            s->wait_for_unfreeze(s);
+    }
+
+    ret = s->close(s->opaque);
+
+    qemu_del_timer(s->timer);
+    qemu_free_timer(s->timer);
+    qemu_free(s->buffer);
+    qemu_free(s);
+
+    return ret;
+}
+
+static int buffered_rate_limit(void *opaque)
+{
+    QEMUFileBuffered *s = opaque;
+
+    if (s->has_error)
+        return 0;
+
+    if (s->freeze_output)
+        return 1;
+
+    if (s->bytes_xfer > s->xfer_limit)
+        return 1;
+
+    return 0;
+}
+
+static void buffered_rate_tick(void *opaque)
+{
+    QEMUFileBuffered *s = opaque;
+
+    if (s->has_error)
+        return;
+
+    qemu_mod_timer(s->timer, qemu_get_clock(rt_clock) + 100);
+
+    if (s->freeze_output)
+        return;
+
+    s->bytes_xfer = 0;
+
+    buffered_flush(s);
+
+    /* Add some checks around this */
+    s->put_ready(s->opaque);
+}
+
+QEMUFile *qemu_fopen_ops_buffered(void *opaque,
+                                  size_t bytes_per_sec,
+                                  BufferedPutFunc *put_buffer,
+                                  BufferedPutReadyFunc *put_ready,
+                                  BufferedWaitForUnfreezeFunc 
*wait_for_unfreeze,
+                                  BufferedCloseFunc *close)
+{
+    QEMUFileBuffered *s;
+
+    s = qemu_mallocz(sizeof(*s));
+    if (s == NULL)
+        return NULL;
+
+    s->opaque = opaque;
+    s->xfer_limit = bytes_per_sec / 10;
+    s->put_buffer = put_buffer;
+    s->put_ready = put_ready;
+    s->wait_for_unfreeze = wait_for_unfreeze;
+    s->close = close;
+
+    s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
+                             buffered_close, buffered_rate_limit);
+
+    s->timer = qemu_new_timer(rt_clock, buffered_rate_tick, s);
+
+    qemu_mod_timer(s->timer, qemu_get_clock(rt_clock) + 100);
+
+    return s->file;
+}
+
diff --git a/migration.h b/migration.h
new file mode 100644
index 0000000..3994fbb
--- /dev/null
+++ b/migration.h
@@ -0,0 +1,17 @@
+#ifndef QEMU_MIGRATION_H
+#define QEMU_MIGRATION_H
+
+#include "hw/hw.h"
+
+typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
+typedef void (BufferedPutReadyFunc)(void *opaque);
+typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
+typedef int (BufferedCloseFunc)(void *opaque);
+
+QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
+                                  BufferedPutFunc *put_buffer,
+                                  BufferedPutReadyFunc *put_ready,
+                                  BufferedWaitForUnfreezeFunc 
*wait_for_unfreeze,
+                                  BufferedCloseFunc *close);
+
+#endif
diff --git a/vl.c b/vl.c
index d02194e..d89435a 100644
--- a/vl.c
+++ b/vl.c
@@ -6275,12 +6275,13 @@ typedef struct QEMUFileUnix
     FILE *outfile;
 } QEMUFileUnix;
 
-static void file_put_buffer(void *opaque, const uint8_t *buf,
-                            int64_t pos, int size)
+static int file_put_buffer(void *opaque, const uint8_t *buf,
+                           int64_t pos, int size)
 {
     QEMUFileUnix *s = opaque;
     fseek(s->outfile, pos, SEEK_SET);
     fwrite(buf, 1, size, s->outfile);
+    return size;
 }
 
 static int file_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
@@ -6328,11 +6329,12 @@ typedef struct QEMUFileBdrv
     int64_t base_offset;
 } QEMUFileBdrv;
 
-static void bdrv_put_buffer(void *opaque, const uint8_t *buf,
-                            int64_t pos, int size)
+static int bdrv_put_buffer(void *opaque, const uint8_t *buf,
+                           int64_t pos, int size)
 {
     QEMUFileBdrv *s = opaque;
     bdrv_pwrite(s->bs, s->base_offset + pos, buf, size);
+    return size;
 }
 
 static int bdrv_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)




reply via email to

[Prev in Thread] Current Thread [Next in Thread]