gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r19729 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r19729 - gnunet/src/stream
Date: Wed, 8 Feb 2012 11:51:55 +0100

Author: harsha
Date: 2012-02-08 11:51:55 +0100 (Wed, 08 Feb 2012)
New Revision: 19729

Modified:
   gnunet/src/stream/stream_api.c
   gnunet/src/stream/stream_protocol.h
Log:
-added socket write handle

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-02-08 07:08:28 UTC (rev 19728)
+++ gnunet/src/stream/stream_api.c      2012-02-08 10:51:55 UTC (rev 19729)
@@ -25,9 +25,13 @@
  */
 #include "platform.h"
 #include "gnunet_common.h"
+#include "gnunet_crypto_lib.h"
 #include "gnunet_stream_lib.h"
 #include "stream_protocol.h"
 
+
+#define MAX_PACKET_SIZE 64000
+
 /**
  * states in the Protocol
  */
@@ -190,6 +194,26 @@
    * The number of previous timeouts
    */
   unsigned int retries;
+
+  /**
+   * The write IO_handle associated with this socket
+   */
+  struct GNUNET_STREAM_IOHandle *write_handle;
+
+  /**
+   * The read IO_handle associated with this socket
+   */
+  struct GNUNET_STREAM_IOHandle *read_handle;
+
+  /**
+   * Write sequence number. Start at random upon reaching ESTABLISHED state
+   */
+  uint32_t write_sequence_number;
+
+  /**
+   * Read sequence number. This number's value is determined during handshake
+   */
+  uint32_t read_sequence_number;
 };
 
 
@@ -218,7 +242,24 @@
    * The call back closure
    */
   void *listen_cb_cls;
+};
 
+
+/**
+ * The IO Handle
+ */
+struct GNUNET_STREAM_IOHandle
+{
+  /**
+   * The packet_buffers associated with this Handle
+   */
+  struct GNUNET_STREAM_DataMessage *messages[64];
+
+  /**
+   * The bitmap of this IOHandle; Corresponding bit for a message is set when
+   * it has been acknowledged by the receiver
+   */
+  GNUNET_STREAM_AckBitmap bitmap;
 };
 
 
@@ -339,9 +380,40 @@
 }
 
 
+/**
+ * Function to modify a bit in GNUNET_STREAM_AckBitmap
+ *
+ * @param bitmap the bitmap to modify
+ * @param bit the bit number to modify
+ * @param GNUNET_YES to on, GNUNET_NO to off
+ */
+static void
+AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
+                                    uint8_t bit, 
+                                    uint8_t value)
+{
+  uint64_t val;
 
+  val = value;
+  *bitmap = *bitmap | (val << bit);
+}
 
+
 /**
+ * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
+ *
+ * @param bit the bit number to check
+ * @return GNUNET_YES if the bit is set; GNUNET_NO if not
+ */
+static uint8_t
+AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
+                      uint8_t bit)
+{
+  return (*bitmap & (0x0000000000000001 << bit)) >> bit;
+}
+
+
+/**
  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
  *
  * @param cls the socket (set from GNUNET_MESH_connect)
@@ -391,6 +463,7 @@
 set_state_established (void *cls,
                        struct GNUNET_STREAM_Socket *socket)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
   socket->state = STATE_ESTABLISHED;
 }
 
@@ -406,6 +479,7 @@
                       struct GNUNET_STREAM_Socket *socket)
 {
   GNUNET_assert (STATE_INIT == socket->state);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
   socket->state = STATE_HELLO_WAIT;
 }
 
@@ -431,19 +505,26 @@
                          const struct GNUNET_ATS_Information*atsi)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
-  struct GNUNET_STREAM_MessageHeader *reply;
+  const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
+  struct GNUNET_STREAM_HelloAckMessage *reply;
 
+  ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
   GNUNET_assert (socket->tunnel == tunnel);
   if (STATE_HELLO_WAIT == socket->state)
     {
+      socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+      /* Get the random sequence number */
+      socket->write_sequence_number = 
+        GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 0xffffffff);
       reply = 
-        GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
-      reply->header.size = 
+        GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+      reply->header.header.size = 
         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
-      reply->header.type = 
+      reply->header.header.type = 
         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+      reply->sequence_number = htonl (socket->write_sequence_number);
       queue_message (socket, 
-                     reply, 
+                     (struct GNUNET_STREAM_MessageHeader *) reply, 
                      &set_state_established, 
                      NULL);
     }
@@ -690,19 +771,23 @@
                      const struct GNUNET_ATS_Information*atsi)
 {
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
-  struct GNUNET_STREAM_MessageHeader *reply;
+  struct GNUNET_STREAM_HelloAckMessage *reply;
 
   GNUNET_assert (socket->tunnel == tunnel);
   if (STATE_INIT == socket->state)
     {
+      /* Get the random sequence number */
+      socket->write_sequence_number = 
+        GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 0xffffffff);
       reply = 
-        GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
-      reply->header.size = 
+        GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+      reply->header.header.size = 
         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
-      reply->header.type = 
+      reply->header.header.type = 
         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+      reply->sequence_number = htonl (socket->write_sequence_number);
       queue_message (socket, 
-                     reply, 
+                     (struct GNUNET_STREAM_MessageHeader *)reply,
                      &set_state_hello_wait, 
                      NULL);
     }
@@ -738,11 +823,13 @@
                          const struct GNUNET_ATS_Information*atsi)
 {
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
-  struct GNUNET_STREAM_MessageHeader *reply;
+  const struct GNUNET_STREAM_HelloAckMessage *ack_message;
 
+  ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
   GNUNET_assert (socket->tunnel == tunnel);
   if (STATE_HELLO_WAIT == socket->state)
     {
+      socket->read_sequence_number = ntohs (ack_message->sequence_number);
       socket->state = STATE_ESTABLISHED;
     }
   else
@@ -1024,7 +1111,7 @@
   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
    sizeof (struct GNUNET_STREAM_AckMessage) },
   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
-   sizeof (struct GNUNET_STREAM_MessageHeader)},
+   sizeof (struct GNUNET_STREAM_HelloAckMessage)},
   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
    sizeof (struct GNUNET_STREAM_MessageHeader)},
   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
@@ -1054,7 +1141,7 @@
   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
    sizeof (struct GNUNET_STREAM_MessageHeader)},
   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
-   sizeof (struct GNUNET_STREAM_MessageHeader)},
+   sizeof (struct GNUNET_STREAM_HelloAckMessage)},
   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
    sizeof (struct GNUNET_STREAM_MessageHeader)},
   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
@@ -1228,6 +1315,7 @@
   if (NULL != socket->transmit_handle)
     {
       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+      socket->transmit_handle = NULL;
     }
 
   /* Clear existing message queue */
@@ -1242,12 +1330,14 @@
   if (NULL != socket->tunnel)
     {
       GNUNET_MESH_tunnel_destroy (socket->tunnel);
+      socket->tunnel = NULL;
     }
 
   /* Close mesh connection */
   if (NULL != socket->mesh)
     {
       GNUNET_MESH_disconnect (socket->mesh);
+      socket->mesh = NULL;
     }
   GNUNET_free (socket);
 }
@@ -1309,8 +1399,7 @@
                 void *tunnel_ctx)
 {
   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
-  struct MessageQueue *head;
-
+  
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Peer %s has terminated connection abruptly\n",
               GNUNET_i2s (&socket->other_peer));
@@ -1376,3 +1465,80 @@
   
   GNUNET_free (lsocket);
 }
+
+
+/**
+ * Tries to write the given data to the stream
+ *
+ * @param socket the socket representing a stream
+ * @param data the data buffer from where the data is written into the stream
+ * @param size the number of bytes to be written from the data buffer
+ * @param timeout the timeout period
+ * @param write_cont the function to call upon writing some bytes into the 
stream
+ * @param write_cont_cls the closure
+ * @return handle to cancel the operation
+ */
+struct GNUNET_STREAM_IOHandle *
+GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
+                     const void *data,
+                     size_t size,
+                     struct GNUNET_TIME_Relative timeout,
+                     GNUNET_STREAM_CompletionContinuation write_cont,
+                     void *write_cont_cls)
+{
+  unsigned int num_needed_packets;
+  unsigned int packet;
+  struct GNUNET_STREAM_IOHandle *io_handle;
+  struct GNUNET_STREAM_DataMessage *data_msg;
+  size_t max_payload_size;
+  size_t packet_size;
+  const void *sweep;
+
+  /* There is already a write request pending */
+  if (NULL != socket->write_handle) return NULL;
+  if (!(STATE_ESTABLISHED == socket->state 
+        || STATE_RECEIVE_CLOSE_WAIT == socket->state
+        || STATE_RECEIVE_CLOSED == socket->state))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "Attempting to write on a closed/not-yet-established 
stream\n");
+      return NULL;
+    }
+      
+  max_payload_size = 
+    MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+  num_needed_packets = ceil (size / max_payload_size);
+  if (64 < num_needed_packets) 
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Given buffer cannot be accommodated in 64 packets\n");
+      num_needed_packets = 64;
+    }
+
+  io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
+  sweep = data;
+  /* Divide the given area into packets for sending */
+  for (packet=0; packet < num_needed_packets; packet++)
+    {
+      if ((packet + 1) * max_payload_size < size) 
+        {
+          packet_size = MAX_PACKET_SIZE;
+        }
+      else 
+        {
+          packet_size = size - packet * max_payload_size
+            + sizeof (struct GNUNET_STREAM_DataMessage);
+        }
+      io_handle->messages[packet] = GNUNET_malloc (packet_size);
+      io_handle->messages[packet]->header.header.size = htons (packet_size);
+      io_handle->messages[packet]->header.header.type =
+        htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
+      data_msg = io_handle->messages[packet];
+      memcpy (&data_msg[1],
+              sweep,
+              packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
+      sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
+    }
+
+  return io_handle;
+}

Modified: gnunet/src/stream/stream_protocol.h
===================================================================
--- gnunet/src/stream/stream_protocol.h 2012-02-08 07:08:28 UTC (rev 19728)
+++ gnunet/src/stream/stream_protocol.h 2012-02-08 10:51:55 UTC (rev 19729)
@@ -42,8 +42,7 @@
 
 /**
  * The stream message header
- *
- * The message can be of Data, Acknowledgement or both
+ * All messages of STREAM should commonly have this as header
  */
 struct GNUNET_STREAM_MessageHeader
 {
@@ -134,6 +133,22 @@
   uint32_t receive_window_remaining GNUNET_PACKED;
 };
 
+
+struct GNUNET_STREAM_HelloAckMessage
+{
+  /**
+   * The stream message header
+   */
+  struct GNUNET_STREAM_MessageHeader header;
+
+  /**
+   * The selected sequence number. Following data tranmissions from the sender
+   * start with this sequence
+   */
+  uint32_t sequence_number;
+
+};
+
 GNUNET_NETWORK_STRUCT_END
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]