[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r20362 - gnunet/src/stream
From: |
gnunet |
Subject: |
[GNUnet-SVN] r20362 - gnunet/src/stream |
Date: |
Thu, 8 Mar 2012 08:45:52 +0100 |
Author: harsha
Date: 2012-03-08 08:45:52 +0100 (Thu, 08 Mar 2012)
New Revision: 20362
Modified:
gnunet/src/stream/stream_api.c
gnunet/src/stream/test_stream_local.c
Log:
Data message retransmissions
Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c 2012-03-08 00:14:55 UTC (rev 20361)
+++ gnunet/src/stream/stream_api.c 2012-03-08 07:45:52 UTC (rev 20362)
@@ -157,7 +157,6 @@
*/
struct GNUNET_STREAM_Socket
{
-
/**
* The peer identity of the peer at the other end of the stream
*/
@@ -254,6 +253,11 @@
GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
/**
+ * Task identifier for retransmission task after timeout
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+
+ /**
* The state of the protocol associated with this socket
*/
enum State state;
@@ -372,12 +376,27 @@
struct GNUNET_STREAM_DataMessage *messages[64];
/**
+ * The write continuation callback
+ */
+ GNUNET_STREAM_CompletionContinuation write_cont;
+
+ /**
+ * Write continuation closure
+ */
+ void *write_cont_cls;
+
+ /**
* The bitmap of this IOHandle; Corresponding bit for a message is set when
* it has been acknowledged by the receiver
*/
GNUNET_STREAM_AckBitmap ack_bitmap;
/**
+ * Number of bytes in this write handle
+ */
+ size_t size;
+
+ /**
* Number of packets sent before waiting for an ack
*
* FIXME: Do we need this?
@@ -406,7 +425,7 @@
/**
* Default value in seconds for various timeouts
*/
-static unsigned int default_timeout = 300;
+static unsigned int default_timeout = 10;
/**
@@ -494,10 +513,16 @@
{
struct MessageQueue *queue_entity;
+ GNUNET_assert
+ ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
+ && (ntohs (message->header.type) <=
GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing message of type %d and size %d\n",
+ "%s: Queueing message of type %d and size %d\n",
+ GNUNET_i2s (&socket->our_id),
ntohs (message->header.type),
ntohs (message->header.size));
+ GNUNET_assert (NULL != message);
queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
queue_entity->message = message;
queue_entity->finish_cb = finish_cb;
@@ -522,6 +547,31 @@
/**
+ * Copies a message and queues it for sending using the mesh connection of
+ * given socket
+ *
+ * @param socket the socket whose mesh connection is used
+ * @param message the message to be sent
+ * @param finish_cb the callback to be called when the message is sent
+ * @param finish_cb_cls the closure for the callback
+ */
+static void
+copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ SendFinishCallback finish_cb,
+ void *finish_cb_cls)
+{
+ struct GNUNET_STREAM_MessageHeader *msg_copy;
+ uint16_t size;
+
+ size = ntohs (message->header.size);
+ msg_copy = GNUNET_malloc (size);
+ memcpy (msg_copy, message, size);
+ queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
+}
+
+
+/**
* Callback function for sending ack message
*
* @param cls closure the ACK message created in ack_task
@@ -547,8 +597,39 @@
return size;
}
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receive_window_size
+ *
+ * @param socket the socket to use
+ */
+static void
+write_data (struct GNUNET_STREAM_Socket *socket);
/**
+ * Task for retransmitting data messages if they aren't ACK before their ack
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+retransmission_timeout_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ return;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->our_id));
+ socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ write_data (socket);
+}
+
+
+/**
* Task for sending ACK message
*
* @param cls the socket
@@ -674,10 +755,17 @@
if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
packet))
{
- queue_message (socket,
- &io_handle->messages[packet]->header,
- NULL,
- NULL);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Placing DATA message with sequence %u in send",
+ "queue\n",
+ GNUNET_i2s (&socket->our_id),
+ (unsigned int)
+ io_handle->messages[packet]->sequence_number);
+
+ copy_and_queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
}
}
packet = ack_packet + 1;
@@ -685,13 +773,25 @@
while ( (NULL != io_handle->messages[packet]) &&
(socket->receive_window_available >= ntohs
(io_handle->messages[packet]->header.header.size)) )
{
- socket->receive_window_available -= ntohs
(io_handle->messages[packet]->header.header.size);
- queue_message (socket,
- &io_handle->messages[packet]->header,
- &write_data_finish_cb,
- io_handle);
+ socket->receive_window_available -=
+ ntohs (io_handle->messages[packet]->header.header.size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Placing DATA message with sequence %u in send",
+ "queue\n",
+ GNUNET_i2s (&socket->our_id),
+ (unsigned int)
+ io_handle->messages[packet]->sequence_number);
+ copy_and_queue_message (socket,
+ &io_handle->messages[packet]->header,
+ &write_data_finish_cb,
+ io_handle);
packet++;
}
+
+ GNUNET_SCHEDULER_add_delayed
+ (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
+ &retransmission_timeout_task,
+ socket);
}
@@ -865,7 +965,8 @@
if ( relative_sequence_number > 64)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Ignoring received message with sequence number %d",
+ "%s: Ignoring received message with sequence number
%u\n",
+ GNUNET_i2s (&socket->our_id),
ntohl (msg->sequence_number));
return GNUNET_YES;
}
@@ -1032,33 +1133,44 @@
const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
struct GNUNET_STREAM_HelloAckMessage *reply;
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
+ ntohs (message->type));
+ ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received HELLO_ACK from %s\n",
GNUNET_i2s (&socket->our_id),
GNUNET_i2s (sender));
- ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
+
GNUNET_assert (socket->tunnel == tunnel);
switch (socket->state)
{
case STATE_HELLO_WAIT:
- socket->read_sequence_number = ntohl (ack_msg->sequence_number);
- socket->receive_window_available = ntohl (ack_msg->receive_window_size);
- /* Get the random sequence number */
- socket->write_sequence_number =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
- reply =
- GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
- reply->header.header.size =
- htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.header.type =
- htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
- reply->sequence_number = htonl (socket->write_sequence_number);
- reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
- queue_message (socket,
- &reply->header,
- &set_state_established,
- NULL);
- return GNUNET_OK;
+ socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read sequence number %u\n",
+ GNUNET_i2s (&socket->our_id),
+ (unsigned int) socket->read_sequence_number);
+ socket->receive_window_available = ntohl (ack_msg->receive_window_size);
+ /* Get the random sequence number */
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Generated write sequence number %u\n",
+ GNUNET_i2s (&socket->our_id),
+ (unsigned int) socket->write_sequence_number);
+ reply =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ reply->header.header.size =
+ htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ reply->sequence_number = htonl (socket->write_sequence_number);
+ reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
+ queue_message (socket,
+ &reply->header,
+ &set_state_established,
+ NULL);
+ return GNUNET_OK;
case STATE_ESTABLISHED:
case STATE_RECEIVE_CLOSE_WAIT:
// call statistics (# ACKs ignored++)
@@ -1359,9 +1471,13 @@
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
struct GNUNET_STREAM_HelloAckMessage *reply;
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
+ ntohs (message->type));
GNUNET_assert (socket->tunnel == tunnel);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received HELLO from %s\n", GNUNET_i2s(sender));
+ "%s: Received HELLO from %s\n",
+ GNUNET_i2s (&socket->our_id),
+ GNUNET_i2s(sender));
/* Catch possible protocol breaks */
GNUNET_break_op (0 == memcmp (&socket->other_peer,
@@ -1373,6 +1489,10 @@
/* Get the random sequence number */
socket->write_sequence_number =
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Generated write sequence number %u\n",
+ GNUNET_i2s (&socket->our_id),
+ (unsigned int) socket->write_sequence_number);
reply =
GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
reply->header.header.size =
@@ -1419,11 +1539,17 @@
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
const struct GNUNET_STREAM_HelloAckMessage *ack_message;
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
+ ntohs (message->type));
+ GNUNET_assert (socket->tunnel == tunnel);
ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
- GNUNET_assert (socket->tunnel == tunnel);
if (STATE_HELLO_WAIT == socket->state)
{
socket->read_sequence_number = ntohl (ack_message->sequence_number);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read sequence number %u\n",
+ GNUNET_i2s (&socket->our_id),
+ (unsigned int) socket->read_sequence_number);
socket->receive_window_available =
ntohl (ack_message->receive_window_size);
/* Attain ESTABLISHED state */
@@ -1644,20 +1770,76 @@
const struct GNUNET_STREAM_AckMessage *ack,
const struct GNUNET_ATS_Information*atsi)
{
+ unsigned int packet;
+ int need_retransmission;
+
switch (socket->state)
{
case (STATE_ESTABLISHED):
if (NULL == socket->write_handle)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received DATA ACK when write_handle is NULL\n");
+ "Received DATA_ACK when write_handle is NULL\n");
return GNUNET_OK;
}
-
+
+ if (!((socket->write_sequence_number
+ - htonl (ack->base_sequence_number)) < 64))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA_ACK with unexpected base sequence",
+ "number\n",
+ GNUNET_i2s (&socket->our_id));
+ return GNUNET_OK;
+ }
+ /* FIXME: include the case when write_handle is cancelled - ignore the
+ acks */
+
+ /* Cancel the retransmission task */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+ socket->retransmission_timeout_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ }
+
socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
socket->receive_window_available =
ntohl (ack->receive_window_remaining);
- write_data (socket);
+
+ /* Check if we have received all acknowledgements */
+ need_retransmission = GNUNET_NO;
+ for (packet=0; packet < 64; packet++)
+ {
+ if (NULL == socket->write_handle->messages[packet]) break;
+ if (GNUNET_YES != ackbitmap_is_bit_set
+ (&socket->write_handle->ack_bitmap,packet))
+ {
+ need_retransmission = GNUNET_YES;
+ break;
+ }
+ }
+ if (GNUNET_YES == need_retransmission)
+ {
+ write_data (socket);
+ }
+ else /* We have to call the write continuation callback now */
+ {
+
+ /* Free the packets */
+ for (packet=0; packet < 64; packet++)
+ {
+ GNUNET_free_non_null (socket->write_handle->messages[packet]);
+ }
+ if (NULL != socket->write_handle->write_cont)
+ socket->write_handle->write_cont
+ (socket->write_handle->write_cont_cls,
+ socket->status,
+ socket->write_handle->size);
+ /* We are done with the write handle - Freeing it */
+ GNUNET_free (socket->write_handle);
+ socket->write_handle = NULL;
+ }
break;
default:
break;
@@ -1806,7 +1988,9 @@
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Target peer %s connected\n", GNUNET_i2s (peer));
+ "%s: Target peer %s connected\n",
+ GNUNET_i2s (&socket->our_id),
+ GNUNET_i2s (peer));
/* Set state to INIT */
socket->state = STATE_INIT;
@@ -1987,7 +2171,7 @@
10, /* QUEUE size as parameter? */
socket, /* cls */
NULL, /* No inbound tunnel handler */
- &tunnel_cleaner,
+ &tunnel_cleaner, /* FIXME: not required?
*/
client_message_handlers,
&app_port); /* We don't get inbound
tunnels */
if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
@@ -2164,6 +2348,7 @@
uint32_t payload_size;
struct GNUNET_STREAM_DataMessage *data_msg;
const void *sweep;
+ struct GNUNET_TIME_Relative ack_deadline;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%s\n", __func__);
@@ -2188,7 +2373,13 @@
size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
+ io_handle->write_cont = write_cont;
+ io_handle->write_cont_cls = write_cont_cls;
+ io_handle->size = size;
sweep = data;
+ /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
+ determined from RTT */
+ ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
/* Divide the given buffer into packets for sending */
for (packet=0; packet < num_needed_packets; packet++)
{
@@ -2213,9 +2404,8 @@
/* FIXME: Remove the fixed delay for ack deadline; Set it to the value
determined from RTT */
- io_handle->messages[packet]->ack_deadline =
- GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5));
+ io_handle->messages[packet]->ack_deadline =
+ GNUNET_TIME_relative_hton (ack_deadline);
data_msg = io_handle->messages[packet];
/* Copy data from given buffer to the packet */
memcpy (&data_msg[1],
@@ -2227,10 +2417,10 @@
socket->write_handle = io_handle;
write_data (socket);
- return io_handle;
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%s() END\n", __func__);
+
+ return io_handle;
}
Modified: gnunet/src/stream/test_stream_local.c
===================================================================
--- gnunet/src/stream/test_stream_local.c 2012-03-08 00:14:55 UTC (rev
20361)
+++ gnunet/src/stream/test_stream_local.c 2012-03-08 07:45:52 UTC (rev
20362)
@@ -380,7 +380,7 @@
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer group is now read \n");
+ "Peer group is now ready\n");
GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r20362 - gnunet/src/stream,
gnunet <=