gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated (b7abb3a11 -> 4eeb7fbf0)


From: gnunet
Subject: [gnunet] branch master updated (b7abb3a11 -> 4eeb7fbf0)
Date: Mon, 25 Jan 2021 15:58:02 +0100

This is an automated email from the git hooks/post-receive script.

t3sserakt pushed a change to branch master
in repository gnunet.

    from b7abb3a11 fixes
     new ac7116582 - fixed bug in tcp com challenge logic. added test case for 
bidirectional test.
     new 4eeb7fbf0 Merge branch 'master' of ssh://gnunet.org/gnunet

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .buildbot/firefly-x86_64-amdepyc_test_tng.sh |   2 +-
 src/transport/Makefile.am                    |  18 +-
 src/transport/gnunet-communicator-tcp.c      | 654 ++++++++++++++-------------
 src/transport/gnunet-communicator-udp.c      |   1 +
 src/transport/test_communicator_basic.c      | 570 ++++++++++++++---------
 5 files changed, 706 insertions(+), 539 deletions(-)

diff --git a/.buildbot/firefly-x86_64-amdepyc_test_tng.sh 
b/.buildbot/firefly-x86_64-amdepyc_test_tng.sh
index ff504ae57..f247f5a69 100755
--- a/.buildbot/firefly-x86_64-amdepyc_test_tng.sh
+++ b/.buildbot/firefly-x86_64-amdepyc_test_tng.sh
@@ -3,6 +3,6 @@
 # echo "Skipped"
 
 pushd src/transport
-make check TESTS='test_communicator_basic-tcp test_communicator_rekey-tcp 
test_communicator_basic-unix test_communicator_basic-udp 
test_communicator_backchannel-udp test_communicator_rekey-udp'
+make check TESTS='test_communicator_basic-tcp test_communicator_rekey-tcp 
test_communicator_basic-unix test_communicator_basic-udp 
test_communicator_backchannel-udp test_communicator_rekey-udp 
test_communicator_bidirect-tcp'
 pkill --signal 9 -U buildbot gnunet
 popd
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index a2fc3811e..0251b001e 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -618,7 +618,8 @@ check_PROGRAMS += \
   test_communicator_basic-udp \
   test_communicator_rekey-tcp \
   test_communicator_rekey-udp \
-  test_communicator_backchannel-udp
+  test_communicator_backchannel-udp \
+  test_communicator_bidirect-tcp
 endif
 endif
 
@@ -696,7 +697,8 @@ TESTS += \
   test_communicator_basic-udp \
   test_communicator_rekey-tcp \
   test_communicator_rekey-udp \
-  test_communicator_backchannel-udp
+  test_communicator_backchannel-udp \
+  test_communicator_bidirect-tcp
 endif
 endif
 
@@ -857,6 +859,14 @@ test_communicator_backchannel_udp_LDADD = \
  $(top_builddir)/src/testing/libgnunettesting.la \
  $(top_builddir)/src/util/libgnunetutil.la \
  $(top_builddir)/src/statistics/libgnunetstatistics.la
+
+test_communicator_bidirect_tcp_SOURCES = \
+ test_communicator_basic.c
+test_communicator_bidirect_tcp_LDADD = \
+ libgnunettransporttesting2.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/statistics/libgnunetstatistics.la
 endif
 
 test_plugin_unix_SOURCES = \
@@ -1567,4 +1577,6 @@ test_communicator_tcp_rekey_peer2.conf \
 test_communicator_udp_rekey_peer1.conf \
 test_communicator_udp_rekey_peer2.conf \
 test_communicator_udp_backchannel_peer1.conf \
-test_communicator_udp_backchannel_peer2.conf
+test_communicator_udp_backchannel_peer2.conf \
+test_communicator_tcp_bidirect_peer1.conf \
+test_communicator_tcp_bidirect_peer2.conf
diff --git a/src/transport/gnunet-communicator-tcp.c 
b/src/transport/gnunet-communicator-tcp.c
index 0c79fc1b4..ed82dba9f 100644
--- a/src/transport/gnunet-communicator-tcp.c
+++ b/src/transport/gnunet-communicator-tcp.c
@@ -589,6 +589,11 @@ struct Queue
    */
   struct ChallengeNonceP challenge;
 
+  /**
+   * Challenge value received. In case of inbound connection we have to 
remember the value, because we send the challenge back later after we received 
the GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_CONFIRMATION_ACK.
+   */
+  struct ChallengeNonceP challenge_received;
+
   /**
    * Iteration Context for retrieving the monotonic time send with key for 
rekeying.
    */
@@ -834,7 +839,7 @@ int addrs_lens;
  * Size of data received without KX challenge played back.
  */
 // TODO remove?
-// size_t unverified_size;
+size_t unverified_size;
 
 /**
  * Database for peer's HELLOs.
@@ -1188,23 +1193,6 @@ setup_cipher (const struct GNUNET_HashCode *dh,
                                     0));
 }
 
-
-/**
- * Setup cipher of @a queue for decryption.
- *
- * @param ephemeral ephemeral key we received from the other peer
- * @param queue[in,out] queue to initialize decryption cipher for
- */
-static void
-setup_in_cipher (const struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral,
-                 struct Queue *queue)
-{
-  struct GNUNET_HashCode dh;
-
-  GNUNET_CRYPTO_eddsa_ecdh (my_private_key, ephemeral, &dh);
-  setup_cipher (&dh, &my_identity, &queue->in_cipher, &queue->in_hmac);
-}
-
 /**
  * Callback called when peerstore store operation for rekey monotime value is 
finished.
  * @param cls Queue context the store operation was executed.
@@ -1278,6 +1266,23 @@ rekey_monotime_cb (void *cls,
                                                      queue);
 }
 
+/**
+ * Setup cipher of @a queue for decryption.
+ *
+ * @param ephemeral ephemeral key we received from the other peer
+ * @param queue[in,out] queue to initialize decryption cipher for
+ */
+static void
+setup_in_cipher (const struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral,
+                 struct Queue *queue)
+{
+  struct GNUNET_HashCode dh;
+
+  GNUNET_CRYPTO_eddsa_ecdh (my_private_key, ephemeral, &dh);
+  setup_cipher (&dh, &my_identity, &queue->in_cipher, &queue->in_hmac);
+}
+
+
 /**
  * Handle @a rekey message on @a queue. The message was already
  * HMAC'ed, but we should additionally still check the signature.
@@ -1415,6 +1420,220 @@ handshake_ack_monotime_cb (void *cls,
                                                              queue);
 }
 
+/**
+ * Sending challenge with TcpConfirmationAck back to sender of ephemeral key.
+ *
+ * @param tc The TCPConfirmation originally send.
+ * @param queue The queue context.
+ */
+static void
+send_challenge (struct ChallengeNonceP challenge, struct Queue *queue)
+{
+  struct TCPConfirmationAck tca;
+  struct TcpHandshakeAckSignature thas;
+
+  GNUNET_log_from_nocheck (GNUNET_ERROR_TYPE_DEBUG,
+                           "transport",
+                           "sending challenge\n");
+
+  tca.header.type = ntohs (
+    GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_CONFIRMATION_ACK);
+  tca.header.size = ntohs (sizeof(tca));
+  tca.challenge = challenge;
+  tca.sender = my_identity;
+  tca.monotonic_time =
+    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
+  thas.purpose.purpose = htonl (
+    GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE_ACK);
+  thas.purpose.size = htonl (sizeof(thas));
+  thas.sender = my_identity;
+  thas.receiver = queue->target;
+  thas.monotonic_time = tca.monotonic_time;
+  thas.challenge = tca.challenge;
+  GNUNET_CRYPTO_eddsa_sign (my_private_key,
+                            &thas,
+                            &tca.sender_sig);
+  GNUNET_assert (0 ==
+                 gcry_cipher_encrypt (queue->out_cipher,
+                                      &queue->cwrite_buf[queue->cwrite_off],
+                                      sizeof(tca),
+                                      &tca,
+                                      sizeof(tca)));
+  queue->cwrite_off += sizeof(tca);
+  GNUNET_log_from_nocheck (GNUNET_ERROR_TYPE_DEBUG,
+                           "transport",
+                           "sending challenge done\n");
+}
+
+/**
+ * Setup cipher for outgoing data stream based on target and
+ * our ephemeral private key.
+ *
+ * @param queue queue to setup outgoing (encryption) cipher for
+ */
+static void
+setup_out_cipher (struct Queue *queue)
+{
+  struct GNUNET_HashCode dh;
+
+  GNUNET_CRYPTO_ecdh_eddsa (&queue->ephemeral, &queue->target.public_key, &dh);
+  /* we don't need the private key anymore, drop it! */
+  memset (&queue->ephemeral, 0, sizeof(queue->ephemeral));
+  setup_cipher (&dh, &queue->target, &queue->out_cipher, &queue->out_hmac);
+  queue->rekey_time = GNUNET_TIME_relative_to_absolute (rekey_interval);
+  queue->rekey_left_bytes =
+    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, REKEY_MAX_BYTES);
+}
+
+
+/**
+ * Inject a `struct TCPRekey` message into the queue's plaintext
+ * buffer.
+ *
+ * @param queue queue to perform rekeying on
+ */
+static void
+inject_rekey (struct Queue *queue)
+{
+  struct TCPRekey rekey;
+  struct TcpRekeySignature thp;
+
+  GNUNET_assert (0 == queue->pwrite_off);
+  memset (&rekey, 0, sizeof(rekey));
+  GNUNET_CRYPTO_ecdhe_key_create (&queue->ephemeral);
+  rekey.header.type = ntohs (GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_REKEY);
+  rekey.header.size = ntohs (sizeof(rekey));
+  GNUNET_CRYPTO_ecdhe_key_get_public (&queue->ephemeral, &rekey.ephemeral);
+  rekey.monotonic_time =
+    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
+  thp.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_REKEY);
+  thp.purpose.size = htonl (sizeof(thp));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "inject_rekey size %u\n",
+              thp.purpose.size);
+  thp.sender = my_identity;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "sender %s\n",
+              GNUNET_p2s (&thp.sender.public_key));
+  thp.receiver = queue->target;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "receiver %s\n",
+              GNUNET_p2s (&thp.receiver.public_key));
+  thp.ephemeral = rekey.ephemeral;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "ephemeral %s\n",
+              GNUNET_e2s (&thp.ephemeral));
+  thp.monotonic_time = rekey.monotonic_time;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "time %s\n",
+              GNUNET_STRINGS_absolute_time_to_string (
+                GNUNET_TIME_absolute_ntoh (thp.monotonic_time)));
+  GNUNET_CRYPTO_eddsa_sign (my_private_key,
+                            &thp,
+                            &rekey.sender_sig);
+  calculate_hmac (&queue->out_hmac, &rekey, sizeof(rekey), &rekey.hmac);
+  /* Encrypt rekey message with 'old' cipher */
+  GNUNET_assert (0 ==
+                 gcry_cipher_encrypt (queue->out_cipher,
+                                      &queue->cwrite_buf[queue->cwrite_off],
+                                      sizeof(rekey),
+                                      &rekey,
+                                      sizeof(rekey)));
+  queue->cwrite_off += sizeof(rekey);
+  /* Setup new cipher for successive messages */
+  gcry_cipher_close (queue->out_cipher);
+  setup_out_cipher (queue);
+}
+
+/**
+ * We have been notified that our socket is ready to write.
+ * Then reschedule this function to be called again once more is available.
+ *
+ * @param cls a `struct Queue`
+ */
+static void
+queue_write (void *cls)
+{
+  struct Queue *queue = cls;
+  ssize_t sent;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "In queue write\n");
+  queue->write_task = NULL;
+  if (0 != queue->cwrite_off)
+  {
+    sent = GNUNET_NETWORK_socket_send (queue->sock,
+                                       queue->cwrite_buf,
+                                       queue->cwrite_off);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sent %lu bytes to TCP queue\n", sent);
+    if ((-1 == sent) && (EAGAIN != errno) && (EINTR != errno))
+    {
+      GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+      queue_destroy (queue);
+      return;
+    }
+    if (sent > 0)
+    {
+      size_t usent = (size_t) sent;
+      queue->cwrite_off -= usent;
+      memmove (queue->cwrite_buf,
+               &queue->cwrite_buf[usent],
+               queue->cwrite_off);
+      reschedule_queue_timeout (queue);
+    }
+  }
+  /* can we encrypt more? (always encrypt full messages, needed
+     such that #mq_cancel() can work!) */
+  if ((0 < queue->rekey_left_bytes) &&
+      (queue->pwrite_off > 0) &&
+      (queue->cwrite_off + queue->pwrite_off <= BUF_SIZE))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Encrypting %lu bytes\n", queue->pwrite_off);
+    GNUNET_assert (0 ==
+                   gcry_cipher_encrypt (queue->out_cipher,
+                                        &queue->cwrite_buf[queue->cwrite_off],
+                                        queue->pwrite_off,
+                                        queue->pwrite_buf,
+                                        queue->pwrite_off));
+    if (queue->rekey_left_bytes > queue->pwrite_off)
+      queue->rekey_left_bytes -= queue->pwrite_off;
+    else
+      queue->rekey_left_bytes = 0;
+    queue->cwrite_off += queue->pwrite_off;
+    queue->pwrite_off = 0;
+  }
+  // if ((-1 != unverified_size)&& ((0 == queue->pwrite_off) &&
+  if (((0 == queue->pwrite_off) &&
+       ((0 == queue->rekey_left_bytes) ||
+        (0 ==
+         GNUNET_TIME_absolute_get_remaining (
+           queue->rekey_time).rel_value_us))))
+  {
+    inject_rekey (queue);
+  }
+  if ((0 == queue->pwrite_off) && (! queue->finishing) &&
+      (GNUNET_YES == queue->mq_awaits_continue))
+  {
+    queue->mq_awaits_continue = GNUNET_NO;
+    GNUNET_MQ_impl_send_continue (queue->mq);
+  }
+  /* did we just finish writing 'finish'? */
+  if ((0 == queue->cwrite_off) && (GNUNET_YES == queue->finishing))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Finishing queue\n");
+    queue_destroy (queue);
+    return;
+  }
+  /* do we care to write more? */
+  if ((0 < queue->cwrite_off) || (0 < queue->pwrite_off))
+    queue->write_task =
+      GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                      queue->sock,
+                                      &queue_write,
+                                      queue);
+}
+
 /**
  * Test if we have received a full message in plaintext.
  * If so, handle it.
@@ -1450,16 +1669,16 @@ try_handle_plaintext (struct Queue *queue)
     return 0; /* not even a header */
   }
 
-  /* if ((-1 != unverified_size) && (unverified_size > INITIAL_CORE_KX_SIZE)) 
*/
-  /* { */
-  /*   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, */
-  /*               "Already received data of size %lu bigger than KX size 
%lu!\n", */
-  /*               unverified_size, */
-  /*               INITIAL_CORE_KX_SIZE); */
-  /*   GNUNET_break_op (0); */
-  /*   queue_finish (queue); */
-  /*   return 0; */
-  /* } */
+  if ((-1 != unverified_size) && (unverified_size > INITIAL_CORE_KX_SIZE))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Already received data of size %lu bigger than KX size %lu!\n",
+                unverified_size,
+                INITIAL_CORE_KX_SIZE);
+    GNUNET_break_op (0);
+    queue_finish (queue);
+    return 0;
+  }
 
   type = ntohs (hdr->type);
   switch (type)
@@ -1520,43 +1739,53 @@ try_handle_plaintext (struct Queue *queue)
                                                                   queue);
 
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Handling plaintext, ack processed!");
-
-    // unverified_size = -1;
-
-    /* char *foreign_addr; */
-
-    /* switch (queue->address->sa_family) */
-    /* { */
-    /* case AF_INET: */
-    /*   GNUNET_asprintf (&foreign_addr, */
-    /*                    "%s-%s", */
-    /*                    COMMUNICATOR_ADDRESS_PREFIX, */
-    /*                    GNUNET_a2s (queue->address, queue->address_len)); */
-    /*   break; */
-
-    /* case AF_INET6: */
-    /*   GNUNET_asprintf (&foreign_addr, */
-    /*                    "%s-%s", */
-    /*                    COMMUNICATOR_ADDRESS_PREFIX, */
-    /*                    GNUNET_a2s (queue->address, queue->address_len)); */
-    /*   break; */
-
-    /* default: */
-    /*   GNUNET_assert (0); */
-    /* } */
-
-    /* queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch, */
-    /*                                                   &queue->target, */
-    /*                                                   foreign_addr, */
-    /*                                                   0 /\* no MTU *\/, */
-    /*                                                   
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, */
-    /*                                                   0, /\* Priority *\/ */
-    /*                                                   queue->nt, */
-    /*                                                   queue->cs, */
-    /*                                                   queue->mq); */
-
-    /* GNUNET_free (foreign_addr); */
+                "Handling plaintext, ack processed!\n");
+
+    if (GNUNET_TRANSPORT_CS_INBOUND ==     queue->cs)
+    {
+      send_challenge (queue->challenge_received, queue);
+      queue->write_task =
+        GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                        queue->sock,
+                                        &queue_write,
+                                        queue);
+    }
+
+    unverified_size = -1;
+
+    char *foreign_addr;
+
+    switch (queue->address->sa_family)
+    {
+    case AF_INET:
+      GNUNET_asprintf (&foreign_addr,
+                       "%s-%s",
+                       COMMUNICATOR_ADDRESS_PREFIX,
+                       GNUNET_a2s (queue->address, queue->address_len));
+      break;
+
+    case AF_INET6:
+      GNUNET_asprintf (&foreign_addr,
+                       "%s-%s",
+                       COMMUNICATOR_ADDRESS_PREFIX,
+                       GNUNET_a2s (queue->address, queue->address_len));
+      break;
+
+    default:
+      GNUNET_assert (0);
+    }
+
+    queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
+                                                      &queue->target,
+                                                      foreign_addr,
+                                                      0 /* no MTU */,
+                                                      
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+                                                      0, /* Priority */
+                                                      queue->nt,
+                                                      queue->cs,
+                                                      queue->mq);
+
+    GNUNET_free (foreign_addr);
 
     size = ntohs (hdr->size);
     break;
@@ -1633,8 +1862,8 @@ try_handle_plaintext (struct Queue *queue)
     return 0;
   }
   GNUNET_assert (0 != size);
-  /* if (-1 != unverified_size) */
-  /*   unverified_size += size; */
+  if (-1 != unverified_size)
+    unverified_size += size;
   return size;
 }
 
@@ -2043,178 +2272,6 @@ tcp_address_to_sockaddr (const char *bindto, socklen_t 
*sock_len)
   return in;
 }
 
-
-/**
- * Setup cipher for outgoing data stream based on target and
- * our ephemeral private key.
- *
- * @param queue queue to setup outgoing (encryption) cipher for
- */
-static void
-setup_out_cipher (struct Queue *queue)
-{
-  struct GNUNET_HashCode dh;
-
-  GNUNET_CRYPTO_ecdh_eddsa (&queue->ephemeral, &queue->target.public_key, &dh);
-  /* we don't need the private key anymore, drop it! */
-  memset (&queue->ephemeral, 0, sizeof(queue->ephemeral));
-  setup_cipher (&dh, &queue->target, &queue->out_cipher, &queue->out_hmac);
-  queue->rekey_time = GNUNET_TIME_relative_to_absolute (rekey_interval);
-  queue->rekey_left_bytes =
-    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, REKEY_MAX_BYTES);
-}
-
-
-/**
- * Inject a `struct TCPRekey` message into the queue's plaintext
- * buffer.
- *
- * @param queue queue to perform rekeying on
- */
-static void
-inject_rekey (struct Queue *queue)
-{
-  struct TCPRekey rekey;
-  struct TcpRekeySignature thp;
-
-  GNUNET_assert (0 == queue->pwrite_off);
-  memset (&rekey, 0, sizeof(rekey));
-  GNUNET_CRYPTO_ecdhe_key_create (&queue->ephemeral);
-  rekey.header.type = ntohs (GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_REKEY);
-  rekey.header.size = ntohs (sizeof(rekey));
-  GNUNET_CRYPTO_ecdhe_key_get_public (&queue->ephemeral, &rekey.ephemeral);
-  rekey.monotonic_time =
-    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
-  thp.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_REKEY);
-  thp.purpose.size = htonl (sizeof(thp));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "inject_rekey size %u\n",
-              thp.purpose.size);
-  thp.sender = my_identity;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "sender %s\n",
-              GNUNET_p2s (&thp.sender.public_key));
-  thp.receiver = queue->target;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "receiver %s\n",
-              GNUNET_p2s (&thp.receiver.public_key));
-  thp.ephemeral = rekey.ephemeral;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "ephemeral %s\n",
-              GNUNET_e2s (&thp.ephemeral));
-  thp.monotonic_time = rekey.monotonic_time;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "time %s\n",
-              GNUNET_STRINGS_absolute_time_to_string (
-                GNUNET_TIME_absolute_ntoh (thp.monotonic_time)));
-  GNUNET_CRYPTO_eddsa_sign (my_private_key,
-                            &thp,
-                            &rekey.sender_sig);
-  calculate_hmac (&queue->out_hmac, &rekey, sizeof(rekey), &rekey.hmac);
-  /* Encrypt rekey message with 'old' cipher */
-  GNUNET_assert (0 ==
-                 gcry_cipher_encrypt (queue->out_cipher,
-                                      &queue->cwrite_buf[queue->cwrite_off],
-                                      sizeof(rekey),
-                                      &rekey,
-                                      sizeof(rekey)));
-  queue->cwrite_off += sizeof(rekey);
-  /* Setup new cipher for successive messages */
-  gcry_cipher_close (queue->out_cipher);
-  setup_out_cipher (queue);
-}
-
-
-/**
- * We have been notified that our socket is ready to write.
- * Then reschedule this function to be called again once more is available.
- *
- * @param cls a `struct Queue`
- */
-static void
-queue_write (void *cls)
-{
-  struct Queue *queue = cls;
-  ssize_t sent;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "In queue write\n");
-  queue->write_task = NULL;
-  if (0 != queue->cwrite_off)
-  {
-    sent = GNUNET_NETWORK_socket_send (queue->sock,
-                                       queue->cwrite_buf,
-                                       queue->cwrite_off);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sent %lu bytes to TCP queue\n", sent);
-    if ((-1 == sent) && (EAGAIN != errno) && (EINTR != errno))
-    {
-      GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
-      queue_destroy (queue);
-      return;
-    }
-    if (sent > 0)
-    {
-      size_t usent = (size_t) sent;
-      queue->cwrite_off -= usent;
-      memmove (queue->cwrite_buf,
-               &queue->cwrite_buf[usent],
-               queue->cwrite_off);
-      reschedule_queue_timeout (queue);
-    }
-  }
-  /* can we encrypt more? (always encrypt full messages, needed
-     such that #mq_cancel() can work!) */
-  if ((0 < queue->rekey_left_bytes) &&
-      (queue->pwrite_off > 0) &&
-      (queue->cwrite_off + queue->pwrite_off <= BUF_SIZE))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Encrypting %lu bytes\n", queue->pwrite_off);
-    GNUNET_assert (0 ==
-                   gcry_cipher_encrypt (queue->out_cipher,
-                                        &queue->cwrite_buf[queue->cwrite_off],
-                                        queue->pwrite_off,
-                                        queue->pwrite_buf,
-                                        queue->pwrite_off));
-    if (queue->rekey_left_bytes > queue->pwrite_off)
-      queue->rekey_left_bytes -= queue->pwrite_off;
-    else
-      queue->rekey_left_bytes = 0;
-    queue->cwrite_off += queue->pwrite_off;
-    queue->pwrite_off = 0;
-  }
-  // if ((-1 != unverified_size)&& ((0 == queue->pwrite_off) &&
-  if (((0 == queue->pwrite_off) &&
-       ((0 == queue->rekey_left_bytes) ||
-        (0 ==
-         GNUNET_TIME_absolute_get_remaining (
-           queue->rekey_time).rel_value_us))))
-  {
-    inject_rekey (queue);
-  }
-  if ((0 == queue->pwrite_off) && (! queue->finishing) &&
-      (GNUNET_YES == queue->mq_awaits_continue))
-  {
-    queue->mq_awaits_continue = GNUNET_NO;
-    GNUNET_MQ_impl_send_continue (queue->mq);
-  }
-  /* did we just finish writing 'finish'? */
-  if ((0 == queue->cwrite_off) && (GNUNET_YES == queue->finishing))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Finishing queue\n");
-    queue_destroy (queue);
-    return;
-  }
-  /* do we care to write more? */
-  if ((0 < queue->cwrite_off) || (0 < queue->pwrite_off))
-    queue->write_task =
-      GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
-                                      queue->sock,
-                                      &queue_write,
-                                      queue);
-}
-
-
 /**
  * Signature of functions implementing the sending functionality of a
  * message queue.
@@ -2348,39 +2405,39 @@ boot_queue (struct Queue *queue)
                                              NULL,
                                              &mq_error,
                                              queue);
-  {
-    char *foreign_addr;
-
-    switch (queue->address->sa_family)
-    {
-    case AF_INET:
-      GNUNET_asprintf (&foreign_addr,
-                       "%s-%s",
-                       COMMUNICATOR_ADDRESS_PREFIX,
-                       GNUNET_a2s (queue->address, queue->address_len));
-      break;
-
-    case AF_INET6:
-      GNUNET_asprintf (&foreign_addr,
-                       "%s-%s",
-                       COMMUNICATOR_ADDRESS_PREFIX,
-                       GNUNET_a2s (queue->address, queue->address_len));
-      break;
-
-    default:
-      GNUNET_assert (0);
-    }
-    queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
-                                                      &queue->target,
-                                                      foreign_addr,
-                                                      0 /* no MTU */,
-                                                      
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
-                                                      0, /* Priority */
-                                                      queue->nt,
-                                                      queue->cs,
-                                                      queue->mq);
-    GNUNET_free (foreign_addr);
-  }
+  /* { */
+  /*   char *foreign_addr; */
+
+  /*   switch (queue->address->sa_family) */
+  /*   { */
+  /*   case AF_INET: */
+  /*     GNUNET_asprintf (&foreign_addr, */
+  /*                      "%s-%s", */
+  /*                      COMMUNICATOR_ADDRESS_PREFIX, */
+  /*                      GNUNET_a2s (queue->address, queue->address_len)); */
+  /*     break; */
+
+  /*   case AF_INET6: */
+  /*     GNUNET_asprintf (&foreign_addr, */
+  /*                      "%s-%s", */
+  /*                      COMMUNICATOR_ADDRESS_PREFIX, */
+  /*                      GNUNET_a2s (queue->address, queue->address_len)); */
+  /*     break; */
+
+  /*   default: */
+  /*     GNUNET_assert (0); */
+  /*   } */
+  /*   queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch, */
+  /*                                                     &queue->target, */
+  /*                                                     foreign_addr, */
+  /*                                                     0 /\* no MTU *\/, */
+  /*                                                     
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, */
+  /*                                                     0, /\* Priority *\/ */
+  /*                                                     queue->nt, */
+  /*                                                     queue->cs, */
+  /*                                                     queue->mq); */
+  /*   GNUNET_free (foreign_addr); */
+  /* } */
 }
 
 
@@ -2594,48 +2651,6 @@ free_proto_queue (struct ProtoQueue *pq)
   GNUNET_free (pq);
 }
 
-/**
- * Sending challenge with TcpConfirmationAck back to sender of ephemeral key.
- *
- * @param tc The TCPConfirmation originally send.
- * @param queue The queue context.
- */
-static void
-send_challenge (struct ChallengeNonceP challenge, struct Queue *queue)
-{
-  struct TCPConfirmationAck tca;
-  struct TcpHandshakeAckSignature thas;
-
-  GNUNET_log_from_nocheck (GNUNET_ERROR_TYPE_DEBUG,
-                           "transport",
-                           "sending challenge\n");
-
-  tca.header.type = ntohs (
-    GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_CONFIRMATION_ACK);
-  tca.header.size = ntohs (sizeof(tca));
-  tca.challenge = challenge;
-  tca.sender = my_identity;
-  tca.monotonic_time =
-    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
-  thas.purpose.purpose = htonl (
-    GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE_ACK);
-  thas.purpose.size = htonl (sizeof(thas));
-  thas.sender = my_identity;
-  thas.receiver = queue->target;
-  thas.monotonic_time = tca.monotonic_time;
-  thas.challenge = tca.challenge;
-  GNUNET_CRYPTO_eddsa_sign (my_private_key,
-                            &thas,
-                            &tca.sender_sig);
-  GNUNET_assert (0 ==
-                 gcry_cipher_encrypt (queue->out_cipher,
-                                      &queue->cwrite_buf[queue->cwrite_off],
-                                      sizeof(tca),
-                                      &tca,
-                                      sizeof(tca)));
-  queue->cwrite_off += sizeof(tca);
-}
-
 /**
  * Read from the socket of the proto queue until we have enough data
  * to upgrade to full queue.
@@ -2722,7 +2737,8 @@ proto_read_kx (void *cls)
                                     &queue_write,
                                     queue);
   // TODO To early! Move it somewhere else.
-  // send_challenge (tc, queue);
+  // send_challenge (tc.challenge, queue);
+  queue->challenge_received = tc.challenge;
 
   GNUNET_CONTAINER_DLL_remove (proto_head, proto_tail, pq);
   GNUNET_free (pq);
@@ -2853,6 +2869,12 @@ queue_read_kx (void *cls)
     return;
   }
   send_challenge (tc.challenge, queue);
+  queue->write_task =
+    GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    queue->sock,
+                                    &queue_write,
+                                    queue);
+
   /* update queue timeout */
   reschedule_queue_timeout (queue);
   /* prepare to continue with regular read task immediately */
@@ -2866,7 +2888,7 @@ queue_read_kx (void *cls)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "cread_off set to %lu bytes\n",
               queue->cread_off);
-  if (0 < queue->cread_off)
+  if (0 <= queue->cread_off)
     queue->read_task = GNUNET_SCHEDULER_add_now (&queue_read, queue);
 }
 
diff --git a/src/transport/gnunet-communicator-udp.c 
b/src/transport/gnunet-communicator-udp.c
index 018da8f0e..2e09bc9d2 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -1882,6 +1882,7 @@ consider_ss_ack (struct SharedSecret *ss, int initial)
 
     // kce_generate (ss, ++ss->sequence_allowed);
     // kce_generate (ss, ++ss->sequence_allowed);
+    // TODO This task must be per sender!
     kce_task = GNUNET_SCHEDULER_add_delayed (WORKING_QUEUE_INTERVALL,
                                              kce_generate_cb,
                                              ss);
diff --git a/src/transport/test_communicator_basic.c 
b/src/transport/test_communicator_basic.c
index 0250de474..ffc21e47a 100644
--- a/src/transport/test_communicator_basic.c
+++ b/src/transport/test_communicator_basic.c
@@ -42,7 +42,7 @@
 
 #define NUM_PEERS 2
 
-static struct GNUNET_SCHEDULER_Task *to_task;
+static struct GNUNET_SCHEDULER_Task *to_task[NUM_PEERS];
 
 static int queue_est = GNUNET_NO;
 
@@ -59,27 +59,29 @@ static struct GNUNET_STATISTICS_Handle *stats[NUM_PEERS];
 
 static char *cfg_peers_name[NUM_PEERS];
 
+static int finished[NUM_PEERS];
+
 static int ret;
 
 static int bidirect = GNUNET_NO;
 
 static size_t long_message_size;
 
-static struct GNUNET_TIME_Absolute start_short;
+static struct GNUNET_TIME_Absolute start_short[NUM_PEERS];
 
-static struct GNUNET_TIME_Absolute start_long;
+static struct GNUNET_TIME_Absolute start_long[NUM_PEERS];
 
-static struct GNUNET_TIME_Absolute timeout;
+static struct GNUNET_TIME_Absolute timeout[NUM_PEERS];
 
-static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *my_tc;
+// static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *my_tc;
 
 static char *communicator_name;
 
 static char *test_name;
 
-static struct GNUNET_STATISTICS_GetHandle *box_stats;
+static struct GNUNET_STATISTICS_GetHandle *box_stats[NUM_PEERS];
 
-static struct GNUNET_STATISTICS_GetHandle *rekey_stats;
+static struct GNUNET_STATISTICS_GetHandle *rekey_stats[NUM_PEERS];
 
 #define TEST_SECTION "test-setup"
 
@@ -97,7 +99,7 @@ static struct GNUNET_STATISTICS_GetHandle *rekey_stats;
 
 #define PEER_B 1
 
-static unsigned int iterations_left = TOTAL_ITERATIONS;
+static unsigned int iterations_left[NUM_PEERS];
 
 #define TIMEOUT_MULTIPLIER 1
 
@@ -118,11 +120,11 @@ enum TestPhase
   TP_SIZE_CHECK
 };
 
-static unsigned int phase_short;
+static unsigned int phase_short[NUM_PEERS];
 
-static unsigned int phase_long;
+static unsigned int phase_long[NUM_PEERS];
 
-static unsigned int phase_size;
+static unsigned int phase_size[NUM_PEERS];
 
 static long long unsigned int allowed_packet_loss_short;
 
@@ -140,26 +142,23 @@ static struct GNUNET_TIME_Relative delay_short;
 
 static struct GNUNET_TIME_Relative delay_long;
 
-static size_t num_sent_short = 0;
-
-static size_t num_sent_long = 0;
-
-static size_t num_sent_size = 0;
+static size_t num_sent_short[NUM_PEERS];
 
-static uint32_t ack = 0;
+static size_t num_sent_long[NUM_PEERS];
 
-static enum TestPhase phase;
+static size_t num_sent_size[NUM_PEERS];
 
-static size_t num_received_short = 0;
+static uint32_t ack[NUM_PEERS];
 
-static size_t num_received_long = 0;
+static enum TestPhase phase[NUM_PEERS];
 
-static size_t num_received_size = 0;
+static size_t num_received_short[NUM_PEERS];
 
-static uint64_t avg_latency = 0;
+static size_t num_received_long[NUM_PEERS];
 
-static struct GNUNET_TIME_Relative duration;
+static size_t num_received_size[NUM_PEERS];
 
+static uint64_t avg_latency[NUM_PEERS];
 
 static void
 communicator_available_cb (
@@ -284,43 +283,87 @@ make_payload (size_t payload_size)
   return payload;
 }
 
+static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *
+get_tc_h (unsigned int peer_nr)
+{
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got peer %u\n",
+       peer_nr);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handle %p peer 0\n",
+       tc_hs[0]);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handle %p peer 1\n",
+       tc_hs[1]);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handle %p get\n",
+       tc_hs[peer_nr]);
+
+  return tc_hs[peer_nr];
+}
+
+
+static unsigned int
+get_peer_nr_from_tc (struct
+                     GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle 
*tc_h)
+{
+  if (tc_h == get_tc_h (0))
+    return PEER_A;
+  else
+    return PEER_B;
+}
+
+static unsigned int
+get_peer_nr (void *cls, unsigned int get_the_other_one)
+{
+  if (0 == strcmp ((char*) cls, cfg_peers_name[0]))
+    return get_the_other_one ? PEER_B : PEER_A;
+  else
+    return get_the_other_one ? PEER_A : PEER_B;
+}
 
 static void
 latency_timeout (void *cls)
 {
 
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+  unsigned int peer_nr;
   size_t num_sent = 0;
   size_t num_received = 0;
 
-  to_task = NULL;
-  if (GNUNET_TIME_absolute_get_remaining (timeout).rel_value_us > 0)
+  peer_nr = get_peer_nr_from_tc (tc_h);
+  to_task[peer_nr] = NULL;
+  if (GNUNET_TIME_absolute_get_remaining (timeout[peer_nr]).rel_value_us > 0)
   {
-    to_task = GNUNET_SCHEDULER_add_at (timeout,
-                                       &latency_timeout,
-                                       NULL);
+    to_task[peer_nr] = GNUNET_SCHEDULER_add_at (timeout[peer_nr],
+                                                &latency_timeout,
+                                                cls);
     return;
   }
-  switch (phase)
+  switch (phase[peer_nr])
   {
   case TP_INIT:
     GNUNET_assert (0);
     break;
   case TP_BURST_SHORT:
-    num_sent = num_sent_short;
-    num_received = num_received_short;
+    num_sent = num_sent_short[peer_nr];
+    num_received = num_received_short[peer_nr];
     break;
   case TP_BURST_LONG:
-    num_sent = num_sent_long;
-    num_received = num_received_long;
+    num_sent = num_sent_long[peer_nr];
+    num_received = num_received_long[peer_nr];
     break;
   case TP_SIZE_CHECK:
-    num_sent = num_sent_size;
-    num_received = num_received_size;
+    num_sent = num_sent_size[peer_nr];
+    num_received = num_received_size[peer_nr];
     break;
   }
   LOG (GNUNET_ERROR_TYPE_ERROR,
        "Latency too high. Test failed. (Phase: %d. Sent: %lu, Received: 
%lu)\n",
-       phase, num_sent, num_received);
+       phase[peer_nr], num_sent, num_received);
   ret = 2;
   GNUNET_SCHEDULER_shutdown ();
 }
@@ -328,31 +371,36 @@ latency_timeout (void *cls)
 static void
 size_test (void *cls)
 {
+  unsigned int peer_nr;
   char *payload;
   size_t max_size = 64000;
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
 
+  peer_nr = get_peer_nr_from_tc (tc_h);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "size_test_cb %u\n",
-       (unsigned int) num_sent_size);
-  GNUNET_assert (TP_SIZE_CHECK == phase);
+       (unsigned int) num_sent_size[peer_nr]);
+  GNUNET_assert (TP_SIZE_CHECK == phase[peer_nr]);
   if (LONG_MESSAGE_SIZE != long_message_size)
     max_size = long_message_size;
-  if (ack + 10 > max_size)
+  if (ack[peer_nr] + 10 > max_size)
     return; /* Leave some room for our protocol, so not 2^16 exactly */
-  ack += 10;
-  payload = make_payload (ack);
-  num_sent_size++;
-  GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
-                                                        (ack < max_size)
+  ack[peer_nr] += 10;
+  payload = make_payload (ack[peer_nr]);
+  num_sent_size[peer_nr]++;
+  GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
+                                                        (ack[peer_nr] <
+                                                         max_size)
                                                         ? &size_test
                                                         : NULL,
-                                                        NULL,
+                                                        cls,
                                                         payload,
-                                                        ack);
+                                                        ack[peer_nr]);
   GNUNET_free (payload);
-  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
-                                                GNUNET_TIME_UNIT_SECONDS,
-                                                TIMEOUT_MULTIPLIER));
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER));
 }
 
 
@@ -362,24 +410,28 @@ long_test (void *cls);
 static void
 long_test_cb (void *cls)
 {
+  unsigned int peer_nr;
   char *payload;
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+
+  peer_nr = get_peer_nr_from_tc (tc_h);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "long_test_cb %u/%u\n",
-       (unsigned int) num_sent_long,
-       (unsigned int) num_received_long);
+       (unsigned int) num_sent_long[peer_nr],
+       (unsigned int) num_received_long[peer_nr]);
   payload = make_payload (long_message_size);
-  num_sent_long++;
-  GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
+  num_sent_long[peer_nr]++;
+  GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
                                                         (burst_packets_long ==
-                                                         num_sent_long)
+                                                         
num_sent_long[peer_nr])
                                                         ? NULL
                                                         : &long_test,
-                                                        NULL,
+                                                        cls,
                                                         payload,
                                                         long_message_size);
   GNUNET_free (payload);
-  timeout = GNUNET_TIME_relative_to_absolute (
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
     GNUNET_TIME_relative_multiply (
       GNUNET_TIME_UNIT_SECONDS,
       TIMEOUT_MULTIPLIER));
@@ -391,7 +443,7 @@ long_test (void *cls)
 {
   GNUNET_SCHEDULER_add_delayed (delay_long,
                                 &long_test_cb,
-                                NULL);
+                                cls);
 }
 
 
@@ -402,26 +454,33 @@ short_test (void *cls);
 static void
 short_test_cb (void *cls)
 {
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+  unsigned int peer_nr;
   char *payload;
 
+  peer_nr = get_peer_nr_from_tc (tc_h);
+
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "short_test_cb %u/%u\n",
-       (unsigned int) num_sent_short,
-       (unsigned int) num_received_short);
+       "short_test_cb %u/%u for peer %u and handle %p\n",
+       (unsigned int) num_sent_short[peer_nr],
+       (unsigned int) num_received_short[peer_nr],
+       peer_nr,
+       tc_h);
   payload = make_payload (SHORT_MESSAGE_SIZE);
-  num_sent_short++;
-  GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
+  num_sent_short[peer_nr]++;
+  GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
                                                         (burst_packets_short ==
-                                                         num_sent_short)
+                                                         
num_sent_short[peer_nr])
                                                         ? NULL
                                                         : &short_test,
-                                                        NULL,
+                                                        cls,
                                                         payload,
                                                         SHORT_MESSAGE_SIZE);
   GNUNET_free (payload);
-  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
-                                                GNUNET_TIME_UNIT_SECONDS,
-                                                TIMEOUT_MULTIPLIER));
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER));
 }
 
 
@@ -430,7 +489,7 @@ short_test (void *cls)
 {
   GNUNET_SCHEDULER_add_delayed (delay_short,
                                 &short_test_cb,
-                                NULL);
+                                cls);
 }
 
 
@@ -462,9 +521,14 @@ short_test (void *cls)
 static void
 process_statistics_box_done (void *cls, int success)
 {
-  if (NULL != box_stats)
-    box_stats = NULL;
-  if (NULL == rekey_stats)
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+  unsigned int peer_nr;
+
+  peer_nr = get_peer_nr_from_tc (tc_h);
+
+  if (NULL != box_stats[peer_nr])
+    box_stats[peer_nr] = NULL;
+  if (NULL == rekey_stats[peer_nr])
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Finished\n");
@@ -476,9 +540,14 @@ process_statistics_box_done (void *cls, int success)
 static void
 process_statistics_rekey_done (void *cls, int success)
 {
-  if (NULL != rekey_stats)
-    rekey_stats = NULL;
-  if (NULL == box_stats)
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+  unsigned int peer_nr;
+
+  peer_nr = get_peer_nr_from_tc (tc_h);
+
+  if (NULL != rekey_stats[peer_nr])
+    rekey_stats[peer_nr] = NULL;
+  if (NULL == box_stats[peer_nr])
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Finished\n");
@@ -533,24 +602,37 @@ process_statistics (void *cls,
 }
 
 static void
-choose_phase ()
+choose_phase (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle 
*tc_h)
 {
-  if (GNUNET_YES == phase_short)
+  unsigned int peer_nr;
+
+  peer_nr = get_peer_nr_from_tc (tc_h);
+
+  if (GNUNET_YES == phase_short[peer_nr])
   {
-    phase =  TP_BURST_SHORT;
-    start_short = GNUNET_TIME_absolute_get ();
-    short_test (NULL);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Choose phase short with peer %u and Handle %p\n",
+                peer_nr,
+                tc_h);
+    phase[peer_nr] =  TP_BURST_SHORT;
+    start_short[peer_nr] = GNUNET_TIME_absolute_get ();
+    short_test (tc_h);
   }
-  else if (GNUNET_YES == phase_long)
+  else if (GNUNET_YES == phase_long[peer_nr])
   {
-    phase =  TP_BURST_LONG;
-    start_long = GNUNET_TIME_absolute_get ();
-    long_test (NULL);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Choose phase long with peer %u\n",
+                peer_nr);
+    phase[peer_nr] =  TP_BURST_LONG;
+    start_long[peer_nr] = GNUNET_TIME_absolute_get ();
+    long_test (tc_h);
   }
-  else if (GNUNET_YES == phase_size)
+  else if (GNUNET_YES == phase_size[peer_nr])
   {
-    phase =  TP_SIZE_CHECK;
-    size_test (NULL);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Choose phase size\n");
+    phase[peer_nr] =  TP_SIZE_CHECK;
+    size_test (tc_h);
   }
   else
   {
@@ -560,28 +642,33 @@ choose_phase ()
                                                           "backchannel",
                                                           test_name))) )
     {
-      if (NULL != box_stats)
-        GNUNET_STATISTICS_get_cancel (box_stats);
-      box_stats = GNUNET_STATISTICS_get (stats[1],
-                                         "C-UDP",
-                                         "# messages decrypted with BOX",
-                                         process_statistics_box_done,
-                                         &process_statistics,
-                                         NULL);
-      if (NULL != rekey_stats)
-        GNUNET_STATISTICS_get_cancel (rekey_stats);
-      rekey_stats = GNUNET_STATISTICS_get (stats[0],
-                                           "C-UDP",
-                                           "# rekeying successful",
-                                           process_statistics_rekey_done,
-                                           &process_statistics,
-                                           NULL);
+      if (NULL != box_stats[peer_nr])
+        GNUNET_STATISTICS_get_cancel (box_stats[peer_nr]);
+      box_stats[peer_nr] = GNUNET_STATISTICS_get (stats[1],
+                                                  "C-UDP",
+                                                  "# messages decrypted with 
BOX",
+                                                  process_statistics_box_done,
+                                                  &process_statistics,
+                                                  tc_h);
+      if (NULL != rekey_stats[peer_nr])
+        GNUNET_STATISTICS_get_cancel (rekey_stats[peer_nr]);
+      rekey_stats[peer_nr] = GNUNET_STATISTICS_get (stats[0],
+                                                    "C-UDP",
+                                                    "# rekeying successful",
+                                                    
process_statistics_rekey_done,
+                                                    &process_statistics,
+                                                    tc_h);
     }
     else
     {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Finished\n");
-      GNUNET_SCHEDULER_shutdown ();
+      if (((PEER_A == peer_nr) && finished[PEER_B]) || ((PEER_B == peer_nr) &&
+                                                        finished[PEER_A]))
+      {
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "Finished\n");
+        GNUNET_SCHEDULER_shutdown ();
+      }
+      finished[peer_nr] = GNUNET_YES;
     }
   }
 }
@@ -604,37 +691,54 @@ add_queue_cb (void *cls,
               tc_queue,
               size_t mtu)
 {
-  if (TP_INIT != phase)
-    return;
-  if (0 != strcmp ((char*) cls, cfg_peers_name[0]))
+
+  unsigned int peer_nr;
+
+  peer_nr = get_peer_nr (cls, GNUNET_NO);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handle %p add %u %u\n",
+       tc_h,
+       peer_nr,
+       get_peer_nr_from_tc (tc_h));
+
+  if ((GNUNET_NO == bidirect)&&(0 != strcmp ((char*) cls, cfg_peers_name[0])))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Queue available at receiving peer\n");
     return; // TODO?
+  }
+  else if (TP_INIT != phase[peer_nr])
+    return;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queue established, starting test...\n");
   // start_short = GNUNET_TIME_absolute_get ();
-  my_tc = tc_h;
+  // my_tc = tc_h;
   if (0 != mtu) /* Message header overhead */
     long_message_size = mtu - sizeof(struct GNUNET_TRANSPORT_SendMessageTo)
                         - sizeof(struct GNUNET_MessageHeader);
   else
     long_message_size = LONG_MESSAGE_SIZE;
   // phase = TP_BURST_SHORT;
-  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
-                                                GNUNET_TIME_UNIT_SECONDS,
-                                                TIMEOUT_MULTIPLIER));
-  GNUNET_assert (NULL == to_task);
-  to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
-                                            GNUNET_TIME_UNIT_SECONDS,
-                                            TIMEOUT_MULTIPLIER),
-                                          &latency_timeout,
-                                          NULL);
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER));
+  GNUNET_assert (NULL == to_task[peer_nr]);
+  to_task[peer_nr] = GNUNET_SCHEDULER_add_delayed (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER),
+    &latency_timeout,
+    tc_h);
   // prepare_test (NULL);
   // short_test (NULL);
-  choose_phase ();
+  choose_phase (tc_h);
 }
 
 
 static void
-update_avg_latency (const char*payload)
+update_avg_latency (const char *payload, unsigned int peer_nr)
 {
   struct GNUNET_TIME_AbsoluteNBO *ts_n;
   struct GNUNET_TIME_Absolute ts;
@@ -645,31 +749,33 @@ update_avg_latency (const char*payload)
   ts = GNUNET_TIME_absolute_ntoh (*ts_n);
   latency = GNUNET_TIME_absolute_get_duration (ts);
 
-  switch (phase)
+  switch (phase[peer_nr])
   {
   case TP_INIT:
     GNUNET_assert (0);
     break;
   case TP_BURST_SHORT:
-    num_received = num_received_short;
+    num_received = num_received_short[peer_nr];
     break;
   case TP_BURST_LONG:
-    num_received = num_received_long;
+    num_received = num_received_long[peer_nr];
     break;
   case TP_SIZE_CHECK:
-    num_received = num_received_size;
+    num_received = num_received_size[peer_nr];
     break;
   }
   if (1 >= num_received)
-    avg_latency = latency.rel_value_us;
+    avg_latency[peer_nr] = latency.rel_value_us;
   else
-    avg_latency = ((avg_latency * (num_received - 1)) + latency.rel_value_us)
-                  / num_received;
+    avg_latency[peer_nr] = ((avg_latency[peer_nr] * (num_received - 1))
+                            + latency.rel_value_us)
+                           / num_received;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Latency of received packet: %s with avg latency %lu\n",
+       "Latency of received packet by peer %u: %s with avg latency %lu\n",
+       peer_nr,
        GNUNET_STRINGS_relative_time_to_string (latency,
                                                GNUNET_YES),
-       avg_latency);
+       avg_latency[peer_nr]);
 }
 
 
@@ -679,25 +785,31 @@ static void
 load_phase_config ()
 {
 
-  phase_short =  GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
-                                                       TEST_SECTION,
-                                                       "PHASE_SHORT");
-  if (GNUNET_SYSERR == phase_short)
-    phase_short = GNUNET_YES;
+  phase_short[0] =  GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
+                                                          TEST_SECTION,
+                                                          "PHASE_SHORT");
+  if (GNUNET_SYSERR == phase_short[0])
+    phase_short[0] = GNUNET_YES;
 
-  phase_long =  GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
-                                                      TEST_SECTION,
-                                                      "PHASE_LONG");
+  phase_short[1] = phase_short[0];
 
-  if (GNUNET_SYSERR == phase_long)
-    phase_long = GNUNET_YES;
+  phase_long[0] =  GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
+                                                         TEST_SECTION,
+                                                         "PHASE_LONG");
 
-  phase_size =   GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
-                                                       TEST_SECTION,
-                                                       "PHASE_SIZE");
+  if (GNUNET_SYSERR == phase_long[0])
+    phase_long[0] = GNUNET_YES;
 
-  if (GNUNET_SYSERR == phase_size)
-    phase_size = GNUNET_YES;
+  phase_long[1] = phase_long[0];
+
+  phase_size[0] =   GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
+                                                          TEST_SECTION,
+                                                          "PHASE_SIZE");
+
+  if (GNUNET_SYSERR == phase_size[0])
+    phase_size[0] = GNUNET_YES;
+
+  phase_size[1] = phase_size[0];
 }
 
 /**
@@ -716,18 +828,24 @@ incoming_message_cb (
   const char *payload,
   size_t payload_len)
 {
-  if (0 != strcmp ((char*) cls,
-                   cfg_peers_name[NUM_PEERS - 1]))
+  unsigned int peer_nr;
+  static struct GNUNET_TIME_Relative duration;
+
+  peer_nr = get_peer_nr (cls, GNUNET_YES);
+
+  if ((GNUNET_NO == bidirect)&&(0 != strcmp ((char*) cls,
+                                             cfg_peers_name[NUM_PEERS - 1])))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "unexpected receiver...\n");
     return;
   }
   /* Reset timeout */
-  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
-                                                GNUNET_TIME_UNIT_SECONDS,
-                                                TIMEOUT_MULTIPLIER));
-  switch (phase)
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER));
+  switch (phase[peer_nr])
   {
   case TP_INIT:
     GNUNET_break (0);
@@ -735,34 +853,37 @@ incoming_message_cb (
   case TP_BURST_SHORT:
     {
       GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len);
-      num_received_short++;
-      duration = GNUNET_TIME_absolute_get_duration (start_short);
-      update_avg_latency (payload);
-      if ((num_sent_short == burst_packets_short) && (num_received_short >
-                                                      burst_packets_short / 100
-                                                      *
-                                                      
allowed_packet_loss_short) )
+      num_received_short[peer_nr]++;
+      duration = GNUNET_TIME_absolute_get_duration (start_short[peer_nr]);
+      update_avg_latency (payload, peer_nr);
+      if ((num_sent_short[peer_nr] == burst_packets_short) &&
+          (num_received_short[peer_nr] >
+           burst_packets_short
+           / 100
+           *
+           allowed_packet_loss_short) )
       {
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
-             "Short size packet test done.\n");
+             "Short size packet test for peer %u done.\n",
+             peer_nr);
         char *goodput = GNUNET_STRINGS_byte_size_fancy (
-          (SHORT_MESSAGE_SIZE * num_received_short * 1000 * 1000)
+          (SHORT_MESSAGE_SIZE * num_received_short[peer_nr] * 1000 * 1000)
           / duration.rel_value_us);
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
              "%lu/%lu packets in %llu us (%s/s) -- avg latency: %llu us\n",
-             (unsigned long) num_received_short,
-             (unsigned long) num_sent_short,
+             (unsigned long) num_received_short[peer_nr],
+             (unsigned long) num_sent_short[peer_nr],
              (unsigned long long) duration.rel_value_us,
              goodput,
-             (unsigned long long) avg_latency);
+             (unsigned long long) avg_latency[peer_nr]);
         GNUNET_free (goodput);
         // start_long = GNUNET_TIME_absolute_get ();
         // phase = TP_BURST_LONG;
         // num_sent_short = 0;
-        avg_latency = 0;
+        avg_latency[peer_nr] = 0;
         // num_received = 0;
-        phase_short = GNUNET_NO;
-        choose_phase ();
+        phase_short[peer_nr] = GNUNET_NO;
+        choose_phase (get_tc_h (peer_nr));
         // long_test (NULL);
       }
       break;
@@ -775,37 +896,40 @@ incoming_message_cb (
              "Ignoring packet with wrong length\n");
         return;   // Ignore
       }
-      num_received_long++;
-      duration = GNUNET_TIME_absolute_get_duration (start_long);
-      update_avg_latency (payload);
-      if ((num_sent_long == burst_packets_long) && (num_received_long >
-                                                    burst_packets_long
-                                                    / 100
-                                                    * 
allowed_packet_loss_short) )
+      num_received_long[peer_nr]++;
+      duration = GNUNET_TIME_absolute_get_duration (start_long[peer_nr]);
+      update_avg_latency (payload, peer_nr);
+      if ((num_sent_long[peer_nr] == burst_packets_long) &&
+          (num_received_long[peer_nr] >
+           burst_packets_long
+           / 100
+           *
+           allowed_packet_loss_short) )
       {
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
-             "Long size packet test done.\n");
+             "Long size packet test  for peer %u done.\n",
+             peer_nr);
         char *goodput = GNUNET_STRINGS_byte_size_fancy (
-          (long_message_size * num_received_long * 1000 * 1000)
+          (long_message_size * num_received_long[peer_nr] * 1000 * 1000)
           / duration.
           rel_value_us);
 
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
              "%lu/%lu packets in %llu us (%s/s) -- avg latency: %llu us\n",
-             (unsigned long) num_received_long,
-             (unsigned long) num_sent_long,
+             (unsigned long) num_received_long[peer_nr],
+             (unsigned long) num_sent_long[peer_nr],
              (unsigned long long) duration.rel_value_us,
              goodput,
-             (unsigned long long) avg_latency);
+             (unsigned long long) avg_latency[peer_nr]);
         GNUNET_free (goodput);
-        ack = 0;
+        ack[peer_nr] = 0;
         // phase = TP_SIZE_CHECK;
         // num_received = 0;
         // num_sent_long = 0;
-        avg_latency = 0;
+        avg_latency[peer_nr] = 0;
         // size_test (NULL);
-        phase_long = GNUNET_NO;
-        choose_phase ();
+        phase_long[peer_nr] = GNUNET_NO;
+        choose_phase (get_tc_h (peer_nr));
       }
       break;
     }
@@ -813,39 +937,44 @@ incoming_message_cb (
     {
       size_t max_size = 64000;
 
-      GNUNET_assert (TP_SIZE_CHECK == phase);
+      GNUNET_assert (TP_SIZE_CHECK == phase[peer_nr]);
       if (LONG_MESSAGE_SIZE != long_message_size)
         max_size = long_message_size;
-      num_received_size++;
-      update_avg_latency (payload);
-      if (num_received_size >= (max_size) / 10)
+      num_received_size[peer_nr]++;
+      update_avg_latency (payload, peer_nr);
+      if ((GNUNET_YES == phase_size[peer_nr]) && (num_received_size[peer_nr] >=
+                                                  (max_size) / 10) )
       {
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
-             "Size packet test done.\n");
+             "Size packet test  for peer %u done.\n",
+             peer_nr);
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
              "%lu/%lu packets -- avg latency: %llu us\n",
-             (unsigned long) num_received_size,
-             (unsigned long) num_sent_size,
-             (unsigned long long) avg_latency);
-        num_received_size = 0;
-        num_sent_size = 0;
-        avg_latency = 0;
-        iterations_left--;
-        if (0 != iterations_left)
+             (unsigned long) num_received_size[peer_nr],
+             (unsigned long) num_sent_size[peer_nr],
+             (unsigned long long) avg_latency[peer_nr]);
+        iterations_left[peer_nr]--;
+        phase_size[peer_nr] = GNUNET_NO;
+        if (0 != iterations_left[peer_nr])
         {
           // start_short = GNUNET_TIME_absolute_get ();
           // phase = TP_BURST_SHORT;
-          num_sent_short = 0;
-          num_sent_long = 0;
-          num_received_short = 0;
-          num_received_long = 0;
+          num_received_size[peer_nr] = 0;
+          num_sent_size[peer_nr] = 0;
+          avg_latency[peer_nr] = 0;
+          num_sent_short[peer_nr] = 0;
+          num_sent_long[peer_nr] = 0;
+          num_received_short[peer_nr] = 0;
+          num_received_long[peer_nr] = 0;
           // short_test (NULL);
-          load_phase_config ();
-          choose_phase ();
-          break;
+          if (((PEER_A == peer_nr) && finished[PEER_B]) || ((PEER_B ==
+                                                             peer_nr) &&
+                                                            finished[PEER_A]))
+          {
+            load_phase_config ();
+          }
         }
-        phase_size = GNUNET_NO;
-        choose_phase ();
+        choose_phase (get_tc_h (peer_nr));
       }
       break;
     }
@@ -859,23 +988,23 @@ do_shutdown (void *cls)
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "shuting down test.\n");
 
-  if (NULL != box_stats)
-  {
-    GNUNET_STATISTICS_get_cancel (box_stats);
-    box_stats = NULL;
-  }
-  if (NULL != rekey_stats)
-  {
-    GNUNET_STATISTICS_get_cancel (rekey_stats);
-    rekey_stats = NULL;
-  }
-  if (NULL != to_task)
-  {
-    GNUNET_SCHEDULER_cancel (to_task);
-    to_task = NULL;
-  }
   for (unsigned int i = 0; i < NUM_PEERS; i++)
   {
+    if (NULL != box_stats[i])
+    {
+      GNUNET_STATISTICS_get_cancel (box_stats[i]);
+      box_stats[i] = NULL;
+    }
+    if (NULL != rekey_stats[i])
+    {
+      GNUNET_STATISTICS_get_cancel (rekey_stats[i]);
+      rekey_stats[i] = NULL;
+    }
+    if (NULL != to_task[i])
+    {
+      GNUNET_SCHEDULER_cancel (to_task[i]);
+      to_task[i] = NULL;
+    }
     GNUNET_TRANSPORT_TESTING_transport_communicator_service_stop (tc_hs[i]);
     GNUNET_STATISTICS_destroy (stats[i], GNUNET_NO);
   }
@@ -935,7 +1064,10 @@ main (int argc,
   char *test_mode;
   char *cfg_peer;
 
-  phase = TP_INIT;
+  iterations_left[0] = TOTAL_ITERATIONS;
+  iterations_left[1] = TOTAL_ITERATIONS;
+  phase[0] = TP_INIT;
+  phase[1] = TP_INIT;
   ret = 1;
   test_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]);
   communicator_name = strchr (test_name, '-');

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]