qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [RFC PATCH V3 3/4] colo-compare: introduce packet compa


From: Jason Wang
Subject: Re: [Qemu-devel] [RFC PATCH V3 3/4] colo-compare: introduce packet comparison thread
Date: Thu, 28 Apr 2016 15:58:38 +0800
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.6.0


On 04/18/2016 07:11 PM, Zhang Chen wrote:
> if packets are same, we send primary packet and drop secondary
> packet, otherwise notify COLO do checkpoint.
>
> Signed-off-by: Zhang Chen <address@hidden>
> Signed-off-by: Li Zhijian <address@hidden>
> Signed-off-by: Wen Congyang <address@hidden>
> ---
>  net/colo-compare.c | 126 
> +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  trace-events       |   2 +
>  2 files changed, 128 insertions(+)
>
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index dc57eac..4b5a2d4 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -26,6 +26,7 @@
>  #include "qemu/jhash.h"
>  #include "net/eth.h"
>  
> +#define DEBUG_TCP_COMPARE 1
>  #define TYPE_COLO_COMPARE "colo-compare"
>  #define COLO_COMPARE(obj) \
>      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
> @@ -90,6 +91,13 @@ typedef struct CompareState {
>      GQueue unprocessed_connections;
>      /* proxy current hash size */
>      uint32_t hashtable_size;
> +
> +    /* notify compare thread */
> +    QemuEvent event;
> +    /* compare thread, a thread for each NIC */
> +    QemuThread thread;
> +    int thread_status;
> +
>  } CompareState;
>  
>  typedef struct CompareClass {
> @@ -132,6 +140,15 @@ enum {
>      SECONDARY_IN,
>  };
>  
> +enum {
> +    /* compare thread isn't started */
> +    COMPARE_THREAD_NONE,
> +    /* compare thread is running */
> +    COMPARE_THREAD_RUNNING,
> +    /* compare thread exit */
> +    COMPARE_THREAD_EXIT,
> +};
> +
>  static void packet_destroy(void *opaque, void *user_data);
>  static int compare_chr_send(CharDriverState *out,
>                              const uint8_t *buf,
> @@ -336,6 +353,94 @@ static void packet_destroy(void *opaque, void *user_data)
>      g_slice_free(Packet, pkt);
>  }
>  
> +static inline void colo_dump_packet(Packet *pkt)
> +{
> +    int i;
> +    for (i = 0; i < pkt->size; i++) {
> +        printf("%02x ", ((uint8_t *)pkt->data)[i]);
> +    }
> +    printf("\n");

Can we use something like qemu_hexdump() here?

> +}
> +
> +/*
> + * The IP packets sent by primary and secondary
> + * will be compared in here
> + * TODO support ip fragment, Out-Of-Order
> + * return:    0  means packet same
> + *            > 0 || < 0 means packet different
> + */
> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
> +{
> +    trace_colo_compare_with_int("ppkt size", ppkt->size);
> +    trace_colo_compare_with_char("ppkt ip_src", inet_ntoa(ppkt->ip->ip_src));
> +    trace_colo_compare_with_char("ppkt ip_dst", inet_ntoa(ppkt->ip->ip_dst));
> +    trace_colo_compare_with_int("spkt size", spkt->size);
> +    trace_colo_compare_with_char("spkt ip_src", inet_ntoa(spkt->ip->ip_src));
> +    trace_colo_compare_with_char("spkt ip_dst", inet_ntoa(spkt->ip->ip_dst));

Can we use a single tracepoint here instead?

> +
> +    if (ppkt->size == spkt->size) {
> +        return memcmp(ppkt->data, spkt->data, spkt->size);
> +    } else {
> +        return -1;
> +    }
> +}
> +
> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
> +{
> +    trace_colo_compare_main("compare all");
> +    return colo_packet_compare(ppkt, spkt);

Why need this?

> +}
> +
> +/*
> + * called from the compare thread on the primary
> + * for compare connection
> + */
> +static void colo_compare_connection(void *opaque, void *user_data)
> +{
> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +    GList *result = NULL;
> +    int ret;
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    while (!g_queue_is_empty(&conn->primary_list) &&
> +           !g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_head(&conn->primary_list);
> +        result = g_queue_find_custom(&conn->secondary_list,
> +                              pkt, (GCompareFunc)colo_packet_compare_all);
> +
> +        if (result) {
> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
> +            if (ret < 0) {
> +                error_report("colo_send_primary_packet failed");
> +            }
> +            trace_colo_compare_main("packet same and release packet");
> +            g_queue_remove(&conn->secondary_list, result->data);
> +        } else {
> +            trace_colo_compare_main("packet different");
> +            g_queue_push_head(&conn->primary_list, pkt);

Is this possible that the packet from secondary has not been arrived on
time? If yes, do we still need to notify the checkpoint here?

> +            /* TODO: colo_notify_checkpoint();*/
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&conn->list_lock);
> +}
> +
> +static void *colo_compare_thread(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    while (s->thread_status == COMPARE_THREAD_RUNNING) {
> +        qemu_event_wait(&s->event);
> +        qemu_event_reset(&s->event);
> +        qemu_mutex_lock(&s->conn_list_lock);
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
> +        qemu_mutex_unlock(&s->conn_list_lock);
> +    }
> +
> +    return NULL;
> +}
> +
>  static int compare_chr_send(CharDriverState *out,
>                              const uint8_t *buf,
>                              uint32_t size)
> @@ -440,6 +545,8 @@ static void compare_pri_chr_in(void *opaque, const 
> uint8_t *buf, int size)
>          if (packet_enqueue(s, PRIMARY_IN)) {
>              trace_colo_compare_main("primary: unsupported packet in");
>              compare_chr_send(s->chr_out, s->pri_rs.buf, 
> s->pri_rs.packet_len);
> +        } else {
> +            qemu_event_set(&s->event);
>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> @@ -461,6 +568,8 @@ static void compare_sec_chr_in(void *opaque, const 
> uint8_t *buf, int size)
>              trace_colo_compare_main("secondary: unsupported packet in");
>              /* should we send sec arp pkt? */
>              compare_chr_send(s->chr_out, s->sec_rs.buf, 
> s->sec_rs.packet_len);
> +        } else {
> +            qemu_event_set(&s->event);
>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> @@ -519,6 +628,8 @@ static void compare_set_outdev(Object *obj, const char 
> *value, Error **errp)
>  static void colo_compare_complete(UserCreatable *uc, Error **errp)
>  {
>      CompareState *s = COLO_COMPARE(uc);
> +    char thread_name[64];
> +    static int compare_id;
>  
>      if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>          error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -564,6 +675,7 @@ static void colo_compare_complete(UserCreatable *uc, 
> Error **errp)
>      QTAILQ_INSERT_TAIL(&net_compares, s, next);
>  
>      g_queue_init(&s->conn_list);
> +    qemu_event_init(&s->event, false);
>      qemu_mutex_init(&s->conn_list_lock);
>      s->hashtable_size = 0;
>  
> @@ -572,6 +684,13 @@ static void colo_compare_complete(UserCreatable *uc, 
> Error **errp)
>                                                        g_free,
>                                                        connection_destroy);
>  
> +    s->thread_status = COMPARE_THREAD_RUNNING;
> +    sprintf(thread_name, "compare %d", compare_id);
> +    qemu_thread_create(&s->thread, thread_name,
> +                       colo_compare_thread, s,
> +                       QEMU_THREAD_JOINABLE);
> +    compare_id++;
> +
>      return;
>  }
>  
> @@ -607,6 +726,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, 
> void *data)
>          QTAILQ_REMOVE(&net_compares, s, next);
>      }
>      qemu_mutex_destroy(&s->conn_list_lock);
> +
> +    if (s->thread.thread) {
> +        s->thread_status = COMPARE_THREAD_EXIT;
> +        qemu_event_set(&s->event);
> +        qemu_thread_join(&s->thread);
> +    }
> +    qemu_event_destroy(&s->event);
>  }
>  
>  static void colo_compare_init(Object *obj)
> diff --git a/trace-events b/trace-events
> index 8862288..978c47f 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, 
> uint32_t data) "To 0x%" PRIx64
>  
>  # net/colo-compare.c
>  colo_compare_main(const char *chr) "chr: %s"
> +colo_compare_with_int(const char *sta, int size) ": %s = %d"
> +colo_compare_with_char(const char *sta, const char *stb) ": %s = %s"




reply via email to

[Prev in Thread] Current Thread [Next in Thread]