gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r20362 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r20362 - gnunet/src/stream
Date: Thu, 8 Mar 2012 08:45:52 +0100

Author: harsha
Date: 2012-03-08 08:45:52 +0100 (Thu, 08 Mar 2012)
New Revision: 20362

Modified:
   gnunet/src/stream/stream_api.c
   gnunet/src/stream/test_stream_local.c
Log:
Data message retransmissions

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-03-08 00:14:55 UTC (rev 20361)
+++ gnunet/src/stream/stream_api.c      2012-03-08 07:45:52 UTC (rev 20362)
@@ -157,7 +157,6 @@
  */
 struct GNUNET_STREAM_Socket
 {
-
   /**
    * The peer identity of the peer at the other end of the stream
    */
@@ -254,6 +253,11 @@
   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
 
   /**
+   * Task identifier for retransmission task after timeout
+   */
+  GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+
+  /**
    * The state of the protocol associated with this socket
    */
   enum State state;
@@ -372,12 +376,27 @@
   struct GNUNET_STREAM_DataMessage *messages[64];
 
   /**
+   * The write continuation callback
+   */
+  GNUNET_STREAM_CompletionContinuation write_cont;
+
+  /**
+   * Write continuation closure
+   */
+  void *write_cont_cls;
+
+  /**
    * The bitmap of this IOHandle; Corresponding bit for a message is set when
    * it has been acknowledged by the receiver
    */
   GNUNET_STREAM_AckBitmap ack_bitmap;
 
   /**
+   * Number of bytes in this write handle
+   */
+  size_t size;
+
+  /**
    * Number of packets sent before waiting for an ack
    *
    * FIXME: Do we need this?
@@ -406,7 +425,7 @@
 /**
  * Default value in seconds for various timeouts
  */
-static unsigned int default_timeout = 300;
+static unsigned int default_timeout = 10;
 
 
 /**
@@ -494,10 +513,16 @@
 {
   struct MessageQueue *queue_entity;
 
+  GNUNET_assert 
+    ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
+     && (ntohs (message->header.type) <= 
GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Queueing message of type %d and size %d\n",
+              "%s: Queueing message of type %d and size %d\n",
+              GNUNET_i2s (&socket->our_id),
               ntohs (message->header.type),
               ntohs (message->header.size));
+  GNUNET_assert (NULL != message);
   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
   queue_entity->message = message;
   queue_entity->finish_cb = finish_cb;
@@ -522,6 +547,31 @@
 
 
 /**
+ * Copies a message and queues it for sending using the mesh connection of
+ * given socket 
+ *
+ * @param socket the socket whose mesh connection is used
+ * @param message the message to be sent
+ * @param finish_cb the callback to be called when the message is sent
+ * @param finish_cb_cls the closure for the callback
+ */
+static void
+copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
+                        const struct GNUNET_STREAM_MessageHeader *message,
+                        SendFinishCallback finish_cb,
+                        void *finish_cb_cls)
+{
+  struct GNUNET_STREAM_MessageHeader *msg_copy;
+  uint16_t size;
+  
+  size = ntohs (message->header.size);
+  msg_copy = GNUNET_malloc (size);
+  memcpy (msg_copy, message, size);
+  queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
+}
+
+
+/**
  * Callback function for sending ack message
  *
  * @param cls closure the ACK message created in ack_task
@@ -547,8 +597,39 @@
   return size;
 }
 
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receive_window_size
+ *
+ * @param socket the socket to use
+ */
+static void 
+write_data (struct GNUNET_STREAM_Socket *socket);
 
 /**
+ * Task for retransmitting data messages if they aren't ACK before their ack
+ * deadline 
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+retransmission_timeout_task (void *cls,
+                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STREAM_Socket *socket = cls;
+  
+  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+    return;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->our_id));
+  socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  write_data (socket);
+}
+
+
+/**
  * Task for sending ACK message
  *
  * @param cls the socket
@@ -674,10 +755,17 @@
       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
                                              packet))
         {
-          queue_message (socket,
-                         &io_handle->messages[packet]->header,
-                         NULL,
-                         NULL);
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "%s: Placing DATA message with sequence %u in send",
+                      "queue\n",
+                      GNUNET_i2s (&socket->our_id),
+                      (unsigned int) 
+                      io_handle->messages[packet]->sequence_number);
+
+          copy_and_queue_message (socket,
+                                  &io_handle->messages[packet]->header,
+                                  NULL,
+                                  NULL);
         }
     }
   packet = ack_packet + 1;
@@ -685,13 +773,25 @@
   while ( (NULL != io_handle->messages[packet]) &&
          (socket->receive_window_available >= ntohs 
(io_handle->messages[packet]->header.header.size)) )
     {
-      socket->receive_window_available -= ntohs 
(io_handle->messages[packet]->header.header.size);
-      queue_message (socket,
-                     &io_handle->messages[packet]->header,
-                     &write_data_finish_cb,
-                     io_handle);
+      socket->receive_window_available -= 
+        ntohs (io_handle->messages[packet]->header.header.size);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s: Placing DATA message with sequence %u in send",
+                  "queue\n",
+                  GNUNET_i2s (&socket->our_id),
+                  (unsigned int) 
+                  io_handle->messages[packet]->sequence_number);
+      copy_and_queue_message (socket,
+                              &io_handle->messages[packet]->header,
+                              &write_data_finish_cb,
+                              io_handle);
       packet++;
     }
+
+  GNUNET_SCHEDULER_add_delayed
+    (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
+     &retransmission_timeout_task,
+     socket);                                
 }
 
 
@@ -865,7 +965,8 @@
       if ( relative_sequence_number > 64)
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "Ignoring received message with sequence number %d",
+                      "%s: Ignoring received message with sequence number 
%u\n",
+                      GNUNET_i2s (&socket->our_id),
                       ntohl (msg->sequence_number));
           return GNUNET_YES;
         }
@@ -1032,33 +1133,44 @@
   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
   struct GNUNET_STREAM_HelloAckMessage *reply;
 
+  GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK == 
+                 ntohs (message->type));
+  ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s: Received HELLO_ACK from %s\n",
               GNUNET_i2s (&socket->our_id),
               GNUNET_i2s (sender));
-  ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
+
   GNUNET_assert (socket->tunnel == tunnel);
   switch (socket->state)
   {
   case STATE_HELLO_WAIT:
-      socket->read_sequence_number = ntohl (ack_msg->sequence_number);
-      socket->receive_window_available = ntohl (ack_msg->receive_window_size);
-      /* Get the random sequence number */
-      socket->write_sequence_number = 
-        GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
-      reply = 
-        GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
-      reply->header.header.size = 
-        htons (sizeof (struct GNUNET_STREAM_MessageHeader));
-      reply->header.header.type = 
-        htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
-      reply->sequence_number = htonl (socket->write_sequence_number);
-      reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
-      queue_message (socket, 
-                     &reply->header, 
-                     &set_state_established, 
-                     NULL);      
-      return GNUNET_OK;
+    socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%s: Read sequence number %u\n",
+                GNUNET_i2s (&socket->our_id),
+                (unsigned int) socket->read_sequence_number);
+    socket->receive_window_available = ntohl (ack_msg->receive_window_size);
+    /* Get the random sequence number */
+    socket->write_sequence_number = 
+      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s: Generated write sequence number %u\n",
+                  GNUNET_i2s (&socket->our_id),
+                  (unsigned int) socket->write_sequence_number);
+    reply = 
+      GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+    reply->header.header.size = 
+      htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+    reply->header.header.type = 
+      htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+    reply->sequence_number = htonl (socket->write_sequence_number);
+    reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
+    queue_message (socket, 
+                   &reply->header, 
+                   &set_state_established, 
+                   NULL);      
+    return GNUNET_OK;
   case STATE_ESTABLISHED:
   case STATE_RECEIVE_CLOSE_WAIT:
     // call statistics (# ACKs ignored++)
@@ -1359,9 +1471,13 @@
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
   struct GNUNET_STREAM_HelloAckMessage *reply;
 
+  GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
+                 ntohs (message->type));
   GNUNET_assert (socket->tunnel == tunnel);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received HELLO from %s\n", GNUNET_i2s(sender));
+              "%s: Received HELLO from %s\n", 
+              GNUNET_i2s (&socket->our_id),
+              GNUNET_i2s(sender));
 
   /* Catch possible protocol breaks */
   GNUNET_break_op (0 == memcmp (&socket->other_peer, 
@@ -1373,6 +1489,10 @@
       /* Get the random sequence number */
       socket->write_sequence_number = 
         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s: Generated write sequence number %u\n",
+                  GNUNET_i2s (&socket->our_id),
+                  (unsigned int) socket->write_sequence_number);
       reply = 
         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
       reply->header.header.size = 
@@ -1419,11 +1539,17 @@
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
 
+  GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
+                 ntohs (message->type));
+  GNUNET_assert (socket->tunnel == tunnel);
   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
-  GNUNET_assert (socket->tunnel == tunnel);
   if (STATE_HELLO_WAIT == socket->state)
     {
       socket->read_sequence_number = ntohl (ack_message->sequence_number);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s: Read sequence number %u\n",
+                  GNUNET_i2s (&socket->our_id),
+                  (unsigned int) socket->read_sequence_number);
       socket->receive_window_available = 
         ntohl (ack_message->receive_window_size);
       /* Attain ESTABLISHED state */
@@ -1644,20 +1770,76 @@
            const struct GNUNET_STREAM_AckMessage *ack,
            const struct GNUNET_ATS_Information*atsi)
 {
+  unsigned int packet;
+  int need_retransmission;
+
   switch (socket->state)
     {
     case (STATE_ESTABLISHED):
       if (NULL == socket->write_handle)
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "Received DATA ACK when write_handle is NULL\n");
+                      "Received DATA_ACK when write_handle is NULL\n");
           return GNUNET_OK;
         }
-
+      
+      if (!((socket->write_sequence_number 
+             - htonl (ack->base_sequence_number)) < 64))
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "%s: Received DATA_ACK with unexpected base sequence",
+                      "number\n",
+                      GNUNET_i2s (&socket->our_id));
+          return GNUNET_OK;
+        }
+      /* FIXME: include the case when write_handle is cancelled - ignore the 
+         acks */
+      
+      /* Cancel the retransmission task */
+      if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+        {
+          GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+          socket->retransmission_timeout_task_id = 
+            GNUNET_SCHEDULER_NO_TASK;
+        }
+         
       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
       socket->receive_window_available = 
         ntohl (ack->receive_window_remaining);
-      write_data (socket);
+
+      /* Check if we have received all acknowledgements */
+      need_retransmission = GNUNET_NO;
+      for (packet=0; packet < 64; packet++)
+        {
+          if (NULL == socket->write_handle->messages[packet]) break;
+          if (GNUNET_YES != ackbitmap_is_bit_set 
+              (&socket->write_handle->ack_bitmap,packet))
+            {
+              need_retransmission = GNUNET_YES;
+              break;
+            }
+        }
+      if (GNUNET_YES == need_retransmission)
+        {
+          write_data (socket);
+        }
+      else      /* We have to call the write continuation callback now */
+        {
+
+          /* Free the packets */
+          for (packet=0; packet < 64; packet++)
+            {
+              GNUNET_free_non_null (socket->write_handle->messages[packet]);
+            }
+          if (NULL != socket->write_handle->write_cont)
+            socket->write_handle->write_cont
+              (socket->write_handle->write_cont_cls,
+               socket->status,
+               socket->write_handle->size);
+          /* We are done with the write handle - Freeing it */
+          GNUNET_free (socket->write_handle);
+          socket->write_handle = NULL;
+        }
       break;
     default:
       break;
@@ -1806,7 +1988,9 @@
     }
   
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Target peer %s connected\n", GNUNET_i2s (peer));
+              "%s: Target peer %s connected\n", 
+              GNUNET_i2s (&socket->our_id),
+              GNUNET_i2s (peer));
   
   /* Set state to INIT */
   socket->state = STATE_INIT;
@@ -1987,7 +2171,7 @@
                                       10,  /* QUEUE size as parameter? */
                                       socket, /* cls */
                                       NULL, /* No inbound tunnel handler */
-                                      &tunnel_cleaner,
+                                      &tunnel_cleaner, /* FIXME: not required? 
*/
                                       client_message_handlers,
                                       &app_port); /* We don't get inbound 
tunnels */
   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
@@ -2164,6 +2348,7 @@
   uint32_t payload_size;
   struct GNUNET_STREAM_DataMessage *data_msg;
   const void *sweep;
+  struct GNUNET_TIME_Relative ack_deadline;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s\n", __func__);
@@ -2188,7 +2373,13 @@
     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
+  io_handle->write_cont = write_cont;
+  io_handle->write_cont_cls = write_cont_cls;
+  io_handle->size = size;
   sweep = data;
+  /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
+     determined from RTT */
+  ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
   /* Divide the given buffer into packets for sending */
   for (packet=0; packet < num_needed_packets; packet++)
     {
@@ -2213,9 +2404,8 @@
 
       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
          determined from RTT */
-      io_handle->messages[packet]->ack_deadline = 
-        GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
-                                   (GNUNET_TIME_UNIT_SECONDS, 5));
+      io_handle->messages[packet]->ack_deadline =
+        GNUNET_TIME_relative_hton (ack_deadline);
       data_msg = io_handle->messages[packet];
       /* Copy data from given buffer to the packet */
       memcpy (&data_msg[1],
@@ -2227,10 +2417,10 @@
   socket->write_handle = io_handle;
   write_data (socket);
 
-  return io_handle;
-
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s() END\n", __func__);
+
+  return io_handle;
 }
 
 

Modified: gnunet/src/stream/test_stream_local.c
===================================================================
--- gnunet/src/stream/test_stream_local.c       2012-03-08 00:14:55 UTC (rev 
20361)
+++ gnunet/src/stream/test_stream_local.c       2012-03-08 07:45:52 UTC (rev 
20362)
@@ -380,7 +380,7 @@
       return;
     }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer group is now read \n");
+              "Peer group is now ready\n");
   
   GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
   




reply via email to

[Prev in Thread] Current Thread [Next in Thread]