gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: data structures for defragm


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: data structures for defragmentation
Date: Sat, 26 Jan 2019 12:27:55 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new cb26df28b data structures for defragmentation
cb26df28b is described below

commit cb26df28be6f46898c34d7e8957baa86fa56ed11
Author: Christian Grothoff <address@hidden>
AuthorDate: Sat Jan 26 12:27:50 2019 +0100

    data structures for defragmentation
---
 .../gnunet_transport_communication_service.h       |   4 +
 src/transport/gnunet-service-tng.c                 | 623 ++++++++++++++++++++-
 2 files changed, 599 insertions(+), 28 deletions(-)

diff --git a/src/include/gnunet_transport_communication_service.h 
b/src/include/gnunet_transport_communication_service.h
index 307f6688a..1cfc82540 100644
--- a/src/include/gnunet_transport_communication_service.h
+++ b/src/include/gnunet_transport_communication_service.h
@@ -308,6 +308,10 @@ GNUNET_TRANSPORT_communicator_address_remove (struct 
GNUNET_TRANSPORT_AddressIde
  * only be done if the communicator is uni-directional (i.e. cannot
  * send the message back itself).
  *
+ * While backchannel messages are signed and encrypted, communicators
+ * must protect against replay attacks when using this backchannel
+ * communication!
+ *
  * @param ch handle of this communicator
  * @param pid peer to send the message to
  * @param comm name of the communicator to send the message to
diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index cb6fcebdc..ac4a262d7 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -33,8 +33,11 @@
  *       transport-to-transport traffic)
  *
  * Implement:
- * - manage defragmentation, retransmission, track RTT, loss, etc.
- * - DV data structures, learning, forgetting, using them!
+ * - data structures for defragmentation
+ * - manage defragmentation
+ * - ACK handling / retransmission
+ * - track RTT, distance, loss, etc.
+ * - DV data structures, learning, forgetting & using them!
  *
  * Easy:
  * - use ATS bandwidth allocation callback and schedule transmissions!
@@ -83,6 +86,7 @@
 #include "gnunet_peerstore_service.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_ats_transport_service.h"
+#include "gnunet_signatures.h"
 #include "transport.h"
 
 
@@ -99,6 +103,16 @@
  */
 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 5)
 
+/**
+ * How long are ephemeral keys valid?
+ */
+#define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_HOURS, 4)
+
+/**
+ * How long do we keep partially reassembled messages around before giving up?
+ */
+#define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MINUTES, 4)
+
 /**
  * How many messages can we have pending for a given communicator
  * process before we start to throttle that communicator?
@@ -169,8 +183,7 @@ struct TransportBackchannelEncapsulationMessage
 
 
 /**
- * Body by which a peqer confirms that it is using an ephemeral
- * key.
+ * Body by which a peer confirms that it is using an ephemeral key.
  */
 struct EphemeralConfirmation
 {
@@ -182,9 +195,23 @@ struct EphemeralConfirmation
 
   /**
    * How long is this signature over the ephemeral key valid?
+   * Note that the receiver MUST IGNORE the absolute time, and
+   * only interpret the value as a mononic time and reject
+   * "older" values than the last one observed.  Even with this,
+   * there is no real guarantee against replay achieved here,
+   * as the latest timestamp is not persisted.  This is 
+   * necessary as we do not want to require synchronized 
+   * clocks and may not have a bidirectional communication
+   * channel.  Communicators must protect against replay
+   * attacks when using backchannel communication!
    */
   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
 
+  /**
+   * Target's peer identity.
+   */
+  struct GNUNET_PeerIdentity target;
+
   /**
    * Ephemeral key setup by the sender for @e target, used
    * to encrypt the payload.
@@ -376,6 +403,12 @@ struct TransportFragmentAckMessage
    * average transmission time of the sender minus this value.
    */
   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
+
+  /**
+   * How long until the receiver will stop trying reassembly
+   * of this message?
+   */
+  struct GNUNET_TIME_RelativeNBO reassembly_timeout;
 };
 
 
@@ -548,6 +581,11 @@ struct EphemeralCacheEntry
    */
   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
 
+  /**
+   * Our private ephemeral key.
+   */
+  struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
+
   /**
    * Node in the ephemeral cache for this entry.
    * Used for expiration.
@@ -726,6 +764,96 @@ struct GNUNET_ATS_Session
 };
 
 
+/**
+ * Information we keep for a message that we are reassembling.
+ */ 
+struct ReassemblyContext
+{
+
+  /**
+   * Original message ID for of the message that all the
+   * fragments belong to.
+   */ 
+  struct GNUNET_ShortHashCode msg_uuid;
+
+  /**
+   * Which neighbour is this context for?
+   */
+  struct Neighbour *neighbour;
+
+  /**
+   * Entry in the reassembly heap (sorted by expiration).
+   */ 
+  struct GNUNET_CONTAINER_HeapNode *hn;
+
+  /**
+   * Bitfield with @e msg_size bits representing the positions
+   * where we have received fragments.  When we receive a fragment,
+   * we check the bits in @e bitfield before incrementing @e msg_missing.
+   *
+   * Allocated after the reassembled message.
+   */
+  uint8_t *bitfield;
+
+  /**
+   * Task for sending ACK. We may send ACKs either because of hitting
+   * the @e extra_acks limit, or based on time and @e num_acks.  This
+   * task is for the latter case.
+   */
+  struct GNUNET_SCHEDULER_Task *ack_task;
+  
+  /**
+   * At what time will we give up reassembly of this message?
+   */
+  struct GNUNET_TIME_Absolute reassembly_timeout;
+
+  /**
+   * Average delay of all acks in @e extra_acks and @e frag_uuid.
+   * Should be reset to zero when @e num_acks is set to 0.
+   */
+  struct GNUNET_TIME_Relative avg_ack_delay;
+
+  /**
+   * Time we received the last fragment.  @e avg_ack_delay must be
+   * incremented by now - @e last_frag multiplied by @e num_acks.
+   */
+  struct GNUNET_TIME_Absolute last_frag;
+
+  /**
+   * Bitfield of up to 64 additional fragments following @e frag_uuid
+   * to be acknowledged in the next cummulative ACK.
+   */
+  uint64_t extra_acks;
+  
+  /**
+   * Unique ID of the lowest fragment UUID to be acknowledged in the
+   * next cummulative ACK.  Only valid if @e num_acks > 0.
+   */
+  uint32_t frag_uuid;
+
+  /**
+   * Number of ACKs we have accumulated so far.  Reset to 0
+   * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
+   */
+  unsigned int num_acks;
+  
+  /**
+   * How big is the message we are reassembling in total?
+   */
+  uint16_t msg_size;
+
+  /**
+   * How many bytes of the message are still missing?  Defragmentation
+   * is complete when @e msg_missing == 0.
+   */
+  uint16_t msg_missing;
+
+  /* Followed by @e msg_size bytes of the (partially) defragmented original 
message */
+
+  /* Followed by @e bitfield data */
+};
+
+
 /**
  * A neighbour that at least one communicator is connected to.
  */
@@ -737,6 +865,25 @@ struct Neighbour
    */
   struct GNUNET_PeerIdentity pid;
 
+  /**
+   * Map with `struct ReassemblyContext` structs for fragments under
+   * reassembly. May be NULL if we currently have no fragments from
+   * this @e pid (lazy initialization).
+   */ 
+  struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
+
+  /**
+   * Heap with `struct ReassemblyContext` structs for fragments under
+   * reassembly. May be NULL if we currently have no fragments from
+   * this @e pid (lazy initialization).
+   */ 
+  struct GNUNET_CONTAINER_Heap *reassembly_heap;
+
+  /**
+   * Task to free old entries from the @e reassembly_heap and @e 
reassembly_map.
+   */
+  struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
+  
   /**
    * Head of list of messages pending for this neighbour.
    */
@@ -1177,6 +1324,11 @@ static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
 
+/**
+ * Task to free expired ephemerals.
+ */
+static struct GNUNET_SCHEDULER_Task *ephemeral_task;
+
 /**
  * Our connection to ATS for allocation and bootstrapping.
  */
@@ -1363,6 +1515,76 @@ client_connect_cb (void *cls,
 }
 
 
+/**
+ * Free @a rc
+ *
+ * @param rc data structure to free
+ */
+static void
+free_reassembly_context (struct ReassemblyContext *rc)
+{
+  struct Neighbour *n = rc->neighbour;
+
+  GNUNET_assert (rc ==
+                GNUNET_CONTAINER_heap_remove_node (rc->hn));
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
+                                                       &rc->msg_uuid,
+                                                       rc));
+  GNUNET_free (rc);
+}
+
+
+/**
+ * Task run to clean up reassembly context of a neighbour that have expired.
+ *
+ * @param cls a `struct Neighbour`
+ */
+static void
+reassembly_cleanup_task (void *cls)
+{
+  struct Neighbour *n = cls;
+  struct ReassemblyContext *rc;
+
+  n->reassembly_timeout_task = NULL;
+  while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
+  {
+    if (0 == GNUNET_TIME_absolute_get_remaining 
(rc->reassembly_timeout).rel_value_us)
+    {
+      free_reassembly_context (rc);
+      continue;
+    }
+    GNUNET_assert (NULL == n->reassembly_timeout_task);
+    n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at 
(rc->reassembly_timeout,
+                                                         
&reassembly_cleanup_task,
+                                                         n);
+    return;
+  }
+}
+
+
+/**
+ * function called to #free_reassembly_context().
+ *
+ * @param cls NULL
+ * @param key unused
+ * @param value a `struct ReassemblyContext` to free
+ * @return #GNUNET_OK (continue iteration)
+ */
+static int
+free_reassembly_cb (void *cls,
+                   const struct GNUNET_ShortHashCode *key,
+                   void *value)
+{
+  struct ReassemblyContext *rc = value;
+  (void) cls;
+  (void) key;
+  
+  free_reassembly_context (rc);
+  return GNUNET_OK;
+}
+
+
 /**
  * Release memory used by @a neighbour.
  *
@@ -1378,6 +1600,18 @@ free_neighbour (struct Neighbour *neighbour)
                                                       neighbour));
   if (NULL != neighbour->timeout_task)
     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
+  if (NULL != neighbour->reassembly_map)
+  {
+    GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
+                                           &free_reassembly_cb,
+                                           NULL);
+    GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
+    neighbour->reassembly_map = NULL;
+    GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
+    neighbour->reassembly_heap = NULL;
+  }
+  if (NULL != neighbour->reassembly_timeout_task)
+    GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
   GNUNET_free (neighbour);
 }
 
@@ -2054,11 +2288,147 @@ static int
 check_communicator_backchannel (void *cls,
                                const struct 
GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
 {
-  // FIXME: check encapsulated message
-  // FIXME: check 0-termination of communcator at target
+  const struct GNUNET_MessageHeader *inbox;
+  const char *is;
+  uint16_t msize;
+  uint16_t isize;
+
+  msize = ntohs (cb->header.size) - sizeof (*cb);
+  if (UINT16_MAX - msize >
+      sizeof (struct TransportBackchannelEncapsulationMessage) +
+      sizeof (struct TransportBackchannelRequestPayload) )
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  inbox = (const struct GNUNET_MessageHeader *) &cb[1];
+  isize = ntohs (inbox->size);
+  if (isize >= msize)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  is = (const char *) inbox;
+  is += isize;
+  msize -= isize;
+  GNUNET_assert (msize > 0);
+  if ('\0' != is[msize-1])
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
   return GNUNET_OK;
 }
 
+
+/**
+ * Remove memory used by expired ephemeral keys.
+ *
+ * @param cls NULL
+ */
+static void
+expire_ephemerals (void *cls)
+{
+  struct EphemeralCacheEntry *ece;
+
+  (void) cls;
+  ephemeral_task = NULL;
+  while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
+  {
+    if (0 == GNUNET_TIME_absolute_get_remaining 
(ece->ephemeral_validity).rel_value_us)
+    {
+      free_ephemeral (ece);
+      continue;
+    }
+    ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
+                                             &expire_ephemerals,
+                                             NULL);
+    return;
+  }
+}
+
+
+/**
+ * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
+ * one, cache it and return it.
+ *
+ * @param pid peer to look up ephemeral for
+ * @param private_key[out] set to the private key 
+ * @param ephemeral_key[out] set to the key
+ * @param ephemeral_sender_sig[out] set to the signature
+ * @param ephemeral_validity[out] set to the validity expiration time
+ */
+static void
+lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
+                 struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
+                 struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
+                 struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
+                 struct GNUNET_TIME_Absolute *ephemeral_validity)
+{
+  struct EphemeralCacheEntry *ece;
+  struct EphemeralConfirmation ec;
+  
+  ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
+                                          pid);
+  if ( (NULL != ece) &&
+       (0 == GNUNET_TIME_absolute_get_remaining 
(ece->ephemeral_validity).rel_value_us) )
+  {
+    free_ephemeral (ece);
+    ece = NULL;
+  }
+  if (NULL == ece)
+  {
+    ece = GNUNET_new (struct EphemeralCacheEntry);
+    ece->target = *pid;
+    ece->ephemeral_validity = GNUNET_TIME_absolute_add 
(GNUNET_TIME_absolute_get_monotonic (GST_cfg),
+                                                       EPHEMERAL_VALIDITY);
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
+    GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
+                                       &ece->ephemeral_key);
+    ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
+    ec.purpose.size = htonl (sizeof (ec));
+    ec.target = *pid;
+    ec.ephemeral_key = ece->ephemeral_key;
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                            &ec.purpose,
+                                            &ece->sender_sig));
+    ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
+                                           ece,
+                                           
ece->ephemeral_validity.abs_value_us);
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
+                                                     &ece->target,
+                                                     ece,
+                                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    if (NULL == ephemeral_task)
+      ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
+                                               &expire_ephemerals,
+                                               NULL);
+  }
+  *private_key = ece->private_key;
+  *ephemeral_key = ece->ephemeral_key;
+  *ephemeral_sender_sig = ece->sender_sig;
+  *ephemeral_validity = ece->ephemeral_validity;
+}
+
+
+/**
+ * We need to transmit @a hdr to @a target.  If necessary, this may
+ * involve DV routing or even broadcasting and fragmentation.
+ *
+ * @param target peer to receive @a hdr
+ * @param hdr header of the message to route
+ */
+static void
+route_message (const struct GNUNET_PeerIdentity *target,
+              struct GNUNET_MessageHeader *hdr)
+{
+  // FIXME: send hdr to target, free hdr (possibly using DV, possibly 
broadcasting)
+  GNUNET_free (hdr);
+}
+
   
 /**
  * Communicator requests backchannel transmission.  Process the request.
@@ -2071,11 +2441,48 @@ handle_communicator_backchannel (void *cls,
                                 const struct 
GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
 {
   struct TransportClient *tc = cls;
-
-  // FIXME: determine path (possibly DV)! to target peer
-  // FIXME: encapsulate message, encrypt message!
-  // FIXME: possibly fragment message
-  // FIXME: possibly DV-route message!
+  struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
+  struct GNUNET_TIME_Absolute ephemeral_validity;
+  struct TransportBackchannelEncapsulationMessage *enc;
+  struct TransportBackchannelRequestPayload ppay;
+  char *mpos;
+  uint16_t msize;
+  
+  /* encapsulate and encrypt message */
+  msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct 
TransportBackchannelRequestPayload);
+  enc = GNUNET_malloc (sizeof (*enc) + msize);
+  enc->header.type = htons 
(GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
+  enc->header.size = htons (sizeof (*enc) + msize);
+  enc->target = cb->pid;
+  lookup_ephemeral (&cb->pid,
+                   &private_key,
+                   &enc->ephemeral_key,
+                   &ppay.sender_sig,
+                   &ephemeral_validity);
+  // FIXME: setup 'iv'
+#if FIXME
+  dh_key_derive (&private_key,
+                &cb->pid,
+                &enc->iv,
+                &key);
+#endif
+  ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
+  ppay.monotonic_time = GNUNET_TIME_absolute_hton 
(GNUNET_TIME_absolute_get_monotonic (GST_cfg));
+  mpos = (char *) &enc[1];
+#if FIXME
+  encrypt (key,
+          &ppay,
+          &mpos,
+          sizeof (ppay));
+  encrypt (key,
+          &cb[1],
+          &mpos,
+          ntohs (cb->header.size) - sizeof (*cb));
+  hmac (key,
+       &enc->hmac);
+#endif
+  route_message (&cb->pid,
+                &enc->header);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -2260,9 +2667,27 @@ struct CommunicatorMessageContext
    * Additional information for flow control and about the sender.
    */
   struct GNUNET_TRANSPORT_IncomingMessage im;
+
+  /**
+   * Number of hops the message has travelled (if DV-routed).
+   * FIXME: make use of this in ACK handling!
+   */
+  uint16_t total_hops;
 };
 
 
+/**
+ * Given an inbound message @a msg from a communicator @a cmc,
+ * demultiplex it based on the type calling the right handler.
+ *
+ * @param cmc context for demultiplexing
+ * @param msg message to demultiplex
+ */ 
+static void
+demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
+                     const struct GNUNET_MessageHeader *msg);
+
+
 /**
  * Send ACK to communicator (if requested) and free @a cmc.
  *
@@ -2385,9 +2810,89 @@ handle_fragment_box (void *cls,
                     const struct TransportFragmentBox *fb)
 {
   struct CommunicatorMessageContext *cmc = cls;
+  struct Neighbour *n;
+  struct ReassemblyContext *rc;
+  const struct GNUNET_MessageHeader *msg;
+  uint16_t msize;
+
+  n = GNUNET_CONTAINER_multipeermap_get (neighbours,
+                                        &cmc->im.sender);
+  if (NULL == n)
+  {
+    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+       
+    GNUNET_break (0);
+    finish_cmc_handling (cmc);
+    GNUNET_SERVICE_client_drop (client);
+    return;
+  }
+  if (NULL == n->reassembly_map)
+  {
+    n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
+                                                              GNUNET_YES);
+    n->reassembly_heap = GNUNET_CONTAINER_heap_create 
(GNUNET_CONTAINER_HEAP_ORDER_MIN);
+    n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed 
(REASSEMBLY_EXPIRATION,
+                                                              
&reassembly_cleanup_task,
+                                                              n);
+  }
+  msize = ntohs (fb->msg_size);
+  rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
+                                          &fb->msg_uuid);
+  if (NULL == rc)
+  {
+    rc = GNUNET_malloc (sizeof (*rc) +
+                       msize + /* reassembly payload buffer */
+                       (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
+    rc->msg_uuid = fb->msg_uuid;
+    rc->neighbour = n;
+    rc->msg_size = msize;
+    rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute 
(REASSEMBLY_EXPIRATION);
+    rc->last_frag = GNUNET_TIME_absolute_get ();
+    rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
+                                          rc,
+                                          rc->reassembly_timeout.abs_value_us);
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
+                                                      &rc->msg_uuid,
+                                                      rc,
+                                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    rc->bitfield = (uint8_t *) (((char *) &rc[1]) + rc->msg_size);
+    rc->msg_missing = rc->msg_size;
+  }
+  if (msize != rc->msg_size)
+  {
+    GNUNET_break (0);
+    finish_cmc_handling (cmc);
+    return;
+  }
   
-  // FIXME: do work!
-  finish_cmc_handling (cmc);
+  // FIXME: do work: reassemble
+
+  /* is reassembly complete? */
+  if (0 != rc->msg_missing)
+  {
+    /* FIXME: possibly send ACK! */
+    finish_cmc_handling (cmc);
+    return;
+  }
+  /* reassembly is complete, verify result */
+  msg = (const struct GNUNET_MessageHeader *) &rc[1];
+  if (ntohs (msg->size) != rc->msg_size)
+  {
+    GNUNET_break (0);
+    free_reassembly_context (rc);
+    finish_cmc_handling (cmc);
+    return;
+  }
+  /* successful reassembly */
+  /* FIXME: definitively send ACK! */
+  demultiplex_with_cmc (cmc,
+                       msg);
+  /* FIXME: really free here? Might be bad if fragments are still
+     en-route and we forget that we finished this reassembly immediately!
+     -> keep around until timeout? 
+     -> shorten timeout based on ACK? */
+  free_reassembly_context (rc);
 }
 
 
@@ -2436,11 +2941,27 @@ handle_reliability_box (void *cls,
 {
   struct CommunicatorMessageContext *cmc = cls;
   const struct GNUNET_MessageHeader *inbox = (const struct 
GNUNET_MessageHeader *) &rb[1];
-  
-  // FIXME: send back reliability ACK (possibly conditional)
-  /* forward encapsulated message to CORE */
-  handle_raw_message (cmc,
-                     inbox);
+
+  if (0 == ntohl (rb->ack_countdown))
+  {
+    struct TransportReliabilityAckMessage *ack;
+
+    /* FIXME: implement cummulative ACKs and ack_countdown,
+       then setting the avg_ack_delay field below: */
+    ack = GNUNET_malloc (sizeof (*ack) + 
+                        sizeof (struct GNUNET_ShortHashCode));
+    ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
+    ack->header.size = htons (sizeof (*ack) + 
+                             sizeof (struct GNUNET_ShortHashCode));
+    memcpy (&ack[1],
+           &rb->msg_uuid,
+           sizeof (struct GNUNET_ShortHashCode));
+    route_message (&cmc->im.sender,
+                  &ack->header);
+  }
+  /* continue with inner message */
+  demultiplex_with_cmc (cmc,
+                       inbox);
 }
 
 
@@ -2496,7 +3017,16 @@ handle_backchannel_encapsulation (void *cls,
 {
   struct CommunicatorMessageContext *cmc = cls;
 
-  // FIMXE: test if it is for me, if not, try to forward to target (DV routes!)
+  if (0 != memcmp (&be->target,
+                  &GST_my_identity,
+                  sizeof (struct GNUNET_PeerIdentity)))
+  {
+    /* not for me, try to route to target */
+    route_message (&be->target,
+                  GNUNET_copy_message (&be->header));
+    finish_cmc_handling (cmc);
+    return;
+  }
   // FIXME: compute shared secret
   // FIXME: check HMAC
   // FIXME: decrypt payload
@@ -2616,10 +3146,25 @@ handle_dv_box (void *cls,
               const struct TransportDVBox *dvb)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  
-  // FIXME: are we the target? Then unbox and handle message.
-  // FIXME: if we are not the target, shorten path and forward along.
-  finish_cmc_handling (cmc);
+  uint16_t size = ntohs (dvb->header.size);
+  uint16_t num_hops = ntohs (dvb->num_hops);
+  const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity 
*) &dvb[1];
+  const struct GNUNET_MessageHeader *inbox = (const struct 
GNUNET_MessageHeader *) &hops[num_hops];
+
+  if (num_hops > 0)
+  {
+    // FIXME: if we are not the target, shorten path and forward along.
+    // Try from the _end_ of hops array if we know the given
+    // neighbour (shortening the path!). 
+    // NOTE: increment total_hops!
+    finish_cmc_handling (cmc);
+    return;
+  }
+  /* We are the target. Unbox and handle message. */
+  cmc->im.sender = dvb->origin;
+  cmc->total_hops = ntohs (dvb->total_hops); 
+  demultiplex_with_cmc (cmc,
+                       inbox);
 }
 
 
@@ -2657,6 +3202,25 @@ handle_incoming_msg (void *cls,
 {
   struct TransportClient *tc = cls;
   struct CommunicatorMessageContext *cmc = GNUNET_new (struct 
CommunicatorMessageContext);
+
+  cmc->tc = tc;
+  cmc->im = *im;
+  demultiplex_with_cmc (cmc,
+                       (const struct GNUNET_MessageHeader *) &im[1]);
+}
+
+
+/**
+ * Given an inbound message @a msg from a communicator @a cmc,
+ * demultiplex it based on the type calling the right handler.
+ *
+ * @param cmc context for demultiplexing
+ * @param msg message to demultiplex
+ */ 
+static void
+demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
+                     const struct GNUNET_MessageHeader *msg)
+{
   struct GNUNET_MQ_MessageHandler handlers[] = {
     GNUNET_MQ_hd_var_size (fragment_box,
                           GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
@@ -2690,14 +3254,12 @@ handle_incoming_msg (void *cls,
   };
   int ret;
 
-  cmc->tc = tc;
-  cmc->im = *im;
   ret = GNUNET_MQ_handle_message (handlers,
-                                 (const struct GNUNET_MessageHeader *) &im[1]);
+                                 msg);
   if (GNUNET_SYSERR == ret)
   {
     GNUNET_break (0);
-    GNUNET_SERVICE_client_drop (tc->client);
+    GNUNET_SERVICE_client_drop (cmc->tc->client);
     GNUNET_free (cmc);
     return;
   }
@@ -2705,7 +3267,7 @@ handle_incoming_msg (void *cls,
   {
     /* unencapsulated 'raw' message */
     handle_raw_message (&cmc,
-                       (const struct GNUNET_MessageHeader *) &im[1]);
+                       msg);
   }
 }
 
@@ -3731,6 +4293,11 @@ do_shutdown (void *cls)
 {
   (void) cls;
 
+  if (NULL != ephemeral_task)
+  {
+    GNUNET_SCHEDULER_cancel (ephemeral_task);
+    ephemeral_task = NULL;
+  }
   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
                                         &free_neighbour_cb,
                                         NULL);

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]