gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r23748 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r23748 - gnunet/src/stream
Date: Tue, 11 Sep 2012 14:55:53 +0200

Author: harsha
Date: 2012-09-11 14:55:53 +0200 (Tue, 11 Sep 2012)
New Revision: 23748

Modified:
   gnunet/src/stream/stream_api.c
Log:
stream speedup fixes

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-09-11 10:36:25 UTC (rev 23747)
+++ gnunet/src/stream/stream_api.c      2012-09-11 12:55:53 UTC (rev 23748)
@@ -1088,6 +1088,7 @@
              const struct GNUNET_ATS_Information*atsi)
 {
   const void *payload;
+  struct GNUNET_TIME_Relative ack_deadline_rel;
   uint32_t bytes_needed;
   uint32_t relative_offset;
   uint32_t relative_sequence_number;
@@ -1099,23 +1100,19 @@
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-
-  if (0 != memcmp (sender,
-                   &socket->other_peer,
-                   sizeof (struct GNUNET_PeerIdentity)))
+  if (0 != memcmp (sender, &socket->other_peer,
+                  sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Received DATA from non-confirming peer\n",
-         GNUNET_i2s (&socket->other_peer));
+        "%s: Received DATA from non-confirming peer\n",
+        GNUNET_i2s (&socket->other_peer));
     return GNUNET_YES;
   }
-
   switch (socket->state)
   {
   case STATE_ESTABLISHED:
   case STATE_TRANSMIT_CLOSED:
-  case STATE_TRANSMIT_CLOSE_WAIT:
-      
+  case STATE_TRANSMIT_CLOSE_WAIT:      
     /* check if the message's sequence number is in the range we are
        expecting */
     relative_sequence_number = 
@@ -1136,8 +1133,7 @@
                                         socket);
       }
       return GNUNET_YES;
-    }
-      
+    }      
     /* Check if we have already seen this message */
     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
                                             relative_sequence_number))
@@ -1151,20 +1147,14 @@
       {
         socket->ack_task_id = 
           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
-                                        (msg->ack_deadline),
-                                        &ack_task,
-                                        socket);
+                                        (msg->ack_deadline), &ack_task, 
socket);
       }
       return GNUNET_YES;
     }
-
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
-         GNUNET_i2s (&socket->other_peer),
-         ntohl (msg->sequence_number),
-         ntohs (msg->header.header.size),
-         GNUNET_i2s (&socket->other_peer));
-      
+         GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
+         ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
     /* Check if we have to allocate the buffer */
     size -= sizeof (struct GNUNET_STREAM_DataMessage);
     relative_offset = ntohl (msg->offset) - socket->read_offset;
@@ -1181,54 +1171,67 @@
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "%s: Cannot accommodate packet %d as buffer is full\n",
-             GNUNET_i2s (&socket->other_peer),
-             ntohl (msg->sequence_number));
+             GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
         return GNUNET_YES;
       }
     }
-      
     /* Copy Data to buffer */
     payload = &msg[1];
     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
-    memcpy (socket->receive_buffer + relative_offset,
-            payload,
-            size);
+    memcpy (socket->receive_buffer + relative_offset, payload, size);
     socket->receive_buffer_boundaries[relative_sequence_number] = 
-      relative_offset + size;
-      
+       relative_offset + size;
     /* Modify the ACK bitmap */
-    ackbitmap_modify_bit (&socket->ack_bitmap,
-                          relative_sequence_number,
-                          GNUNET_YES);
-
+    ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
+                         GNUNET_YES);
     /* Start ACK sending task if one is not already present */
+    ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
     {
+      ack_deadline_rel = 
+         GNUNET_TIME_relative_min (ack_deadline_rel,
+                                   GNUNET_TIME_relative_multiply
+                                   (GNUNET_TIME_UNIT_SECONDS, 300));
       socket->ack_task_id = 
-        GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
-                                      (msg->ack_deadline),
-                                      &ack_task,
-                                      socket);
+         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
+                                       (msg->ack_deadline), &ack_task, socket);
+      socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+      socket->ack_time_deadline = ack_deadline_rel;
     }
-
+    else
+    {
+      struct GNUNET_TIME_Relative ack_time_past;
+      struct GNUNET_TIME_Relative ack_time_remaining;
+      struct GNUNET_TIME_Relative ack_time_min;
+      ack_time_past = 
+         GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
+      ack_time_remaining = GNUNET_TIME_relative_subtract
+         (socket->ack_time_deadline, ack_time_past);
+      ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
+                                              ack_deadline_rel);
+      if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
+                     sizeof (struct GNUNET_TIME_Relative)))
+      {
+       ack_deadline_rel = ack_time_min;
+       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+       socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
+                                                           &ack_task, socket);
+       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+       socket->ack_time_deadline = ack_deadline_rel;
+      }
+    }
     if ((NULL != socket->read_handle) /* A read handle is waiting */
         /* There is no current read task */
         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
         /* We have the first packet */
-        && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
-                                               0)))
+        && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
     {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Scheduling read processor\n",
-           GNUNET_i2s (&socket->other_peer));
-          
-      socket->read_task_id = 
-        GNUNET_SCHEDULER_add_now (&call_read_processor,
-                                  socket);
-    }
-      
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n", 
+          GNUNET_i2s (&socket->other_peer));          
+      socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+                                                      socket);
+    }      
     break;
-
   default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Received data message when it cannot be handled\n",
@@ -1261,11 +1264,8 @@
 {
   struct GNUNET_STREAM_Socket *socket = cls;
 
-  return handle_data (socket, 
-                      tunnel, 
-                      sender, 
-                      (const struct GNUNET_STREAM_DataMessage *) message, 
-                      atsi);
+  return handle_data (socket, tunnel, sender, 
+                      (const struct GNUNET_STREAM_DataMessage *) message, 
atsi);
 }
 
 
@@ -1529,9 +1529,8 @@
   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
   struct GNUNET_STREAM_HelloAckMessage *reply;
 
-  if (0 != memcmp (sender,
-                   &socket->other_peer,
-                   sizeof (struct GNUNET_PeerIdentity)))
+  if (0 != memcmp (sender, &socket->other_peer,
+                  sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Received HELLO_ACK from non-confirming peer\n",
@@ -1539,11 +1538,8 @@
     return GNUNET_YES;
   }
   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Received HELLO_ACK from %s\n",
-       GNUNET_i2s (&socket->other_peer),
-       GNUNET_i2s (&socket->other_peer));
-
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
+       GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
   GNUNET_assert (socket->tunnel == tunnel);
   switch (socket->state)
   {
@@ -1555,7 +1551,7 @@
          (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
     reply = generate_hello_ack (socket, GNUNET_YES);
-    queue_message (socket, &reply->header, &set_state_established, 
+    queue_message (socket, &reply->header, &set_state_established,
                    NULL, GNUNET_NO);    
     return GNUNET_OK;
   case STATE_ESTABLISHED:
@@ -1568,8 +1564,7 @@
   default:
     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
                GNUNET_i2s (&socket->other_peer),
-               GNUNET_i2s (&socket->other_peer),
-               socket->state);
+              GNUNET_i2s (&socket->other_peer), socket->state);
     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
     return GNUNET_SYSERR;
   }
@@ -1626,7 +1621,6 @@
   {
   case STATE_ESTABLISHED:
     socket->state = STATE_RECEIVE_CLOSED;
-
     /* Send TRANSMIT_CLOSE_ACK */
     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
     reply->header.type = 
@@ -1634,7 +1628,6 @@
     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
     break;
-
   default:
     /* FIXME: Call statistics? */
     break;
@@ -1703,7 +1696,6 @@
          GNUNET_i2s (&socket->other_peer));
     return GNUNET_OK;
   }
-
   switch (operation)
   {
   case SHUT_RDWR:
@@ -1714,15 +1706,11 @@
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "%s: Received CLOSE_ACK when shutdown handle is not for "
-             "SHUT_RDWR\n",
-             GNUNET_i2s (&socket->other_peer));
+             "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Received CLOSE_ACK from %s\n",
-           GNUNET_i2s (&socket->other_peer),
-           GNUNET_i2s (&socket->other_peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_CLOSED;
       break;
     default:
@@ -1732,7 +1720,6 @@
       return GNUNET_OK;
     }
     break;
-
   case SHUT_RD:
     switch (socket->state)
     {
@@ -1741,15 +1728,11 @@
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
-             "is not for SHUT_RD\n",
-             GNUNET_i2s (&socket->other_peer));
+             "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Received RECEIVE_CLOSE_ACK from %s\n",
-           GNUNET_i2s (&socket->other_peer),
-           GNUNET_i2s (&socket->other_peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_RECEIVE_CLOSED;
       break;
     default:
@@ -1758,7 +1741,6 @@
            GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
-
     break;
   case SHUT_WR:
     switch (socket->state)
@@ -1772,25 +1754,20 @@
              GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
-           GNUNET_i2s (&socket->other_peer),
-           GNUNET_i2s (&socket->other_peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from 
%s\n",
+           GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_TRANSMIT_CLOSED;
       break;
     default:
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
-           GNUNET_i2s (&socket->other_peer));
-          
+           GNUNET_i2s (&socket->other_peer));          
       return GNUNET_OK;
     }
     break;
   default:
     GNUNET_assert (0);
   }
-
   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
                                    operation);
@@ -1800,7 +1777,7 @@
     GNUNET_SCHEDULER_cancel
       (shutdown_handle->close_msg_retransmission_task_id);
     shutdown_handle->close_msg_retransmission_task_id =
-      GNUNET_SCHEDULER_NO_TASK;
+       GNUNET_SCHEDULER_NO_TASK;
   }
   GNUNET_free (shutdown_handle); /* Free shutdown handle */
   socket->shutdown_handle = NULL;
@@ -3338,14 +3315,11 @@
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s\n", __func__);
-
-  /* Return NULL if there is already a write request pending */
   if (NULL != socket->write_handle)
   {
     GNUNET_break (0);
     return NULL;
   }
-
   switch (socket->state)
   {
   case STATE_TRANSMIT_CLOSED:
@@ -3371,7 +3345,6 @@
   case STATE_RECEIVE_CLOSE_WAIT:
     break;
   }
-
   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
   num_needed_packets =
@@ -3408,25 +3381,24 @@
     io_handle->messages[packet]->sequence_number =
       htonl (socket->write_sequence_number++);
     io_handle->messages[packet]->offset = htonl (socket->write_offset);
-
     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
        determined from RTT */
     io_handle->messages[packet]->ack_deadline =
       GNUNET_TIME_relative_hton (ack_deadline);
     data_msg = io_handle->messages[packet];
     /* Copy data from given buffer to the packet */
-    memcpy (&data_msg[1],
-            sweep,
-            payload_size);
+    memcpy (&data_msg[1], sweep, payload_size);
     sweep += payload_size;
     socket->write_offset += payload_size;
   }
+  /* ack the last data message. FIXME: remove when we figure out how to do this
+     using RTT */
+  io_handle->messages[num_needed_packets - 1]->ack_deadline = 
+      GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
   socket->write_handle = io_handle;
   write_data (socket);
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s() END\n", __func__);
-
   return io_handle;
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]