+ * and secondary.
+ */
+ uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
+
+pri:
+ if (g_queue_is_empty(&conn->primary_list)) {
+ return;
}
+ ppkt = g_queue_pop_head(&conn->primary_list);
+sec:
+ if (g_queue_is_empty(&conn->secondary_list)) {
+ g_queue_push_head(&conn->primary_list, ppkt);
+ return;
+ }
+ spkt = g_queue_pop_head(&conn->secondary_list);
- /*
- * Check tcp header length for tcp option field.
- * th_off > 5 means this tcp packet have options field.
- * The tcp options maybe always different.
- * for example:
- * From RFC 7323.
- * TCP Timestamps option (TSopt):
- * Kind: 8
- *
- * Length: 10 bytes
- *
- * +-------+-------+---------------------+---------------------+
- * |Kind=8 | 10 | TS Value (TSval) |TS Echo Reply (TSecr)|
- * +-------+-------+---------------------+---------------------+
- * 1 1 4 4
- *
- * In this case the primary guest's timestamp always different with
- * the secondary guest's timestamp. COLO just focus on payload,
- * so we just need skip this field.
- */
+ if (ppkt->tcp_seq == ppkt->seq_end) {
+ colo_release_primary_pkt(s, ppkt);
+ ppkt = NULL;
+ }
- ptrdiff_t ptcp_offset, stcp_offset;
+ if (ppkt && conn->compare_seq && !after(ppkt->seq_end,
conn->compare_seq)) {
+ trace_colo_compare_main("pri: this packet has compared");
+ colo_release_primary_pkt(s, ppkt);
+ ppkt = NULL;
+ }
- ptcp_offset = ppkt->transport_header - (uint8_t *)ppkt->data
- + (ptcp->th_off << 2) - ppkt->vnet_hdr_len;
- stcp_offset = spkt->transport_header - (uint8_t *)spkt->data
- + (stcp->th_off << 2) - spkt->vnet_hdr_len;
- if (ppkt->size - ptcp_offset == spkt->size - stcp_offset) {
- res = colo_compare_packet_payload(ppkt, spkt,
- ptcp_offset, stcp_offset,
- ppkt->size - ptcp_offset);
+ if (spkt->tcp_seq == spkt->seq_end) {
+ packet_destroy(spkt, NULL);
+ if (!ppkt) {
+ goto pri;
+ } else {
+ goto sec;
+ }
} else {
- trace_colo_compare_main("TCP: payload size of packets are
different");
- res = -1;
+ if (conn->compare_seq && !after(spkt->seq_end,
conn->compare_seq)) {
+ trace_colo_compare_main("sec: this packet has compared");
+ packet_destroy(spkt, NULL);
+ if (!ppkt) {
+ goto pri;
+ } else {
+ goto sec;
+ }
+ }
+ if (!ppkt) {
+ g_queue_push_head(&conn->secondary_list, spkt);
+ goto pri;
+ }
}
- if (res != 0 &&
- trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
- char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20],
sec_ip_dst[20];
-
- strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
- strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
- strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
- strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
-
- trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
- pri_ip_dst, spkt->size,
- sec_ip_src, sec_ip_dst);
-
- trace_colo_compare_tcp_info("pri tcp packet",
- ntohl(ptcp->th_seq),
- ntohl(ptcp->th_ack),
- res, ptcp->th_flags,
- ppkt->size);
-
- trace_colo_compare_tcp_info("sec tcp packet",
- ntohl(stcp->th_seq),
- ntohl(stcp->th_ack),
- res, stcp->th_flags,
- spkt->size);
+ if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
+ trace_colo_compare_tcp_info("pri",
+ ppkt->tcp_seq, ppkt->tcp_ack,
+ ppkt->header_size, ppkt->payload_size,
+ ppkt->offset, ppkt->flags);
+
+ trace_colo_compare_tcp_info("sec",
+ spkt->tcp_seq, spkt->tcp_ack,
+ spkt->header_size, spkt->payload_size,
+ spkt->offset, spkt->flags);
+
+ if (mark == COLO_COMPARE_FREE_PRIMARY) {
+ conn->compare_seq = ppkt->seq_end;
+ colo_release_primary_pkt(s, ppkt);
+ g_queue_push_head(&conn->secondary_list, spkt);
+ goto pri;
+ }
+ if (mark == COLO_COMPARE_FREE_SECONDARY) {
+ conn->compare_seq = spkt->seq_end;
+ packet_destroy(spkt, NULL);
+ goto sec;
+ }
+ if (mark == (COLO_COMPARE_FREE_PRIMARY |
COLO_COMPARE_FREE_SECONDARY)) {
+ conn->compare_seq = ppkt->seq_end;
+ colo_release_primary_pkt(s, ppkt);
+ packet_destroy(spkt, NULL);
+ goto pri;
+ }
+ } else {
+ g_queue_push_head(&conn->primary_list, ppkt);
+ g_queue_push_head(&conn->secondary_list, spkt);
qemu_hexdump((char *)ppkt->data, stderr,
"colo-compare ppkt", ppkt->size);
qemu_hexdump((char *)spkt->data, stderr,
"colo-compare spkt", spkt->size);
- }
- return res;
+ /*
+ * colo_compare_inconsistent_notify();
+ * TODO: notice to checkpoint();
+ */
+ }
}
+
/*
* Called from the compare thread on the primary
* for compare udp packet
@@ -477,53 +588,22 @@ static void colo_old_packet_check(void *opaque)
(GCompareFunc)colo_old_packet_check_one_conn);
}
-/*
- * Called from the compare thread on the primary
- * for compare packet with secondary list of the
- * specified connection when a new packet was
- * queued to it.
- */
-static void colo_compare_connection(void *opaque, void *user_data)
+static void colo_compare_packet(CompareState *s, Connection *conn,
+ int (*HandlePacket)(Packet *spkt,
+ Packet *ppkt))
{
- CompareState *s = user_data;
- Connection *conn = opaque;
Packet *pkt = NULL;
GList *result = NULL;
- int ret;
while (!g_queue_is_empty(&conn->primary_list) &&
!g_queue_is_empty(&conn->secondary_list)) {
pkt = g_queue_pop_head(&conn->primary_list);
- switch (conn->ip_proto) {
- case IPPROTO_TCP:
- result = g_queue_find_custom(&conn->secondary_list,
- pkt, (GCompareFunc)colo_packet_compare_tcp);
- break;
- case IPPROTO_UDP:
- result = g_queue_find_custom(&conn->secondary_list,
- pkt, (GCompareFunc)colo_packet_compare_udp);
- break;
- case IPPROTO_ICMP:
- result = g_queue_find_custom(&conn->secondary_list,
- pkt, (GCompareFunc)colo_packet_compare_icmp);
- break;
- default:
- result = g_queue_find_custom(&conn->secondary_list,
- pkt, (GCompareFunc)colo_packet_compare_other);
- break;
- }
+ result = g_queue_find_custom(&conn->secondary_list,
+ pkt, (GCompareFunc)HandlePacket);
if (result) {
- ret = compare_chr_send(s,
- pkt->data,
- pkt->size,
- pkt->vnet_hdr_len);
- if (ret < 0) {
- error_report("colo_send_primary_packet failed");
- }
- trace_colo_compare_main("packet same and release packet");
+ colo_release_primary_pkt(s, pkt);
g_queue_remove(&conn->secondary_list, result->data);
- packet_destroy(pkt, NULL);
} else {
/*
* If one packet arrive late, the secondary_list or
@@ -538,6 +618,33 @@ static void colo_compare_connection(void *opaque,
void *user_data)
}
}
+/*
+ * Called from the compare thread on the primary
+ * for compare packet with secondary list of the
+ * specified connection when a new packet was
+ * queued to it.
+ */
+static void colo_compare_connection(void *opaque, void *user_data)
+{
+ CompareState *s = user_data;
+ Connection *conn = opaque;
+
+ switch (conn->ip_proto) {
+ case IPPROTO_TCP:
+ colo_compare_tcp(s, conn);
+ break;
+ case IPPROTO_UDP:
+ colo_compare_packet(s, conn, colo_packet_compare_udp);
+ break;
+ case IPPROTO_ICMP:
+ colo_compare_packet(s, conn, colo_packet_compare_icmp);
+ break;
+ default:
+ colo_compare_packet(s, conn, colo_packet_compare_other);
+ break;
+ }
+}
+
static int compare_chr_send(CompareState *s,
const uint8_t *buf,
uint32_t size,
diff --git a/net/colo.c b/net/colo.c
index a39d600..8426265 100644
--- a/net/colo.c
+++ b/net/colo.c
@@ -138,6 +138,8 @@ Connection *connection_new(ConnectionKey *key)
conn->processing = false;
conn->offset = 0;
conn->syn_flag = 0;
+ conn->pack = 0;
+ conn->sack = 0;
g_queue_init(&conn->primary_list);
g_queue_init(&conn->secondary_list);
@@ -163,6 +165,13 @@ Packet *packet_new(const void *data, int size, int
vnet_hdr_len)
pkt->size = size;
pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
pkt->vnet_hdr_len = vnet_hdr_len;
+ pkt->tcp_seq = 0;
+ pkt->tcp_ack = 0;
+ pkt->seq_end = 0;
+ pkt->header_size = 0;
+ pkt->payload_size = 0;
+ pkt->offset = 0;
+ pkt->flags = 0;
return pkt;
}
diff --git a/net/colo.h b/net/colo.h
index 0658e86..da6c36d 100644
--- a/net/colo.h
+++ b/net/colo.h
@@ -45,6 +45,15 @@ typedef struct Packet {
int64_t creation_ms;
/* Get vnet_hdr_len from filter */
uint32_t vnet_hdr_len;
+ uint32_t tcp_seq; /* sequence number */
+ uint32_t tcp_ack; /* acknowledgement number */
+ /* the sequence number of the last byte of the packet */
+ uint32_t seq_end;
+ uint8_t header_size; /* the header length */
+ uint16_t payload_size; /* the payload length */
+ /* record the payload offset(the length that has been compared) */
+ uint16_t offset;
+ uint8_t flags; /* Flags(aka Control bits) */
} Packet;
typedef struct ConnectionKey {
@@ -64,6 +73,12 @@ typedef struct Connection {
/* flag to enqueue unprocessed_connections */
bool processing;
uint8_t ip_proto;
+ /* record the sequence number that has been compared */
+ uint32_t compare_seq;
+ /* the maximum of acknowledgement number in primary_list queue */
+ uint32_t pack;
+ /* the maximum of acknowledgement number in secondary_list queue */
+ uint32_t sack;
/* offset = secondary_seq - primary_seq */
tcp_seq offset;
/*
diff --git a/net/trace-events b/net/trace-events
index 938263d..7b594cf 100644
--- a/net/trace-events
+++ b/net/trace-events
@@ -13,7 +13,7 @@ colo_compare_icmp_miscompare(const char *sta, int size)
": %s = %d"
colo_compare_ip_info(int psize, const char *sta, const char *stb, int
ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s,
ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
colo_old_packet_check_found(int64_t old_time) "%" PRId64
colo_compare_miscompare(void) ""
-colo_compare_tcp_info(const char *pkt, uint32_t seq, uint32_t ack, int
res, uint32_t flag, int size) "side: %s seq/ack= %u/%u res= %d flags= 0x%x
pkt_size: %d\n"
+colo_compare_tcp_info(const char *pkt, uint32_t seq, uint32_t ack, int
hdlen, int pdlen, int offset, int flags) "%s: seq/ack= %u/%u hdlen= %d
pdlen= %d offset= %d flags=%d\n"
# net/filter-rewriter.c
colo_filter_rewriter_debug(void) ""
--
2.9.4