gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (a5c4a4b1c -> 40537b048)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (a5c4a4b1c -> 40537b048)
Date: Tue, 21 Feb 2017 13:49:31 +0100

This is an automated email from the git hooks/post-receive script.

bart-polot pushed a change to branch master
in repository gnunet.

    from a5c4a4b1c prevent crash on exit
     new 291f4c74d Rewrite cadet tests using MQ API
     new 50d97ba0b - doc
     new e719c7792 - verbose log
     new 40537b048 Fix channel disconnect checking<

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/cadet/.gitignore                               |   1 +
 src/cadet/Makefile.am                              |  28 +-
 src/cadet/cadet_api_new.c                          |   1 +
 .../{cadet_test_lib.c => cadet_test_lib_new.c}     | 135 ++--
 .../{cadet_test_lib.h => cadet_test_lib_new.h}     |  40 +-
 src/cadet/test_cadet.c                             |   4 +-
 src/cadet/test_cadet_local_mq.c                    |   4 +-
 src/cadet/{test_cadet.c => test_cadet_new.c}       | 724 +++++++++------------
 8 files changed, 455 insertions(+), 482 deletions(-)
 copy src/cadet/{cadet_test_lib.c => cadet_test_lib_new.c} (66%)
 copy src/cadet/{cadet_test_lib.h => cadet_test_lib_new.h} (69%)
 copy src/cadet/{test_cadet.c => test_cadet_new.c} (51%)

diff --git a/src/cadet/.gitignore b/src/cadet/.gitignore
index a38b8f495..a73006dae 100644
--- a/src/cadet/.gitignore
+++ b/src/cadet/.gitignore
@@ -21,3 +21,4 @@ test_cadet_local
 test_cadet_single
 gnunet-service-cadet-new
 test_cadet_local_mq
+test_cadet_2_forward_new
\ No newline at end of file
diff --git a/src/cadet/Makefile.am b/src/cadet/Makefile.am
index 1a51453c9..74791d66e 100644
--- a/src/cadet/Makefile.am
+++ b/src/cadet/Makefile.am
@@ -110,7 +110,7 @@ endif
 
 
 if HAVE_TESTING
- noinst_LIBRARIES = libgnunetcadettest.a $(noinst_LIB_EXP)
+ noinst_LIBRARIES = libgnunetcadettest.a libgnunetcadettestnew.a 
$(noinst_LIB_EXP)
  noinst_PROGRAMS = gnunet-cadet-profiler
 endif
 
@@ -124,6 +124,7 @@ libgnunetcadettest_a_LIBADD = \
 if HAVE_TESTING
 check_PROGRAMS = \
   test_cadet_local_mq \
+  test_cadet_2_forward_new \
   test_cadet_single \
   test_cadet_local \
   test_cadet_2_forward \
@@ -245,6 +246,31 @@ test_cadet_5_speed_reliable_backwards_SOURCES = \
 test_cadet_5_speed_reliable_backwards_LDADD = $(ld_cadet_test_lib)
 
 
+# NEW TESTS
+libgnunetcadettestnew_a_SOURCES = \
+  cadet_test_lib_new.c cadet_test_lib_new.h
+libgnunetcadettestnew_a_LIBADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testbed/libgnunettestbed.la \
+ libgnunetcadetnew.la
+
+ld_cadet_test_lib_new = \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/testing/libgnunettesting.la \
+  libgnunetcadetnew.la \
+  libgnunetcadettestnew.a \
+  $(top_builddir)/src/testbed/libgnunettestbed.la \
+  $(top_builddir)/src/statistics/libgnunetstatistics.la
+dep_cadet_test_lib_new = \
+  libgnunetcadetnew.la \
+  libgnunetcadettestnew.a \
+  $(top_builddir)/src/statistics/libgnunetstatistics.la
+
+test_cadet_2_forward_new_SOURCES = \
+  test_cadet_new.c
+test_cadet_2_forward_new_LDADD = $(ld_cadet_test_lib_new)
+
+
 if ENABLE_TEST_RUN
 AM_TESTS_ENVIRONMENT=export 
GNUNET_PREFIX=$${GNUNET_PREFIX:address@hidden@};export 
PATH=$${GNUNET_PREFIX:address@hidden@}/bin:$$PATH;unset XDG_DATA_HOME;unset 
XDG_CONFIG_HOME;
 TESTS = \
diff --git a/src/cadet/cadet_api_new.c b/src/cadet/cadet_api_new.c
index 100c02a69..81bfe1f6f 100644
--- a/src/cadet/cadet_api_new.c
+++ b/src/cadet/cadet_api_new.c
@@ -1182,6 +1182,7 @@ destroy_port_cb (void *cls,
   /* struct GNUNET_CADET_Handle *handle = cls; */
   struct GNUNET_CADET_Port *port = value;
 
+  /* This is a warning, the app should have cleanly closed all open ports */
   GNUNET_break (0);
   GNUNET_CADET_close_port (port);
   return GNUNET_OK;
diff --git a/src/cadet/cadet_test_lib.c b/src/cadet/cadet_test_lib_new.c
similarity index 66%
copy from src/cadet/cadet_test_lib.c
copy to src/cadet/cadet_test_lib_new.c
index 9a70dad49..c3a1540f4 100644
--- a/src/cadet/cadet_test_lib.c
+++ b/src/cadet/cadet_test_lib_new.c
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012 GNUnet e.V.
+     Copyright (C) 2012, 2017 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -24,9 +24,10 @@
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
-#include "cadet_test_lib.h"
+#include "cadet_test_lib_new.h"
 #include "gnunet_cadet_service.h"
 
+
 /**
  * Test context for a CADET Test.
  */
@@ -40,7 +41,7 @@ struct GNUNET_CADET_TEST_Context
   /**
    * Array of handles to the CADET for each peer.
    */
-  struct GNUNET_CADET_Handle **cadetes;
+  struct GNUNET_CADET_Handle **cadets;
 
   /**
    * Operation associated with the connection to the CADET.
@@ -48,6 +49,11 @@ struct GNUNET_CADET_TEST_Context
   struct GNUNET_TESTBED_Operation **ops;
 
   /**
+   * Number of peers running, size of the arrays above.
+   */
+  unsigned int num_peers;
+
+  /**
    * Main function of the test to run once all CADETs are available.
    */
   GNUNET_CADET_TEST_AppMain app_main;
@@ -58,30 +64,35 @@ struct GNUNET_CADET_TEST_Context
   void *app_main_cls;
 
   /**
-   * Number of peers running, size of the arrays above.
+   * Handler for incoming tunnels.
    */
-  unsigned int num_peers;
+  GNUNET_CADET_ConnectEventHandler connects;
 
   /**
-   * Handler for incoming tunnels.
+   * Function called when the transmit window size changes.
    */
-  GNUNET_CADET_InboundChannelNotificationHandler *new_channel;
+  GNUNET_CADET_WindowSizeEventHandler window_changes;
 
   /**
    * Cleaner for destroyed incoming tunnels.
    */
-  GNUNET_CADET_ChannelEndHandler *cleaner;
+  GNUNET_CADET_DisconnectEventHandler disconnects;
 
   /**
    * Message handlers.
    */
-  struct GNUNET_CADET_MessageHandler* handlers;
+  struct GNUNET_MQ_MessageHandler *handlers;
 
   /**
    * Application ports.
    */
   const struct GNUNET_HashCode **ports;
 
+  /**
+   * Number of ports in #ports.
+   */
+  unsigned int port_count;
+
 };
 
 
@@ -94,6 +105,11 @@ struct GNUNET_CADET_TEST_AdapterContext
    * Peer number for the particular peer.
    */
   unsigned int peer;
+
+  /**
+   * Port handlers for open ports.
+   */
+  struct GNUNET_CADET_Port **ports;
  
   /**
    * General context.
@@ -114,26 +130,28 @@ struct GNUNET_CADET_TEST_AdapterContext
  */
 static void *
 cadet_connect_adapter (void *cls,
-                      const struct GNUNET_CONFIGURATION_Handle *cfg)
+                       const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct GNUNET_CADET_TEST_AdapterContext *actx = cls;
   struct GNUNET_CADET_TEST_Context *ctx = actx->ctx;
   struct GNUNET_CADET_Handle *h;
+  unsigned int i;
 
-  h = GNUNET_CADET_connect (cfg,
-                           (void *) (long) actx->peer,
-                           ctx->cleaner,
-                           ctx->handlers);
+  h = GNUNET_CADET_connecT (cfg);
   if (NULL == ctx->ports)
     return h;
 
-  for (int i = 0; NULL != ctx->ports[i]; i++)
+  actx->ports = GNUNET_new_array (ctx->port_count, struct GNUNET_CADET_Port *);
+  for (i = 0; i < ctx->port_count; i++)
   {
-    (void ) GNUNET_CADET_open_port (h, ctx->ports[i],
-                                    ctx->new_channel,
-                                    (void *) (long) actx->peer);
+    actx->ports[i] = GNUNET_CADET_open_porT (h,
+                                             ctx->ports[i],
+                                             ctx->connects,
+                                             (void *) (long) actx->peer,
+                                             ctx->window_changes,
+                                             ctx->disconnects,
+                                             ctx->handlers);
   }
-
   return h;
 }
 
@@ -152,6 +170,15 @@ cadet_disconnect_adapter (void *cls,
   struct GNUNET_CADET_Handle *cadet = op_result;
   struct GNUNET_CADET_TEST_AdapterContext *actx = cls;
 
+  if (NULL != actx->ports)
+  {
+    for (int i = 0; i < actx->ctx->port_count; i++)
+    {
+      GNUNET_CADET_close_port (actx->ports[i]);
+      actx->ports[i] = NULL;
+    }
+    GNUNET_free (actx->ports);
+  }
   GNUNET_free (actx);
   GNUNET_CADET_disconnect (cadet);
 }
@@ -186,18 +213,18 @@ cadet_connect_cb (void *cls,
   for (i = 0; i < ctx->num_peers; i++)
     if (op == ctx->ops[i])
     {
-      ctx->cadetes[i] = ca_result;
+      ctx->cadets[i] = ca_result;
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "...cadet %u connected\n", i);
     }
   for (i = 0; i < ctx->num_peers; i++)
-    if (NULL == ctx->cadetes[i])
+    if (NULL == ctx->cadets[i])
       return; /* still some CADET connections missing */
   /* all CADET connections ready! */
   ctx->app_main (ctx->app_main_cls,
                  ctx,
                  ctx->num_peers,
                  ctx->peers,
-                 ctx->cadetes);
+                 ctx->cadets);
 }
 
 
@@ -213,7 +240,7 @@ GNUNET_CADET_TEST_cleanup (struct GNUNET_CADET_TEST_Context 
*ctx)
     ctx->ops[i] = NULL;
   }
   GNUNET_free (ctx->ops);
-  GNUNET_free (ctx->cadetes);
+  GNUNET_free (ctx->cadets);
   GNUNET_free (ctx);
   GNUNET_SCHEDULER_shutdown ();
 }
@@ -243,12 +270,23 @@ cadet_test_run (void *cls,
   struct GNUNET_CADET_TEST_Context *ctx = cls;
   unsigned int i;
 
+  if (0 != links_failed)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Some links failed (%u), ending\n",
+                links_failed);
+    exit (2);
+  }
+
   if  (num_peers != ctx->num_peers)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peers started %u/%u, ending\n",
                 num_peers, ctx->num_peers);
     exit (1);
   }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Testbed up, %u peers and %u links\n",
+              num_peers, links_succeeded);
   ctx->peers = peers;
   for (i = 0; i < num_peers; i++)
   {
@@ -270,31 +308,52 @@ cadet_test_run (void *cls,
 }
 
 
+/**
+ * Run a test using the given name, configuration file and number of peers.
+ * All cadet callbacks will receive the peer number (long) as the closure.
+ *
+ * @param testname Name of the test (for logging).
+ * @param cfgfile Name of the configuration file.
+ * @param num_peers Number of peers to start.
+ * @param tmain Main function to run once the testbed is ready.
+ * @param tmain_cls Closure for @a tmain.
+ * @param connects Handler for incoming channels.
+ * @param window_changes Handler for the window size change notification.
+ * @param disconnects Cleaner for destroyed incoming channels.
+ * @param handlers Message handlers.
+ * @param ports Ports the peers offer, NULL-terminated.
+ */
 void
-GNUNET_CADET_TEST_run (const char *testname,
-                      const char *cfgname,
-                      unsigned int num_peers,
-                      GNUNET_CADET_TEST_AppMain tmain,
-                      void *tmain_cls,
-                      GNUNET_CADET_InboundChannelNotificationHandler 
new_channel,
-                      GNUNET_CADET_ChannelEndHandler cleaner,
-                      struct GNUNET_CADET_MessageHandler* handlers,
-                      const struct GNUNET_HashCode **ports)
+GNUNET_CADET_TEST_ruN (const char *testname,
+                       const char *cfgfile,
+                       unsigned int num_peers,
+                       GNUNET_CADET_TEST_AppMain tmain,
+                       void *tmain_cls,
+                       GNUNET_CADET_ConnectEventHandler connects,
+                       GNUNET_CADET_WindowSizeEventHandler window_changes,
+                       GNUNET_CADET_DisconnectEventHandler disconnects,
+                       struct GNUNET_MQ_MessageHandler *handlers,
+                       const struct GNUNET_HashCode **ports)
 {
   struct GNUNET_CADET_TEST_Context *ctx;
 
   ctx = GNUNET_new (struct GNUNET_CADET_TEST_Context);
   ctx->num_peers = num_peers;
-  ctx->ops = GNUNET_malloc (num_peers * sizeof (struct 
GNUNET_TESTBED_Operation *));
-  ctx->cadetes = GNUNET_malloc (num_peers * sizeof (struct GNUNET_CADET_Handle 
*));
+  ctx->ops = GNUNET_new_array (num_peers, struct GNUNET_TESTBED_Operation *);
+  ctx->cadets = GNUNET_new_array (num_peers, struct GNUNET_CADET_Handle *);
   ctx->app_main = tmain;
   ctx->app_main_cls = tmain_cls;
-  ctx->new_channel = new_channel;
-  ctx->cleaner = cleaner;
-  ctx->handlers = handlers;
+  ctx->connects = connects;
+  ctx->window_changes = window_changes;
+  ctx->disconnects = disconnects;
+  ctx->handlers = GNUNET_MQ_copy_handlers (handlers);
   ctx->ports = ports;
+  ctx->port_count = 0;
+  while (NULL != ctx->ports[ctx->port_count])
+    ctx->port_count++;
+
   GNUNET_TESTBED_test_run (testname,
-                           cfgname,
+                           cfgfile,
                            num_peers,
                            0LL, NULL, NULL,
                            &cadet_test_run, ctx);
diff --git a/src/cadet/cadet_test_lib.h b/src/cadet/cadet_test_lib_new.h
similarity index 69%
copy from src/cadet/cadet_test_lib.h
copy to src/cadet/cadet_test_lib_new.h
index 464977d42..4b3a6b18d 100644
--- a/src/cadet/cadet_test_lib.h
+++ b/src/cadet/cadet_test_lib_new.h
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012 GNUnet e.V.
+     Copyright (C) 2012,2017 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -49,41 +49,41 @@ struct GNUNET_CADET_TEST_Context;
  * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
  * @param num_peers Number of peers that are running.
  * @param peers Array of peers.
- * @param cadetes Handle to each of the CADETs of the peers.
+ * @param cadets Handle to each of the CADETs of the peers.
  */
 typedef void (*GNUNET_CADET_TEST_AppMain) (void *cls,
                                           struct GNUNET_CADET_TEST_Context 
*ctx,
                                           unsigned int num_peers,
                                           struct GNUNET_TESTBED_Peer **peers,
-                                          struct GNUNET_CADET_Handle 
**cadetes);
+                                          struct GNUNET_CADET_Handle **cadets);
 
 
 /**
- * Run a test using the given name, configuration file and number of
- * peers.
- * All cadet callbacks will receive the peer number as the closure.
+ * Run a test using the given name, configuration file and number of peers.
+ * All cadet callbacks will receive the peer number (long) as the closure.
  *
  * @param testname Name of the test (for logging).
- * @param cfgname Name of the configuration file.
+ * @param cfgfile Name of the configuration file.
  * @param num_peers Number of peers to start.
  * @param tmain Main function to run once the testbed is ready.
- * @param tmain_cls Closure for 'tmain'.
- * @param new_channel Handler for incoming tunnels.
- * @param cleaner Cleaner for destroyed incoming tunnels.
+ * @param tmain_cls Closure for @a tmain.
+ * @param connects Handler for incoming channels.
+ * @param window_changes Handler for the window size change notification.
+ * @param disconnects Cleaner for destroyed incoming channels.
  * @param handlers Message handlers.
  * @param ports Ports the peers offer, NULL-terminated.
  */
 void
-GNUNET_CADET_TEST_run (const char *testname,
-                      const char *cfgname,
-                      unsigned int num_peers,
-                      GNUNET_CADET_TEST_AppMain tmain,
-                      void *tmain_cls,
-                      GNUNET_CADET_InboundChannelNotificationHandler 
new_channel,
-                      GNUNET_CADET_ChannelEndHandler cleaner,
-                      struct GNUNET_CADET_MessageHandler* handlers,
-                      const struct GNUNET_HashCode **ports);
-
+GNUNET_CADET_TEST_ruN (const char *testname,
+                       const char *cfgfile,
+                       unsigned int num_peers,
+                       GNUNET_CADET_TEST_AppMain tmain,
+                       void *tmain_cls,
+                       GNUNET_CADET_ConnectEventHandler connects,
+                       GNUNET_CADET_WindowSizeEventHandler window_changes,
+                       GNUNET_CADET_DisconnectEventHandler disconnects,
+                       struct GNUNET_MQ_MessageHandler *handlers,
+                       const struct GNUNET_HashCode **ports);
 
 /**
  * Clean up the testbed.
diff --git a/src/cadet/test_cadet.c b/src/cadet/test_cadet.c
index f2e639e7a..e57c01be2 100644
--- a/src/cadet/test_cadet.c
+++ b/src/cadet/test_cadet.c
@@ -593,8 +593,8 @@ tmt_rdy (void *cls, size_t size, void *buf)
                 "sending initializer\n");
     msg_size = size_payload + 1000;
     msg->size = htons (msg_size);
-  if (SPEED_ACK == test)
-      data_sent++;
+    if (SPEED_ACK == test)
+        data_sent++;
   }
   else if ( (SPEED == test) ||
             (SPEED_ACK == test) )
diff --git a/src/cadet/test_cadet_local_mq.c b/src/cadet/test_cadet_local_mq.c
index 785a8844f..19bafbed1 100644
--- a/src/cadet/test_cadet_local_mq.c
+++ b/src/cadet/test_cadet_local_mq.c
@@ -168,8 +168,8 @@ disconnected (void *cls,
               const struct GNUNET_CADET_Channel *channel)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "disconnected cls: %p\n",
-              cls);
+              "disconnected channel %p, cls: %p\n",
+              channel, cls);
   if (channel == ch)
     ch = NULL;
 }
diff --git a/src/cadet/test_cadet.c b/src/cadet/test_cadet_new.c
similarity index 51%
copy from src/cadet/test_cadet.c
copy to src/cadet/test_cadet_new.c
index f2e639e7a..d32404815 100644
--- a/src/cadet/test_cadet.c
+++ b/src/cadet/test_cadet_new.c
@@ -18,23 +18,34 @@
      Boston, MA 02110-1301, USA.
 */
 /**
- * @file cadet/test_cadet.c
+ * @file cadet/test_cadet_mq.c
  * @author Bart Polot
  * @author Christian Grothoff
- * @brief Test for the cadet service: retransmission of traffic.
+ * @brief Test for the cadet service using mq API.
  */
 #include <stdio.h>
 #include "platform.h"
-#include "cadet_test_lib.h"
+#include "cadet_test_lib_new.h"
 #include "gnunet_cadet_service.h"
 #include "gnunet_statistics_service.h"
 #include <gauger.h>
 
 
 /**
+ * Ugly workaround to unify data handlers on incoming and outgoing channels.
+ */
+struct CadetTestChannelWrapper
+{
+  /**
+   * Channel pointer.
+   */
+  struct GNUNET_CADET_Channel *ch;
+};
+
+/**
  * How many messages to send
  */
-#define TOTAL_PACKETS 500 /* Cannot exceed 64k! */
+#define TOTAL_PACKETS 500       /* Cannot exceed 64k! */
 
 /**
  * How long until we give up on connecting the peers?
@@ -83,9 +94,9 @@ static int ok;
 static int ok_goal;
 
 /**
- * Size of each test packet
+ * Size of each test packet's payload
  */
-static size_t size_payload = sizeof (struct GNUNET_MessageHeader) + sizeof 
(uint32_t);
+static size_t size_payload = sizeof (uint32_t);
 
 /**
  * Operation to get peer ids.
@@ -158,9 +169,9 @@ static struct GNUNET_SCHEDULER_Task *disconnect_task;
 static struct GNUNET_SCHEDULER_Task *test_task;
 
 /**
- * Task runnining #data_task().
+ * Task runnining #send_next_msg().
  */
-static struct GNUNET_SCHEDULER_Task *data_job;
+static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
 
 /**
  * Cadet handle for the root peer
@@ -175,7 +186,7 @@ static struct GNUNET_CADET_Handle *h2;
 /**
  * Channel handle for the root peer
  */
-static struct GNUNET_CADET_Channel *ch;
+static struct GNUNET_CADET_Channel *outgoing_ch;
 
 /**
  * Channel handle for the dest peer
@@ -183,17 +194,6 @@ static struct GNUNET_CADET_Channel *ch;
 static struct GNUNET_CADET_Channel *incoming_ch;
 
 /**
- * Transmit handle for root data calls
- */
-static struct GNUNET_CADET_TransmitHandle *th;
-
-/**
- * Transmit handle for root data calls
- */
-static struct GNUNET_CADET_TransmitHandle *incoming_th;
-
-
-/**
  * Time we started the data transmission (after channel has been established
  * and initilized).
  */
@@ -225,20 +225,26 @@ static unsigned int ka_received;
 static unsigned int msg_dropped;
 
 
+/******************************************************************************/
+
+
+/******************************************************************************/
+
+
 /**
- * Get the client number considered as the "target" or "receiver", depending on
+ * Get the channel considered as the "target" or "receiver", depending on
  * the test type and size.
  *
- * @return Peer # of the target client, either 0 (for backward tests) or
- *         the last peer in the line (for other tests).
+ * @return Channel handle of the target client, either 0 (for backward tests)
+ *         or the last peer in the line (for other tests).
  */
-static unsigned int
-get_expected_target ()
+static struct GNUNET_CADET_Channel *
+get_target_channel ()
 {
   if (SPEED == test && GNUNET_YES == test_backwards)
-    return 0;
+    return outgoing_ch;
   else
-    return peers_requested - 1;
+    return incoming_ch;
 }
 
 
@@ -251,16 +257,13 @@ show_end_data (void)
   static struct GNUNET_TIME_Absolute end_time;
   static struct GNUNET_TIME_Relative total_time;
 
-  end_time = GNUNET_TIME_absolute_get();
-  total_time = GNUNET_TIME_absolute_get_difference(start_time, end_time);
+  end_time = GNUNET_TIME_absolute_get ();
+  total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time);
   FPRINTF (stderr, "\nResults of test \"%s\"\n", test_name);
   FPRINTF (stderr, "Test time %s\n",
-          GNUNET_STRINGS_relative_time_to_string (total_time,
-                                                  GNUNET_YES));
-  FPRINTF (stderr, "Test bandwidth: %f kb/s\n",
-          4 * TOTAL_PACKETS * 1.0 / (total_time.rel_value_us / 1000)); // 
4bytes * ms
-  FPRINTF (stderr, "Test throughput: %f packets/s\n\n",
-          TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000)); // 
packets * ms
+           GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES));
+  FPRINTF (stderr, "Test bandwidth: %f kb/s\n", 4 * TOTAL_PACKETS * 1.0 / 
(total_time.rel_value_us / 1000));    // 4bytes * ms
+  FPRINTF (stderr, "Test throughput: %f packets/s\n\n", TOTAL_PACKETS * 1000.0 
/ (total_time.rel_value_us / 1000));     // packets * ms
   GAUGER ("CADET", test_name,
           TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000),
           "packets/s");
@@ -281,29 +284,19 @@ disconnect_cadet_peers (void *cls)
 
   disconnect_task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-             "disconnecting cadet service of peers, called from line %ld\n",
-             line);
+              "disconnecting cadet service of peers, called from line %ld\n",
+              line);
   for (i = 0; i < 2; i++)
   {
     GNUNET_TESTBED_operation_done (t_op[i]);
   }
-  if (NULL != ch)
+  if (NULL != outgoing_ch)
   {
-    if (NULL != th)
-    {
-      GNUNET_CADET_notify_transmit_ready_cancel (th);
-      th = NULL;
-    }
-    GNUNET_CADET_channel_destroy (ch);
-    ch = NULL;
+    GNUNET_CADET_channel_destroy (outgoing_ch);
+    outgoing_ch = NULL;
   }
   if (NULL != incoming_ch)
   {
-    if (NULL != incoming_th)
-    {
-      GNUNET_CADET_notify_transmit_ready_cancel (incoming_th);
-      incoming_th = NULL;
-    }
     GNUNET_CADET_channel_destroy (incoming_ch);
     incoming_ch = NULL;
   }
@@ -322,10 +315,10 @@ static void
 shutdown_task (void *cls)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n");
-  if (NULL != data_job)
+  if (NULL != send_next_msg_task)
   {
-    GNUNET_SCHEDULER_cancel (data_job);
-    data_job = NULL;
+    GNUNET_SCHEDULER_cancel (send_next_msg_task);
+    send_next_msg_task = NULL;
   }
   if (NULL != test_task)
   {
@@ -335,8 +328,8 @@ shutdown_task (void *cls)
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
-                                               (void *) __LINE__);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, (void *) __LINE__);
   }
 }
 
@@ -351,17 +344,11 @@ shutdown_task (void *cls)
  *          operation has executed successfully.
  */
 static void
-stats_cont (void *cls,
-            struct GNUNET_TESTBED_Operation *op,
-            const char *emsg)
+stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              " KA sent: %u, KA received: %u\n",
-              ka_sent,
-              ka_received);
-  if ( (KEEPALIVE == test) &&
-       ( (ka_sent < 2) ||
-         (ka_sent > ka_received + 1)) )
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, " KA sent: %u, KA received: %u\n",
+              ka_sent, ka_received);
+  if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1)))
   {
     GNUNET_break (0);
     ok--;
@@ -370,8 +357,7 @@ stats_cont (void *cls,
 
   if (NULL != disconnect_task)
     GNUNET_SCHEDULER_cancel (disconnect_task);
-  disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
-                                             cls);
+  disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, cls);
 }
 
 
@@ -387,11 +373,8 @@ stats_cont (void *cls,
  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
  */
 static int
-stats_iterator (void *cls,
-                const struct GNUNET_TESTBED_Peer *peer,
-                const char *subsystem,
-                const char *name,
-                uint64_t value,
+stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer,
+                const char *subsystem, const char *name, uint64_t value,
                 int is_persistent)
 {
   static const char *s_sent = "# keepalives sent";
@@ -401,19 +384,15 @@ stats_iterator (void *cls,
   uint32_t i;
 
   i = GNUNET_TESTBED_get_index (peer);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "STATS PEER %u - %s [%s]: %llu\n",
-              i,
-              subsystem,
-              name,
-              (unsigned long long) value);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
+              subsystem, name, (unsigned long long) value);
   if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i)
     ka_sent = value;
-  if (0 == strncmp(s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
+  if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
     ka_received = value;
-  if (0 == strncmp(rdrops, name, strlen (rdrops)))
+  if (0 == strncmp (rdrops, name, strlen (rdrops)))
     msg_dropped += value;
-  if (0 == strncmp(cdrops, name, strlen (cdrops)))
+  if (0 == strncmp (cdrops, name, strlen (cdrops)))
     msg_dropped += value;
 
   return GNUNET_OK;
@@ -423,7 +402,7 @@ stats_iterator (void *cls,
 /**
  * Task to gather all statistics.
  *
- * @param cls Closure (NULL).
+ * @param cls Closure (line from which the task was scheduled).
  */
 static void
 gather_stats_and_exit (void *cls)
@@ -432,21 +411,20 @@ gather_stats_and_exit (void *cls)
 
   disconnect_task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-             "gathering statistics from line %d\n",
-             (int) l);
-  if (NULL != ch)
+              "gathering statistics from line %ld\n",
+              l);
+  if (NULL != outgoing_ch)
   {
-    if (NULL != th)
-    {
-      GNUNET_CADET_notify_transmit_ready_cancel (th);
-      th = NULL;
-    }
-    GNUNET_CADET_channel_destroy (ch);
-    ch = NULL;
+    GNUNET_CADET_channel_destroy (outgoing_ch);
+    outgoing_ch = NULL;
   }
-  stats_op = GNUNET_TESTBED_get_statistics (peers_running, testbed_peers,
-                                            "cadet", NULL,
-                                            &stats_iterator, stats_cont, cls);
+  stats_op = GNUNET_TESTBED_get_statistics (peers_running,
+                                            testbed_peers,
+                                            "cadet",
+                                            NULL,
+                                            &stats_iterator,
+                                            stats_cont,
+                                            cls);
 }
 
 
@@ -462,163 +440,126 @@ abort_test (long line)
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Aborting test from %ld\n", line);
-    disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
-                                                (void *) line);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Aborting test from %ld\n", line);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, (void *) line);
   }
 }
 
+
 /**
- * Transmit ready callback.
+ * Send a message on the channel with the appropriate size and payload.
  *
- * @param cls Closure (message type).
- * @param size Size of the tranmist buffer.
- * @param buf Pointer to the beginning of the buffer.
+ * Update the appropriate *_sent counter.
  *
- * @return Number of bytes written to buf.
+ * @param channel Channel to send the message on.
  */
-static size_t
-tmt_rdy (void *cls, size_t size, void *buf);
+static void
+send_test_message (struct GNUNET_CADET_Channel *channel)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_MessageHeader *msg;
+  uint32_t *data;
+  int *counter;
+  int size;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending test message on channel %p\n",
+              channel);
+  size = size_payload;
+  if (GNUNET_NO == initialized)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
+    size += 1000;
+    counter = &data_sent;
+    if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer
+        data_sent++;
+  }
+  else if (SPEED == test || SPEED_ACK == test)
+  {
+    counter = get_target_channel() == channel ? &ack_sent : &data_sent;
+    size += *counter;
+    *counter = *counter + 1;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending message %u\n", *counter);
+  }
+  else
+  {
+    counter =  &ack_sent;
+  }
+  env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
 
+  data = (uint32_t *) &msg[1];
+  *data = htonl (*counter);
+  GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env);
+}
 
 /**
- * Task to request a new data transmission.
+ * Task to request a new data transmission in a SPEED test, without waiting
+ * for previous messages to be sent/arrrive.
  *
- * @param cls Closure (peer #).
+ * @param cls Closure (unused).
  */
 static void
-data_task (void *cls)
+send_next_msg (void *cls)
 {
   struct GNUNET_CADET_Channel *channel;
-  static struct GNUNET_CADET_TransmitHandle **pth;
-  long src;
 
-  data_job = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Data task\n");
-  if (GNUNET_YES == test_backwards)
-  {
-    channel = incoming_ch;
-    pth = &incoming_th;
-    src = peers_requested - 1;
-  }
-  else
-  {
-    channel = ch;
-    pth = &th;
-    src = 0;
-  }
+  send_next_msg_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending next message: %d\n", 
data_sent);
 
+  channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
   GNUNET_assert (NULL != channel);
-  GNUNET_assert (NULL == *pth);
-
-  *pth = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             size_payload + data_sent,
-                                             &tmt_rdy, (void *) src);
-  if (NULL == *pth)
+  GNUNET_assert (SPEED == test);
+  send_test_message (channel);
+  if (data_sent < TOTAL_PACKETS)
   {
-    unsigned long i = (unsigned long) cls;
-
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Retransmission\n");
-    if (0 == i)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "  in 1 ms\n");
-      data_job = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS,
-                                              &data_task, (void *) 1L);
-    }
-    else
-    {
-      i++;
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "in %llu ms\n",
-                  (unsigned long long) i);
-      data_job = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MILLISECONDS,
-                                                                             
i),
-                                              &data_task, (void *) i);
-    }
+    /* SPEED test: Send all messages as soon as possible */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Scheduling message %d\n",
+                data_sent + 1);
+    send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg, NULL);
   }
 }
 
 
 /**
- * Transmit ready callback
+ * Every few messages cancel the timeout task and re-schedule it again, to
+ * avoid timing out when traffic keeps coming.
  *
- * @param cls Closure (peer # which is sending the data).
- * @param size Size of the buffer we have.
- * @param buf Buffer to copy data to.
+ * @param line Code line number to log if a timeout occurs.
  */
-static size_t
-tmt_rdy (void *cls, size_t size, void *buf)
+static void
+reschedule_timeout_task (long line)
 {
-  struct GNUNET_MessageHeader *msg = buf;
-  size_t msg_size;
-  uint32_t *data;
-  long id = (long) cls;
-  unsigned int counter;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "tmt_rdy on %ld, filling buffer\n",
-              id);
-  if (0 == id)
-    th = NULL;
-  else if ((peers_requested - 1) == id)
-    incoming_th = NULL;
-  else
-    GNUNET_assert (0);
-  counter = get_expected_target () == id ? ack_sent : data_sent;
-  msg_size = size_payload + counter;
-  GNUNET_assert (msg_size > sizeof (struct GNUNET_MessageHeader));
-  if ( (size < msg_size) ||
-       (NULL == buf) )
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "size %u, buf %p, data_sent %u, ack_received %u\n",
-                (unsigned int) size,
-                buf,
-                data_sent,
-                ack_received);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ok %u, ok goal %u\n", ok, ok_goal);
-    GNUNET_break (ok >= ok_goal - 2);
-
-    return 0;
-  }
-  msg->size = htons (msg_size);
-  msg->type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
-  data = (uint32_t *) &msg[1];
-  *data = htonl (counter);
-  if (GNUNET_NO == initialized)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "sending initializer\n");
-    msg_size = size_payload + 1000;
-    msg->size = htons (msg_size);
-  if (SPEED_ACK == test)
-      data_sent++;
-  }
-  else if ( (SPEED == test) ||
-            (SPEED_ACK == test) )
+  if ((ok % 10) == 0)
   {
-    if (get_expected_target() == id)
-      ack_sent++;
-    else
-      data_sent++;
-    counter++;
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                " Sent message %u size %u\n",
-                counter,
-                (unsigned int) msg_size);
-    if ( (data_sent < TOTAL_PACKETS) &&
-         (SPEED == test) )
+    if (NULL != disconnect_task)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  " Scheduling message %d\n",
-                  counter + 1);
-      data_job = GNUNET_SCHEDULER_add_now (&data_task, NULL);
+                  " reschedule timeout every 10 messages\n");
+      GNUNET_SCHEDULER_cancel (disconnect_task);
+      disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
+                                                      &gather_stats_and_exit,
+                                                      (void *) line);
     }
   }
+}
 
-  return msg_size;
+
+/**
+ * Check if payload is sane (size contains payload).
+ *
+ * @param cls should match #ch
+ * @param message The actual message.
+ * @return #GNUNET_OK to keep the channel open,
+ *         #GNUNET_SYSERR to close it (signal serious error).
+ */
+static int
+check_data (void *cls, const struct GNUNET_MessageHeader *message)
+{
+  if (sizeof (struct GNUNET_MessageHeader) >= ntohs (message->size))
+    return GNUNET_SYSERR;
+  return GNUNET_OK;             /* all is well-formed */
 }
 
 
@@ -626,75 +567,49 @@ tmt_rdy (void *cls, size_t size, void *buf)
  * Function is called whenever a message is received.
  *
  * @param cls closure (set from GNUNET_CADET_connect(), peer number)
- * @param channel connection to the other end
- * @param channel_ctx place to store local state associated with the channel
  * @param message the actual message
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
  */
-static int
-data_callback (void *cls,
-               struct GNUNET_CADET_Channel *channel,
-               void **channel_ctx,
-               const struct GNUNET_MessageHeader *message)
+static void
+handle_data (void *cls, const struct GNUNET_MessageHeader *message)
 {
-  struct GNUNET_CADET_TransmitHandle **pth;
-  long client = (long) cls;
-  long expected_target_client;
+  struct CadetTestChannelWrapper *ch = cls;
+  struct GNUNET_CADET_Channel *channel = ch->ch;
   uint32_t *data;
   uint32_t payload;
-  unsigned int counter;
+  int *counter;
 
   ok++;
-  counter = get_expected_target () == client ? data_received : ack_received;
+  counter = get_target_channel () == channel ? &data_received : &ack_received;
 
-  GNUNET_CADET_receive_done (channel);
+  reschedule_timeout_task ((long) __LINE__);
 
-  if ((ok % 10) == 0)
+  if (channel == outgoing_ch)
   {
-    if (NULL != disconnect_task)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                  " reschedule timeout\n");
-      GNUNET_SCHEDULER_cancel (disconnect_task);
-      disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                      &gather_stats_and_exit,
-                                                      (void *) __LINE__);
-    }
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Root client got a message!\n");
   }
-
-  switch (client)
+  else if (channel == incoming_ch)
   {
-  case 0L:
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Root client got a message!\n");
-    GNUNET_assert (channel == ch);
-    pth = &th;
-    break;
-  case 1L:
-  case 4L:
-    GNUNET_assert (client == peers_requested - 1);
-    GNUNET_assert (channel == incoming_ch);
-    pth = &incoming_th;
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client %ld got a message.\n",
-                client);
-    break;
-  default:
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Client %ld not valid.\n", client);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client got a message.\n");
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unknown channel %p.\n", channel);
     GNUNET_assert (0);
   }
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: (%d/%d)\n", ok, ok_goal);
   data = (uint32_t *) &message[1];
   payload = ntohl (*data);
-  if (payload == counter)
+  if (payload == *counter)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, " payload as expected: %u\n", payload);
   }
   else
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " payload %u, expected: %u\n",
-                payload, counter);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                " payload %u, expected: %u\n",
+                payload, *counter);
   }
-  expected_target_client = get_expected_target ();
 
   if (GNUNET_NO == initialized)
   {
@@ -702,189 +617,151 @@ data_callback (void *cls,
     start_time = GNUNET_TIME_absolute_get ();
     if (SPEED == test)
     {
-      GNUNET_assert (peers_requested - 1 == client);
-      data_job = GNUNET_SCHEDULER_add_now (&data_task, NULL);
-      return GNUNET_OK;
+      GNUNET_assert (incoming_ch == channel);
+      send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg, NULL);
+      return;
     }
   }
 
-  counter++;
-  if (client == expected_target_client) /* Normally 4 */
+  (*counter)++;
+  if (get_target_channel () == channel) /* Got "data" */
   {
-    data_received++;
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received data %u\n", data_received);
     if (SPEED != test || (ok_goal - 2) == ok)
     {
       /* Send ACK */
-      GNUNET_assert (NULL == *pth);
-      *pth = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                                 GNUNET_TIME_UNIT_FOREVER_REL,
-                                                 size_payload + ack_sent,
-                                                 &tmt_rdy, (void *) client);
-      return GNUNET_OK;
+      send_test_message (channel);
+      return;
     }
     else
     {
       if (data_received < TOTAL_PACKETS)
-        return GNUNET_OK;
+        return;
     }
   }
-  else /* Normally 0 */
+  else /* Got "ack" */
   {
     if (SPEED_ACK == test || SPEED == test)
     {
-      ack_received++;
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
-      /* send more data */
-      GNUNET_assert (NULL == *pth);
-      *pth = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                                 GNUNET_TIME_UNIT_FOREVER_REL,
-                                                 size_payload + data_sent,
-                                                 &tmt_rdy, (void *) client);
+      /* Send more data */
+      send_test_message (channel);
       if (ack_received < TOTAL_PACKETS && SPEED != test)
-        return GNUNET_OK;
+        return;
       if (ok == 2 && SPEED == test)
-        return GNUNET_OK;
-      show_end_data();
+        return;
+      show_end_data ();
     }
     if (test == P2P_SIGNAL)
     {
-      if (NULL != incoming_th)
-      {
-        GNUNET_CADET_notify_transmit_ready_cancel (incoming_th);
-        incoming_th = NULL;
-      }
       GNUNET_CADET_channel_destroy (incoming_ch);
       incoming_ch = NULL;
     }
     else
     {
-      if (NULL != th)
-      {
-        GNUNET_CADET_notify_transmit_ready_cancel (th);
-        th = NULL;
-      }
-      GNUNET_CADET_channel_destroy (ch);
-      ch = NULL;
+      GNUNET_CADET_channel_destroy (outgoing_ch);
+      outgoing_ch = NULL;
     }
   }
-
-  return GNUNET_OK;
 }
 
 
 /**
- * Data handlers for every message type of CADET's payload.
- * {callback_function, message_type, size_expected}
- */
-static struct GNUNET_CADET_MessageHandler handlers[] = {
-  {&data_callback,
-   GNUNET_MESSAGE_TYPE_DUMMY,
-   sizeof (struct GNUNET_MessageHeader)},
-  {NULL, 0, 0}
-};
-
-
-/**
- * Method called whenever another peer has added us to a channel
- * the other peer initiated.
+ * Method called whenever a peer connects to a port in MQ-based CADET.
  *
- * @param cls Closure.
+ * @param cls Closure from #GNUNET_CADET_open_porT (peer # as long).
  * @param channel New handle to the channel.
- * @param initiator Peer that started the channel.
- * @param port Port this channel is connected to.
- * @param options channel option flags
- * @return Initial channel context for the channel
- *         (can be NULL -- that's not an error).
+ * @param source Peer that started this channel.
+ * @return Closure for the incoming @a channel. It's given to:
+ *         - The #GNUNET_CADET_DisconnectEventHandler (given to
+ *           #GNUNET_CADET_open_porT) when the channel dies.
+ *         - Each the #GNUNET_MQ_MessageCallback handlers for each message
+ *           received on the @a channel.
  */
 static void *
-incoming_channel (void *cls,
-                  struct GNUNET_CADET_Channel *channel,
-                  const struct GNUNET_PeerIdentity *initiator,
-                  const struct GNUNET_HashCode *port,
-                  enum GNUNET_CADET_ChannelOption options)
+connect_handler (void *cls, struct GNUNET_CADET_Channel *channel,
+                 const struct GNUNET_PeerIdentity *source)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Incoming channel from %s to peer %d:%s\n",
-              GNUNET_i2s (initiator),
-              (int) (long) cls, GNUNET_h2s (port));
+  struct CadetTestChannelWrapper *ch;
+  long peer = (long) cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Incoming channel from %s to peer %ld\n",
+              GNUNET_i2s (source), peer);
   ok++;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok);
-  if ((long) cls == peers_requested - 1)
+  if (peer == peers_requested - 1)
   {
     if (NULL != incoming_ch)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Duplicate incoming channel for client %lu\n",
-                  (long) cls);
-      GNUNET_break(0);
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Duplicate incoming channel for client %lu\n", (long) cls);
+      GNUNET_assert (0);
     }
     incoming_ch = channel;
   }
   else
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Incoming channel for unexpected peer #%lu\n",
-                (long) cls);
-    GNUNET_break (0);
+                "Incoming channel for unexpected peer #%lu\n", (long) cls);
+    GNUNET_assert (0);
   }
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                    &gather_stats_and_exit,
-                                                    (void *) __LINE__);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_delayed (SHORT_TIME, &gather_stats_and_exit,
+                                      (void *) __LINE__);
   }
 
-  return NULL;
+  /* TODO: cannot return channel as-is, in order to unify the data handlers */
+  ch = GNUNET_new (struct CadetTestChannelWrapper);
+  ch->ch = channel;
+
+  return ch;
 }
 
 
 /**
- * Function called whenever an inbound channel is destroyed.  Should clean up
- * any associated state.
+ * Function called whenever an MQ-channel is destroyed, even if the destruction
+ * was requested by #GNUNET_CADET_channel_destroy.
+ * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
  *
- * @param cls closure (set from GNUNET_CADET_connect, peer number)
- * @param channel connection to the other end (henceforth invalid)
- * @param channel_ctx place where local state associated
- *                   with the channel is stored
+ * It should clean up any associated state, including cancelling any pending
+ * transmission on this channel.
+ *
+ * @param cls Channel closure (channel wrapper).
+ * @param channel Connection to the other end (henceforth invalid).
  */
 static void
-channel_cleaner (void *cls,
-                 const struct GNUNET_CADET_Channel *channel,
-                 void *channel_ctx)
+disconnect_handler (void *cls, const struct GNUNET_CADET_Channel *channel)
 {
-  long i = (long) cls;
+  struct CadetTestChannelWrapper *ch_w = cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Incoming channel disconnected at peer %ld\n",
-              i);
-  if (peers_running - 1 == i)
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Channel disconnected\n");
+  GNUNET_assert (ch_w->ch == channel);
+  if (channel == incoming_ch)
   {
     ok++;
-    GNUNET_break (channel == incoming_ch);
     incoming_ch = NULL;
   }
-  else if (0L == i)
+  else if (outgoing_ch == channel
+  )
   {
     if (P2P_SIGNAL == test)
     {
       ok++;
     }
-    GNUNET_break (channel == ch);
-    ch = NULL;
+    outgoing_ch = NULL;
   }
   else
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Unknown peer! %d\n",
-                (int) i);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Unknown channel! %p\n", channel);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok);
 
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_now (&gather_stats_and_exit,
-                                                (void *) __LINE__);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&gather_stats_and_exit, (void *) __LINE__);
   }
 }
 
@@ -898,13 +775,20 @@ channel_cleaner (void *cls,
  * @param cls Closure (unused).
  */
 static void
-do_test (void *cls)
+start_test (void *cls)
 {
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DUMMY,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
+  struct CadetTestChannelWrapper *ch;
   enum GNUNET_CADET_ChannelOption flags;
 
   test_task = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "do_test\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "start_test\n");
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
@@ -918,30 +802,30 @@ do_test (void *cls)
     flags |= GNUNET_CADET_OPTION_RELIABLE;
   }
 
-  ch = GNUNET_CADET_channel_create (h1,
-                                    NULL,
-                                    p_id[1],
-                                    &port,
-                                    flags);
+  ch = GNUNET_new (struct CadetTestChannelWrapper);
+  outgoing_ch = GNUNET_CADET_channel_creatE (h1,
+                                             ch,
+                                             p_id[1],
+                                             &port,
+                                             flags,
+                                             NULL,
+                                             &disconnect_handler,
+                                             handlers);
+  ch->ch = outgoing_ch;
 
-  disconnect_task
-    = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                    &gather_stats_and_exit,
-                                    (void *) __LINE__);
+  disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
+                                                  &gather_stats_and_exit,
+                                                  (void *) __LINE__);
   if (KEEPALIVE == test)
-    return; /* Don't send any data. */
+    return;                     /* Don't send any data. */
+
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending data initializer...\n");
   data_received = 0;
   data_sent = 0;
   ack_received = 0;
   ack_sent = 0;
-  th = GNUNET_CADET_notify_transmit_ready (ch,
-                                           GNUNET_NO,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           size_payload + 1000,
-                                           &tmt_rdy, (void *) 0L);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending data initializer...\n");
+  send_test_message (outgoing_ch);
 }
 
 
@@ -955,35 +839,26 @@ do_test (void *cls)
  *             NULL if the operation is successfull
  */
 static void
-pi_cb (void *cls,
-       struct GNUNET_TESTBED_Operation *op,
-       const struct GNUNET_TESTBED_PeerInformation *pinfo,
-       const char *emsg)
+pi_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
+       const struct GNUNET_TESTBED_PeerInformation *pinfo, const char *emsg)
 {
   long i = (long) cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "id callback for %ld\n", i);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ID callback for %ld\n", i);
 
-  if ( (NULL == pinfo) ||
-       (NULL != emsg) )
+  if ((NULL == pinfo) || (NULL != emsg))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "pi_cb: %s\n", emsg);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "pi_cb: %s\n", emsg);
     abort_test (__LINE__);
     return;
   }
   p_id[i] = pinfo->result.id;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "  id: %s\n", GNUNET_i2s (p_id[i]));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  id: %s\n", GNUNET_i2s (p_id[i]));
   p_ids++;
   if (p_ids < 2)
     return;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Got all IDs, starting test\n");
-  test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
-                                            &do_test,
-                                            NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got all IDs, starting test\n");
+  test_task = GNUNET_SCHEDULER_add_now (&start_test, NULL);
 }
 
 
@@ -994,7 +869,7 @@ pi_cb (void *cls,
  * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
  * @param num_peers Number of peers that are running.
  * @param peers Array of peers.
- * @param cadetes Handle to each of the CADETs of the peers.
+ * @param cadets Handle to each of the CADETs of the peers.
  */
 static void
 tmain (void *cls,
@@ -1017,10 +892,12 @@ tmain (void *cls,
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
   t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0],
                                                  GNUNET_TESTBED_PIT_IDENTITY,
-                                                 &pi_cb, (void *) 0L);
+                                                 &pi_cb,
+                                                 (void *) 0L);
   t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1],
                                                  GNUNET_TESTBED_PIT_IDENTITY,
-                                                 &pi_cb, (void *) 1L);
+                                                 &pi_cb,
+                                                 (void *) 1L);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n");
 }
 
@@ -1031,10 +908,19 @@ tmain (void *cls,
 int
 main (int argc, char *argv[])
 {
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DUMMY,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
+
   initialized = GNUNET_NO;
   static const struct GNUNET_HashCode *ports[2];
   const char *config_file;
   char port_id[] = "test port";
+
   GNUNET_CRYPTO_hash (port_id, sizeof (port_id), &port);
 
   GNUNET_log_setup ("test", "DEBUG", NULL);
@@ -1137,22 +1023,22 @@ main (int argc, char *argv[])
   p_ids = 0;
   ports[0] = &port;
   ports[1] = NULL;
-  GNUNET_CADET_TEST_run ("test_cadet_small",
-                        config_file,
-                        peers_requested,
-                        &tmain,
-                        NULL, /* tmain cls */
-                        &incoming_channel,
-                        &channel_cleaner,
-                        handlers,
-                        ports);
+  GNUNET_CADET_TEST_ruN ("test_cadet_small",
+                         config_file,
+                         peers_requested,
+                         &tmain,
+                         NULL,        /* tmain cls */
+                         &connect_handler,
+                         NULL,
+                         &disconnect_handler,
+                         handlers,
+                         ports);
   if (NULL != strstr (argv[0], "_reliable"))
-    msg_dropped = 0; /* dropped should be retransmitted */
+    msg_dropped = 0;            /* dropped should be retransmitted */
 
   if (ok_goal > ok - msg_dropped)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "FAILED! (%d/%d)\n", ok, ok_goal);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "FAILED! (%d/%d)\n", ok, ok_goal);
     return 1;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "success\n");

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]