#include "qemu/osdep.h" #include "libqtest.h" #include #include "qemu/osdep.h" #include #include "qemu/cutils.h" #include "qemu/bitops.h" #include "qemu/bitmap.h" #include "qemu/main-loop.h" #include "migration/ram.h" #include "migration/migration.h" #include "migration/register.h" #include "migration/misc.h" #include "migration/page_cache.h" #include "qemu/error-report.h" #include "qapi/error.h" #include "qapi/qapi-events-migration.h" #include "qapi/qmp/qerror.h" #include "trace.h" //#include "exec/ram_addr.h" #include "exec/target_page.h" #include "qemu/rcu_queue.h" #include "migration/colo.h" #include "migration/block.h" #include "migration/threads.h" #include "migration/qemu-file.h" #include "migration/threads.h" CompressionStats compression_counters; #define PAGE_SIZE 4096 #define PAGE_MASK ~(PAGE_SIZE - 1) static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt, int64_t pos) { int i, size = 0; for (i = 0; i < iovcnt; i++) { size += iov[i].iov_len; } return size; } static int test_fclose(void *opaque) { return 0; } static const QEMUFileOps test_write_ops = { .writev_buffer = test_writev_buffer, .close = test_fclose }; QEMUFile *dest_file; static const QEMUFileOps empty_ops = { }; static int do_compress_ram_page(QEMUFile *f, z_stream *stream, uint8_t *ram_addr, ram_addr_t offset, uint8_t *source_buf) { int bytes_sent = 0, blen; uint8_t *p = ram_addr; /* * copy it to a internal buffer to avoid it being modified by VM * so that we can catch up the error during compression and * decompression */ memcpy(source_buf, p, PAGE_SIZE); blen = qemu_put_compression_data(f, stream, source_buf, PAGE_SIZE); if (blen < 0) { bytes_sent = 0; qemu_file_set_error(dest_file, blen); error_report("compressed data failed!"); } else { printf("Compressed size %d.\n", blen); bytes_sent += blen; } return bytes_sent; } struct CompressData { /* filled by migration thread.*/ uint8_t *ram_addr; ram_addr_t offset; /* filled by compress thread. */ QEMUFile *file; z_stream stream; uint8_t *originbuf; ThreadRequest data; }; typedef struct CompressData CompressData; static ThreadRequest *compress_thread_data_init(void) { CompressData *cd = g_new0(CompressData, 1); cd->originbuf = g_try_malloc(PAGE_SIZE); if (!cd->originbuf) { goto exit; } if (deflateInit(&cd->stream, 1) != Z_OK) { g_free(cd->originbuf); goto exit; } cd->file = qemu_fopen_ops(NULL, &empty_ops); return &cd->data; exit: g_free(cd); return NULL; } static void compress_thread_data_fini(ThreadRequest *data) { CompressData *cd = container_of(data, CompressData, data); qemu_fclose(cd->file); deflateEnd(&cd->stream); g_free(cd->originbuf); g_free(cd); } static void compress_thread_data_handler(ThreadRequest *data) { CompressData *cd = container_of(data, CompressData, data); /* * if compression fails, it will indicate by * migrate_get_current()->to_dst_file. */ do_compress_ram_page(cd->file, &cd->stream, cd->ram_addr, cd->offset, cd->originbuf); } static void compress_thread_data_done(ThreadRequest *data) { CompressData *cd = container_of(data, CompressData, data); int bytes_xmit; bytes_xmit = qemu_put_qemu_file(dest_file, cd->file); /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */ compression_counters.reduced_size += 4096 - bytes_xmit + 8; compression_counters.pages++; } static Threads *compress_threads; static void flush_compressed_data(void) { threads_wait_done(compress_threads); } static void compress_threads_save_cleanup(void) { if (!compress_threads) { return; } threads_destroy(compress_threads); compress_threads = NULL; qemu_fclose(dest_file); dest_file = NULL; } static int compress_threads_save_setup(void) { dest_file = qemu_fopen_ops(NULL, &test_write_ops); compress_threads = threads_create(16, "compress", compress_thread_data_init, compress_thread_data_fini, compress_thread_data_handler, compress_thread_data_done); assert(compress_threads); return 0; } static int compress_page_with_multi_thread(uint8_t *addr) { CompressData *cd; ThreadRequest *thread_data; thread_data = threads_submit_request_prepare(compress_threads); if (!thread_data) { compression_counters.busy++; return -1; } cd = container_of(thread_data, CompressData, data); cd->ram_addr = addr; threads_submit_request_commit(compress_threads, thread_data); return 1; } #define MEM_SIZE (30ULL << 30) #define COUNT 5 static void run(void) { void *mem = qemu_memalign(PAGE_SIZE, MEM_SIZE); uint8_t *ptr = mem, *end = mem + MEM_SIZE; uint64_t start_time, total_time = 0, spend, total_busy = 0; int i; memset(mem, 0, MEM_SIZE); start_time = g_get_monotonic_time(); for (i = 0; i < COUNT; i++) { ptr = mem; start_time = g_get_monotonic_time(); while (ptr < end) { *ptr = 0x10; compress_page_with_multi_thread(ptr); ptr += PAGE_SIZE; } flush_compressed_data(); spend = g_get_monotonic_time() - start_time; total_time += spend; printf("RUN %d: BUSY %ld Time Cost %ld.\n", i, compression_counters.busy, spend); total_busy += compression_counters.busy; compression_counters.busy = 0; } printf("AVG: BUSY %ld Time Cost %ld.\n", total_busy / COUNT, total_time / COUNT); } static void compare_zero_and_compression(void) { ThreadRequest *data = compress_thread_data_init(); CompressData *cd; uint64_t start_time, zero_time, compress_time; char page[PAGE_SIZE]; if (!data) { printf("Init compression failed.\n"); return; } cd = container_of(data, CompressData, data); cd->ram_addr = (uint8_t *)page; memset(page, 0, sizeof(page)); dest_file = qemu_fopen_ops(NULL, &test_write_ops); start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); buffer_is_zero(page, PAGE_SIZE); zero_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time; start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); compress_thread_data_handler(data); compress_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time; printf("Zero %ld ns Compression: %ld ns.\n", zero_time, compress_time); compress_thread_data_fini(data); } static void migration_threads(void) { int i; printf("Zero Test vs. compression.\n"); for (i = 0; i < 10; i++) { compare_zero_and_compression(); } printf("test migration threads.\n"); compress_threads_save_setup(); run(); compress_threads_save_cleanup(); } int main(int argc, char **argv) { QTestState *s = NULL; int ret; g_test_init(&argc, &argv, NULL); qtest_add_func("/migration/threads", migration_threads); ret = g_test_run(); if (s) { qtest_quit(s); } return ret; }