[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r37460 - in gnunet/src: dv include
From: |
gnunet |
Subject: |
[GNUnet-SVN] r37460 - in gnunet/src: dv include |
Date: |
Tue, 5 Jul 2016 16:17:51 +0200 |
Author: grothoff
Date: 2016-07-05 16:17:50 +0200 (Tue, 05 Jul 2016)
New Revision: 37460
Modified:
gnunet/src/dv/dv.h
gnunet/src/dv/dv_api.c
gnunet/src/dv/gnunet-service-dv.c
gnunet/src/dv/plugin_transport_dv.c
gnunet/src/include/gnunet_dv_service.h
Log:
-reworking DV to use new MQ API
Modified: gnunet/src/dv/dv.h
===================================================================
--- gnunet/src/dv/dv.h 2016-07-05 12:41:49 UTC (rev 37459)
+++ gnunet/src/dv/dv.h 2016-07-05 14:17:50 UTC (rev 37460)
@@ -126,9 +126,9 @@
struct GNUNET_MessageHeader header;
/**
- * Unique ID for this message, for confirm callback, must never be zero.
+ * Reserved for alignment. 0.
*/
- uint32_t uid GNUNET_PACKED;
+ uint32_t reserved GNUNET_PACKED;
/**
* The (actual) target of the message
@@ -139,31 +139,6 @@
/**
- * Message from service to DV plugin, saying that a
- * SEND request was handled.
- */
-struct GNUNET_DV_AckMessage
-{
- /**
- * Type: #GNUNET_MESSAGE_TYPE_DV_SEND_ACK or
- * #GNUNET_MESSAGE_TYPE_DV_SEND_NACK.
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Which message is being acknowledged?
- */
- uint32_t uid GNUNET_PACKED;
-
- /**
- * The (actual) target of the message
- */
- struct GNUNET_PeerIdentity target;
-
-};
-
-
-/**
* Message from service to DV plugin, saying that our
* distance to another peer changed.
*/
Modified: gnunet/src/dv/dv_api.c
===================================================================
--- gnunet/src/dv/dv_api.c 2016-07-05 12:41:49 UTC (rev 37459)
+++ gnunet/src/dv/dv_api.c 2016-07-05 14:17:50 UTC (rev 37460)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- Copyright (C) 2009--2013 GNUnet e.V.
+ Copyright (C) 2009--2013, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -37,60 +37,6 @@
/**
* Information we track for each peer.
*/
-struct ConnectedPeer;
-
-
-/**
- * Handle for a send operation.
- */
-struct GNUNET_DV_TransmitHandle
-{
- /**
- * Kept in a DLL.
- */
- struct GNUNET_DV_TransmitHandle *next;
-
- /**
- * Kept in a DLL.
- */
- struct GNUNET_DV_TransmitHandle *prev;
-
- /**
- * Handle to the service.
- */
- struct GNUNET_DV_ServiceHandle *sh;
-
- /**
- * Function to call upon completion.
- */
- GNUNET_DV_MessageSentCallback cb;
-
- /**
- * Closure for @a cb.
- */
- void *cb_cls;
-
- /**
- * The actual message (allocated at the end of this struct).
- */
- const struct GNUNET_MessageHeader *msg;
-
- /**
- * Destination for the message.
- */
- struct ConnectedPeer *target;
-
- /**
- * UID of our message, if any.
- */
- uint32_t uid;
-
-};
-
-
-/**
- * Information we track for each peer.
- */
struct ConnectedPeer
{
@@ -99,22 +45,6 @@
*/
struct GNUNET_PeerIdentity pid;
- /**
- * Head of DLL of transmission handles where we need
- * to invoke a continuation when we are informed about
- * successful transmission. The respective request
- * has already been sent to the DV service.
- */
- struct GNUNET_DV_TransmitHandle *head;
-
- /**
- * Tail of DLL of transmission handles where we need
- * to invoke a continuation when we are informed about
- * successful transmission. The respective request
- * has already been sent to the DV service.
- */
- struct GNUNET_DV_TransmitHandle *tail;
-
};
@@ -127,14 +57,9 @@
/**
* Connection to DV service.
*/
- struct GNUNET_CLIENT_Connection *client;
+ struct GNUNET_MQ_Handle *mq;
/**
- * Active request for transmission to DV service.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
-
- /**
* Our configuration.
*/
const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -165,26 +90,11 @@
GNUNET_DV_MessageReceivedCallback message_cb;
/**
- * Head of messages to transmit.
- */
- struct GNUNET_DV_TransmitHandle *th_head;
-
- /**
- * Tail of messages to transmit.
- */
- struct GNUNET_DV_TransmitHandle *th_tail;
-
- /**
* Information tracked per connected peer. Maps peer
* identities to `struct ConnectedPeer` entries.
*/
struct GNUNET_CONTAINER_MultiPeerMap *peers;
- /**
- * Current unique ID
- */
- uint32_t uid_gen;
-
};
@@ -198,120 +108,160 @@
/**
- * Start sending messages from our queue to the service.
+ * We got disconnected from the service and thus all of the
+ * connections need to be torn down.
*
- * @param sh service handle
+ * @param cls the `struct GNUNET_DV_ServiceHandle`
+ * @param key a peer identity
+ * @param value a `struct ConnectedPeer` to clean up
+ * @return #GNUNET_OK (continue to iterate)
*/
-static void
-start_transmit (struct GNUNET_DV_ServiceHandle *sh);
+static int
+cleanup_send_cb (void *cls,
+ const struct GNUNET_PeerIdentity *key,
+ void *value)
+{
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ struct ConnectedPeer *peer = value;
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (sh->peers,
+ key,
+ peer));
+ sh->disconnect_cb (sh->cls,
+ key);
+ GNUNET_free (peer);
+ return GNUNET_OK;
+}
+
/**
- * Gives a message from our queue to the DV service.
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
*
- * @param cls handle to the dv service (`struct GNUNET_DV_ServiceHandle`)
- * @param size how many bytes can we send
- * @param buf where to copy the message to send
- * @return how many bytes we copied to @a buf
+ * @param cls the handle to the DV API
+ * @param cm the message that was received
*/
-static size_t
-transmit_pending (void *cls, size_t size, void *buf)
+static void
+handle_connect (void *cls,
+ const struct GNUNET_DV_ConnectMessage *cm)
{
struct GNUNET_DV_ServiceHandle *sh = cls;
- char *cbuf = buf;
- struct GNUNET_DV_TransmitHandle *th;
- size_t ret;
- size_t tsize;
+ struct ConnectedPeer *peer;
- sh->th = NULL;
- if (NULL == buf)
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &cm->peer);
+ if (NULL != peer)
{
+ GNUNET_break (0);
reconnect (sh);
- return 0;
+ return;
}
- ret = 0;
- while ( (NULL != (th = sh->th_head)) &&
- (size - ret >= (tsize = ntohs (th->msg->size)) ))
+ peer = GNUNET_new (struct ConnectedPeer);
+ peer->pid = cm->peer;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_put (sh->peers,
+ &peer->pid,
+ peer,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ sh->connect_cb (sh->cls,
+ &cm->peer,
+ ntohl (cm->distance),
+ (enum GNUNET_ATS_Network_Type) ntohl (cm->network));
+}
+
+
+/**
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
+ *
+ * @param cls the handle to the DV API
+ * @param dm the message that was received
+ */
+static void
+handle_disconnect (void *cls,
+ const struct GNUNET_DV_DisconnectMessage *dm)
+{
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ struct ConnectedPeer *peer;
+
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &dm->peer);
+ if (NULL == peer)
{
- GNUNET_CONTAINER_DLL_remove (sh->th_head,
- sh->th_tail,
- th);
- memcpy (&cbuf[ret], th->msg, tsize);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Passing %u bytes of type %u to DV service\n",
- tsize,
- ntohs (th->msg->type));
- th->msg = NULL;
- ret += tsize;
- if (NULL != th->cb)
- {
- GNUNET_CONTAINER_DLL_insert_tail (th->target->head,
- th->target->tail,
- th);
- }
- else
- {
- GNUNET_free (th);
- }
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
}
- if (NULL != sh->th_head)
- start_transmit (sh);
- return ret;
+ cleanup_send_cb (sh,
+ &dm->peer,
+ peer);
}
/**
- * Start sending messages from our queue to the service.
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
*
- * @param sh service handle
+ * @param cls the handle to the DV API
+ * @param msg the message that was received
*/
static void
-start_transmit (struct GNUNET_DV_ServiceHandle *sh)
+handle_distance_update (void *cls,
+ const struct GNUNET_DV_DistanceUpdateMessage *dum)
{
- if (NULL != sh->th)
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ struct ConnectedPeer *peer;
+
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &dum->peer);
+ if (NULL == peer)
+ {
+ GNUNET_break (0);
+ reconnect (sh);
return;
- if (NULL == sh->th_head)
- return;
- sh->th =
- GNUNET_CLIENT_notify_transmit_ready (sh->client,
- ntohs (sh->th_head->msg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &transmit_pending, sh);
+ }
+ sh->distance_cb (sh->cls,
+ &dum->peer,
+ ntohl (dum->distance),
+ (enum GNUNET_ATS_Network_Type) ntohl (dum->network));
}
/**
- * We got disconnected from the service and thus all of the
- * pending send callbacks will never be confirmed. Clean up.
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
*
- * @param cls the 'struct GNUNET_DV_ServiceHandle'
- * @param key a peer identity
- * @param value a `struct ConnectedPeer` to clean up
- * @return #GNUNET_OK (continue to iterate)
+ * @param cls the handle to the DV API
+ * @param rm the message that was received
*/
static int
-cleanup_send_cb (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
+check_received (void *cls,
+ const struct GNUNET_DV_ReceivedMessage *rm)
{
struct GNUNET_DV_ServiceHandle *sh = cls;
- struct ConnectedPeer *peer = value;
- struct GNUNET_DV_TransmitHandle *th;
+ const struct GNUNET_MessageHeader *payload;
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_remove (sh->peers,
- key,
- peer));
- sh->disconnect_cb (sh->cls,
- key);
- while (NULL != (th = peer->head))
+ if (NULL ==
+ GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &rm->sender))
{
- GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th);
- th->cb (th->cb_cls);
- GNUNET_free (th);
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
}
- GNUNET_free (peer);
+ if (ntohs (rm->header.size) - sizeof (struct GNUNET_DV_ReceivedMessage) <
+ sizeof (*payload))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ payload = (const struct GNUNET_MessageHeader *) &rm[1];
+ if (ntohs (rm->header.size) !=
+ sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
return GNUNET_OK;
}
@@ -321,211 +271,38 @@
* Parse it out and give it to the plugin.
*
* @param cls the handle to the DV API
- * @param msg the message that was received
+ * @param rm the message that was received
*/
static void
-handle_message_receipt (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_received (void *cls,
+ const struct GNUNET_DV_ReceivedMessage *rm)
{
struct GNUNET_DV_ServiceHandle *sh = cls;
- const struct GNUNET_DV_ConnectMessage *cm;
- const struct GNUNET_DV_DistanceUpdateMessage *dum;
- const struct GNUNET_DV_DisconnectMessage *dm;
- const struct GNUNET_DV_ReceivedMessage *rm;
const struct GNUNET_MessageHeader *payload;
- const struct GNUNET_DV_AckMessage *ack;
- struct GNUNET_DV_TransmitHandle *th;
- struct GNUNET_DV_TransmitHandle *tn;
- struct ConnectedPeer *peer;
- if (NULL == msg)
- {
- /* Connection closed */
- reconnect (sh);
- return;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u with %u bytes from DV service\n",
- (unsigned int) ntohs (msg->type),
- (unsigned int) ntohs (msg->size));
- switch (ntohs (msg->type))
- {
- case GNUNET_MESSAGE_TYPE_DV_CONNECT:
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ConnectMessage))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- cm = (const struct GNUNET_DV_ConnectMessage *) msg;
- peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
- &cm->peer);
- if (NULL != peer)
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- peer = GNUNET_new (struct ConnectedPeer);
- peer->pid = cm->peer;
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (sh->peers,
- &peer->pid,
- peer,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- sh->connect_cb (sh->cls,
- &cm->peer,
- ntohl (cm->distance),
- (enum GNUNET_ATS_Network_Type) ntohl (cm->network));
- break;
- case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED:
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg;
- sh->distance_cb (sh->cls,
- &dum->peer,
- ntohl (dum->distance),
- (enum GNUNET_ATS_Network_Type) ntohl (dum->network));
- break;
- case GNUNET_MESSAGE_TYPE_DV_DISCONNECT:
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- dm = (const struct GNUNET_DV_DisconnectMessage *) msg;
- peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
- &dm->peer);
- if (NULL == peer)
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- tn = sh->th_head;
- while (NULL != (th = tn))
- {
- tn = th->next;
- if (peer == th->target)
- {
- GNUNET_CONTAINER_DLL_remove (sh->th_head,
- sh->th_tail,
- th);
- th->cb (th->cb_cls);
- GNUNET_free (th);
- }
- }
- cleanup_send_cb (sh, &dm->peer, peer);
- break;
- case GNUNET_MESSAGE_TYPE_DV_RECV:
- if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof
(struct GNUNET_MessageHeader))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- rm = (const struct GNUNET_DV_ReceivedMessage *) msg;
- payload = (const struct GNUNET_MessageHeader *) &rm[1];
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs
(payload->size))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- if (NULL ==
- GNUNET_CONTAINER_multipeermap_get (sh->peers,
- &rm->sender))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- sh->message_cb (sh->cls,
- &rm->sender,
- ntohl (rm->distance),
- payload);
- break;
- case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
- case GNUNET_MESSAGE_TYPE_DV_SEND_NACK:
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- ack = (const struct GNUNET_DV_AckMessage *) msg;
- peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
- &ack->target);
- if (NULL == peer)
- break; /* this happens, just ignore */
- for (th = peer->head; NULL != th; th = th->next)
- {
- if (th->uid != ntohl (ack->uid))
- continue;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Matched ACK for message to peer %s\n",
- GNUNET_i2s (&ack->target));
- GNUNET_CONTAINER_DLL_remove (peer->head,
- peer->tail,
- th);
- th->cb (th->cb_cls);
- GNUNET_free (th);
- break;
- }
- break;
- default:
- reconnect (sh);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message, continuing receive loop for %p\n",
- sh->client);
- GNUNET_CLIENT_receive (sh->client,
- &handle_message_receipt, sh,
- GNUNET_TIME_UNIT_FOREVER_REL);
+ payload = (const struct GNUNET_MessageHeader *) &rm[1];
+ sh->message_cb (sh->cls,
+ &rm->sender,
+ ntohl (rm->distance),
+ payload);
}
/**
- * Transmit the start message to the DV service.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
*
- * @param cls the `struct GNUNET_DV_ServiceHandle *`
- * @param size number of bytes available in buf
- * @param buf where to copy the message
- * @return number of bytes written to buf
+ * @param cls closure with the `struct GNUNET_DV_ServiceHandle *`
+ * @param error error code
*/
-static size_t
-transmit_start (void *cls,
- size_t size,
- void *buf)
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
{
struct GNUNET_DV_ServiceHandle *sh = cls;
- struct GNUNET_MessageHeader start_message;
- sh->th = NULL;
- if (NULL == buf)
- {
- GNUNET_break (0);
- reconnect (sh);
- return 0;
- }
- GNUNET_assert (size >= sizeof (start_message));
- start_message.size = htons (sizeof (struct GNUNET_MessageHeader));
- start_message.type = htons (GNUNET_MESSAGE_TYPE_DV_START);
- memcpy (buf, &start_message, sizeof (start_message));
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting START request, starting receive loop for %p\n",
- sh->client);
- GNUNET_CLIENT_receive (sh->client,
- &handle_message_receipt, sh,
- GNUNET_TIME_UNIT_FOREVER_REL);
- start_transmit (sh);
- return sizeof (start_message);
+ reconnect (sh);
}
@@ -537,36 +314,52 @@
static void
reconnect (struct GNUNET_DV_ServiceHandle *sh)
{
- if (NULL != sh->th)
+ GNUNET_MQ_hd_fixed_size (connect,
+ GNUNET_MESSAGE_TYPE_DV_CONNECT,
+ struct GNUNET_DV_ConnectMessage);
+ GNUNET_MQ_hd_fixed_size (disconnect,
+ GNUNET_MESSAGE_TYPE_DV_DISCONNECT,
+ struct GNUNET_DV_DisconnectMessage);
+ GNUNET_MQ_hd_fixed_size (distance_update,
+ GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED,
+ struct GNUNET_DV_DistanceUpdateMessage);
+ GNUNET_MQ_hd_var_size (received,
+ GNUNET_MESSAGE_TYPE_DV_RECV,
+ struct GNUNET_DV_ReceivedMessage);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_connect_handler (sh),
+ make_disconnect_handler (sh),
+ make_distance_update_handler (sh),
+ make_received_handler (sh),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MessageHeader *sm;
+ struct GNUNET_MQ_Envelope *env;
+
+ if (NULL != sh->mq)
{
- GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
- sh->th = NULL;
+ GNUNET_MQ_destroy (sh->mq);
+ sh->mq = NULL;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Disconnecting from DV service at %p\n",
- sh->client);
- if (NULL != sh->client)
- {
- GNUNET_CLIENT_disconnect (sh->client);
- sh->client = NULL;
- }
GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
&cleanup_send_cb,
sh);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connecting to DV service\n");
- sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg);
- if (NULL == sh->client)
+ sh->mq = GNUNET_CLIENT_connecT (sh->cfg,
+ "dv",
+ handlers,
+ &mq_error_handler,
+ sh);
+ if (NULL == sh->mq)
{
GNUNET_break (0);
return;
}
- sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client,
- sizeof (struct
GNUNET_MessageHeader),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &transmit_start,
- sh);
+ env = GNUNET_MQ_msg (sm,
+ GNUNET_MESSAGE_TYPE_DV_START);
+ GNUNET_MQ_send (sh->mq,
+ env);
}
@@ -598,7 +391,8 @@
sh->distance_cb = distance_cb;
sh->disconnect_cb = disconnect_cb;
sh->message_cb = message_cb;
- sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
+ sh->peers = GNUNET_CONTAINER_multipeermap_create (128,
+ GNUNET_YES);
reconnect (sh);
return sh;
}
@@ -612,27 +406,13 @@
void
GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
{
- struct GNUNET_DV_TransmitHandle *pos;
-
if (NULL == sh)
return;
- if (NULL != sh->th)
+ if (NULL != sh->mq)
{
- GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
- sh->th = NULL;
+ GNUNET_MQ_destroy (sh->mq);
+ sh->mq = NULL;
}
- while (NULL != (pos = sh->th_head))
- {
- GNUNET_CONTAINER_DLL_remove (sh->th_head,
- sh->th_tail,
- pos);
- GNUNET_free (pos);
- }
- if (NULL != sh->client)
- {
- GNUNET_CLIENT_disconnect (sh->client);
- sh->client = NULL;
- }
GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
&cleanup_send_cb,
sh);
@@ -647,88 +427,41 @@
* @param sh service handle
* @param target intended recpient
* @param msg message payload
- * @param cb function to invoke when done
- * @param cb_cls closure for @a cb
- * @return handle to cancel the operation
*/
-struct GNUNET_DV_TransmitHandle *
+void
GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_MessageHeader *msg,
- GNUNET_DV_MessageSentCallback cb,
- void *cb_cls)
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_DV_TransmitHandle *th;
struct GNUNET_DV_SendMessage *sm;
struct ConnectedPeer *peer;
+ struct GNUNET_MQ_Envelope *env;
- if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >=
GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ if (ntohs (msg->size) + sizeof (*sm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
GNUNET_break (0);
- return NULL;
+ return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to send %u bytes of type %u to %s via %p\n",
+ "Asked to send %u bytes of type %u to %s\n",
(unsigned int) ntohs (msg->size),
(unsigned int) ntohs (msg->type),
- GNUNET_i2s (target),
- sh->client);
+ GNUNET_i2s (target));
peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
target);
if (NULL == peer)
{
GNUNET_break (0);
- return NULL;
+ return;
}
- th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) +
- sizeof (struct GNUNET_DV_SendMessage) +
- ntohs (msg->size));
- th->sh = sh;
- th->target = peer;
- th->cb = cb;
- th->cb_cls = cb_cls;
- th->msg = (const struct GNUNET_MessageHeader *) &th[1];
- sm = (struct GNUNET_DV_SendMessage *) &th[1];
- sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
- sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
- ntohs (msg->size));
- if (0 == sh->uid_gen)
- sh->uid_gen = 1;
- th->uid = sh->uid_gen;
- sm->uid = htonl (sh->uid_gen++);
- /* use memcpy here as 'target' may not be sufficiently aligned */
- memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
- memcpy (&sm[1], msg, ntohs (msg->size));
- GNUNET_CONTAINER_DLL_insert_tail (sh->th_head,
- sh->th_tail,
- th);
- start_transmit (sh);
- return th;
+ GNUNET_assert (NULL != sh->mq);
+ env = GNUNET_MQ_msg_nested_mh (sm,
+ GNUNET_MESSAGE_TYPE_DV_SEND,
+ msg);
+ sm->target = *target;
+ GNUNET_MQ_send (sh->mq,
+ env);
}
-/**
- * Abort send operation (naturally, the message may have
- * already been transmitted; this only stops the 'cb'
- * from being called again).
- *
- * @param th send operation to cancel
- */
-void
-GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th)
-{
- struct GNUNET_DV_ServiceHandle *sh = th->sh;
-
- if (NULL == th->msg)
- GNUNET_CONTAINER_DLL_remove (th->target->head,
- th->target->tail,
- th);
- else
- GNUNET_CONTAINER_DLL_remove (sh->th_head,
- sh->th_tail,
- th);
- GNUNET_free (th);
-}
-
-
/* end of dv_api.c */
Modified: gnunet/src/dv/gnunet-service-dv.c
===================================================================
--- gnunet/src/dv/gnunet-service-dv.c 2016-07-05 12:41:49 UTC (rev 37459)
+++ gnunet/src/dv/gnunet-service-dv.c 2016-07-05 14:17:50 UTC (rev 37460)
@@ -149,11 +149,6 @@
*/
struct GNUNET_PeerIdentity next_target;
- /**
- * Unique ID of the message.
- */
- uint32_t uid;
-
};
@@ -480,33 +475,6 @@
/**
- * Give an (N)ACK message to the plugin, we transmitted a message for it.
- *
- * @param target peer that received the message
- * @param uid plugin-chosen UID for the message
- * @param nack #GNUNET_NO to send ACK, #GNUNET_YES to send NACK
- */
-static void
-send_ack_to_plugin (const struct GNUNET_PeerIdentity *target,
- uint32_t uid,
- int nack)
-{
- struct GNUNET_DV_AckMessage ack_msg;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Delivering ACK for message to peer `%s'\n",
- GNUNET_i2s (target));
- ack_msg.header.size = htons (sizeof (ack_msg));
- ack_msg.header.type = htons ((GNUNET_YES == nack)
- ? GNUNET_MESSAGE_TYPE_DV_SEND_NACK
- : GNUNET_MESSAGE_TYPE_DV_SEND_ACK);
- ack_msg.uid = htonl (uid);
- ack_msg.target = *target;
- send_control_to_plugin (&ack_msg.header);
-}
-
-
-/**
* Send a DISTANCE_CHANGED message to the plugin.
*
* @param peer peer with a changed distance
@@ -613,16 +581,6 @@
dn->pm_tail,
pending);
memcpy (&cbuf[off], pending->msg, msize);
- if (0 != pending->uid)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Acking transmission of %u bytes to %s with plugin\n",
- (unsigned int) msize,
- GNUNET_i2s (&pending->next_target));
- send_ack_to_plugin (&pending->next_target,
- pending->uid,
- GNUNET_NO);
- }
GNUNET_free (pending);
off += msize;
}
@@ -649,7 +607,6 @@
*
* @param target where to send the message
* @param distance distance to the @a sender
- * @param uid unique ID for the message
* @param sender original sender of the message
* @param actual_target ultimate recipient for the message
* @param payload payload of the message
@@ -657,7 +614,6 @@
static void
forward_payload (struct DirectNeighbor *target,
uint32_t distance,
- uint32_t uid,
const struct GNUNET_PeerIdentity *sender,
const struct GNUNET_PeerIdentity *actual_target,
const struct GNUNET_MessageHeader *payload)
@@ -667,7 +623,6 @@
size_t msize;
if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) &&
- (0 == uid) &&
(0 != memcmp (sender,
&my_identity,
sizeof (struct GNUNET_PeerIdentity))) )
@@ -686,7 +641,6 @@
}
pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
pm->next_target = target->peer;
- pm->uid = uid;
pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
rm = (struct RouteMessage *) &pm[1];
rm->header.size = htons ((uint16_t) msize);
@@ -1888,7 +1842,6 @@
GNUNET_i2s (&neighbor->peer));
forward_payload (neighbor,
distance + 1,
- 0,
&rm->sender,
&rm->target,
payload);
@@ -1920,7 +1873,6 @@
return;
}
msg = (const struct GNUNET_DV_SendMessage *) message;
- GNUNET_break (0 != ntohl (msg->uid));
payload = (const struct GNUNET_MessageHeader *) &msg[1];
if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs
(payload->size))
{
@@ -1940,7 +1892,6 @@
GNUNET_STATISTICS_update (stats,
"# local messages discarded (no route)",
1, GNUNET_NO);
- send_ack_to_plugin (&msg->target, ntohl (msg->uid), GNUNET_YES);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return;
}
@@ -1952,7 +1903,6 @@
forward_payload (route->next_hop,
0 /* first hop, distance is zero */,
- htonl (msg->uid),
&my_identity,
&msg->target,
payload);
Modified: gnunet/src/dv/plugin_transport_dv.c
===================================================================
--- gnunet/src/dv/plugin_transport_dv.c 2016-07-05 12:41:49 UTC (rev 37459)
+++ gnunet/src/dv/plugin_transport_dv.c 2016-07-05 14:17:50 UTC (rev 37460)
@@ -45,51 +45,6 @@
/**
- * An active request for transmission via DV.
- */
-struct PendingRequest
-{
-
- /**
- * This is a DLL.
- */
- struct PendingRequest *next;
-
- /**
- * This is a DLL.
- */
- struct PendingRequest *prev;
-
- /**
- * Continuation function to call once the transmission buffer
- * has again space available. NULL if there is no
- * continuation to call.
- */
- GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
-
- /**
- * Closure for @e transmit_cont.
- */
- void *transmit_cont_cls;
-
- /**
- * Transmission handle from DV client library.
- */
- struct GNUNET_DV_TransmitHandle *th;
-
- /**
- * Session of this request.
- */
- struct GNUNET_ATS_Session *session;
-
- /**
- * Number of bytes to transmit.
- */
- size_t size;
-};
-
-
-/**
* Session handle for connections.
*/
struct GNUNET_ATS_Session
@@ -100,16 +55,6 @@
struct Plugin *plugin;
/**
- * Head of pending requests.
- */
- struct PendingRequest *pr_head;
-
- /**
- * Tail of pending requests.
- */
- struct PendingRequest *pr_tail;
-
- /**
* Address we use for the other peer.
*/
struct GNUNET_HELLO_Address *address;
@@ -449,7 +394,6 @@
free_session (struct GNUNET_ATS_Session *session)
{
struct Plugin *plugin = session->plugin;
- struct PendingRequest *pr;
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
@@ -470,20 +414,6 @@
session);
session->active = GNUNET_NO;
}
- while (NULL != (pr = session->pr_head))
- {
- GNUNET_CONTAINER_DLL_remove (session->pr_head,
- session->pr_tail,
- pr);
- GNUNET_DV_send_cancel (pr->th);
- pr->th = NULL;
- if (NULL != pr->transmit_cont)
- pr->transmit_cont (pr->transmit_cont_cls,
- &session->sender,
- GNUNET_SYSERR,
- pr->size, 0);
- GNUNET_free (pr);
- }
GNUNET_HELLO_address_free (session->address);
GNUNET_free (session);
}
@@ -515,31 +445,6 @@
/**
- * Function called once the delivery of a message has been successful.
- * Clean up the pending request, and call continuations.
- *
- * @param cls closure
- */
-static void
-send_finished (void *cls)
-{
- struct PendingRequest *pr = cls;
- struct GNUNET_ATS_Session *session = pr->session;
-
- pr->th = NULL;
- GNUNET_CONTAINER_DLL_remove (session->pr_head,
- session->pr_tail,
- pr);
- if (NULL != pr->transmit_cont)
- pr->transmit_cont (pr->transmit_cont_cls,
- &session->sender,
- GNUNET_OK,
- pr->size, 0);
- GNUNET_free (pr);
-}
-
-
-/**
* Function that can be used by the transport service to transmit
* a message using the plugin.
*
@@ -565,10 +470,10 @@
size_t msgbuf_size,
unsigned int priority,
struct GNUNET_TIME_Relative timeout,
- GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
+ GNUNET_TRANSPORT_TransmitContinuation cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
- struct PendingRequest *pr;
const struct GNUNET_MessageHeader *msg;
struct GNUNET_MessageHeader *box;
@@ -585,20 +490,13 @@
memcpy (&box[1], msgbuf, msgbuf_size);
msg = box;
}
- pr = GNUNET_new (struct PendingRequest);
- pr->transmit_cont = cont;
- pr->transmit_cont_cls = cont_cls;
- pr->session = session;
- pr->size = msgbuf_size;
- GNUNET_CONTAINER_DLL_insert_tail (session->pr_head,
- session->pr_tail,
- pr);
-
- pr->th = GNUNET_DV_send (plugin->dvh,
- &session->sender,
- msg,
- &send_finished,
- pr);
+ GNUNET_DV_send (plugin->dvh,
+ &session->sender,
+ msg);
+ cont (cont_cls,
+ &session->sender,
+ GNUNET_OK,
+ msgbuf_size, 0);
GNUNET_free_non_null (box);
return 0; /* DV */
}
@@ -618,26 +516,11 @@
{
struct Plugin *plugin = cls;
struct GNUNET_ATS_Session *session;
- struct PendingRequest *pr;
session = GNUNET_CONTAINER_multipeermap_get (plugin->sessions,
target);
if (NULL == session)
return; /* nothing to do */
- while (NULL != (pr = session->pr_head))
- {
- GNUNET_CONTAINER_DLL_remove (session->pr_head,
- session->pr_tail,
- pr);
- GNUNET_DV_send_cancel (pr->th);
- pr->th = NULL;
- if (NULL != pr->transmit_cont)
- pr->transmit_cont (pr->transmit_cont_cls,
- &session->sender,
- GNUNET_SYSERR,
- pr->size, 0);
- GNUNET_free (pr);
- }
session->active = GNUNET_NO;
}
@@ -655,22 +538,6 @@
dv_plugin_disconnect_session (void *cls,
struct GNUNET_ATS_Session *session)
{
- struct PendingRequest *pr;
-
- while (NULL != (pr = session->pr_head))
- {
- GNUNET_CONTAINER_DLL_remove (session->pr_head,
- session->pr_tail,
- pr);
- GNUNET_DV_send_cancel (pr->th);
- pr->th = NULL;
- if (NULL != pr->transmit_cont)
- pr->transmit_cont (pr->transmit_cont_cls,
- &session->sender,
- GNUNET_SYSERR,
- pr->size, 0);
- GNUNET_free (pr);
- }
session->active = GNUNET_NO;
return GNUNET_OK;
}
@@ -691,9 +558,11 @@
* @param asc_cls closure for @a asc
*/
static void
-dv_plugin_address_pretty_printer (void *cls, const char *type,
+dv_plugin_address_pretty_printer (void *cls,
+ const char *type,
const void *addr,
- size_t addrlen, int numeric,
+ size_t addrlen,
+ int numeric,
struct GNUNET_TIME_Relative timeout,
GNUNET_TRANSPORT_AddressStringCallback asc,
void *asc_cls)
Modified: gnunet/src/include/gnunet_dv_service.h
===================================================================
--- gnunet/src/include/gnunet_dv_service.h 2016-07-05 12:41:49 UTC (rev
37459)
+++ gnunet/src/include/gnunet_dv_service.h 2016-07-05 14:17:50 UTC (rev
37460)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- Copyright (C) 2013 GNUnet e.V.
+ Copyright (C) 2013, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -153,29 +153,14 @@
* @param sh service handle
* @param target intended recpient
* @param msg message payload
- * @param cb function to invoke when done
- * @param cb_cls closure for 'cb'
* @return handle to cancel the operation
*/
-struct GNUNET_DV_TransmitHandle *
+void
GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_MessageHeader *msg,
- GNUNET_DV_MessageSentCallback cb,
- void *cb_cls);
+ const struct GNUNET_MessageHeader *msg);
-/**
- * Abort send operation (naturally, the message may have
- * already been transmitted; this only stops the 'cb'
- * from being called again).
- *
- * @param th send operation to cancel
- */
-void
-GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th);
-
-
#endif
/** @} */ /* end of group */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r37460 - in gnunet/src: dv include,
gnunet <=