gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28626 - gnunet/src/mesh


From: gnunet
Subject: [GNUnet-SVN] r28626 - gnunet/src/mesh
Date: Wed, 14 Aug 2013 19:21:18 +0200

Author: bartpolot
Date: 2013-08-14 19:21:18 +0200 (Wed, 14 Aug 2013)
New Revision: 28626

Modified:
   gnunet/src/mesh/gnunet-service-mesh-enc.c
Log:
- rewrite flow control towards clients


Modified: gnunet/src/mesh/gnunet-service-mesh-enc.c
===================================================================
--- gnunet/src/mesh/gnunet-service-mesh-enc.c   2013-08-14 15:37:25 UTC (rev 
28625)
+++ gnunet/src/mesh/gnunet-service-mesh-enc.c   2013-08-14 17:21:18 UTC (rev 
28626)
@@ -240,6 +240,11 @@
   unsigned int queue_max;
 
   /**
+   * Next ID to use.
+   */
+  uint32_t next_pid;
+
+  /**
    * ID of the last packet sent towards the peer.
    */
   uint32_t last_pid_sent;
@@ -755,6 +760,11 @@
      * ID of the client, mainly for debug messages
      */
   unsigned int id;
+
+    /**
+     * Is this client prevented from sending more data? (We "owe" him an ACK).
+     */
+  int blocked;
 };
 
 
@@ -1451,22 +1461,30 @@
  * 
  * @param ch Channel on which to send the ACK.
  * @param c Client to whom send the ACK.
- * @param is_fwd Set to GNUNET_YES for FWD ACK (dest->owner)
+ * @param fwd Set to GNUNET_YES for FWD ACK (dest->owner)
  */
 static void
 send_local_ack (struct MeshChannel *ch,
                 struct MeshClient *c,
-                int is_fwd)
+                int fwd)
 {
   struct GNUNET_MESH_LocalAck msg;
 
+  if (NULL == c
+      || ( fwd && (0 == ch->lid_root || c != ch->root))
+      || (!fwd && (0 == ch->lid_dest || c != ch->dest)) )
+  {
+    GNUNET_break (0);
+    return;
+  }
   msg.header.size = htons (sizeof (msg));
   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
-  msg.channel_id = htonl (is_fwd ? ch->lid_root : ch->lid_dest);
+  msg.channel_id = htonl (fwd ? ch->lid_root : ch->lid_dest);
   GNUNET_SERVER_notification_context_unicast (nc,
                                               c->handle,
                                               &msg.header,
                                               GNUNET_NO);
+  c->blocked = GNUNET_NO;
 }
 
 
@@ -1633,6 +1651,7 @@
       return;
     }
     msg->ttl = htonl (ttl - 1);
+    msg->pid = htonl (fwd ? c->fwd_fc.next_pid++ : c->bck_fc.next_pid++);
   }
 
   queue_add (data,
@@ -1659,6 +1678,7 @@
                               int fwd)
 {
   struct MeshConnection *c;
+  struct MeshFlowControl *fc;
   uint16_t type;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Send on Tunnel %s\n",
@@ -1669,14 +1689,18 @@
     GNUNET_break (GNUNET_YES == t->destroy);
     return;
   }
+  fc = fwd ? &c->fwd_fc : &c->bck_fc;
   type = ntohs (msg->header.type);
   switch (type)
   {
     case GNUNET_MESSAGE_TYPE_MESH_FWD:
     case GNUNET_MESSAGE_TYPE_MESH_BCK:
+    case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE:
+    case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY:
       msg->cid = htonl (c->id);
       msg->tid = t->id;
       msg->ttl = default_ttl;
+      msg->pid = fc->next_pid++;
       break;
     default:
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "unkown type %s\n",
@@ -2295,6 +2319,25 @@
 
 
 /**
+ * Is this peer the first one on the connection?
+ *
+ * @param c Connection.
+ * @param fwd Is this about fwd traffic?
+ *
+ * @return GNUNET_YES if origin, GNUNET_NO if relay/terminal.
+ */
+static int
+connection_is_origin (struct MeshConnection *c, int fwd)
+{
+  if (!fwd && c->own_pos == c->path->length - 1)
+    return GNUNET_YES;
+  if (fwd && c->own_pos == 0)
+    return GNUNET_YES;
+  return GNUNET_NO;
+}
+
+
+/**
  * Is this peer the last one on the connection?
  *
  * @param c Connection.
@@ -3445,12 +3488,34 @@
 }
 
 
+/**
+ * Send an ACK to a client is needed.
+ *
+ * @param ch Channel this is regarding.
+ * @param fwd Is this about fwd traffic? (ACk goes the opposite direction).
+ */
+static void
+channel_send_client_ack (struct MeshChannel *ch, int fwd)
+{
+  struct MeshClient *c;
 
+  /* Client to receive the ACK (fwd indicates traffic to be ACK'd) */
+  c = fwd ? ch->root : ch->dest;
+
+  if (GNUNET_YES == c->blocked)
+    send_local_ack (ch, c, fwd);
+}
+
+
 /**
  * Send ACK on one or more connections due to buffer space to the client.
+ *
+ * @param ch Channel which has some free buffer space.
+ * @param buffer Buffer space.
+ * @param fwd Is this in the FWD direction?
  */
 static void
-channel_send_ack (struct MeshChannel *ch, uint32_t buffer, int fwd)
+channel_send_connection_ack (struct MeshChannel *ch, uint32_t buffer, int fwd)
 {
   struct MeshTunnel2 *t = ch->t;
   struct MeshConnection *c;
@@ -3748,6 +3813,7 @@
 static void
 fc_init (struct MeshFlowControl *fc)
 {
+  fc->next_pid = 0;
   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
   fc->last_pid_recv = (uint32_t) -1;
   fc->last_ack_sent = (uint32_t) 0;
@@ -4111,7 +4177,7 @@
               peer2s (c->t->peer),
               c->id);
 
-  if (connection_is_terminal (c, GNUNET_NO)) /* If local, leave. */
+  if (connection_is_origin (c, GNUNET_YES)) /* If local, leave. */
     return;
 
   connection_destroy (c);
@@ -4140,7 +4206,7 @@
               peer2s (c->t->peer),
               c->id);
 
-  if (connection_is_terminal (c, GNUNET_YES)) /* If local, leave. */
+  if (connection_is_origin (c, GNUNET_NO)) /* If local, leave. */
     return;
 
   connection_destroy (c);
@@ -4172,7 +4238,7 @@
   if (GNUNET_SCHEDULER_NO_TASK != *ti)
     GNUNET_SCHEDULER_cancel (*ti);
 
-  if (connection_is_terminal (c, !fwd)) /* Endpoint */
+  if (connection_is_origin (c, fwd)) /* Endpoint */
   {
     f  = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
     *ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
@@ -4371,14 +4437,14 @@
       break;
     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   path create\n");
-      if (connection_is_terminal (c, GNUNET_NO))
+      if (connection_is_origin (c, GNUNET_YES))
         data_size = send_core_connection_create (queue->c, size, buf);
       else
         data_size = send_core_data_raw (queue->cls, size, buf);
       break;
     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   path ack\n");
-      if (connection_is_terminal (c, GNUNET_YES))
+      if (connection_is_origin (c, GNUNET_NO))
         data_size = send_core_connection_ack (queue->c, size, buf);
       else
         data_size = send_core_data_raw (queue->cls, size, buf);
@@ -4398,8 +4464,6 @@
                 GNUNET_MESH_DEBUG_M2S (queue->type));
     data_size = 0;
   }
-  /* Free queue, but cls was freed by send_core_* */
-  queue_destroy (queue, GNUNET_NO);
 
   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
   switch (type)
@@ -4408,12 +4472,18 @@
     case GNUNET_MESSAGE_TYPE_MESH_BCK:
       pid = ntohl ( ((struct GNUNET_MESH_Encrypted *) buf)->pid );
       fc->last_pid_sent = pid;
-      connection_send_ack (c, fwd);
+      if (NULL != queue->ch)
+        channel_send_client_ack (queue->ch, fwd);
+      else
+        connection_send_ack (c, fwd);
       break;
     default:
       break;
   }
 
+  /* Free queue, but cls was freed by send_core_* */
+  queue_destroy (queue, GNUNET_NO);
+
   /* If more data in queue, send next */
   queue = peer_get_first_message (peer);
   if (NULL != queue)
@@ -4511,6 +4581,7 @@
     priority = 50;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "fc %p\n", fc);
   if (fc->queue_n >= fc->queue_max && 0 == priority)
   {
     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
@@ -4522,6 +4593,8 @@
     return; /* Drop this message */
   }
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "pid %u\n", fc->last_pid_sent);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ack %u\n", fc->last_ack_recv);
   if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv) &&
       GNUNET_SCHEDULER_NO_TASK == fc->poll_task)
   {
@@ -5304,6 +5377,7 @@
     return GNUNET_OK;
   }
 
+  /* Message not for us: forward to next hop */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
   ttl = ntohl (msg->ttl);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
@@ -5317,7 +5391,6 @@
   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
 
   send_prebuilt_message_connection (&msg->header, c, NULL, fwd);
-  connection_send_ack (c, fwd);
 
   return GNUNET_OK;
 }
@@ -5992,6 +6065,7 @@
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  by client %u\n", c->id);
+  c->blocked = GNUNET_YES;
 
   msg = (struct GNUNET_MESH_LocalData *) message;
 
@@ -6140,7 +6214,7 @@
 
   rel->client_ready = GNUNET_YES;
   channel_send_client_buffered_data (ch, c, rel);
-  channel_send_ack (ch, 64 - rel->n_recv, fwd);
+  channel_send_connection_ack (ch, 64 - rel->n_recv, fwd);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]