[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r26981 - in gnunet/src: include set
From: |
gnunet |
Subject: |
[GNUnet-SVN] r26981 - in gnunet/src: include set |
Date: |
Wed, 24 Apr 2013 13:48:32 +0200 |
Author: dold
Date: 2013-04-24 13:48:31 +0200 (Wed, 24 Apr 2013)
New Revision: 26981
Modified:
gnunet/src/include/gnunet_applications.h
gnunet/src/include/gnunet_protocols.h
gnunet/src/include/gnunet_set_service.h
gnunet/src/set/Makefile.am
gnunet/src/set/gnunet-service-set.c
gnunet/src/set/gnunet-set.c
gnunet/src/set/mq.c
gnunet/src/set/mq.h
gnunet/src/set/set.conf.in
gnunet/src/set/set.h
gnunet/src/set/set_api.c
gnunet/src/set/strata_estimator.c
gnunet/src/set/strata_estimator.h
gnunet/src/set/test_set_api.c
Log:
started implementing union operation for set
Modified: gnunet/src/include/gnunet_applications.h
===================================================================
--- gnunet/src/include/gnunet_applications.h 2013-04-23 15:55:13 UTC (rev
26980)
+++ gnunet/src/include/gnunet_applications.h 2013-04-24 11:48:31 UTC (rev
26981)
@@ -77,6 +77,12 @@
#define GNUNET_APPLICATION_TYPE_CONSENSUS 18
+/**
+ * Set. Used for two-peer set operations implemented using stream.
+ */
+#define GNUNET_APPLICATION_TYPE_SET 19
+
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2013-04-23 15:55:13 UTC (rev
26980)
+++ gnunet/src/include/gnunet_protocols.h 2013-04-24 11:48:31 UTC (rev
26981)
@@ -1807,11 +1807,42 @@
#define GNUNET_MESSAGE_TYPE_SET_REQUEST 578
/**
- * Evaluate a set operation
+ * Evaluate a set operation.
*/
#define GNUNET_MESSAGE_TYPE_SET_CREATE 579
+/**
+ * Evaluate a set operation.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 580
+/**
+ * Strata estimator.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 581
+
+/**
+ * Invertible bloom filter.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 582
+
+/**
+ * Actual set elements.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 583
+
+/**
+ * Requests for the elements with the given hashes.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 584
+
+/**
+ * Operation is done.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 585
+
+
+
/*******************************************************************************
* TESTBED LOGGER message types
******************************************************************************/
Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h 2013-04-23 15:55:13 UTC (rev
26980)
+++ gnunet/src/include/gnunet_set_service.h 2013-04-24 11:48:31 UTC (rev
26981)
@@ -63,11 +63,6 @@
/**
- * Opaque handle to a listen operation.
- */
-struct GNUNET_SET_ListenHandle;
-
-/**
* The operation that a set set supports.
*/
enum GNUNET_SET_OperationType
@@ -135,10 +130,12 @@
* Number of bytes in the buffer pointed to by data.
*/
uint16_t size;
+
/**
* Application-specific element type.
*/
uint16_t type;
+
/**
* Actual data of the element
*/
@@ -153,6 +150,7 @@
*/
typedef void (*GNUNET_SET_Continuation) (void *cls);
+
/**
* Callback for set operation results. Called for each element
* in the result set.
@@ -161,10 +159,9 @@
* @param element a result element, only valid if status is
GNUNET_SET_STATUS_OK
* @param status see enum GNUNET_SET_Status
*/
-typedef void
-(*GNUNET_SET_ResultIterator) (void *cls,
- struct GNUNET_SET_Element *element,
- enum GNUNET_SET_Status status);
+typedef void (*GNUNET_SET_ResultIterator) (void *cls,
+ struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status);
/**
@@ -201,7 +198,7 @@
* @return a handle to the set
*/
struct GNUNET_SET_Handle *
-GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
enum GNUNET_SET_OperationType op);
@@ -270,8 +267,6 @@
void *result_cls);
-
-
/**
* Wait for set operation requests for the given application id
*
@@ -285,7 +280,7 @@
* @return a handle that can be used to cancel the listen operation
*/
struct GNUNET_SET_ListenHandle *
-GNUNET_SET_listen (struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
enum GNUNET_SET_OperationType op_type,
const struct GNUNET_HashCode *app_id,
GNUNET_SET_ListenCallback listen_cb,
Modified: gnunet/src/set/Makefile.am
===================================================================
--- gnunet/src/set/Makefile.am 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/Makefile.am 2013-04-24 11:48:31 UTC (rev 26981)
@@ -29,6 +29,7 @@
gnunet_set_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/set/libgnunetset.la \
+ $(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/testbed/libgnunettestbed.la \
$(GN_LIBINTL)
gnunet_set_DEPENDENCIES = \
@@ -36,6 +37,8 @@
gnunet_service_set_SOURCES = \
gnunet-service-set.c \
+ gnunet-service-set_union.c \
+ mq.c \
ibf.c \
strata_estimator.c
gnunet_service_set_LDADD = \
@@ -44,12 +47,16 @@
$(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/mesh/libgnunetmesh.la \
$(GN_LIBINTL)
+# hack for mq.c, see automake Objects ‘created with both libtool and without’
+# remove once GNUNET_MQ is in util/
+gnunet_service_set_CFLAGS = $(AM_CFLAGS)
libgnunetset_la_SOURCES = \
set_api.c \
mq.c
libgnunetset_la_LIBADD = \
$(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/stream/libgnunetstream.la \
$(LTLIBINTL)
libgnunetset_la_LDFLAGS = \
$(GN_LIB_LDFLAGS)
@@ -67,6 +74,8 @@
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/testing/libgnunettesting.la \
$(top_builddir)/src/set/libgnunetset.la
+test_set_api_DEPENDENCIES = \
+ libgnunetset.la
EXTRA_DIST = \
test_set.conf
Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/gnunet-service-set.c 2013-04-24 11:48:31 UTC (rev 26981)
@@ -24,34 +24,197 @@
* @author Florian Dold
*/
-#include "platform.h"
-#include "gnunet_common.h"
-#include "gnunet_protocols.h"
-#include "gnunet_applications.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_core_service.h"
-#include "gnunet_stream_lib.h"
-struct Set
-{
+#include "gnunet-service-set.h"
+#include "set_protocol.h"
-};
-struct Listener
-{
+/**
+ * Configuration of our local peer.
+ */
+const struct GNUNET_CONFIGURATION_Handle *configuration;
-};
+/**
+ * Socket listening for other peers via stream.
+ */
+static struct GNUNET_STREAM_ListenSocket *stream_listen_socket;
-/*
-static struct Listener *sets_head;
-static struct Listener *sets_tail;
+/**
+ * Sets are held in a doubly linked list.
+ */
+static struct Set *sets_head;
+/**
+ * Sets are held in a doubly linked list.
+ */
+static struct Set *sets_tail;
+
+/**
+ * Listeners are held in a doubly linked list.
+ */
static struct Listener *listeners_head;
+
+/**
+ * Listeners are held in a doubly linked list.
+ */
static struct Listener *listeners_tail;
-*/
+/**
+ * Incoming sockets from remote peers are
+ * held in a doubly linked list.
+ */
+static struct Incoming *incoming_head;
/**
+ * Incoming sockets from remote peers are
+ * held in a doubly linked list.
+ */
+static struct Incoming *incoming_tail;
+
+/**
+ * Counter for allocating unique request IDs for clients.
+ */
+static uint32_t request_id = 1;
+
+
+/**
+ * Disconnect a client and free all resources
+ * that the client allocated (e.g. Sets or Listeners)
+ *
+ * @param client the client to disconnect
+ */
+void
+client_disconnect (struct GNUNET_SERVER_Client *client)
+{
+ /* FIXME: clean up any data structures belonging to the client */
+ GNUNET_SERVER_client_disconnect (client);
+}
+
+
+/**
+ * Get set that is owned by the client, if any.
+ *
+ * @param client client to look for
+ * @return set that the client owns, NULL if the client
+ * does not own a set
+ */
+static struct Set *
+get_set (struct GNUNET_SERVER_Client *client)
+{
+ struct Set *set;
+ for (set = sets_head; NULL != set; set = set->next)
+ if (set->client == client)
+ return set;
+ return NULL;
+}
+
+
+/**
+ * Get the listener associated to a client, if any.
+ *
+ * @param client the client
+ * @return listener associated with the client, NULL
+ * if there isn't any
+ */
+static struct Listener *
+get_listener (struct GNUNET_SERVER_Client *client)
+{
+ struct Listener *listener;
+ for (listener = listeners_head; NULL != listener; listener = listener->next)
+ if (listener->client == client)
+ return listener;
+ return NULL;
+}
+
+/**
+ * Get the incoming socket associated with the given id
+ *
+ * @param id id to look for
+ * @return the incoming socket associated with the id,
+ * or NULL if there is none
+ */
+static struct Incoming *
+get_incoming (uint32_t id)
+{
+ struct Incoming *incoming;
+ for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
+ if (incoming->request_id == id)
+ return incoming;
+ return NULL;
+}
+
+static void
+destroy_incoming (struct Incoming *incoming)
+{
+ if (NULL != incoming->mq)
+ {
+ GNUNET_MQ_destroy (incoming->mq);
+ incoming->mq = NULL;
+ }
+ if (NULL != incoming->socket)
+ {
+ GNUNET_STREAM_close (incoming->socket);
+ incoming->socket = NULL;
+ }
+ GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
+ GNUNET_free (incoming);
+}
+
+
+/**
+ * Handle a request for a set operation for
+ * another peer.
+ *
+ * @param cls the incoming socket
+ * @param mh the message
+ */
+static void
+handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
+{
+ struct Incoming *incoming = cls;
+ const struct OperationRequestMessage *msg = (const struct
OperationRequestMessage *) mh;
+ struct GNUNET_MQ_Message *mqm;
+ struct RequestMessage *cmsg;
+ struct Listener *listener;
+ const struct GNUNET_MessageHeader *context_msg;
+
+ if (ntohs (mh->size) < sizeof *msg)
+ {
+ GNUNET_break (0);
+ destroy_incoming (incoming);
+ return;
+ }
+ else if (ntohs (mh->size) == sizeof *msg)
+ {
+ context_msg = NULL;
+ }
+ else
+ {
+ context_msg = &msg[1].header;
+ if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size))
+ {
+ /* size of context message is invalid */
+ GNUNET_break (0);
+ destroy_incoming (incoming);
+ return;
+ }
+ }
+
+ for (listener = listeners_head; listener != NULL; listener = listener->next)
+ {
+ if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
+ (htons (msg->operation) != listener->operation) )
+ continue;
+ mqm = GNUNET_MQ_msg_concat (cmsg, context_msg,
GNUNET_MESSAGE_TYPE_SET_REQUEST);
+ incoming->request_id = request_id++;
+ cmsg->request_id = htonl (incoming->request_id);
+ GNUNET_MQ_send (listener->client_mq, mqm);
+ return;
+ }
+}
+
+
+/**
* Called when a client wants to create a new set.
*
* @param cls unused
@@ -63,11 +226,307 @@
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *m)
{
+ struct SetCreateMessage *msg = (struct SetCreateMessage *) m;
+ struct Set *set;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new set created\n");
+
+ if (NULL != get_set (client))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+
+ set = GNUNET_new (struct Set);
+
+ switch (ntohs (msg->operation))
+ {
+ case GNUNET_SET_OPERATION_INTERSECTION:
+ /* FIXME: cfuchs */
+ GNUNET_assert (0);
+ break;
+ case GNUNET_SET_OPERATION_UNION:
+ set = union_set_create ();
+ break;
+ default:
+ GNUNET_free (set);
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+
+ set->client = client;
+ set->client_mq = GNUNET_MQ_queue_for_server_client (client);
+ GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Called when a client wants to create a new set.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_listen (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct ListenMessage *msg = (struct ListenMessage *) m;
+ struct Listener *listener;
+ if (NULL != get_listener (client))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+
+ listener = GNUNET_new (struct Listener);
+ listener->app_id = msg->app_id;
+ listener->operation = msg->operation;
+ GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
}
/**
+ * Called when a client wants to create a new set.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_add (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Set *set;
+
+ set = get_set (client);
+ if (NULL == set)
+ {
+ GNUNET_break (0);
+ client_disconnect (client);
+ return;
+ }
+ switch (set->operation)
+ {
+ case GNUNET_SET_OPERATION_UNION:
+ union_add (set, (struct ElementMessage *) m);
+ break;
+ case GNUNET_SET_OPERATION_INTERSECTION:
+ /* FIXME: cfuchs */
+ break;
+ default:
+ GNUNET_assert (0);
+ break;
+ }
+}
+
+
+/**
+ * Called when a client wants to evaluate a set operation with another peer.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_evaluate (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Set *set;
+ struct EvaluateMessage *msg = (struct EvaluateMessage *) m;
+ struct EvaluateOperation *eo;
+
+ set = get_set (client);
+
+ if (NULL == set)
+ {
+ GNUNET_break (0);
+ client_disconnect (client);
+ return;
+ }
+
+ eo = GNUNET_new (struct EvaluateOperation);
+ eo->peer = msg->peer;
+ eo->app_id = msg->app_id;
+ eo->request_id = msg->request_id;
+ eo->context_msg = GNUNET_copy_message (&msg[1].header);
+ eo->set = set;
+
+ switch (set->operation)
+ {
+ case GNUNET_SET_OPERATION_INTERSECTION:
+ /* FIXME: cfuchs */
+ break;
+ case GNUNET_SET_OPERATION_UNION:
+ union_evaluate (eo);
+ break;
+ default:
+ GNUNET_assert (0);
+ break;
+ }
+}
+
+
+/**
+ * Handle a cancel request from a client.
+ *
+ * @param cls unused
+ * @param client the client
+ * @param m the cancel message
+ */
+static void
+handle_client_cancel (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ /* FIXME: implement */
+}
+
+
+/**
+ * Handle an ack from a client.
+ *
+ * @param cls unused
+ * @param client the client
+ * @param m the message
+ */
+static void
+handle_client_ack (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ /* FIXME: implement */
+}
+
+
+/**
+ * Handle a request from the client to accept
+ * a set operation.
+ *
+ * @param cls unused
+ * @param client the client
+ * @param m the message
+ */
+static void
+handle_client_accept (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct AcceptMessage *msg = (struct AcceptMessage *) m;
+ struct Set *set;
+ struct Incoming *incoming;
+ struct EvaluateOperation *eo;
+
+ set = get_set (client);
+
+ if (NULL == set)
+ {
+ GNUNET_break (0);
+ client_disconnect (client);
+ return;
+ }
+
+ incoming = get_incoming (ntohl (msg->request_id));
+
+ if ( (NULL == incoming) ||
+ (incoming->operation != set->operation) )
+ {
+ GNUNET_break (0);
+ client_disconnect (client);
+ return;
+ }
+
+ eo = GNUNET_new (struct EvaluateOperation);
+ eo->peer = incoming->peer;
+ eo->app_id = incoming->app_id;
+ eo->request_id = msg->request_id;
+ eo->set = set;
+
+ switch (set->operation)
+ {
+ case GNUNET_SET_OPERATION_INTERSECTION:
+ /* FIXME: cfuchs*/
+ GNUNET_assert (0);
+ break;
+ case GNUNET_SET_OPERATION_UNION:
+ union_accept (eo, incoming);
+ break;
+ default:
+ GNUNET_assert (0);
+ break;
+ }
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other
peers
+ * or upon binding error which happen when the app_port given in
+ * GNUNET_STREAM_listen() is already taken.
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream; NULL on binding error
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us; NULL on binding error
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+stream_listen_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ struct Incoming *incoming;
+ static const struct GNUNET_MQ_Handler handlers[] = {
+ {handle_p2p_operation_request,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST},
+ GNUNET_MQ_HANDLERS_END
+ };
+
+ if (NULL == socket)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+
+ incoming = GNUNET_new (struct Incoming);
+ incoming->peer = *initiator;
+ incoming->socket = socket;
+ incoming->mq = GNUNET_MQ_queue_for_stream_socket (incoming->socket,
handlers, incoming);
+ /* FIXME: timeout for peers that only connect but don't send anything */
+ GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Called to clean up, after a shutdown has been requested.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (NULL != stream_listen_socket)
+ {
+ GNUNET_STREAM_listen_close (stream_listen_socket);
+ stream_listen_socket = NULL;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
+}
+
+
+/**
* Function called by the service's run
* method to run service-specific setup code.
*
@@ -77,15 +536,24 @@
*/
static void
run (void *cls, struct GNUNET_SERVER_Handle *server, const struct
GNUNET_CONFIGURATION_Handle *cfg)
-
{
static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
{handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
+ {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
+ {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
+ {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0},
+ {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
+ {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
+ {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
{NULL, NULL, 0, 0}
};
-
+ configuration = cfg;
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
NULL);
GNUNET_SERVER_add_handlers (server, server_handlers);
+ stream_listen_socket = GNUNET_STREAM_listen (cfg,
GNUNET_APPLICATION_TYPE_SET,
+ &stream_listen_cb, NULL,
+ GNUNET_STREAM_OPTION_END);
}
Modified: gnunet/src/set/gnunet-set.c
===================================================================
--- gnunet/src/set/gnunet-set.c 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/gnunet-set.c 2013-04-24 11:48:31 UTC (rev 26981)
@@ -27,8 +27,25 @@
#include "gnunet_common.h"
#include "gnunet_util_lib.h"
#include "gnunet_testbed_service.h"
+#include "gnunet_set_service.h"
+static struct GNUNET_HashCode app_id;
+static struct GNUNET_SET_Handle *set1;
+static struct GNUNET_SET_Handle *set2;
+static struct GNUNET_SET_ListenHandle *listen_handle;
+
+
+static void
+listen_cb (void *cls,
+ const struct GNUNET_PeerIdentity *other_peer,
+ const struct GNUNET_MessageHeader *context_msg,
+ struct GNUNET_SET_Request *request)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
+}
+
+
/**
* Main function that will be run.
*
@@ -40,10 +57,15 @@
static void
run (void *cls, char *const *args,
const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *
- cfg)
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
- /* FIXME */
+ static const char* app_str = "gnunet-set";
+ GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
+
+ set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, &app_id,
+ listen_cb, NULL);
}
@@ -56,7 +78,7 @@
};
GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set",
"help",
- options, &run, NULL, GNUNET_YES);
+ options, &run, NULL, GNUNET_NO);
return 0;
}
Modified: gnunet/src/set/mq.c
===================================================================
--- gnunet/src/set/mq.c 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/mq.c 2013-04-24 11:48:31 UTC (rev 26981)
@@ -26,13 +26,181 @@
#include "mq.h"
+/**
+ * Signature of functions implementing the
+ * sending part of a message queue
+ *
+ * @param q the message queue
+ * @param m the message
+ */
+typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct
GNUNET_MQ_Message *m);
+
+typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
+
+
+/**
+ * Collection of the state necessary to read and write gnunet messages
+ * to a stream socket. Should be used as closure for stream_data_processor.
+ */
+struct MessageStreamState
+{
+ struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+ struct MessageQueue *mq;
+ struct GNUNET_STREAM_Socket *socket;
+ struct GNUNET_STREAM_ReadHandle *rh;
+ struct GNUNET_STREAM_WriteHandle *wh;
+};
+
+
+struct ServerClientSocketState
+{
+ struct GNUNET_SERVER_Client *client;
+ struct GNUNET_SERVER_TransmitHandle* th;
+};
+
+
+struct ClientConnectionState
+{
+ struct GNUNET_CLIENT_Connection *connection;
+ struct GNUNET_CLIENT_TransmitHandle *th;
+};
+
+
+struct GNUNET_MQ_MessageQueue
+{
+ /**
+ * Handlers array, or NULL if the queue should not receive messages
+ */
+ const struct GNUNET_MQ_Handler *handlers;
+
+ /**
+ * Closure for the handler callbacks
+ */
+ void *handlers_cls;
+
+ /**
+ * Actual implementation of message sending,
+ * called when a message is added
+ */
+ SendImpl send_impl;
+
+ /**
+ * Implementation-dependent queue destruction function
+ */
+ DestroyImpl destroy_impl;
+
+ /**
+ * Implementation-specific state
+ */
+ void *impl_state;
+
+ /**
+ * Linked list of messages pending to be sent
+ */
+ struct GNUNET_MQ_Message *msg_head;
+
+ /**
+ * Linked list of messages pending to be sent
+ */
+ struct GNUNET_MQ_Message *msg_tail;
+
+ /**
+ * Message that is currently scheduled to be
+ * sent. Not the head of the message queue, as the implementation
+ * needs to know if sending has been already scheduled or not.
+ */
+ struct GNUNET_MQ_Message *current_msg;
+
+ /**
+ * Map of associations, lazily allocated
+ */
+ struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+
+ /**
+ * Next id that should be used for the assoc_map,
+ * initialized lazily to a random value together with
+ * assoc_map
+ */
+ uint32_t assoc_id;
+};
+
+
struct GNUNET_MQ_Message
{
+ /**
+ * Messages are stored in a linked list
+ */
+ struct GNUNET_MQ_Message *next;
+
+ /**
+ * Messages are stored in a linked list
+ */
+ struct GNUNET_MQ_Message *prev;
+
+ /**
+ * Actual allocated message header,
+ * usually points to the end of the containing GNUNET_MQ_Message
+ */
struct GNUNET_MessageHeader *mh;
+
+ /**
+ * Queue the message is queued in, NULL if message is not queued.
+ */
+ struct GNUNET_MQ_MessageQueue *parent_queue;
+
+ /**
+ * Called after the message was sent irrevokably
+ */
+ GNUNET_MQ_NotifyCallback sent_cb;
+
+ /**
+ * Closure for send_cb
+ */
+ void *sent_cls;
};
+/**
+ * Call the right callback for a message received
+ * by a queue
+ */
+static void
+dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct
GNUNET_MessageHeader *mh)
+{
+ const struct GNUNET_MQ_Handler *handler;
+
+ handler = mq->handlers;
+ if (NULL == handler)
+ return;
+ for (; NULL != handler->cb; handler++)
+ if (handler->type == ntohs (mh->type))
+ handler->cb (mq->handlers_cls, mh);
+}
+
+
+void
+GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
+{
+ GNUNET_assert (NULL == mqm->parent_queue);
+ GNUNET_free (mqm);
+}
+
+
+/**
+ * Send a message with the give message queue.
+ * May only be called once per message.
+ *
+ * @param mq message queue
+ * @param mqm the message to send.
+ */
+void
+GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message
*mqm)
+{
+ mq->send_impl (mq, mqm);
+}
+
+
struct GNUNET_MQ_Message *
GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t
type)
{
@@ -40,8 +208,422 @@
mqm = GNUNET_malloc (sizeof *mqm + size);
mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
mqm->mh->size = htons (size);
- mqm->mh->type = htons(type);
+ mqm->mh->type = htons (type);
if (NULL != mhp)
*mhp = mqm->mh;
return mqm;
}
+
+
+struct GNUNET_MQ_Message *
+GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size,
struct GNUNET_MessageHeader *m, uint16_t type)
+{
+ struct GNUNET_MQ_Message *mq;
+
+ GNUNET_assert (NULL != mhp);
+ if (NULL == m)
+ return GNUNET_MQ_msg_ (mhp, base_size, type);
+ GNUNET_assert (ntohs (m->size >= sizeof (struct GNUNET_MessageHeader)));
+ /* check for overflow */
+ if (base_size + ntohs (m->size) <= base_size)
+ return NULL;
+ mq = GNUNET_MQ_msg_ (mhp, base_size + ntohs (m->size), type);
+ memcpy (((void *) *mhp) + base_size, m, ntohs (m->size));
+ return mq;
+}
+
+
+/**
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
+ *
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ * GNUNET_STREAM_OK if writing to stream was completed successfully;
+ * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ * (this doesn't mean that the data is never sent, the receiver may
+ * have read the data but its ACKs may have been lost);
+ * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ * be processed.
+ * @param size the number of bytes written
+ */
+static void
+stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+{
+ struct GNUNET_MQ_MessageQueue *mq = cls;
+ struct MessageStreamState *mss = (struct MessageStreamState *)
mq->impl_state;
+ struct GNUNET_MQ_Message *mqm;
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+
+ /* call cb for message we finished sending */
+ mqm = mq->current_msg;
+ if (NULL != mqm)
+ {
+ if (NULL != mqm->sent_cb)
+ mqm->sent_cb (mqm->sent_cls);
+ GNUNET_free (mqm);
+ }
+
+ mss->wh = NULL;
+
+ mqm = mq->msg_head;
+ mq->current_msg = mqm;
+ if (NULL == mqm)
+ return;
+ GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
+ mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
stream_write_queued, cls);
+ GNUNET_assert (NULL != mss->wh);
+}
+
+
+static void
+stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct
GNUNET_MQ_Message *mqm)
+{
+ if (NULL != mq->current_msg)
+ {
+ GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
+ return;
+ }
+ stream_write_queued (mq, GNUNET_STREAM_OK, 0);
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received by the tokenizer.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+stream_mst_callback (void *cls, void *client, const struct
GNUNET_MessageHeader *message)
+{
+ struct GNUNET_MQ_MessageQueue *mq = cls;
+
+ GNUNET_assert (NULL != message);
+ dispatch_message (mq, message);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on
timeout
+ * @return number of bytes of processed from 'data' (any data remaining should
be
+ * given to the next time the read processor is called).
+ */
+static size_t
+stream_data_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *data,
+ size_t size)
+{
+ struct GNUNET_MQ_MessageQueue *mq = cls;
+ struct MessageStreamState *mss;
+ int ret;
+ mss = (struct MessageStreamState *) mq->impl_state;
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO,
GNUNET_NO);
+ GNUNET_assert (GNUNET_OK == ret);
+ /* we always read all data */
+ return size;
+}
+
+
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_MQ_Handler *handlers,
+ void *cls)
+{
+ struct GNUNET_MQ_MessageQueue *mq;
+ struct MessageStreamState *mss;
+
+ mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+ mss = GNUNET_new (struct MessageStreamState);
+ mss->socket = socket;
+ mq->impl_state = mss;
+ mq->send_impl = stream_socket_send_impl;
+ mq->handlers = handlers;
+ mq->handlers_cls = cls;
+ if (NULL != handlers)
+ {
+ mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
+ mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
+ stream_data_processor, mq);
+ }
+ return mq;
+}
+
+
+/**
+ * Transmit a queued message to the session's client.
+ *
+ * @param cls consensus session
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_queued (void *cls, size_t size,
+ void *buf)
+{
+ struct GNUNET_MQ_MessageQueue *mq = cls;
+ struct GNUNET_MQ_Message *mqm = mq->current_msg;
+ struct ServerClientSocketState *state = mq->impl_state;
+ size_t msg_size;
+
+ mq->current_msg = NULL;
+ GNUNET_assert (NULL != mqm);
+ GNUNET_assert (NULL != buf);
+ msg_size = ntohs (mqm->mh->size);
+ GNUNET_assert (size >= msg_size);
+ memcpy (buf, mqm->mh, msg_size);
+ GNUNET_free (mqm);
+ state->th = NULL;
+ if (NULL != mq->msg_head)
+ {
+ mq->current_msg = mq->msg_head;
+ GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
+ state->th =
+ GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_queued, mq);
+ }
+ return msg_size;
+}
+
+
+static void
+server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct
GNUNET_MQ_Message *mqm)
+{
+ struct ServerClientSocketState *state = mq->impl_state;
+ int msize;
+
+ GNUNET_assert (NULL != state);
+
+ if (NULL != state->th)
+ {
+ GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
+ return;
+ }
+ GNUNET_assert (NULL == mq->current_msg);
+ msize = ntohs (mq->msg_head->mh->size);
+ mq->current_msg = mqm;
+ state->th =
+ GNUNET_SERVER_notify_transmit_ready (state->client, msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_queued, mq);
+}
+
+
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
+{
+ struct GNUNET_MQ_MessageQueue *mq;
+ struct ServerClientSocketState *scss;
+
+ mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+ scss = GNUNET_new (struct ServerClientSocketState);
+ mq->impl_state = scss;
+ mq->send_impl = server_client_send_impl;
+ return mq;
+}
+
+
+/**
+ * Transmit a queued message to the session's client.
+ *
+ * @param cls consensus session
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+connection_client_transmit_queued (void *cls, size_t size,
+ void *buf)
+{
+ struct GNUNET_MQ_MessageQueue *mq = cls;
+ struct GNUNET_MQ_Message *mqm = mq->current_msg;
+ struct ClientConnectionState *state = mq->impl_state;
+ size_t msg_size;
+
+ mq->current_msg = NULL;
+ GNUNET_assert (NULL != mqm);
+ GNUNET_assert (NULL != buf);
+ msg_size = ntohs (mqm->mh->size);
+ GNUNET_assert (size >= msg_size);
+ memcpy (buf, mqm->mh, msg_size);
+ GNUNET_free (mqm);
+ state->th = NULL;
+ if (NULL != mq->msg_head)
+ {
+ mq->current_msg = mq->msg_head;
+ GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
+ state->th =
+ GNUNET_CLIENT_notify_transmit_ready (state->connection, msg_size,
+ GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_NO,
+
&connection_client_transmit_queued, mq);
+ }
+ return msg_size;
+}
+
+
+static void
+connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct
GNUNET_MQ_Message *mqm)
+{
+ struct ClientConnectionState *state = mq->impl_state;
+ int msize;
+
+ GNUNET_assert (NULL != state);
+
+ if (NULL != state->th)
+ {
+ GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
+ return;
+ }
+ GNUNET_assert (NULL == mq->current_msg);
+ mq->current_msg = mqm;
+ msize = ntohs (mqm->mh->size);
+ state->th =
+ GNUNET_CLIENT_notify_transmit_ready (state->connection, msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_NO,
+ &connection_client_transmit_queued,
mq);
+}
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+handle_client_message (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_MQ_MessageQueue *mq = cls;
+
+ GNUNET_assert (NULL != msg);
+
+ dispatch_message (mq, msg);
+}
+
+
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection
*connection,
+ const struct GNUNET_MQ_Handler
*handlers,
+ void *cls)
+{
+ struct GNUNET_MQ_MessageQueue *mq;
+ struct ClientConnectionState *state;
+
+ GNUNET_assert (NULL != connection);
+
+ mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+ mq->handlers = handlers;
+ mq->handlers_cls = cls;
+ state = GNUNET_new (struct ClientConnectionState);
+ state->connection = connection;
+ mq->impl_state = state;
+ mq->send_impl = connection_client_send_impl;
+
+ if (NULL != handlers)
+ {
+ GNUNET_CLIENT_receive (connection, handle_client_message, mq,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ }
+
+ return mq;
+}
+
+
+
+void
+GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
+ const struct GNUNET_MQ_Handler *new_handlers,
+ void *cls)
+{
+ mq->handlers = new_handlers;
+ mq->handlers_cls = cls;
+}
+
+
+
+/**
+ * Associate the assoc_data in mq with a unique request id.
+ *
+ * @param mq message queue, id will be unique for the queue
+ * @param mqm message to associate
+ * @param data to associate
+ */
+uint32_t
+GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
+ struct GNUNET_MQ_Message *mqm,
+ void *assoc_data)
+{
+ uint32_t id;
+
+ if (NULL == mq->assoc_map)
+ mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
+ id = mq->assoc_id++;
+ GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ return id;
+}
+
+
+
+void *
+GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
+{
+ if (NULL == mq->assoc_map)
+ return NULL;
+ return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
+}
+
+
+void *
+GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
+{
+ void *val;
+
+ if (NULL == mq->assoc_map)
+ return NULL;
+ val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
+ GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
+ return val;
+}
+
+
+void
+GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
+ GNUNET_MQ_NotifyCallback cb,
+ void *cls)
+{
+ mqm->sent_cb = cb;
+ mqm->sent_cls = cls;
+}
+
+
+void
+GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
+{
+ /* FIXME: destroy all pending messages in the queue */
+ GNUNET_free (mq);
+}
+
Modified: gnunet/src/set/mq.h
===================================================================
--- gnunet/src/set/mq.h 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/mq.h 2013-04-24 11:48:31 UTC (rev 26981)
@@ -30,23 +30,109 @@
#include "gnunet_common.h"
#include "gnunet_util_lib.h"
#include "gnunet_connection_lib.h"
+#include "gnunet_server_lib.h"
+#include "gnunet_stream_lib.h"
-#define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_(((void)
mvar->header, (struct GNUNET_MessageHeader**) &mvar), (esize) + sizeof *mvar,
type)
+/**
+ * Allocate a GNUNET_MQ_Message, with extra space allocated after the space
needed
+ * by the message struct.
+ * The allocated message will already have the type and size field set.
+ *
+ * @param mvar variable to store the allocated message in;
+ * must have a header field
+ * @param esize extra space to allocate after the message
+ * @param type type of the message
+ * @return the MQ message
+ */
+#define GNUNET_MQ_msg_extra(mvar, esize, type)
GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**)
&(mvar)), (esize) + sizeof *(mvar), (type))
+/**
+ * Allocate a GNUNET_MQ_Message.
+ * The allocated message will already have the type and size field set.
+ *
+ * @param mvar variable to store the allocated message in;
+ * must have a header field
+ * @param type type of the message
+ * @return the MQ message
+ */
#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
-#define GNUNET_MQ_msg_raw(type) GNUNET_MQ_msg_ (NULL, sizeof (struct
GNUNET_MessageHeader), type)
+/**
+ * Allocate a GNUNET_MQ_Message, and concatenate another message
+ * after the space needed by the message struct.
+ *
+ * @param mvar variable to store the allocated message in;
+ * must have a header field
+ * @param mc message to concatenate, can be NULL
+ * @param type type of the message
+ * @return the MQ message, NULL if mc is to large to be concatenated
+ */
+#define GNUNET_MQ_msg_concat(mvar, mc, t) GNUNET_MQ_msg_concat_(((void)
mvar->header, (struct GNUNET_MessageHeader **) &(mvar)), \
+ sizeof *mvar, (struct GNUNET_MessageHeader *) mc, t)
+
+/**
+ * Allocate a GNUNET_MQ_Message, where the message only consists of a header.
+ * The allocated message will already have the type and size field set.
+ *
+ * @param mvar variable to store the allocated message in;
+ * must have a header field
+ * @param type type of the message
+ */
+#define GNUNET_MQ_msg_header(type) GNUNET_MQ_msg_ (NULL, sizeof (struct
GNUNET_MessageHeader), type)
+
+
+/**
+ * Allocate a GNUNET_MQ_Message, where the message only consists of a header
and extra space.
+ * The allocated message will already have the type and size field set.
+ *
+ * @param mvar variable to store the allocated message in;
+ * must have a header field
+ * @param esize extra space to allocate after the message header
+ * @param type type of the message
+ */
+#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh,
sizeof (struct GNUNET_MessageHeader), type)
+
+
+/**
+ * End-marker for the handlers array
+ */
#define GNUNET_MQ_HANDLERS_END {NULL, 0}
+/**
+ * Opaque handle to a message queue
+ */
struct GNUNET_MQ_MessageQueue;
+/**
+ * Opaque handle to an allocated message
+ */
struct GNUNET_MQ_Message;
+/**
+ * Called when a message has been received.
+ *
+ * @param cls closure
+ * @param msg the received message
+ */
+typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct
GNUNET_MessageHeader *msg);
+
+
+/**
+ * Message handler for a specific message type.
+ */
struct GNUNET_MQ_Handler
{
- void *cb;
+ /**
+ * Callback, called every time a new message of
+ * the specified type has been receied.
+ */
+ GNUNET_MQ_MessageCallback cb;
+
+ /**
+ * Type of the message we are interested in
+ */
uint16_t type;
};
@@ -63,12 +149,34 @@
* @param mhp message header to store the allocated message header in, can be
NULL
* @param size size of the message to allocate
* @param type type of the message, will be set in the allocated message
- * @param return the allocated MQ message
+ * @return the allocated MQ message
*/
struct GNUNET_MQ_Message *
GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t
type);
+
/**
+ * Create a new message for MQ, by concatenating another message
+ * after a message of the specified type.
+ *
+ * @retrn the allocated MQ message
+ */
+struct GNUNET_MQ_Message *
+GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size,
struct GNUNET_MessageHeader *m, uint16_t type);
+
+
+/**
+ * Discard the message queue message, free all
+ * allocated resources. Must be called in the event
+ * that a message is created but should not actually be sent.
+ *
+ * @param mqm the message to discard
+ */
+void
+GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm);
+
+
+/**
* Send a message with the give message queue.
* May only be called once per message.
*
@@ -78,6 +186,7 @@
void
GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message
*mqm);
+
/**
* Cancel sending the message. Message must have been sent with GNUNET_MQ_send
before.
* May not be called after the notify sent callback has been called
@@ -100,33 +209,106 @@
struct GNUNET_MQ_Message *mqm,
void *assoc_data);
+/**
+ * Get the data associated with a request id in a queue
+ *
+ * @param mq the message queue with the association
+ * @param request_id the request id we are interested in
+ * @return the associated data
+ */
void *
GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
+
+/**
+ * Remove the association for a request id
+ *
+ * @param mq the message queue with the association
+ * @param request_id the request id we want to remove
+ * @return the associated data
+ */
void *
GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t
request_id);
+/**
+ * Create a message queue for a GNUNET_CLIENT_Connection.
+ * If handlers are specfied, receive messages from the connection.
+ *
+ * @param connection the client connection
+ * @param handlers handlers for receiving messages
+ * @param cls closure for the handlers
+ * @return the message queue
+ */
struct GNUNET_MQ_MessageQueue *
GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection
*connection,
const struct GNUNET_MQ_Handler
*handlers,
void *cls);
+/**
+ * Create a message queue for a GNUNET_STREAM_Socket.
+ *
+ * @param param client the client
+ * @return the message queue
+ */
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client);
+
+
+
+/**
+ * Create a message queue for a GNUNET_STREAM_Socket.
+ * If handlers are specfied, receive messages from the stream socket.
+ *
+ * @param socket the stream socket
+ * @param handlers handlers for receiving messages
+ * @param cls closure for the handlers
+ * @return the message queue
+ */
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_MQ_Handler *handlers,
+ void *cls);
+
+/**
+ * Replace the handlers of a message queue with new handlers.
+ * Takes effect immediately, even for messages that already have been
received, but for
+ * with the handler has not been called.
+ *
+ * @param mq message queue
+ * @param new_handlers new handlers
+ * @param cls new closure for the handlers
+ */
void
+GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
+ const struct GNUNET_MQ_Handler *new_handlers,
+ void *cls);
+
+
+
+/**
+ * Call a callback once the message has been sent, that is, the message
+ * can not be canceled anymore.
+ * There can be only one notify sent callback per message.
+ *
+ * @param mqm message to call the notify callback for
+ * @param cb the notify callback
+ * @param cls closure for the callback
+ */
+void
GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
GNUNET_MQ_NotifyCallback cb,
void *cls);
+/**
+ * Destroy the message queue.
+ *
+ * @param mq message queue to destroy
+ */
void
-GNUNET_MQ_notify_timeout (struct GNUNET_MQ_Message *mqm,
- GNUNET_MQ_NotifyCallback cb,
- void *cls);
-
-
-void
GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq);
#endif
Modified: gnunet/src/set/set.conf.in
===================================================================
--- gnunet/src/set/set.conf.in 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/set.conf.in 2013-04-24 11:48:31 UTC (rev 26981)
@@ -0,0 +1,11 @@
+[set]
+AUTOSTART = NO
+# PORT = 2106
+HOSTNAME = localhost
+HOME = $SERVICEHOME
+BINARY = gnunet-service-set
+ACCEPT_FROM = 127.0.0.1;
+ACCEPT_FROM6 = ::1;
+UNIXPATH = /tmp/gnunet-service-set.sock
+UNIX_MATCH_UID = YES
+UNIX_MATCH_GID = YES
Modified: gnunet/src/set/set.h
===================================================================
--- gnunet/src/set/set.h 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/set.h 2013-04-24 11:48:31 UTC (rev 26981)
@@ -20,12 +20,13 @@
/**
* @author Florian Dold
- * @file consensus/consensus.h
- * @brief
+ * @file set/set.h
+ * @brief messages used for the set api
*/
#ifndef SET_H
#define SET_H
+#include "platform.h"
#include "gnunet_common.h"
@@ -68,12 +69,6 @@
* Operation type, values of enum GNUNET_SET_OperationType
*/
uint16_t operation GNUNET_PACKED;
-
- /**
- * Operation type, values of enum GNUNET_SET_OperationType
- */
- uint16_t op GNUNET_PACKED;
-
};
@@ -88,6 +83,12 @@
* request id of the request we want to accept
*/
uint32_t request_id GNUNET_PACKED;
+
+ /**
+ * Zero if the client has rejected the request,
+ * non-zero if it has accepted it
+ */
+ uint32_t accepted GNUNET_PACKED;
};
@@ -119,8 +120,14 @@
*/
struct GNUNET_MessageHeader header;
- struct GNUNET_PeerIdentity other_peer;
+ /**
+ * Peer to evaluate the operation with
+ */
+ struct GNUNET_PeerIdentity peer;
+ /**
+ * Application id
+ */
struct GNUNET_HashCode app_id;
/**
@@ -144,8 +151,15 @@
*/
uint32_t request_id GNUNET_PACKED;
+ /**
+ * Was the evaluation successful?
+ */
uint16_t result_status GNUNET_PACKED;
+ /**
+ * Type of the element attachted to the message,
+ * if any.
+ */
uint16_t element_type GNUNET_PACKED;
/* rest: the actual element */
@@ -179,6 +193,7 @@
uint32_t request_id GNUNET_PACKED;
};
+
GNUNET_NETWORK_STRUCT_END
#endif
Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/set_api.c 2013-04-24 11:48:31 UTC (rev 26981)
@@ -93,7 +93,7 @@
if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
{
struct GNUNET_MQ_Message *mqm;
- mqm = GNUNET_MQ_msg_raw (GNUNET_MESSAGE_TYPE_SET_ACK);
+ mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
GNUNET_MQ_send (set->mq, mqm);
}
@@ -136,7 +136,15 @@
req->request_id = ntohl (msg->request_id);
lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req);
if (GNUNET_NO == req->accepted)
+ {
+ struct GNUNET_MQ_Message *mqm;
+ struct AcceptMessage *amsg;
+
+ mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
+ amsg->request_id = msg->request_id;
+ GNUNET_MQ_send (lh->mq, mqm);
GNUNET_free (req);
+ }
}
@@ -152,7 +160,7 @@
* @return a handle to the set
*/
struct GNUNET_SET_Handle *
-GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
enum GNUNET_SET_OperationType op)
{
struct GNUNET_SET_Handle *set;
@@ -165,6 +173,7 @@
set = GNUNET_new (struct GNUNET_SET_Handle);
set->client = GNUNET_CLIENT_connect ("set", cfg);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set client created\n");
GNUNET_assert (NULL != set->client);
set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers,
set);
mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
@@ -295,7 +304,7 @@
mqm = GNUNET_MQ_msg_extra (msg, htons(context_msg->size),
GNUNET_MESSAGE_TYPE_SET_EVALUATE);
msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh));
- msg->other_peer = *other_peer;
+ msg->peer = *other_peer;
msg->app_id = *app_id;
memcpy (&msg[1], context_msg, htons (context_msg->size));
oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
operation_timeout_task, oh);
@@ -318,7 +327,7 @@
* @return a handle that can be used to cancel the listen operation
*/
struct GNUNET_SET_ListenHandle *
-GNUNET_SET_listen (struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
enum GNUNET_SET_OperationType operation,
const struct GNUNET_HashCode *app_id,
GNUNET_SET_ListenCallback listen_cb,
@@ -396,9 +405,11 @@
mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT);
msg->request_id = htonl (request->request_id);
- oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
operation_timeout_task, oh);
+ msg->accepted = 1;
GNUNET_MQ_send (set->mq, mqm);
+ oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
operation_timeout_task, oh);
+
return oh;
}
@@ -416,7 +427,7 @@
h_assoc = GNUNET_MQ_assoc_remove (h->set->mq, h->request_id);
GNUNET_assert (h_assoc == h);
- mqm = GNUNET_MQ_msg_raw (GNUNET_MESSAGE_TYPE_SET_CANCEL);
+ mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
GNUNET_MQ_send (h->set->mq, mqm);
GNUNET_free (h);
}
Modified: gnunet/src/set/strata_estimator.c
===================================================================
--- gnunet/src/set/strata_estimator.c 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/strata_estimator.c 2013-04-24 11:48:31 UTC (rev 26981)
@@ -53,15 +53,15 @@
void
-strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode
*key)
+strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key)
{
- uint32_t v;
+ uint64_t v;
int i;
- v = key->bits[0];
+ v = key.key_val;
/* count trailing '1'-bits of v */
for (i = 0; v & 1; v>>=1, i++)
/* empty */;
- ibf_insert (se->strata[i], ibf_key_from_hashcode (key));
+ ibf_insert (se->strata[i], key);
}
Modified: gnunet/src/set/strata_estimator.h
===================================================================
--- gnunet/src/set/strata_estimator.h 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/strata_estimator.h 2013-04-24 11:48:31 UTC (rev 26981)
@@ -66,7 +66,7 @@
void
-strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode
*key);
+strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key);
void
Modified: gnunet/src/set/test_set_api.c
===================================================================
--- gnunet/src/set/test_set_api.c 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/test_set_api.c 2013-04-24 11:48:31 UTC (rev 26981)
@@ -44,8 +44,8 @@
struct GNUNET_SET_Handle *set1;
struct GNUNET_SET_Handle *set2;
- set1 = GNUNET_SET_create (GNUNET_SET_OPERATION_UNION);
- set2 = GNUNET_SET_create (GNUNET_SET_OPERATION_UNION);
+ set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
}
int
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r26981 - in gnunet/src: include set,
gnunet <=