|
From: | Ori Mamluk |
Subject: | [Qemu-devel] [RFC PATCH v3 8/9] repagent: Work on review of patch V2 - moved to QTAILQ instead of array, replaced pthreads with qemu-thread |
Date: | Thu, 5 Apr 2012 15:20:15 +0300 |
Changed volumes list to QTAILQ instead of an array. Replaced pthreads with qemu-thread Fixed license text --- block.c | 9 --- block/repagent/qemu-repagent.txt | 2 + block/repagent/repagent.c | 142 ++++++++++++++++++-------------------- block/repagent/repagent.h | 5 +- block/repagent/repagent_client.c | 2 +- block/repagent/repagent_client.h | 4 +- block/repagent/repagent_drv.c | 6 +- block/repagent/repcmd_listener.c | 2 +- block/repagent/repcmd_listener.h | 2 +- block/repagent/rephub_cmds.h | 2 +- block/repagent/rephub_defs.h | 2 +- 11 files changed, 82 insertions(+), 96 deletions(-)
diff --git a/block.c b/block.c index 8e11c03..b77ca0f 100644 --- a/block.c +++ b/block.c @@ -1486,13 +1486,6 @@ static int bdrv_rw_co(BlockDriverState *bs, int64_t sector_num, uint8_t *buf, qemu_aio_wait(); } } - /* orim todo remove */ - printf("IO done. is_write %d sec %lld size %d ", is_write, - (long long int) sector_num, nb_sectors); - if (bs->drv != NULL) { - printf("Drv %s, ", bs->drv->format_name); - } - printf("file %s, ret %d\n", bs->filename, rwco.ret); return rwco.ret; }
@@ -1966,8 +1959,6 @@ int64_t bdrv_getlength(BlockDriverState *bs) ret = drv->bdrv_getlength(bs); } } - /* orim todo remove */ - printf("bdrv_getlength returned %d", (int)ret); return ret; }
diff --git a/block/repagent/qemu-repagent.txt b/block/repagent/qemu-repagent.txt index 0f9dc03..af1245c 100644 --- a/block/repagent/qemu-repagent.txt +++ b/block/repagent/qemu-repagent.txt @@ -1,5 +1,7 @@ repagent - replication agent - a Qemu module for enabling continuous async replication of VM volumes
+Ori Mamluk, 2012 + Introduction This document describes a feature in Qemu - a replication agent (named Repagent). The Repagent is a new module that exposes an API to an external replication system (AKA Rephub). diff --git a/block/repagent/repagent.c b/block/repagent/repagent.c index 2e70853..a5c0636 100644 --- a/block/repagent/repagent.c +++ b/block/repagent/repagent.c @@ -1,7 +1,7 @@ /* * QEMU System Emulator replication agent * - * Copyright (c) 2003 Fabrice Bellard + * Copyright (c) 2012 Ori Mamluk * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,7 +24,6 @@ #include <string.h> #include <stdlib.h> #include <stdio.h> -#include <pthread.h> #include <stdint.h>
#include "block.h" @@ -33,21 +32,24 @@ #include "repagent_client.h" #include "repagent.h" #include "rephub_cmds.h" +#include "qemu-queue.h" +#include "qemu-thread.h"
#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj)) -#define REPAGENT_MAX_NUM_VOLUMES (64) #define REPAGENT_VOLUME_ID_NONE (0)
typedef struct RepagentVolume { uint64_t vol_id; - const char *vol_path; + char *vol_path; BlockDriverState *driver_ptr; + QTAILQ_ENTRY(RepagentVolume) list; } RepagentVolume;
struct RepAgentState { bool is_init; int num_volumes; - RepagentVolume *volumes[REPAGENT_MAX_NUM_VOLUMES]; + QTAILQ_HEAD(RepagentVolumesList, RepagentVolume) volumes; + QemuThread disconnectThread; };
typedef struct RepagentReadVolIo { @@ -57,9 +59,9 @@ typedef struct RepagentReadVolIo { struct timeval start_time; } RepagentReadVolIo;
-static int repagent_get_volume_by_driver( +static RepagentVolume *repagent_get_volume_by_driver( BlockDriverState *bs); -static int repagent_get_volume_by_name(const char *name); +static RepagentVolume *repagent_get_volume_by_name(const char *name); static void repagent_report_volumes_to_hub(void); static void repagent_remote_io_done(void *opaque, int ret); static struct timeval tsub(struct timeval t1, struct timeval t2); @@ -68,6 +70,10 @@ RepAgentState g_rep_agent = { 0 };
void repagent_init(const char *hubname, int port) { + QTAILQ_HEAD(RepagentVolumesList, RepagentVolume) tmpHead = + QTAILQ_HEAD_INITIALIZER(tmpHead); + memcpy(&g_rep_agent.volumes, &tmpHead, sizeof(g_rep_agent.volumes)); + /* It is the responsibility of the thread to free this struct */ rephub_params *pParams = g_malloc(sizeof(rephub_params)); if (hubname == NULL) { @@ -82,70 +88,57 @@ void repagent_init(const char *hubname, int port) pParams->port = port; pParams->name = g_strdup(hubname);
- pthread_t thread_id = 0;
/* Create the repagent client listener thread */ - pthread_create(&thread_id, 0, repagent_listen, (void *) pParams); - pthread_detach(thread_id); + qemu_thread_create(&g_rep_agent.disconnectThread, repagent_listen, + (void *) pParams, QEMU_THREAD_DETACHED); }
void repagent_register_drive(const char *drive_path, BlockDriverState *driver_ptr) { /* Assert that the volume is not registered yet */ - int i = repagent_get_volume_by_name(drive_path); - assert(i == -1); - - /*Add the volume at the last place*/ - assert(g_rep_agent.num_volumes < REPAGENT_MAX_NUM_VOLUMES); - - i = g_rep_agent.num_volumes; - g_rep_agent.num_volumes++; + RepagentVolume *vol = repagent_get_volume_by_name(drive_path); + assert(vol == NULL);
printf("zerto repagent: Registering drive. Num drives %d, path %s\n", g_rep_agent.num_volumes, drive_path); - g_rep_agent.volumes[i] = - (RepagentVolume *)g_malloc(sizeof(RepagentVolume)); - g_rep_agent.volumes[i]->driver_ptr = driver_ptr; + RepagentVolume *new_vol = g_malloc(sizeof(RepagentVolume)); + new_vol->driver_ptr = driver_ptr; /* orim todo strcpy? */ - g_rep_agent.volumes[i]->vol_path = drive_path; + new_vol->vol_path = g_strdup(drive_path);
+ + QTAILQ_INSERT_HEAD(&g_rep_agent.volumes, new_vol, list); /* Orim todo thread-safety? */ g_rep_agent.num_volumes++;
repagent_report_volumes_to_hub(); }
-void repagent_deregister_drive(const char *drive_path, - BlockDriverState *driver_ptr) +void repagent_deregister_drive(BlockDriverState *driver_ptr) { /* Orim todo thread-safety? */ - int i = repagent_get_volume_by_driver(driver_ptr); - assert(i != -1); - - RepagentVolume *vol = g_rep_agent.volumes[i]; - /* Put the last volume in the cell of the removed volume to maintain a - * contiguous array */ - g_rep_agent.volumes[i] = g_rep_agent.volumes[g_rep_agent.num_volumes - 1]; - g_rep_agent.volumes[g_rep_agent.num_volumes - 1] = NULL; + RepagentVolume *vol = repagent_get_volume_by_driver(driver_ptr); + assert(vol != NULL); + + QTAILQ_REMOVE(&g_rep_agent.volumes, vol, list); + g_free(vol->vol_path); g_rep_agent.num_volumes--; g_free(vol); - } /* orim todo destruction? */
-static int repagent_get_volume_by_driver( +static RepagentVolume *repagent_get_volume_by_driver( BlockDriverState *bs) { - /* orim todo optimize search */ - int i = 0; - for (i = 0; i < g_rep_agent.num_volumes ; i++) { - RepagentVolume *p_vol = g_rep_agent.volumes[i]; - if (p_vol != NULL && p_vol->driver_ptr == (void *) bs) { - return i; + RepagentVolume *vol = NULL; + QTAILQ_FOREACH(vol, &g_rep_agent.volumes, list) { + if (vol != NULL && vol->driver_ptr == (void *)bs) { + return vol; } } - return -1; + return NULL; }
void repagent_handle_protected_write(BlockDriverState *bs, int64_t sector_num, @@ -160,15 +153,13 @@ void repagent_handle_protected_write(BlockDriverState *bs, int64_t sector_num, printf("\n");
/* orim todo thread safety? */ - int i = repagent_get_volume_by_driver(bs); - if (i == -1 || g_rep_agent.volumes[i]->vol_id == REPAGENT_VOLUME_ID_NONE) { + RepagentVolume *vol = repagent_get_volume_by_driver(bs); + if (vol == NULL || vol->vol_id == REPAGENT_VOLUME_ID_NONE) { /* Unprotected */ printf("Got a write to an unprotected volume.\n"); return; }
- RepagentVolume *p_vol = g_rep_agent.volumes[i]; - /* Report IO to rephub */
int data_size = qiov->size; @@ -184,7 +175,7 @@ void repagent_handle_protected_write(BlockDriverState *bs, int64_t sector_num, qemu_iovec_to_buffer(qiov, pdata); }
- p_cmd->volume_id = p_vol->vol_id; + p_cmd->volume_id = vol->vol_id; p_cmd->offset_sectors = sector_num; p_cmd->size_sectors = nb_sectors; p_cmd->ret_status = ret_status; @@ -197,23 +188,26 @@ void repagent_handle_protected_write(BlockDriverState *bs, int64_t sector_num, static void repagent_report_volumes_to_hub(void) { /* Report IO to rephub */ - int i; RepCmdDataReportVmVolumes *p_cmd_data = NULL; RepCmdReportVmVolumes *p_cmd = (RepCmdReportVmVolumes *) repcmd_new( REPHUB_CMD_REPORT_VM_VOLUMES, g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo), (uint8_t **) &p_cmd_data); + RepagentVolume *vol = QTAILQ_FIRST(&g_rep_agent.volumes); p_cmd->num_volumes = g_rep_agent.num_volumes; printf("reporting %u volumes\n", g_rep_agent.num_volumes); + + int i; for (i = 0; i < g_rep_agent.num_volumes ; i++) { - assert(g_rep_agent.volumes[i] != NULL); + assert(vol != NULL); printf("reporting volume %s size %u\n", - g_rep_agent.volumes[i]->vol_path, + vol->vol_path, (uint32_t) sizeof(p_cmd_data->volumes[i].name)); strncpy((char *) p_cmd_data->volumes[i].name, - g_rep_agent.volumes[i]->vol_path, + vol->vol_path, sizeof(p_cmd_data->volumes[i].name)); - p_cmd_data->volumes[i].volume_id = g_rep_agent.volumes[i]->vol_id; + p_cmd_data->volumes[i].volume_id = vol->vol_id; + vol = QTAILQ_NEXT(vol, list); } if (repagent_client_send((RepCmd *) p_cmd) != 0) { printf("Error sending command\n"); @@ -225,51 +219,49 @@ bool repaget_start_protect(RepCmdStartProtect *pcmd, { printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name, (unsigned long long) pcmd->volume_id); - int vol_index = repagent_get_volume_by_name(pcmd_data->volume_name); + RepagentVolume *vol = repagent_get_volume_by_name(pcmd_data->volume_name); if (g_rep_agent.num_volumes > 0 && strcmp(pcmd_data->volume_name, "stam") == 0) { /* Choose the first one for rephub */ - vol_index = 0; + vol = QTAILQ_LAST(&g_rep_agent.volumes, RepagentVolumesList); } - if (vol_index < 0) { + if (vol == NULL) { printf("The volume doesn't exist\n"); return true; } /* orim todo protect */ - g_rep_agent.volumes[vol_index]->vol_id = pcmd->volume_id; + vol->vol_id = pcmd->volume_id;
return true; }
-static int repagent_get_volume_by_name(const char *name) +static RepagentVolume *repagent_get_volume_by_name(const char *name) { - int i = 0; - for (i = 0; i < g_rep_agent.num_volumes ; i++) { - if (g_rep_agent.volumes[i] != NULL - && strcmp(name, g_rep_agent.volumes[i]->vol_path) == 0) { - return i; + RepagentVolume *vol = NULL; + QTAILQ_FOREACH(vol, &g_rep_agent.volumes, list) { + if (vol != NULL && strcmp(name, vol->vol_path) == 0) { + return vol; } } - return -1; + return NULL; }
-static int repagent_get_volume_by_id(uint64_t vol_id) +static RepagentVolume *repagent_get_volume_by_id(uint64_t vol_id) { - int i = 0; - for (i = 0; i < g_rep_agent.num_volumes ; i++) { - if (g_rep_agent.volumes[i] != NULL - && g_rep_agent.volumes[i]->vol_id == vol_id) { - return i; + RepagentVolume *vol = NULL; + QTAILQ_FOREACH(vol, &g_rep_agent.volumes, list) { + if (vol != NULL && vol->vol_id == vol_id) { + return vol; } } - return -1; + return NULL; }
bool repagent_remote_io(RepCmdRemoteIoReq *pcmd, uint8_t *pdata) { - int index = repagent_get_volume_by_id(pcmd->volume_id); + RepagentVolume *vol = repagent_get_volume_by_id(pcmd->volume_id); int size_bytes = pcmd->size_sectors * 512; - if (index < 0) { + if (vol == NULL) { printf("Vol read - Could not find vol id %llx\n", (unsigned long long int) pcmd->volume_id); RepCmdRemoteIoRes *p_res_cmd = (RepCmdRemoteIoRes *) repcmd_new( @@ -282,7 +274,7 @@ bool repagent_remote_io(RepCmdRemoteIoReq *pcmd, uint8_t *pdata) }
printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n", - g_rep_agent.volumes[index]->driver_ptr, + vol->driver_ptr, (unsigned long long int) pcmd->volume_id, (unsigned long long int) pcmd->offset_sectors, pcmd->size_sectors);
@@ -296,7 +288,7 @@ bool repagent_remote_io(RepCmdRemoteIoReq *pcmd, uint8_t *pdata) qemu_iovec_init(&io_xaction->qiov, 1);
/*read_xact->buf = - qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr, size_bytes); */ + qemu_blockalign(vol->driver_ptr, size_bytes); */ io_xaction->buf = (uint8_t *) g_malloc(size_bytes); io_xaction->rep_cmd = *pcmd; qemu_iovec_add(&io_xaction->qiov, io_xaction->buf, size_bytes); @@ -305,12 +297,12 @@ bool repagent_remote_io(RepCmdRemoteIoReq *pcmd, uint8_t *pdata) /* orim TODO - use the returned acb to cancel the request on shutdown */ /*acb = */ if (pcmd->is_read) { - bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr, + bdrv_aio_readv(vol->driver_ptr, io_xaction->rep_cmd.offset_sectors, &io_xaction->qiov, io_xaction->rep_cmd.size_sectors, repagent_remote_io_done, io_xaction); } else { - bdrv_aio_writev(g_rep_agent.volumes[index]->driver_ptr, + bdrv_aio_writev(vol->driver_ptr, io_xaction->rep_cmd.offset_sectors, &io_xaction->qiov, io_xaction->rep_cmd.size_sectors, repagent_remote_io_done, io_xaction); diff --git a/block/repagent/repagent.h b/block/repagent/repagent.h index 2863ffc..157a9b6 100644 --- a/block/repagent/repagent.h +++ b/block/repagent/repagent.h @@ -1,7 +1,7 @@ /* * QEMU System Emulator * - * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2012 Ori Mamluk * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -42,8 +42,7 @@ void repagent_handle_protected_write(BlockDriverState *bs, int nb_sectors, QEMUIOVector *qiov, int ret_status); void repagent_register_drive(const char *drive_path, BlockDriverState *driver_ptr); -void repagent_deregister_drive(const char *drive_path, - BlockDriverState *driver_ptr); +void repagent_deregister_drive(BlockDriverState *driver_ptr); bool repaget_start_protect(RepCmdStartProtect *pcmd, RepCmdDataStartProtect *pcmd_data); bool repagent_remote_io(struct RepCmdRemoteIoReq *pcmd, uint8_t *pdata); diff --git a/block/repagent/repagent_client.c b/block/repagent/repagent_client.c index 9d826c4..2e57ed0 100644 --- a/block/repagent/repagent_client.c +++ b/block/repagent/repagent_client.c @@ -1,7 +1,7 @@ /* * QEMU System Emulator replication agent - socket client * - * Copyright (c) 2003 Fabrice Bellard + * Copyright (c) 2012 Ori Mamluk * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/block/repagent/repagent_client.h b/block/repagent/repagent_client.h index 62a5377..6eaafed 100644 --- a/block/repagent/repagent_client.h +++ b/block/repagent/repagent_client.h @@ -1,7 +1,7 @@ /* - * QEMU System Emulator + * QEMU System Emulator - replication agent client * - * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2012 Ori Mamluk * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/block/repagent/repagent_drv.c b/block/repagent/repagent_drv.c index 4775166..1795de1 100644 --- a/block/repagent/repagent_drv.c +++ b/block/repagent/repagent_drv.c @@ -1,7 +1,7 @@ /* * QEMU System Emulator replication agent - repagent block driver * - * Copyright (c) 2003 Fabrice Bellard + * Copyright (c) 2012 Ori Mamluk * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -68,6 +68,7 @@ static int coroutine_fn repagent_co_writev(BlockDriverState *bs, static void repagent_close(BlockDriverState *bs) { printf("%s\n", __func__); + repagent_deregister_drive(bs); }
static int coroutine_fn repagent_co_flush(BlockDriverState *bs) @@ -88,11 +89,12 @@ static int repagent_truncate(BlockDriverState *bs, int64_t offset) return bdrv_truncate(bs->file, offset); }
+/* orim todo maybe use probe for planting repagent in every driver */ static int repagent_probe(const uint8_t *buf, int buf_size, const char *filename) { printf("%s\n", __func__); - return 1; /* everything can be opened as raw image */ + return 0; /* everything can be opened as raw image */ }
static int coroutine_fn repagent_co_discard(BlockDriverState *bs, diff --git a/block/repagent/repcmd_listener.c b/block/repagent/repcmd_listener.c index e6b4d74..0998f44 100644 --- a/block/repagent/repcmd_listener.c +++ b/block/repagent/repcmd_listener.c @@ -1,7 +1,7 @@ /* * QEMU System Emulator replication agent - socket commands layer * - * Copyright (c) 2003 Fabrice Bellard + * Copyright (c) 2012 Ori Mamluk * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/block/repagent/repcmd_listener.h b/block/repagent/repcmd_listener.h index 19b9ea9..629cdb5 100644 --- a/block/repagent/repcmd_listener.h +++ b/block/repagent/repcmd_listener.h @@ -1,7 +1,7 @@ /* * QEMU System Emulator * - * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2012 Ori Mamluk * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/block/repagent/rephub_cmds.h b/block/repagent/rephub_cmds.h index cb737e6..5075f33 100644 --- a/block/repagent/rephub_cmds.h +++ b/block/repagent/rephub_cmds.h @@ -1,7 +1,7 @@ /* * QEMU System Emulator * - * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2012 Ori Mamluk * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/block/repagent/rephub_defs.h b/block/repagent/rephub_defs.h index f036f58..575e9f8 100644 --- a/block/repagent/rephub_defs.h +++ b/block/repagent/rephub_defs.h @@ -1,7 +1,7 @@ /* * QEMU System Emulator * - * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2012 Ori Mamluk * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal -- 1.7.6.5 |
[Prev in Thread] | Current Thread | [Next in Thread] |