qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet compariso


From: Dr. David Alan Gilbert
Subject: Re: [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread
Date: Wed, 30 Mar 2016 12:41:43 +0100
User-agent: Mutt/1.5.24 (2015-08-30)

* Zhang Chen (address@hidden) wrote:
> if packets are same, we send primary packet and drop secondary
> packet, otherwise notify COLO do checkpoint.
> 
> Signed-off-by: Zhang Chen <address@hidden>
> Signed-off-by: Li Zhijian <address@hidden>
> Signed-off-by: Wen Congyang <address@hidden>
> ---
>  net/colo-compare.c | 122 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 121 insertions(+), 1 deletion(-)
> 
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 0bb5a51..1debc0e 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -36,6 +36,7 @@
>  static QTAILQ_HEAD(, CompareState) net_compares =
>         QTAILQ_HEAD_INITIALIZER(net_compares);
>  static ssize_t hashtable_max_size;
> +static int colo_need_checkpoint;
>  
>  typedef struct ReadState {
>      int state; /* 0 = getting length, 1 = getting data */
> @@ -91,6 +92,13 @@ typedef struct CompareState {
>      GQueue unprocessed_connections;
>      /* proxy current hash size */
>      ssize_t hashtable_size;
> +
> +    /* notify compare thread */
> +    QemuEvent event;
> +    /* compare thread, a thread for each NIC */
> +    QemuThread thread;
> +    int thread_status;
> +
>  } CompareState;
>  
>  typedef struct Packet {
> @@ -129,6 +137,15 @@ enum {
>      SECONDARY_IN,
>  };
>  
> +enum {
> +    /* compare thread isn't started */
> +    COMPARE_THREAD_NONE,
> +    /* compare thread is running */
> +    COMPARE_THREAD_RUNNING,
> +    /* compare thread exit */
> +    COMPARE_THREAD_EXIT,
> +};
> +
>  static void packet_destroy(void *opaque, void *user_data);
>  static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int 
> size);
>  
> @@ -340,6 +357,88 @@ static inline void colo_flush_connection(void *opaque, 
> void *user_data)
>      qemu_mutex_unlock(&conn->list_lock);
>  }
>  
> +static void colo_notify_checkpoint(void)
> +{
> +    colo_need_checkpoint = true;
> +}
> +
> +/* TODO colo_do_checkpoint() {
> + * we flush the connections and reset 'colo_need_checkpoint'
> + * }
> + */
> +
> +static inline void colo_dump_packet(Packet *pkt)
> +{
> +    int i;
> +    for (i = 0; i < pkt->size; i++) {
> +        printf("%02x ", ((uint8_t *)pkt->data)[i]);
> +    }
> +    printf("\n");
> +}
> +
> +/*
> + * The IP packets sent by primary and secondary
> + * will be compared in here
> + * TODO support ip fragment, Out-Of-Order
> + * return:    0  means packet same
> + *            > 0 || < 0 means packet different
> + */
> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
> +{
> +    colo_dump_packet(ppkt);
> +    colo_dump_packet(spkt);

Obviously those need to become conditional on something.

> +    if (ppkt->size == spkt->size) {
> +        return memcmp(ppkt->data, spkt->data, spkt->size);
> +    } else {
> +        return -1;
> +    }
> +}
> +
> +static void colo_compare_connection(void *opaque, void *user_data)
> +{
> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +    GList *result = NULL;
> +    int ret;
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    while (!g_queue_is_empty(&conn->primary_list) &&
> +           !g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_head(&conn->primary_list);
> +        result = g_queue_find_custom(&conn->secondary_list,
> +                              pkt, (GCompareFunc)colo_packet_compare);

I think the order of parameters passed to the colo_packet_compare
is the wrong way around - although it doesn't really matter with your current
simple comparison;  
https://developer.gnome.org/glib/stable/glib-Double-ended-Queues.html
says that

    'The function takes two gconstpointer arguments, the GQueue element's data 
as the
     first argument and the given user data as the second argument'

  so that makes the first argument the element out of the secondary_list and
the second argument the 'pkt' that you popped off the primary.

> +
> +        if (result) {
> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
> +            if (ret < 0) {
> +                error_report("colo_send_primary_packet failed");
> +            }
> +            g_queue_remove(&conn->secondary_list, result);
> +        } else {
> +            g_queue_push_head(&conn->primary_list, pkt);
> +            colo_notify_checkpoint();
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&conn->list_lock);
> +}
> +
> +static void *colo_compare_thread(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    while (s->thread_status == COMPARE_THREAD_RUNNING) {
> +        qemu_event_wait(&s->event);
> +        qemu_event_reset(&s->event);
> +        qemu_mutex_lock(&s->conn_list_lock);
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
> +        qemu_mutex_unlock(&s->conn_list_lock);

Interesting; holding the 'conn_list_lock' around the whole of the comparison
is probably quite expensive if you've got a lot of packets coming in then
the lock could be held for most of the time.
I'm not sure of a better solution; maybe use the qemu/rcu_queue.h ?

> +    }
> +
> +    return NULL;
> +}
> +
>  static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int 
> size)
>  {
>      int ret = 0;
> @@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const 
> uint8_t *buf, int size)
>      if (ret == 1) {
>          if (packet_enqueue(s, PRIMARY_IN)) {
>              error_report("primary: unsupported packet in");
> -            compare_chr_send(s->chr_out, buf, size);
> +            compare_chr_send(s->chr_out, s->pri_rs.buf, 
> s->pri_rs.packet_len);

Doesn't that change belong in an earlier patch?

> +        } else {
> +            qemu_event_set(&s->event);

Also these - why are these in this patch?

>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> @@ -449,6 +550,8 @@ static void compare_sec_chr_in(void *opaque, const 
> uint8_t *buf, int size)
>      if (ret == 1) {
>          if (packet_enqueue(s, SECONDARY_IN)) {
>              error_report("secondary: unsupported packet in");
> +        } else {
> +            qemu_event_set(&s->event);
>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> @@ -504,6 +607,8 @@ static void colo_compare_complete(UserCreatable *uc, 
> Error **errp)
>  {
>      CompareState *s = COLO_COMPARE(uc);
>      struct sysinfo si;
> +    char thread_name[64];
> +    static int compare_id;
>  
>      if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>          error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -552,6 +657,7 @@ static void colo_compare_complete(UserCreatable *uc, 
> Error **errp)
>      g_queue_init(&s->conn_list);
>      qemu_mutex_init(&s->conn_list_lock);
>  
> +    colo_need_checkpoint = false;
>      s->hashtable_size = 0;
>      /*
>       * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
> @@ -572,6 +678,13 @@ static void colo_compare_complete(UserCreatable *uc, 
> Error **errp)
>                                                        g_free,
>                                                        connection_destroy);
>  
> +    s->thread_status = COMPARE_THREAD_RUNNING;
> +    sprintf(thread_name, "proxy compare %d", compare_id);

As with my comment from last month; the thread names are limited
to 14 characters on Linux (and most other Unixes) so keep this short;
I use "proxy:%s" and the device name.

> +    qemu_thread_create(&s->thread, thread_name,
> +                       colo_compare_thread, s,
> +                       QEMU_THREAD_JOINABLE);
> +    compare_id++;
> +
>      return;
>  
>  out:
> @@ -615,6 +728,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, 
> void *data)
>          QTAILQ_REMOVE(&net_compares, s, next);
>      }
>      qemu_mutex_destroy(&s->conn_list_lock);
> +
> +    if (s->thread.thread) {
> +        s->thread_status = COMPARE_THREAD_EXIT;
> +        qemu_event_set(&s->event);
> +        qemu_thread_join(&s->thread);
> +    }
> +    qemu_event_destroy(&s->event);
>  }
>  
>  static void colo_compare_init(Object *obj)
> -- 
> 1.9.1
> 
> 
> 
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK



reply via email to

[Prev in Thread] Current Thread [Next in Thread]