qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH 1/2] colo: compare the packet based on the tcp s


From: Zhang Chen
Subject: Re: [Qemu-devel] [PATCH 1/2] colo: compare the packet based on the tcp sequence number
Date: Mon, 4 Dec 2017 09:41:25 +0800

On Tue, Nov 28, 2017 at 8:04 PM, Mao Zhongyi <address@hidden>
wrote:

>     The primary and secondary guest has the same TCP stream, but the
>     the packet sizes are different due to the different fragmentation.
>
>     In the current impletation, compare the packet with the size of
>     payload, but packets of the same size and payload are very few,
>     so it triggers checkopint frequently, which leads to a very low
>     performance of the tcp packet comparison. In addtion, the method
>     of comparing the size of packet is not correct in itself.
>
>     like that:
>     We send this payload:
>     ------------------------------
>     | header |1|2|3|4|5|6|7|8|9|0|
>     ------------------------------
>
>     primary:
>     ppkt1:
>     ----------------
>     | header |1|2|3|
>     ----------------
>     ppkt2:
>     ------------------------
>     | header |4|5|6|7|8|9|0|
>     ------------------------
>
>     secondary:
>     spkt1:
>     ------------------------------
>     | header |1|2|3|4|5|6|7|8|9|0|
>     ------------------------------
>
>     In the original method, ppkt1 and ppkt2 are diffrent in size and
>     spkt1, so they can't compare and trigger the checkpoint.
>
>     I have tested FTP get 200M and 1G file many times, I found that
>     the performance was less than 1% of the native.
>
>     Now I reconstructed the comparison of TCP packets based on the
>     TCP sequence number. first of all, ppkt1 and spkt1 have the same
>     starting sequence number, so they can compare, even though their
>     length is different. And then ppkt1 with a smaller payload length
>     is used as the comparison length, if the payload is same, send
>     out the ppkt1 and record the offset(the length of ppkt1 payload)
>     in spkt1. The next comparison, ppkt2 and spkt1 can be compared
>     from the recorded position of spkt1.
>
>     like that:
>     ----------------
>     | header |1|2|3| ppkt1
>     ---------|-----|
>              |     |
>     ---------v-----v--------------
>     | header |1|2|3|4|5|6|7|8|9|0| spkt1
>     ---------------|\------------|
>                    | \offset     |
>           ---------v-------------v
>           | header |4|5|6|7|8|9|0| ppkt2
>           ------------------------
>
>     In this way, the performance can reach native 20% in my multiple
>     tests.
>
> Cc: Zhang Chen <address@hidden>
> Cc: Li Zhijian <address@hidden>
> Cc: Jason Wang <address@hidden>
>
> Reported-by: Zhang Chen <address@hidden>
> Signed-off-by: Mao Zhongyi <address@hidden>
> Signed-off-by: Li Zhijian <address@hidden>
> ---
>  net/colo-compare.c | 300 ++++++++++++++++++++++++++++++
> +++--------------------
>  net/colo.c         |   8 ++
>  net/colo.h         |  14 +++
>  3 files changed, 211 insertions(+), 111 deletions(-)
>
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 1ce195f..0752e9f 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -38,6 +38,9 @@
>  #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>  #define MAX_QUEUE_SIZE 1024
>
> +#define COLO_COMPARE_FREE_PRIMARY     0x01
> +#define COLO_COMPARE_FREE_SECONDARY   0x02
> +
>  /* TODO: Should be configurable */
>  #define REGULAR_PACKET_CHECK_MS 3000
>
> @@ -112,14 +115,31 @@ static gint seq_sorter(Packet *a, Packet *b,
> gpointer data)
>      return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
>  }
>
> +static void fill_pkt_seq(void *data, uint32_t *max_ack)
> +{
> +    Packet *pkt = data;
> +    struct tcphdr *tcphd;
> +
> +    tcphd = (struct tcphdr *)pkt->transport_header;
> +
> +    pkt->tcp_seq = ntohl(tcphd->th_seq);
> +    pkt->tcp_ack = ntohl(tcphd->th_ack);
> +    *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
> +    pkt->hdsize = pkt->transport_header - (uint8_t *)pkt->data
> +                  + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
> +    pkt->pdsize = pkt->size - pkt->hdsize;
>


In this function you are not just "fill_pkt_seq", use "fill_pkt_tcp_info"
is more suitable.
And use "header_size" and "payload_size" instead of "hdsize" and "pdsize"
maybe better to read.



> +    pkt->seq_end = pkt->tcp_seq + pkt->pdsize;
> +}
> +
>  /*
>   * Return 1 on success, if return 0 means the
>   * packet will be dropped
>   */
> -static int colo_insert_packet(GQueue *queue, Packet *pkt)
> +static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t
> *max_ack)
>  {
>      if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
>          if (pkt->ip->ip_p == IPPROTO_TCP) {
> +            fill_pkt_seq(pkt, max_ack);
>              g_queue_insert_sorted(queue,
>                                    pkt,
>                                    (GCompareDataFunc)seq_sorter,
> @@ -169,12 +189,12 @@ static int packet_enqueue(CompareState *s, int mode,
> Connection **con)
>      }
>
>      if (mode == PRIMARY_IN) {
> -        if (!colo_insert_packet(&conn->primary_list, pkt)) {
> +        if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
>              error_report("colo compare primary queue size too big,"
>                           "drop packet");
>          }
>      } else {
> -        if (!colo_insert_packet(&conn->secondary_list, pkt)) {
> +        if (!colo_insert_packet(&conn->secondary_list, pkt,
> &conn->sack)) {
>              error_report("colo compare secondary queue size too big,"
>                           "drop packet");
>          }
> @@ -184,6 +204,167 @@ static int packet_enqueue(CompareState *s, int mode,
> Connection **con)
>      return 0;
>  }
>
> +static inline bool after(uint32_t seq1, uint32_t seq2)
> +{
> +        return (int32_t)(seq1 - seq2) > 0;
> +}
> +
> +static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
> +{
> +    int ret;
> +    ret = compare_chr_send(s,
> +                           pkt->data,
> +                           pkt->size,
> +                           pkt->vnet_hdr_len);
> +    if (ret < 0) {
> +        error_report("colo send primary packet failed");
> +    }
> +    trace_colo_compare_main("packet same and release packet");
> +    packet_destroy(pkt, NULL);
> +}
>

This function codes duplicate with that in colo_compare_connection(),
We'd better reuse it.



> +
> +static bool colo_compare_payload(Packet *ppkt, Packet *spkt,
> +                                 uint16_t poff, uint16_t soff,
> +                                 uint16_t len)
> +{
> +    if (memcmp(ppkt->data + poff, spkt->data + soff, len)) {
> +        trace_colo_compare_main("the payload is not same");
> +        return false;
> +    }
> +    return true;
> +}
>

This function looks like colo_packet_compare_common(),
Why we need add a new one?



> +
> +/*
> + * return true means that the payload is consist and
> + * need to make the next comparison, false means do
> + * the checkpoint
> + */
> +static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
> +                              int8_t *mark, uint32_t max_ack)
> +{
> +    *mark = 0;
> +
> +    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end)
> {
> +        if (colo_compare_payload(ppkt, spkt, ppkt->hdsize, spkt->hdsize,
> +                                 ppkt->hdsize)) {
> +            *mark = COLO_COMPARE_FREE_SECONDARY |
> COLO_COMPARE_FREE_PRIMARY;
> +            return true;
> +        }
> +    }
> +
> +    /* one part of secondary packet payload still need to be compared */
> +    if (!after(ppkt->seq_end, spkt->seq_end)) {
> +        if (colo_compare_payload(ppkt, spkt, ppkt->hdsize + ppkt->offset,
> +                                 spkt->hdsize + spkt->offset,
> +                                 ppkt->pdsize - ppkt->offset)) {
> +            if (!after(ppkt->tcp_ack, max_ack)) {
> +                *mark = COLO_COMPARE_FREE_PRIMARY;
> +                spkt->offset += ppkt->pdsize - ppkt->offset;
> +                return true;
> +            } else {
> +                /* secondary guest hasn't ack the data, don't send
> +                 * out this packet
> +                 */
> +                return false;
> +            }
> +        }
> +    } else {
> +        /* primary packet is longer than secondary packet, compare
> +         * the same part and mark the primary packet offset
> +         */
> +        if (colo_compare_payload(ppkt, spkt, ppkt->hdsize + ppkt->offset,
> +                                 spkt->hdsize + spkt->offset,
> +                                 spkt->pdsize - spkt->offset)) {
> +            *mark = COLO_COMPARE_FREE_SECONDARY;
> +            ppkt->offset += spkt->pdsize - spkt->offset;
> +            return true;
> +        }
> +    }
> +
> +    return false;
> +}
> +
> +static void colo_compare_tcp(CompareState *s, Connection *conn)
> +{
> +    Packet *ppkt = NULL, *spkt = NULL;
> +    int8_t mark;
>


You should add more comments about the "max_ack".


> +    uint32_t max_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
> +
> +pri:
> +    if (g_queue_is_empty(&conn->primary_list)) {
> +        return;
> +    }
> +    ppkt = g_queue_pop_head(&conn->primary_list);
> +sec:
> +    if (g_queue_is_empty(&conn->secondary_list)) {
> +        g_queue_push_head(&conn->primary_list, ppkt);
> +        return;
> +    }
> +    spkt = g_queue_pop_head(&conn->secondary_list);
> +
> +    if (ppkt->tcp_seq == ppkt->seq_end) {
> +        colo_release_primary_pkt(s, ppkt);
> +        ppkt = NULL;
> +    }
> +
> +    if (ppkt && conn->compare_seq && !after(ppkt->seq_end,
> conn->compare_seq)) {
> +        trace_colo_compare_main("pri: pkt has compared & posted,
> destroy");
> +        packet_destroy(ppkt, NULL);
> +        ppkt = NULL;
> +    }
> +
> +    if (spkt->tcp_seq == spkt->seq_end) {
> +        packet_destroy(spkt, NULL);
> +        if (!ppkt) {
> +            goto pri;
> +        } else {
> +            goto sec;
> +        }
> +    } else {
> +        if (conn->compare_seq && !after(spkt->seq_end,
> conn->compare_seq)) {
> +            trace_colo_compare_main("sec: pkt has compared & posted,
> destroy");
> +            packet_destroy(spkt, NULL);
> +            if (!ppkt) {
> +                goto pri;
> +            } else {
> +                goto sec;
> +            }
> +        }
> +        if (!ppkt) {
> +            g_queue_push_head(&conn->secondary_list, spkt);
> +            goto pri;
> +        }
> +    }
> +
> +    if (colo_mark_tcp_pkt(ppkt, spkt, &mark, max_ack)) {
> +        if (mark == COLO_COMPARE_FREE_PRIMARY) {
> +            conn->compare_seq = ppkt->seq_end;
> +            colo_release_primary_pkt(s, ppkt);
> +            g_queue_push_head(&conn->secondary_list, spkt);
> +            goto pri;
> +        }
> +        if (mark == COLO_COMPARE_FREE_SECONDARY) {
> +            conn->compare_seq = spkt->seq_end;
> +            packet_destroy(spkt, NULL);
> +            goto sec;
> +        }
> +        if (mark == (COLO_COMPARE_FREE_PRIMARY |
> COLO_COMPARE_FREE_SECONDARY)) {
> +            conn->compare_seq = ppkt->seq_end;
> +            colo_release_primary_pkt(s, ppkt);
> +            packet_destroy(spkt, NULL);
> +            goto pri;
> +        }
> +    } else {
> +        g_queue_push_head(&conn->primary_list, ppkt);
> +        g_queue_push_head(&conn->secondary_list, spkt);
> +
> +        /*
> +         * colo_compare_inconsistent_notify();
> +         * TODO: notice to checkpoint();
> +         */
> +    }
> +}
> +
>  /*
>   * The IP packets sent by primary and secondary
>   * will be compared in here
> @@ -224,110 +405,6 @@ static int colo_packet_compare_common(Packet *ppkt,
>
>  /*
>   * Called from the compare thread on the primary
> - * for compare tcp packet
> - * compare_tcp copied from Dr. David Alan Gilbert's branch
> - */
> -static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt)
> -{
> -    struct tcphdr *ptcp, *stcp;
> -    int res;
> -
> -    trace_colo_compare_main("compare tcp");
> -
> -    ptcp = (struct tcphdr *)ppkt->transport_header;
> -    stcp = (struct tcphdr *)spkt->transport_header;
> -
> -    /*
> -     * The 'identification' field in the IP header is *very* random
> -     * it almost never matches.  Fudge this by ignoring differences in
> -     * unfragmented packets; they'll normally sort themselves out if
> different
> -     * anyway, and it should recover at the TCP level.
> -     * An alternative would be to get both the primary and secondary to
> rewrite
> -     * somehow; but that would need some sync traffic to sync the state
> -     */
> -    if (ntohs(ppkt->ip->ip_off) & IP_DF) {
> -        spkt->ip->ip_id = ppkt->ip->ip_id;
> -        /* and the sum will be different if the IDs were different */
> -        spkt->ip->ip_sum = ppkt->ip->ip_sum;
> -    }
> -
> -    /*
> -     * Check tcp header length for tcp option field.
> -     * th_off > 5 means this tcp packet have options field.
> -     * The tcp options maybe always different.
> -     * for example:
> -     * From RFC 7323.
> -     * TCP Timestamps option (TSopt):
> -     * Kind: 8
> -     *
> -     * Length: 10 bytes
> -     *
> -     *    +-------+-------+---------------------+---------------------+
> -     *    |Kind=8 |  10   |   TS Value (TSval)  |TS Echo Reply (TSecr)|
> -     *    +-------+-------+---------------------+---------------------+
> -     *       1       1              4                     4
> -     *
> -     * In this case the primary guest's timestamp always different with
> -     * the secondary guest's timestamp. COLO just focus on payload,
> -     * so we just need skip this field.
> -     */
> -    if (ptcp->th_off > 5) {
> -        ptrdiff_t ptcp_offset, stcp_offset;
> -
> -        ptcp_offset = ppkt->transport_header - (uint8_t *)ppkt->data
> -                      + (ptcp->th_off * 4) - ppkt->vnet_hdr_len;
> -        stcp_offset = spkt->transport_header - (uint8_t *)spkt->data
> -                      + (stcp->th_off * 4) - spkt->vnet_hdr_len;
> -
> -        /*
> -         * When network is busy, some tcp options(like sack) will
> unpredictable
> -         * occur in primary side or secondary side. it will make packet
> size
> -         * not same, but the two packet's payload is identical. colo just
> -         * care about packet payload, so we skip the option field.
> -         */
> -        res = colo_packet_compare_common(ppkt, spkt, ptcp_offset,
> stcp_offset);
> -    } else if (ptcp->th_sum == stcp->th_sum) {
> -        res = colo_packet_compare_common(ppkt, spkt, ETH_HLEN, ETH_HLEN);
> -    } else {
> -        res = -1;
> -    }
> -
> -    if (res != 0 &&
> -        trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
> -        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20],
> sec_ip_dst[20];
> -
> -        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
> -        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
> -        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
> -        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
> -
> -        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
> -                                   pri_ip_dst, spkt->size,
> -                                   sec_ip_src, sec_ip_dst);
> -
> -        trace_colo_compare_tcp_info("pri tcp packet",
> -                                    ntohl(ptcp->th_seq),
> -                                    ntohl(ptcp->th_ack),
> -                                    res, ptcp->th_flags,
> -                                    ppkt->size);
> -
> -        trace_colo_compare_tcp_info("sec tcp packet",
> -                                    ntohl(stcp->th_seq),
> -                                    ntohl(stcp->th_ack),
> -                                    res, stcp->th_flags,
> -                                    spkt->size);
> -
> -        qemu_hexdump((char *)ppkt->data, stderr,
> -                     "colo-compare ppkt", ppkt->size);
> -        qemu_hexdump((char *)spkt->data, stderr,
> -                     "colo-compare spkt", spkt->size);
> -    }
> -
> -    return res;
> -}
> -
> -/*
> - * Called from the compare thread on the primary
>   * for compare udp packet
>   */
>  static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
> @@ -492,14 +569,15 @@ static void colo_compare_connection(void *opaque,
> void *user_data)
>      GList *result = NULL;
>      int ret;
>
> +    if (conn->ip_proto == IPPROTO_TCP) {
> +        colo_compare_tcp(s, conn);
> +        return;
> +    }
> +
>      while (!g_queue_is_empty(&conn->primary_list) &&
>             !g_queue_is_empty(&conn->secondary_list)) {
>          pkt = g_queue_pop_head(&conn->primary_list);
>          switch (conn->ip_proto) {
> -        case IPPROTO_TCP:
> -            result = g_queue_find_custom(&conn->secondary_list,
> -                     pkt, (GCompareFunc)colo_packet_compare_tcp);
> -            break;
>

I think we should put colo_compare_tcp in here, like other protocol.
If this compare loop can't satisfy your needs(like goto pri/sec), you can
fix this loop that make it more general,
rather than give TCP a privilege between other protocol(like compare
firstly).


Thanks
Zhang Chen



>          case IPPROTO_UDP:
>              result = g_queue_find_custom(&conn->secondary_list,
>                       pkt, (GCompareFunc)colo_packet_compare_udp);
> diff --git a/net/colo.c b/net/colo.c
> index a39d600..1743522 100644
> --- a/net/colo.c
> +++ b/net/colo.c
> @@ -138,6 +138,8 @@ Connection *connection_new(ConnectionKey *key)
>      conn->processing = false;
>      conn->offset = 0;
>      conn->syn_flag = 0;
> +    conn->pack = 0;
> +    conn->sack = 0;
>      g_queue_init(&conn->primary_list);
>      g_queue_init(&conn->secondary_list);
>
> @@ -163,6 +165,12 @@ Packet *packet_new(const void *data, int size, int
> vnet_hdr_len)
>      pkt->size = size;
>      pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>      pkt->vnet_hdr_len = vnet_hdr_len;
> +    pkt->tcp_seq = 0;
> +    pkt->tcp_ack = 0;
> +    pkt->seq_end = 0;
> +    pkt->hdsize = 0;
> +    pkt->pdsize = 0;
> +    pkt->offset = 0;
>
>      return pkt;
>  }
> diff --git a/net/colo.h b/net/colo.h
> index 0658e86..97bc41e 100644
> --- a/net/colo.h
> +++ b/net/colo.h
> @@ -45,6 +45,14 @@ typedef struct Packet {
>      int64_t creation_ms;
>      /* Get vnet_hdr_len from filter */
>      uint32_t vnet_hdr_len;
> +    uint32_t tcp_seq; /* sequence number */
> +    uint32_t tcp_ack; /* acknowledgement number */
> +    /* the sequence number of the last byte of the packet */
> +    uint32_t seq_end;
> +    uint8_t hdsize;  /* the header length */
> +    uint16_t pdsize; /* the payload length */
> +    /* record the payload offset(the length that has been compared) */
> +    uint16_t offset;
>  } Packet;
>
>  typedef struct ConnectionKey {
> @@ -64,6 +72,12 @@ typedef struct Connection {
>      /* flag to enqueue unprocessed_connections */
>      bool processing;
>      uint8_t ip_proto;
> +    /* record the sequence number that has been compared */
> +    uint32_t compare_seq;
> +    /* the maximum of acknowledgement number in primary_list queue */
> +    uint32_t pack;
> +    /* the maximum of acknowledgement number in secondary_list queue */
> +    uint32_t sack;
>      /* offset = secondary_seq - primary_seq */
>      tcp_seq  offset;
>      /*
> --
> 2.9.4
>
>
>
>


reply via email to

[Prev in Thread] Current Thread [Next in Thread]