[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r19729 - gnunet/src/stream
From: |
gnunet |
Subject: |
[GNUnet-SVN] r19729 - gnunet/src/stream |
Date: |
Wed, 8 Feb 2012 11:51:55 +0100 |
Author: harsha
Date: 2012-02-08 11:51:55 +0100 (Wed, 08 Feb 2012)
New Revision: 19729
Modified:
gnunet/src/stream/stream_api.c
gnunet/src/stream/stream_protocol.h
Log:
-added socket write handle
Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c 2012-02-08 07:08:28 UTC (rev 19728)
+++ gnunet/src/stream/stream_api.c 2012-02-08 10:51:55 UTC (rev 19729)
@@ -25,9 +25,13 @@
*/
#include "platform.h"
#include "gnunet_common.h"
+#include "gnunet_crypto_lib.h"
#include "gnunet_stream_lib.h"
#include "stream_protocol.h"
+
+#define MAX_PACKET_SIZE 64000
+
/**
* states in the Protocol
*/
@@ -190,6 +194,26 @@
* The number of previous timeouts
*/
unsigned int retries;
+
+ /**
+ * The write IO_handle associated with this socket
+ */
+ struct GNUNET_STREAM_IOHandle *write_handle;
+
+ /**
+ * The read IO_handle associated with this socket
+ */
+ struct GNUNET_STREAM_IOHandle *read_handle;
+
+ /**
+ * Write sequence number. Start at random upon reaching ESTABLISHED state
+ */
+ uint32_t write_sequence_number;
+
+ /**
+ * Read sequence number. This number's value is determined during handshake
+ */
+ uint32_t read_sequence_number;
};
@@ -218,7 +242,24 @@
* The call back closure
*/
void *listen_cb_cls;
+};
+
+/**
+ * The IO Handle
+ */
+struct GNUNET_STREAM_IOHandle
+{
+ /**
+ * The packet_buffers associated with this Handle
+ */
+ struct GNUNET_STREAM_DataMessage *messages[64];
+
+ /**
+ * The bitmap of this IOHandle; Corresponding bit for a message is set when
+ * it has been acknowledged by the receiver
+ */
+ GNUNET_STREAM_AckBitmap bitmap;
};
@@ -339,9 +380,40 @@
}
+/**
+ * Function to modify a bit in GNUNET_STREAM_AckBitmap
+ *
+ * @param bitmap the bitmap to modify
+ * @param bit the bit number to modify
+ * @param GNUNET_YES to on, GNUNET_NO to off
+ */
+static void
+AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
+ uint8_t bit,
+ uint8_t value)
+{
+ uint64_t val;
+ val = value;
+ *bitmap = *bitmap | (val << bit);
+}
+
/**
+ * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
+ *
+ * @param bit the bit number to check
+ * @return GNUNET_YES if the bit is set; GNUNET_NO if not
+ */
+static uint8_t
+AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
+ uint8_t bit)
+{
+ return (*bitmap & (0x0000000000000001 << bit)) >> bit;
+}
+
+
+/**
* Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
*
* @param cls the socket (set from GNUNET_MESH_connect)
@@ -391,6 +463,7 @@
set_state_established (void *cls,
struct GNUNET_STREAM_Socket *socket)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
socket->state = STATE_ESTABLISHED;
}
@@ -406,6 +479,7 @@
struct GNUNET_STREAM_Socket *socket)
{
GNUNET_assert (STATE_INIT == socket->state);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
socket->state = STATE_HELLO_WAIT;
}
@@ -431,19 +505,26 @@
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
- struct GNUNET_STREAM_MessageHeader *reply;
+ const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
+ struct GNUNET_STREAM_HelloAckMessage *reply;
+ ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
GNUNET_assert (socket->tunnel == tunnel);
if (STATE_HELLO_WAIT == socket->state)
{
+ socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+ /* Get the random sequence number */
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 0xffffffff);
reply =
- GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.size =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ reply->header.header.size =
htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.type =
+ reply->header.header.type =
htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ reply->sequence_number = htonl (socket->write_sequence_number);
queue_message (socket,
- reply,
+ (struct GNUNET_STREAM_MessageHeader *) reply,
&set_state_established,
NULL);
}
@@ -690,19 +771,23 @@
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- struct GNUNET_STREAM_MessageHeader *reply;
+ struct GNUNET_STREAM_HelloAckMessage *reply;
GNUNET_assert (socket->tunnel == tunnel);
if (STATE_INIT == socket->state)
{
+ /* Get the random sequence number */
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 0xffffffff);
reply =
- GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.size =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ reply->header.header.size =
htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.type =
+ reply->header.header.type =
htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ reply->sequence_number = htonl (socket->write_sequence_number);
queue_message (socket,
- reply,
+ (struct GNUNET_STREAM_MessageHeader *)reply,
&set_state_hello_wait,
NULL);
}
@@ -738,11 +823,13 @@
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- struct GNUNET_STREAM_MessageHeader *reply;
+ const struct GNUNET_STREAM_HelloAckMessage *ack_message;
+ ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
GNUNET_assert (socket->tunnel == tunnel);
if (STATE_HELLO_WAIT == socket->state)
{
+ socket->read_sequence_number = ntohs (ack_message->sequence_number);
socket->state = STATE_ESTABLISHED;
}
else
@@ -1024,7 +1111,7 @@
{&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
sizeof (struct GNUNET_STREAM_AckMessage) },
{&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
- sizeof (struct GNUNET_STREAM_MessageHeader)},
+ sizeof (struct GNUNET_STREAM_HelloAckMessage)},
{&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
sizeof (struct GNUNET_STREAM_MessageHeader)},
{&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
@@ -1054,7 +1141,7 @@
{&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
sizeof (struct GNUNET_STREAM_MessageHeader)},
{&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
- sizeof (struct GNUNET_STREAM_MessageHeader)},
+ sizeof (struct GNUNET_STREAM_HelloAckMessage)},
{&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
sizeof (struct GNUNET_STREAM_MessageHeader)},
{&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
@@ -1228,6 +1315,7 @@
if (NULL != socket->transmit_handle)
{
GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
}
/* Clear existing message queue */
@@ -1242,12 +1330,14 @@
if (NULL != socket->tunnel)
{
GNUNET_MESH_tunnel_destroy (socket->tunnel);
+ socket->tunnel = NULL;
}
/* Close mesh connection */
if (NULL != socket->mesh)
{
GNUNET_MESH_disconnect (socket->mesh);
+ socket->mesh = NULL;
}
GNUNET_free (socket);
}
@@ -1309,8 +1399,7 @@
void *tunnel_ctx)
{
struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
- struct MessageQueue *head;
-
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Peer %s has terminated connection abruptly\n",
GNUNET_i2s (&socket->other_peer));
@@ -1376,3 +1465,80 @@
GNUNET_free (lsocket);
}
+
+
+/**
+ * Tries to write the given data to the stream
+ *
+ * @param socket the socket representing a stream
+ * @param data the data buffer from where the data is written into the stream
+ * @param size the number of bytes to be written from the data buffer
+ * @param timeout the timeout period
+ * @param write_cont the function to call upon writing some bytes into the
stream
+ * @param write_cont_cls the closure
+ * @return handle to cancel the operation
+ */
+struct GNUNET_STREAM_IOHandle *
+GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
+ const void *data,
+ size_t size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_STREAM_CompletionContinuation write_cont,
+ void *write_cont_cls)
+{
+ unsigned int num_needed_packets;
+ unsigned int packet;
+ struct GNUNET_STREAM_IOHandle *io_handle;
+ struct GNUNET_STREAM_DataMessage *data_msg;
+ size_t max_payload_size;
+ size_t packet_size;
+ const void *sweep;
+
+ /* There is already a write request pending */
+ if (NULL != socket->write_handle) return NULL;
+ if (!(STATE_ESTABLISHED == socket->state
+ || STATE_RECEIVE_CLOSE_WAIT == socket->state
+ || STATE_RECEIVE_CLOSED == socket->state))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Attempting to write on a closed/not-yet-established
stream\n");
+ return NULL;
+ }
+
+ max_payload_size =
+ MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+ num_needed_packets = ceil (size / max_payload_size);
+ if (64 < num_needed_packets)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Given buffer cannot be accommodated in 64 packets\n");
+ num_needed_packets = 64;
+ }
+
+ io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
+ sweep = data;
+ /* Divide the given area into packets for sending */
+ for (packet=0; packet < num_needed_packets; packet++)
+ {
+ if ((packet + 1) * max_payload_size < size)
+ {
+ packet_size = MAX_PACKET_SIZE;
+ }
+ else
+ {
+ packet_size = size - packet * max_payload_size
+ + sizeof (struct GNUNET_STREAM_DataMessage);
+ }
+ io_handle->messages[packet] = GNUNET_malloc (packet_size);
+ io_handle->messages[packet]->header.header.size = htons (packet_size);
+ io_handle->messages[packet]->header.header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
+ data_msg = io_handle->messages[packet];
+ memcpy (&data_msg[1],
+ sweep,
+ packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
+ sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
+ }
+
+ return io_handle;
+}
Modified: gnunet/src/stream/stream_protocol.h
===================================================================
--- gnunet/src/stream/stream_protocol.h 2012-02-08 07:08:28 UTC (rev 19728)
+++ gnunet/src/stream/stream_protocol.h 2012-02-08 10:51:55 UTC (rev 19729)
@@ -42,8 +42,7 @@
/**
* The stream message header
- *
- * The message can be of Data, Acknowledgement or both
+ * All messages of STREAM should commonly have this as header
*/
struct GNUNET_STREAM_MessageHeader
{
@@ -134,6 +133,22 @@
uint32_t receive_window_remaining GNUNET_PACKED;
};
+
+struct GNUNET_STREAM_HelloAckMessage
+{
+ /**
+ * The stream message header
+ */
+ struct GNUNET_STREAM_MessageHeader header;
+
+ /**
+ * The selected sequence number. Following data tranmissions from the sender
+ * start with this sequence
+ */
+ uint32_t sequence_number;
+
+};
+
GNUNET_NETWORK_STRUCT_END
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r19729 - gnunet/src/stream,
gnunet <=