gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r26430 - gnunet/src/dv


From: gnunet
Subject: [GNUnet-SVN] r26430 - gnunet/src/dv
Date: Thu, 14 Mar 2013 17:03:30 +0100

Author: grothoff
Date: 2013-03-14 17:03:30 +0100 (Thu, 14 Mar 2013)
New Revision: 26430

Modified:
   gnunet/src/dv/dv.h
   gnunet/src/dv/dv_api.c
   gnunet/src/dv/gnunet-service-dv.c
Log:
-generate and process ACKs

Modified: gnunet/src/dv/dv.h
===================================================================
--- gnunet/src/dv/dv.h  2013-03-14 15:42:14 UTC (rev 26429)
+++ gnunet/src/dv/dv.h  2013-03-14 16:03:30 UTC (rev 26430)
@@ -121,7 +121,7 @@
   struct GNUNET_MessageHeader header;
 
   /**
-   * Unique ID for this message, for confirm callback.
+   * Unique ID for this message, for confirm callback, must never be zero.
    */
   uint32_t uid GNUNET_PACKED;
 

Modified: gnunet/src/dv/dv_api.c
===================================================================
--- gnunet/src/dv/dv_api.c      2013-03-14 15:42:14 UTC (rev 26429)
+++ gnunet/src/dv/dv_api.c      2013-03-14 16:03:30 UTC (rev 26430)
@@ -74,6 +74,11 @@
    */
   struct GNUNET_PeerIdentity target;
 
+  /**
+   * UID of our message, if any.
+   */
+  uint32_t uid;
+  
 };
 
 
@@ -184,10 +189,17 @@
                                 th);
     memcpy (&cbuf[ret], th->msg, tsize);
     ret += tsize;
-    (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks,
-                                             &th->target.hashPubKey,
-                                             th,
-                                             
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+    if (NULL != th->cb)
+    {
+      (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks,
+                                               &th->target.hashPubKey,
+                                               th,
+                                               
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+    }
+    else
+    {
+      GNUNET_free (th);
+    }
   }
   return ret;
 }
@@ -215,6 +227,54 @@
 
 
 /**
+ * Closure for 'process_ack'.
+ */
+struct AckContext
+{
+  /**
+   * The ACK message.
+   */
+  const struct GNUNET_DV_AckMessage *ack;
+
+  /**
+   * Our service handle.
+   */
+  struct GNUNET_DV_ServiceHandle *sh;
+};
+
+
+/**
+ * We got an ACK.  Check if it matches the given transmit handle, and if
+ * so call the continuation.
+ *
+ * @param cls the 'struct AckContext'
+ * @param key peer identity
+ * @param value the 'struct GNUNET_DV_TransmitHandle'
+ * @return GNUNET_OK if the ACK did not match (continue to iterate)
+ */
+static int
+process_ack (void *cls,
+            const struct GNUNET_HashCode *key,
+            void *value)
+{
+  struct AckContext *ctx = cls;
+  struct GNUNET_DV_TransmitHandle *th = value;
+
+  if (th->uid != ntohl (ctx->ack->uid))
+    return GNUNET_OK;
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_remove (ctx->sh->send_callbacks,
+                                                      key,
+                                                      th));
+  /* FIXME: should distinguish between success and failure here... */
+  th->cb (th->cb_cls,
+         GNUNET_OK);
+  GNUNET_free (th);
+  return GNUNET_NO;
+}
+
+
+/**
  * Handles a message sent from the DV service to us.
  * Parse it out and give it to the plugin.
  *
@@ -230,7 +290,9 @@
   const struct GNUNET_DV_DisconnectMessage *dm;
   const struct GNUNET_DV_ReceivedMessage *rm;
   const struct GNUNET_MessageHeader *payload;
-
+  const struct GNUNET_DV_AckMessage *ack;
+  struct AckContext ctx;
+  
   if (NULL == msg)
   {
     /* Connection closed */
@@ -282,6 +344,21 @@
                    ntohl (rm->distance),
                    payload);
     break;
+  case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
+    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    ack = (const struct GNUNET_DV_AckMessage *) msg;
+    ctx.ack = ack;
+    ctx.sh = sh;
+    GNUNET_CONTAINER_multihashmap_get_multiple (sh->send_callbacks,
+                                               &ack->target.hashPubKey,
+                                               &process_ack,
+                                               &ctx);
+    return;
   default:
     reconnect (sh);
     break;
@@ -495,6 +572,10 @@
   sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
   sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) + 
                           ntohs (msg->size));
+  if (0 == sh->uid_gen)
+    sh->uid_gen = 1;
+  th->uid = sh->uid_gen;
+  sm->uid = htonl (sh->uid_gen++);
   /* use memcpy here as 'target' may not be sufficiently aligned */
   memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
   memcpy (&sm[1], msg, ntohs (msg->size));

Modified: gnunet/src/dv/gnunet-service-dv.c
===================================================================
--- gnunet/src/dv/gnunet-service-dv.c   2013-03-14 15:42:14 UTC (rev 26429)
+++ gnunet/src/dv/gnunet-service-dv.c   2013-03-14 16:03:30 UTC (rev 26430)
@@ -28,9 +28,6 @@
  * @author Nathan Evans
  *
  * TODO:
- * - even _local_ flow control (send ACK only after core took our message) is
- *   not implemented, but should be (easy fix, but needs adjustments to data
- *   structures)
  * - distance updates are not properly communicate to US by core,
  *   and conversely we don't give distance updates properly to the plugin yet
  * - we send 'ACK' even if a message was dropped due to no route (may
@@ -420,12 +417,12 @@
  * Forward a message from another peer to the plugin.
  *
  * @param message the message to send to the plugin
- * @param distant_neighbor the original sender of the message
+ * @param origin the original sender of the message
  * @param distnace distance to the original sender of the message
  */
 static void
 send_data_to_plugin (const struct GNUNET_MessageHeader *message, 
-                    const struct GNUNET_PeerIdentity *distant_neighbor, 
+                    const struct GNUNET_PeerIdentity *origin,
                     uint32_t distance)
 {
   struct GNUNET_DV_ReceivedMessage *received_msg;
@@ -443,7 +440,7 @@
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Delivering message from peer `%s'\n",
-              GNUNET_i2s (distant_neighbor));
+              GNUNET_i2s (origin));
   size = sizeof (struct GNUNET_DV_ReceivedMessage) + 
     ntohs (message->size);
   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
@@ -456,7 +453,7 @@
   received_msg->header.size = htons (size);
   received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV);
   received_msg->distance = htonl (distance);
-  received_msg->sender = *distant_neighbor;
+  received_msg->sender = *origin;
   memcpy (&received_msg[1], message, ntohs (message->size));
   GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head, 
                                    plugin_pending_tail,
@@ -611,8 +608,9 @@
                                 dn->pm_tail,
                                  pending);
     memcpy (&cbuf[off], pending->msg, msize);
-    send_ack_to_plugin (&pending->ultimate_target,
-                       pending->uid);
+    if (0 != pending->uid) 
+      send_ack_to_plugin (&pending->ultimate_target,
+                         pending->uid);
     GNUNET_free (pending);
     off += msize;
   }
@@ -633,6 +631,8 @@
  * Forward the given payload to the given target.
  *
  * @param target where to send the message
+ * @param uid unique ID for the message
+ * @param ultimate_target ultimate recipient for the message
  * @param distance expected (remaining) distance to the target
  * @param sender original sender of the message
  * @param payload payload of the message
@@ -640,7 +640,9 @@
 static void
 forward_payload (struct DirectNeighbor *target,
                 uint32_t distance,
+                uint32_t uid,
                 const struct GNUNET_PeerIdentity *sender,
+                const struct GNUNET_PeerIdentity *ultimate_target,
                 const struct GNUNET_MessageHeader *payload)
 {
   struct PendingMessage *pm;
@@ -651,7 +653,10 @@
        (0 != memcmp (sender,
                     &my_identity,
                     sizeof (struct GNUNET_PeerIdentity))) )
+  {
+    GNUNET_break (0 == uid);
     return;
+  }
   msize = sizeof (struct RouteMessage) + ntohs (payload->size);
   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
   {
@@ -659,6 +664,8 @@
     return;
   }
   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+  pm->ultimate_target = *ultimate_target;
+  pm->uid = uid;
   pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
   rm = (struct RouteMessage *) &pm[1];
   rm->header.size = htons ((uint16_t) msize);
@@ -1271,6 +1278,8 @@
   }
   forward_payload (route->next_hop,
                   ntohl (route->target.distance),
+                  0,
+                  &rm->target,
                   &rm->sender,
                   payload);
   return GNUNET_OK;  
@@ -1300,6 +1309,7 @@
     return;
   }
   msg = (const struct GNUNET_DV_SendMessage *) message;
+  GNUNET_break (0 != ntohl (msg->uid));
   payload = (const struct GNUNET_MessageHeader *) &msg[1];
   if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs 
(payload->size))
   {
@@ -1316,14 +1326,14 @@
     GNUNET_STATISTICS_update (stats,
                              "# local messages discarded (no route)",
                              1, GNUNET_NO);
-    send_ack_to_plugin (&msg->target, htonl (msg->uid));
+    send_ack_to_plugin (&msg->target, ntohl (msg->uid));
     GNUNET_SERVER_receive_done (client, GNUNET_OK);
     return;
   }
-  // FIXME: flow control (send ACK only once message has left the queue...)
-  send_ack_to_plugin (&msg->target, htonl (msg->uid));
   forward_payload (route->next_hop,
                   ntohl (route->target.distance),
+                  htonl (msg->uid),
+                  &msg->target,
                   &my_identity,
                   payload);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]