[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r26430 - gnunet/src/dv
From: |
gnunet |
Subject: |
[GNUnet-SVN] r26430 - gnunet/src/dv |
Date: |
Thu, 14 Mar 2013 17:03:30 +0100 |
Author: grothoff
Date: 2013-03-14 17:03:30 +0100 (Thu, 14 Mar 2013)
New Revision: 26430
Modified:
gnunet/src/dv/dv.h
gnunet/src/dv/dv_api.c
gnunet/src/dv/gnunet-service-dv.c
Log:
-generate and process ACKs
Modified: gnunet/src/dv/dv.h
===================================================================
--- gnunet/src/dv/dv.h 2013-03-14 15:42:14 UTC (rev 26429)
+++ gnunet/src/dv/dv.h 2013-03-14 16:03:30 UTC (rev 26430)
@@ -121,7 +121,7 @@
struct GNUNET_MessageHeader header;
/**
- * Unique ID for this message, for confirm callback.
+ * Unique ID for this message, for confirm callback, must never be zero.
*/
uint32_t uid GNUNET_PACKED;
Modified: gnunet/src/dv/dv_api.c
===================================================================
--- gnunet/src/dv/dv_api.c 2013-03-14 15:42:14 UTC (rev 26429)
+++ gnunet/src/dv/dv_api.c 2013-03-14 16:03:30 UTC (rev 26430)
@@ -74,6 +74,11 @@
*/
struct GNUNET_PeerIdentity target;
+ /**
+ * UID of our message, if any.
+ */
+ uint32_t uid;
+
};
@@ -184,10 +189,17 @@
th);
memcpy (&cbuf[ret], th->msg, tsize);
ret += tsize;
- (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks,
- &th->target.hashPubKey,
- th,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ if (NULL != th->cb)
+ {
+ (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks,
+ &th->target.hashPubKey,
+ th,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ }
+ else
+ {
+ GNUNET_free (th);
+ }
}
return ret;
}
@@ -215,6 +227,54 @@
/**
+ * Closure for 'process_ack'.
+ */
+struct AckContext
+{
+ /**
+ * The ACK message.
+ */
+ const struct GNUNET_DV_AckMessage *ack;
+
+ /**
+ * Our service handle.
+ */
+ struct GNUNET_DV_ServiceHandle *sh;
+};
+
+
+/**
+ * We got an ACK. Check if it matches the given transmit handle, and if
+ * so call the continuation.
+ *
+ * @param cls the 'struct AckContext'
+ * @param key peer identity
+ * @param value the 'struct GNUNET_DV_TransmitHandle'
+ * @return GNUNET_OK if the ACK did not match (continue to iterate)
+ */
+static int
+process_ack (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct AckContext *ctx = cls;
+ struct GNUNET_DV_TransmitHandle *th = value;
+
+ if (th->uid != ntohl (ctx->ack->uid))
+ return GNUNET_OK;
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (ctx->sh->send_callbacks,
+ key,
+ th));
+ /* FIXME: should distinguish between success and failure here... */
+ th->cb (th->cb_cls,
+ GNUNET_OK);
+ GNUNET_free (th);
+ return GNUNET_NO;
+}
+
+
+/**
* Handles a message sent from the DV service to us.
* Parse it out and give it to the plugin.
*
@@ -230,7 +290,9 @@
const struct GNUNET_DV_DisconnectMessage *dm;
const struct GNUNET_DV_ReceivedMessage *rm;
const struct GNUNET_MessageHeader *payload;
-
+ const struct GNUNET_DV_AckMessage *ack;
+ struct AckContext ctx;
+
if (NULL == msg)
{
/* Connection closed */
@@ -282,6 +344,21 @@
ntohl (rm->distance),
payload);
break;
+ case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ ack = (const struct GNUNET_DV_AckMessage *) msg;
+ ctx.ack = ack;
+ ctx.sh = sh;
+ GNUNET_CONTAINER_multihashmap_get_multiple (sh->send_callbacks,
+ &ack->target.hashPubKey,
+ &process_ack,
+ &ctx);
+ return;
default:
reconnect (sh);
break;
@@ -495,6 +572,10 @@
sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
ntohs (msg->size));
+ if (0 == sh->uid_gen)
+ sh->uid_gen = 1;
+ th->uid = sh->uid_gen;
+ sm->uid = htonl (sh->uid_gen++);
/* use memcpy here as 'target' may not be sufficiently aligned */
memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
memcpy (&sm[1], msg, ntohs (msg->size));
Modified: gnunet/src/dv/gnunet-service-dv.c
===================================================================
--- gnunet/src/dv/gnunet-service-dv.c 2013-03-14 15:42:14 UTC (rev 26429)
+++ gnunet/src/dv/gnunet-service-dv.c 2013-03-14 16:03:30 UTC (rev 26430)
@@ -28,9 +28,6 @@
* @author Nathan Evans
*
* TODO:
- * - even _local_ flow control (send ACK only after core took our message) is
- * not implemented, but should be (easy fix, but needs adjustments to data
- * structures)
* - distance updates are not properly communicate to US by core,
* and conversely we don't give distance updates properly to the plugin yet
* - we send 'ACK' even if a message was dropped due to no route (may
@@ -420,12 +417,12 @@
* Forward a message from another peer to the plugin.
*
* @param message the message to send to the plugin
- * @param distant_neighbor the original sender of the message
+ * @param origin the original sender of the message
* @param distnace distance to the original sender of the message
*/
static void
send_data_to_plugin (const struct GNUNET_MessageHeader *message,
- const struct GNUNET_PeerIdentity *distant_neighbor,
+ const struct GNUNET_PeerIdentity *origin,
uint32_t distance)
{
struct GNUNET_DV_ReceivedMessage *received_msg;
@@ -443,7 +440,7 @@
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Delivering message from peer `%s'\n",
- GNUNET_i2s (distant_neighbor));
+ GNUNET_i2s (origin));
size = sizeof (struct GNUNET_DV_ReceivedMessage) +
ntohs (message->size);
if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
@@ -456,7 +453,7 @@
received_msg->header.size = htons (size);
received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV);
received_msg->distance = htonl (distance);
- received_msg->sender = *distant_neighbor;
+ received_msg->sender = *origin;
memcpy (&received_msg[1], message, ntohs (message->size));
GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head,
plugin_pending_tail,
@@ -611,8 +608,9 @@
dn->pm_tail,
pending);
memcpy (&cbuf[off], pending->msg, msize);
- send_ack_to_plugin (&pending->ultimate_target,
- pending->uid);
+ if (0 != pending->uid)
+ send_ack_to_plugin (&pending->ultimate_target,
+ pending->uid);
GNUNET_free (pending);
off += msize;
}
@@ -633,6 +631,8 @@
* Forward the given payload to the given target.
*
* @param target where to send the message
+ * @param uid unique ID for the message
+ * @param ultimate_target ultimate recipient for the message
* @param distance expected (remaining) distance to the target
* @param sender original sender of the message
* @param payload payload of the message
@@ -640,7 +640,9 @@
static void
forward_payload (struct DirectNeighbor *target,
uint32_t distance,
+ uint32_t uid,
const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_PeerIdentity *ultimate_target,
const struct GNUNET_MessageHeader *payload)
{
struct PendingMessage *pm;
@@ -651,7 +653,10 @@
(0 != memcmp (sender,
&my_identity,
sizeof (struct GNUNET_PeerIdentity))) )
+ {
+ GNUNET_break (0 == uid);
return;
+ }
msize = sizeof (struct RouteMessage) + ntohs (payload->size);
if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
@@ -659,6 +664,8 @@
return;
}
pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+ pm->ultimate_target = *ultimate_target;
+ pm->uid = uid;
pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
rm = (struct RouteMessage *) &pm[1];
rm->header.size = htons ((uint16_t) msize);
@@ -1271,6 +1278,8 @@
}
forward_payload (route->next_hop,
ntohl (route->target.distance),
+ 0,
+ &rm->target,
&rm->sender,
payload);
return GNUNET_OK;
@@ -1300,6 +1309,7 @@
return;
}
msg = (const struct GNUNET_DV_SendMessage *) message;
+ GNUNET_break (0 != ntohl (msg->uid));
payload = (const struct GNUNET_MessageHeader *) &msg[1];
if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs
(payload->size))
{
@@ -1316,14 +1326,14 @@
GNUNET_STATISTICS_update (stats,
"# local messages discarded (no route)",
1, GNUNET_NO);
- send_ack_to_plugin (&msg->target, htonl (msg->uid));
+ send_ack_to_plugin (&msg->target, ntohl (msg->uid));
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return;
}
- // FIXME: flow control (send ACK only once message has left the queue...)
- send_ack_to_plugin (&msg->target, htonl (msg->uid));
forward_payload (route->next_hop,
ntohl (route->target.distance),
+ htonl (msg->uid),
+ &msg->target,
&my_identity,
payload);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r26430 - gnunet/src/dv,
gnunet <=