qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in compare_chr_send


From: Lukas Straub
Subject: Re: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in compare_chr_send
Date: Fri, 8 May 2020 08:08:04 +0200

On Fri, 8 May 2020 02:19:00 +0000
"Zhang, Chen" <address@hidden> wrote:
> > > No need to init the notify_sendco each time, because the notify dev just  
> > an optional parameter.  
> > > You can use the if (s->notify_dev) here. Just Xen use the chr_notify_dev. 
> > >  
> > 
> > Ok, I will change that and the code below in the next version.
> >   
> > > Overall, make the chr_send job to coroutine is a good idea. It looks good 
> > >  
> > for me.  
> > > And your patch inspired me, it looks we can re-use the compare_chr_send  
> > code on filter mirror/redirector too.
> > 
> > I already have patch for that, but I don't think it is a good idea, because 
> > the
> > guest then can send packets faster than colo-compare can process. This leads
> > bufferbloat and the performance drops in my tests:
> > Client-to-server tcp:
> > without patch: ~66 Mbit/s
> > with patch: ~59 Mbit/s
> > Server-to-client tcp:
> > without patch: ~702 Kbit/s
> > with patch: ~328 Kbit/s  
> 
> Oh, a big performance drop, is that caused by memcpy/zero_copy parts ? 
> 
> Thanks
> Zhang Chen

No, there is no memcpy overhead with this patch, see below.

Regards,
Lukas Straub

---
 net/filter-mirror.c | 142 +++++++++++++++++++++++++++++++++-----------
 1 file changed, 106 insertions(+), 36 deletions(-)

diff --git a/net/filter-mirror.c b/net/filter-mirror.c
index d83e815545..6bcd317502 100644
--- a/net/filter-mirror.c
+++ b/net/filter-mirror.c
@@ -20,6 +20,8 @@
 #include "chardev/char-fe.h"
 #include "qemu/iov.h"
 #include "qemu/sockets.h"
+#include "block/aio-wait.h"
+#include "qemu/coroutine.h"
 
 #define FILTER_MIRROR(obj) \
     OBJECT_CHECK(MirrorState, (obj), TYPE_FILTER_MIRROR)
@@ -31,6 +33,18 @@
 #define TYPE_FILTER_REDIRECTOR "filter-redirector"
 #define REDIRECTOR_MAX_LEN NET_BUFSIZE
 
+typedef struct SendCo {
+    Coroutine *co;
+    GQueue send_list;
+    bool done;
+    int ret;
+} SendCo;
+
+typedef struct SendEntry {
+    ssize_t size;
+    uint8_t buf[];
+} SendEntry;
+
 typedef struct MirrorState {
     NetFilterState parent_obj;
     char *indev;
@@ -38,59 +52,101 @@ typedef struct MirrorState {
     CharBackend chr_in;
     CharBackend chr_out;
     SocketReadState rs;
+    SendCo sendco;
     bool vnet_hdr;
 } MirrorState;
 
-static int filter_send(MirrorState *s,
-                       const struct iovec *iov,
-                       int iovcnt)
+static void coroutine_fn _filter_send(void *opaque)
 {
+    MirrorState *s = opaque;
+    SendCo *sendco = &s->sendco;
     NetFilterState *nf = NETFILTER(s);
     int ret = 0;
-    ssize_t size = 0;
-    uint32_t len = 0;
-    char *buf;
-
-    size = iov_size(iov, iovcnt);
-    if (!size) {
-        return 0;
-    }
 
-    len = htonl(size);
-    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
-    if (ret != sizeof(len)) {
-        goto err;
-    }
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        uint32_t len = htonl(entry->size);
 
-    if (s->vnet_hdr) {
-        /*
-         * If vnet_hdr = on, we send vnet header len to make other
-         * module(like colo-compare) know how to parse net
-         * packet correctly.
-         */
-        ssize_t vnet_hdr_len;
+        ret = qemu_chr_fe_write_all(&s->chr_out,
+                                    (uint8_t *)&len,
+                                    sizeof(len));
+        if (ret != sizeof(len)) {
+            g_free(entry);
+            goto err;
+        }
 
-        vnet_hdr_len = nf->netdev->vnet_hdr_len;
+        if (s->vnet_hdr) {
+            /*
+             * If vnet_hdr = on, we send vnet header len to make other
+             * module(like colo-compare) know how to parse net
+             * packet correctly.
+             */
+
+            len = htonl(nf->netdev->vnet_hdr_len);
+            ret = qemu_chr_fe_write_all(&s->chr_out,
+                                        (uint8_t *)&len,
+                                        sizeof(len));
+            if (ret != sizeof(len)) {
+                g_free(entry);
+                goto err;
+            }
+        }
 
-        len = htonl(vnet_hdr_len);
-        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
-        if (ret != sizeof(len)) {
+        ret = qemu_chr_fe_write_all(&s->chr_out,
+                                    (uint8_t *)entry->buf,
+                                    entry->size);
+        if (ret != entry->size) {
+            g_free(entry);
             goto err;
         }
-    }
 
-    buf = g_malloc(size);
-    iov_to_buf(iov, iovcnt, 0, buf, size);
-    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
-    g_free(buf);
-    if (ret != size) {
-        goto err;
+        g_free(entry);
     }
 
-    return 0;
+    sendco->ret = 0;
+    goto out;
 
 err:
-    return ret < 0 ? ret : -EIO;
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        g_free(entry);
+    }
+    sendco->ret = ret < 0 ? ret : -EIO;
+out:
+    sendco->co = NULL;
+    sendco->done = true;
+    aio_wait_kick();
+}
+
+static int filter_send(MirrorState *s,
+                       const struct iovec *iov,
+                       int iovcnt)
+{
+    SendCo *sendco = &s->sendco;
+    SendEntry *entry;
+
+    ssize_t size = iov_size(iov, iovcnt);
+    if (!size) {
+        return 0;
+    }
+
+    entry = g_malloc(sizeof(SendEntry) + size);
+    entry->size = size;
+    iov_to_buf(iov, iovcnt, 0, entry->buf, size);
+    g_queue_push_head(&sendco->send_list, entry);
+
+    if (sendco->done) {
+        sendco->co = qemu_coroutine_create(_filter_send, s);
+        sendco->done = false;
+        qemu_coroutine_enter(sendco->co);
+        if (sendco->done) {
+            /* report early errors */
+            return sendco->ret;
+        }
+    }
+
+    /* assume success */
+    return 0;
 }
 
 static void redirector_to_filter(NetFilterState *nf,
@@ -194,6 +250,10 @@ static void filter_mirror_cleanup(NetFilterState *nf)
 {
     MirrorState *s = FILTER_MIRROR(nf);
 
+    AIO_WAIT_WHILE(NULL, !s->sendco.done);
+
+    g_queue_clear(&s->sendco.send_list);
+
     qemu_chr_fe_deinit(&s->chr_out, false);
 }
 
@@ -201,6 +261,10 @@ static void filter_redirector_cleanup(NetFilterState *nf)
 {
     MirrorState *s = FILTER_REDIRECTOR(nf);
 
+    AIO_WAIT_WHILE(NULL, !s->sendco.done);
+
+    g_queue_clear(&s->sendco.send_list);
+
     qemu_chr_fe_deinit(&s->chr_in, false);
     qemu_chr_fe_deinit(&s->chr_out, false);
 }
@@ -224,6 +288,9 @@ static void filter_mirror_setup(NetFilterState *nf, Error 
**errp)
     }
 
     qemu_chr_fe_init(&s->chr_out, chr, errp);
+
+    s->sendco.done = true;
+    g_queue_init(&s->sendco.send_list);
 }
 
 static void redirector_rs_finalize(SocketReadState *rs)
@@ -281,6 +348,9 @@ static void filter_redirector_setup(NetFilterState *nf, 
Error **errp)
             return;
         }
     }
+
+    s->sendco.done = true;
+    g_queue_init(&s->sendco.send_list);
 }
 
 static void filter_mirror_class_init(ObjectClass *oc, void *data)
-- 
2.20.1

Attachment: pgpbiCiMrhRCU.pgp
Description: OpenPGP digital signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]