+ int size;
+} Packet;
+
+int parse_packet_early(Packet *pkt);
+void connection_hashtable_reset(GHashTable *connection_track_table);
+Packet *packet_new(const void *data, int size);
+void packet_destroy(void *opaque, void *user_data);
+
+#endif /* QEMU_COLO_BASE_H */
diff --git a/net/colo-compare.c b/net/colo-compare.c
index 0402958..7c52cc8 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -27,13 +27,38 @@
#include "sysemu/char.h"
#include "qemu/sockets.h"
#include "qapi-visit.h"
+#include "net/colo-base.h"
+#include "trace.h"
#define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
#define COMPARE_READ_LEN_MAX NET_BUFSIZE
+#define MAX_QUEUE_SIZE 1024
+/*
+ + CompareState ++
+ | |
+ +---------------+ +---------------+ +---------------+
+ |conn list +--->conn +--------->conn |
+ +---------------+ +---------------+ +---------------+
+ | | | | | |
+ +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
+ |primary | |secondary |primary | |secondary
+ |packet | |packet + |packet | |packet +
+ +--------+ +--------+ +--------+ +--------+
+ | | | |
+ +---v----+ +---v----+ +---v----+ +---v----+
+ |primary | |secondary |primary | |secondary
+ |packet | |packet + |packet | |packet +
+ +--------+ +--------+ +--------+ +--------+
+ | | | |
+ +---v----+ +---v----+ +---v----+ +---v----+
+ |primary | |secondary |primary | |secondary
+ |packet | |packet + |packet | |packet +
+ +--------+ +--------+ +--------+ +--------+
+*/
typedef struct CompareState {
Object parent;
@@ -46,12 +71,89 @@ typedef struct CompareState {
QTAILQ_ENTRY(CompareState) next;
SocketReadState pri_rs;
SocketReadState sec_rs;
+
+ /* hashtable to save connection */
+ GHashTable *connection_track_table;
+ /* to save unprocessed_connections */
+ GQueue unprocessed_connections;
+ /* proxy current hash size */
+ uint32_t hashtable_size;
} CompareState;
typedef struct CompareClass {
ObjectClass parent_class;
} CompareClass;
+enum {
+ PRIMARY_IN = 0,
+ SECONDARY_IN,
+};
+
+static int compare_chr_send(CharDriverState *out,
+ const uint8_t *buf,
+ uint32_t size);
+
+/*
+ * Return 0 on success, if return -1 means the pkt
+ * is unsupported(arp and ipv6) and will be sent later
+ */
+static int packet_enqueue(CompareState *s, int mode)
+{
+ Packet *pkt = NULL;
+
+ if (mode == PRIMARY_IN) {
+ pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len);
+ } else {
+ pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len);
+ }
+
+ if (parse_packet_early(pkt)) {
+ packet_destroy(pkt, NULL);
+ pkt = NULL;
+ return -1;
+ }
+ /* TODO: get connection key from pkt */
+
+ /*
+ * TODO: use connection key get conn from
+ * connection_track_table
+ */
+
+ /*
+ * TODO: insert pkt to it's conn->primary_list
+ * or conn->secondary_list
+ */
+
+ return 0;
+}
+
+static int compare_chr_send(CharDriverState *out,
+ const uint8_t *buf,
+ uint32_t size)
+{
+ int ret = 0;
+ uint32_t len = htonl(size);
+
+ if (!size) {
+ return 0;
+ }
+
+ ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
+ if (ret != sizeof(len)) {
+ goto err;
+ }
+
+ ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
+ if (ret != size) {
+ goto err;
+ }
+
+ return 0;
+
+err:
+ return ret < 0 ? ret : -EIO;
+}
+
static char *compare_get_pri_indev(Object *obj, Error **errp)
{
CompareState *s = COLO_COMPARE(obj);
@@ -99,12 +201,21 @@ static void compare_set_outdev(Object *obj, const char
*value, Error **errp)
static void compare_pri_rs_finalize(SocketReadState *pri_rs)
{
- /* if packet_enqueue pri pkt failed we will send unsupported packet */
+ CompareState *s = container_of(pri_rs, CompareState, pri_rs);
+
+ if (packet_enqueue(s, PRIMARY_IN)) {
+ trace_colo_compare_main("primary: unsupported packet in");
+ compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
+ }
}
static void compare_sec_rs_finalize(SocketReadState *sec_rs)
{
- /* if packet_enqueue sec pkt failed we will notify trace */
+ CompareState *s = container_of(sec_rs, CompareState, sec_rs);
+
+ if (packet_enqueue(s, SECONDARY_IN)) {
+ trace_colo_compare_main("secondary: unsupported packet in");
+ }
}
/*
@@ -156,6 +267,10 @@ static void colo_compare_complete(UserCreatable *uc, Error
**errp)
net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
+ s->hashtable_size = 0;
+
+ /* use g_hash_table_new_full() to new a hashtable */
+
return;
}
diff --git a/trace-events b/trace-events
index ca7211b..703de1a 100644
--- a/trace-events
+++ b/trace-events
@@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising FIQ: %d"
aspeed_vic_update_irq(int flags) "Raising IRQ: %d"
aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) "From 0x%" PRIx64
" of size %u: 0x%" PRIx32
aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64 "
of size %u: 0x%" PRIx32
+
+# net/colo-compare.c
+colo_compare_main(const char *chr) ": %s"