[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r23748 - gnunet/src/stream
From: |
gnunet |
Subject: |
[GNUnet-SVN] r23748 - gnunet/src/stream |
Date: |
Tue, 11 Sep 2012 14:55:53 +0200 |
Author: harsha
Date: 2012-09-11 14:55:53 +0200 (Tue, 11 Sep 2012)
New Revision: 23748
Modified:
gnunet/src/stream/stream_api.c
Log:
stream speedup fixes
Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c 2012-09-11 10:36:25 UTC (rev 23747)
+++ gnunet/src/stream/stream_api.c 2012-09-11 12:55:53 UTC (rev 23748)
@@ -1088,6 +1088,7 @@
const struct GNUNET_ATS_Information*atsi)
{
const void *payload;
+ struct GNUNET_TIME_Relative ack_deadline_rel;
uint32_t bytes_needed;
uint32_t relative_offset;
uint32_t relative_sequence_number;
@@ -1099,23 +1100,19 @@
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
-
- if (0 != memcmp (sender,
- &socket->other_peer,
- sizeof (struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (sender, &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received DATA from non-confirming peer\n",
- GNUNET_i2s (&socket->other_peer));
+ "%s: Received DATA from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
return GNUNET_YES;
}
-
switch (socket->state)
{
case STATE_ESTABLISHED:
case STATE_TRANSMIT_CLOSED:
- case STATE_TRANSMIT_CLOSE_WAIT:
-
+ case STATE_TRANSMIT_CLOSE_WAIT:
/* check if the message's sequence number is in the range we are
expecting */
relative_sequence_number =
@@ -1136,8 +1133,7 @@
socket);
}
return GNUNET_YES;
- }
-
+ }
/* Check if we have already seen this message */
if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
relative_sequence_number))
@@ -1151,20 +1147,14 @@
{
socket->ack_task_id =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
- (msg->ack_deadline),
- &ack_task,
- socket);
+ (msg->ack_deadline), &ack_task,
socket);
}
return GNUNET_YES;
}
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
- GNUNET_i2s (&socket->other_peer),
- ntohl (msg->sequence_number),
- ntohs (msg->header.header.size),
- GNUNET_i2s (&socket->other_peer));
-
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
+ ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
/* Check if we have to allocate the buffer */
size -= sizeof (struct GNUNET_STREAM_DataMessage);
relative_offset = ntohl (msg->offset) - socket->read_offset;
@@ -1181,54 +1171,67 @@
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Cannot accommodate packet %d as buffer is full\n",
- GNUNET_i2s (&socket->other_peer),
- ntohl (msg->sequence_number));
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
return GNUNET_YES;
}
}
-
/* Copy Data to buffer */
payload = &msg[1];
GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
- memcpy (socket->receive_buffer + relative_offset,
- payload,
- size);
+ memcpy (socket->receive_buffer + relative_offset, payload, size);
socket->receive_buffer_boundaries[relative_sequence_number] =
- relative_offset + size;
-
+ relative_offset + size;
/* Modify the ACK bitmap */
- ackbitmap_modify_bit (&socket->ack_bitmap,
- relative_sequence_number,
- GNUNET_YES);
-
+ ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
+ GNUNET_YES);
/* Start ACK sending task if one is not already present */
+ ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
{
+ ack_deadline_rel =
+ GNUNET_TIME_relative_min (ack_deadline_rel,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 300));
socket->ack_task_id =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
- (msg->ack_deadline),
- &ack_task,
- socket);
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline), &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
}
-
+ else
+ {
+ struct GNUNET_TIME_Relative ack_time_past;
+ struct GNUNET_TIME_Relative ack_time_remaining;
+ struct GNUNET_TIME_Relative ack_time_min;
+ ack_time_past =
+ GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
+ ack_time_remaining = GNUNET_TIME_relative_subtract
+ (socket->ack_time_deadline, ack_time_past);
+ ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
+ ack_deadline_rel);
+ if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
+ sizeof (struct GNUNET_TIME_Relative)))
+ {
+ ack_deadline_rel = ack_time_min;
+ GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+ socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
+ &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
+ }
+ }
if ((NULL != socket->read_handle) /* A read handle is waiting */
/* There is no current read task */
&& (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
/* We have the first packet */
- && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
- 0)))
+ && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Scheduling read processor\n",
- GNUNET_i2s (&socket->other_peer));
-
- socket->read_task_id =
- GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
- }
-
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+ socket);
+ }
break;
-
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received data message when it cannot be handled\n",
@@ -1261,11 +1264,8 @@
{
struct GNUNET_STREAM_Socket *socket = cls;
- return handle_data (socket,
- tunnel,
- sender,
- (const struct GNUNET_STREAM_DataMessage *) message,
- atsi);
+ return handle_data (socket, tunnel, sender,
+ (const struct GNUNET_STREAM_DataMessage *) message,
atsi);
}
@@ -1529,9 +1529,8 @@
const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
struct GNUNET_STREAM_HelloAckMessage *reply;
- if (0 != memcmp (sender,
- &socket->other_peer,
- sizeof (struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (sender, &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received HELLO_ACK from non-confirming peer\n",
@@ -1539,11 +1538,8 @@
return GNUNET_YES;
}
ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received HELLO_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
-
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
GNUNET_assert (socket->tunnel == tunnel);
switch (socket->state)
{
@@ -1555,7 +1551,7 @@
(unsigned int) socket->read_sequence_number);
socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
reply = generate_hello_ack (socket, GNUNET_YES);
- queue_message (socket, &reply->header, &set_state_established,
+ queue_message (socket, &reply->header, &set_state_established,
NULL, GNUNET_NO);
return GNUNET_OK;
case STATE_ESTABLISHED:
@@ -1568,8 +1564,7 @@
default:
LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n",
GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer),
- socket->state);
+ GNUNET_i2s (&socket->other_peer), socket->state);
socket->state = STATE_CLOSED; // introduce STATE_ERROR?
return GNUNET_SYSERR;
}
@@ -1626,7 +1621,6 @@
{
case STATE_ESTABLISHED:
socket->state = STATE_RECEIVE_CLOSED;
-
/* Send TRANSMIT_CLOSE_ACK */
reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
reply->header.type =
@@ -1634,7 +1628,6 @@
reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
queue_message (socket, reply, NULL, NULL, GNUNET_NO);
break;
-
default:
/* FIXME: Call statistics? */
break;
@@ -1703,7 +1696,6 @@
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
switch (operation)
{
case SHUT_RDWR:
@@ -1714,15 +1706,11 @@
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received CLOSE_ACK when shutdown handle is not for "
- "SHUT_RDWR\n",
- GNUNET_i2s (&socket->other_peer));
+ "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received CLOSE_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
socket->state = STATE_CLOSED;
break;
default:
@@ -1732,7 +1720,6 @@
return GNUNET_OK;
}
break;
-
case SHUT_RD:
switch (socket->state)
{
@@ -1741,15 +1728,11 @@
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
- "is not for SHUT_RD\n",
- GNUNET_i2s (&socket->other_peer));
+ "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received RECEIVE_CLOSE_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
socket->state = STATE_RECEIVE_CLOSED;
break;
default:
@@ -1758,7 +1741,6 @@
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
break;
case SHUT_WR:
switch (socket->state)
@@ -1772,25 +1754,20 @@
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from
%s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
socket->state = STATE_TRANSMIT_CLOSED;
break;
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
- GNUNET_i2s (&socket->other_peer));
-
+ GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
break;
default:
GNUNET_assert (0);
}
-
if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
shutdown_handle->completion_cb(shutdown_handle->completion_cls,
operation);
@@ -1800,7 +1777,7 @@
GNUNET_SCHEDULER_cancel
(shutdown_handle->close_msg_retransmission_task_id);
shutdown_handle->close_msg_retransmission_task_id =
- GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_free (shutdown_handle); /* Free shutdown handle */
socket->shutdown_handle = NULL;
@@ -3338,14 +3315,11 @@
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s\n", __func__);
-
- /* Return NULL if there is already a write request pending */
if (NULL != socket->write_handle)
{
GNUNET_break (0);
return NULL;
}
-
switch (socket->state)
{
case STATE_TRANSMIT_CLOSED:
@@ -3371,7 +3345,6 @@
case STATE_RECEIVE_CLOSE_WAIT:
break;
}
-
if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size;
num_needed_packets =
@@ -3408,25 +3381,24 @@
io_handle->messages[packet]->sequence_number =
htonl (socket->write_sequence_number++);
io_handle->messages[packet]->offset = htonl (socket->write_offset);
-
/* FIXME: Remove the fixed delay for ack deadline; Set it to the value
determined from RTT */
io_handle->messages[packet]->ack_deadline =
GNUNET_TIME_relative_hton (ack_deadline);
data_msg = io_handle->messages[packet];
/* Copy data from given buffer to the packet */
- memcpy (&data_msg[1],
- sweep,
- payload_size);
+ memcpy (&data_msg[1], sweep, payload_size);
sweep += payload_size;
socket->write_offset += payload_size;
}
+ /* ack the last data message. FIXME: remove when we figure out how to do this
+ using RTT */
+ io_handle->messages[num_needed_packets - 1]->ack_deadline =
+ GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
socket->write_handle = io_handle;
write_data (socket);
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s() END\n", __func__);
-
return io_handle;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r23748 - gnunet/src/stream,
gnunet <=