gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r26981 - in gnunet/src: include set


From: gnunet
Subject: [GNUnet-SVN] r26981 - in gnunet/src: include set
Date: Wed, 24 Apr 2013 13:48:32 +0200

Author: dold
Date: 2013-04-24 13:48:31 +0200 (Wed, 24 Apr 2013)
New Revision: 26981

Modified:
   gnunet/src/include/gnunet_applications.h
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/include/gnunet_set_service.h
   gnunet/src/set/Makefile.am
   gnunet/src/set/gnunet-service-set.c
   gnunet/src/set/gnunet-set.c
   gnunet/src/set/mq.c
   gnunet/src/set/mq.h
   gnunet/src/set/set.conf.in
   gnunet/src/set/set.h
   gnunet/src/set/set_api.c
   gnunet/src/set/strata_estimator.c
   gnunet/src/set/strata_estimator.h
   gnunet/src/set/test_set_api.c
Log:
started implementing union operation for set


Modified: gnunet/src/include/gnunet_applications.h
===================================================================
--- gnunet/src/include/gnunet_applications.h    2013-04-23 15:55:13 UTC (rev 
26980)
+++ gnunet/src/include/gnunet_applications.h    2013-04-24 11:48:31 UTC (rev 
26981)
@@ -77,6 +77,12 @@
 #define GNUNET_APPLICATION_TYPE_CONSENSUS 18
 
 
+/**
+ * Set. Used for two-peer set operations implemented using stream.
+ */
+#define GNUNET_APPLICATION_TYPE_SET 19
+
+
 #if 0                           /* keep Emacsens' auto-indent happy */
 {
 #endif

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2013-04-23 15:55:13 UTC (rev 
26980)
+++ gnunet/src/include/gnunet_protocols.h       2013-04-24 11:48:31 UTC (rev 
26981)
@@ -1807,11 +1807,42 @@
 #define GNUNET_MESSAGE_TYPE_SET_REQUEST 578
 
 /**
- * Evaluate a set operation
+ * Evaluate a set operation.
  */
 #define GNUNET_MESSAGE_TYPE_SET_CREATE 579
 
+/**
+ * Evaluate a set operation.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 580
 
+/**
+ * Strata estimator.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 581
+
+/**
+ * Invertible bloom filter.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 582
+
+/**
+ * Actual set elements.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 583
+
+/**
+ * Requests for the elements with the given hashes.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 584
+
+/**
+ * Operation is done.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 585
+
+
+
 
/*******************************************************************************
  * TESTBED LOGGER message types
  
******************************************************************************/

Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h     2013-04-23 15:55:13 UTC (rev 
26980)
+++ gnunet/src/include/gnunet_set_service.h     2013-04-24 11:48:31 UTC (rev 
26981)
@@ -63,11 +63,6 @@
 
 
 /**
- * Opaque handle to a listen operation.
- */
-struct GNUNET_SET_ListenHandle;
-
-/**
  * The operation that a set set supports.
  */
 enum GNUNET_SET_OperationType
@@ -135,10 +130,12 @@
    * Number of bytes in the buffer pointed to by data.
    */
   uint16_t size;
+
   /**
    * Application-specific element type.
    */
   uint16_t type;
+
   /**
    * Actual data of the element
    */
@@ -153,6 +150,7 @@
  */
 typedef void (*GNUNET_SET_Continuation) (void *cls);
 
+
 /**
  * Callback for set operation results. Called for each element
  * in the result set.
@@ -161,10 +159,9 @@
  * @param element a result element, only valid if status is 
GNUNET_SET_STATUS_OK
  * @param status see enum GNUNET_SET_Status
  */
-typedef void
-(*GNUNET_SET_ResultIterator) (void *cls,
-                              struct GNUNET_SET_Element *element,
-                              enum GNUNET_SET_Status status);
+typedef void (*GNUNET_SET_ResultIterator) (void *cls,
+                                           struct GNUNET_SET_Element *element,
+                                           enum GNUNET_SET_Status status);
 
 
 /**
@@ -201,7 +198,7 @@
  * @return a handle to the set
  */
 struct GNUNET_SET_Handle *
-GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
                    enum GNUNET_SET_OperationType op);
 
 
@@ -270,8 +267,6 @@
                      void *result_cls);
 
 
-
-
 /**
  * Wait for set operation requests for the given application id
  * 
@@ -285,7 +280,7 @@
  * @return a handle that can be used to cancel the listen operation
  */
 struct GNUNET_SET_ListenHandle *
-GNUNET_SET_listen (struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
                    enum GNUNET_SET_OperationType op_type,
                    const struct GNUNET_HashCode *app_id,
                    GNUNET_SET_ListenCallback listen_cb,

Modified: gnunet/src/set/Makefile.am
===================================================================
--- gnunet/src/set/Makefile.am  2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/Makefile.am  2013-04-24 11:48:31 UTC (rev 26981)
@@ -29,6 +29,7 @@
 gnunet_set_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/set/libgnunetset.la \
+  $(top_builddir)/src/stream/libgnunetstream.la \
   $(top_builddir)/src/testbed/libgnunettestbed.la \
   $(GN_LIBINTL)
 gnunet_set_DEPENDENCIES = \
@@ -36,6 +37,8 @@
 
 gnunet_service_set_SOURCES = \
  gnunet-service-set.c \
+ gnunet-service-set_union.c \
+ mq.c \
  ibf.c \
  strata_estimator.c
 gnunet_service_set_LDADD = \
@@ -44,12 +47,16 @@
   $(top_builddir)/src/stream/libgnunetstream.la \
   $(top_builddir)/src/mesh/libgnunetmesh.la \
   $(GN_LIBINTL)
+# hack for mq.c, see automake Objects ‘created with both libtool and without’
+# remove once GNUNET_MQ is in util/
+gnunet_service_set_CFLAGS = $(AM_CFLAGS)
 
 libgnunetset_la_SOURCES = \
   set_api.c \
   mq.c
 libgnunetset_la_LIBADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/stream/libgnunetstream.la \
   $(LTLIBINTL)
 libgnunetset_la_LDFLAGS = \
   $(GN_LIB_LDFLAGS)
@@ -67,6 +74,8 @@
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/testing/libgnunettesting.la \
   $(top_builddir)/src/set/libgnunetset.la
+test_set_api_DEPENDENCIES = \
+  libgnunetset.la
 
 EXTRA_DIST = \
   test_set.conf

Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/gnunet-service-set.c 2013-04-24 11:48:31 UTC (rev 26981)
@@ -24,34 +24,197 @@
  * @author Florian Dold
  */
 
-#include "platform.h"
-#include "gnunet_common.h"
-#include "gnunet_protocols.h"
-#include "gnunet_applications.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_core_service.h"
-#include "gnunet_stream_lib.h"
 
-struct Set
-{
+#include "gnunet-service-set.h"
+#include "set_protocol.h"
 
-};
 
-struct Listener
-{
+/**
+ * Configuration of our local peer.
+ */
+const struct GNUNET_CONFIGURATION_Handle *configuration;
 
-};
+/**
+ * Socket listening for other peers via stream.
+ */
+static struct GNUNET_STREAM_ListenSocket *stream_listen_socket;
 
-/*
-static struct Listener *sets_head;
-static struct Listener *sets_tail;
+/**
+ * Sets are held in a doubly linked list.
+ */
+static struct Set *sets_head;
 
+/**
+ * Sets are held in a doubly linked list.
+ */
+static struct Set *sets_tail;
+
+/**
+ * Listeners are held in a doubly linked list.
+ */
 static struct Listener *listeners_head;
+
+/**
+ * Listeners are held in a doubly linked list.
+ */
 static struct Listener *listeners_tail;
-*/
 
+/**
+ * Incoming sockets from remote peers are
+ * held in a doubly linked list.
+ */
+static struct Incoming *incoming_head;
 
 /**
+ * Incoming sockets from remote peers are
+ * held in a doubly linked list.
+ */
+static struct Incoming *incoming_tail;
+
+/**
+ * Counter for allocating unique request IDs for clients.
+ */
+static uint32_t request_id = 1;
+
+
+/**
+ * Disconnect a client and free all resources
+ * that the client allocated (e.g. Sets or Listeners)
+ *
+ * @param client the client to disconnect
+ */
+void
+client_disconnect (struct GNUNET_SERVER_Client *client)
+{
+  /* FIXME: clean up any data structures belonging to the client */
+  GNUNET_SERVER_client_disconnect (client);
+}
+
+
+/**
+ * Get set that is owned by the client, if any.
+ *
+ * @param client client to look for
+ * @return set that the client owns, NULL if the client
+ *         does not own a set
+ */
+static struct Set *
+get_set (struct GNUNET_SERVER_Client *client)
+{
+  struct Set *set;
+  for (set = sets_head; NULL != set; set = set->next)
+    if (set->client == client)
+      return set;
+  return NULL;
+}
+
+
+/**
+ * Get the listener associated to a client, if any.
+ *
+ * @param client the client
+ * @return listener associated with the client, NULL
+ *         if there isn't any
+ */
+static struct Listener *
+get_listener (struct GNUNET_SERVER_Client *client)
+{
+  struct Listener *listener;
+  for (listener = listeners_head; NULL != listener; listener = listener->next)
+    if (listener->client == client)
+      return listener;
+  return NULL;
+}
+
+/**
+ * Get the incoming socket associated with the given id
+ *
+ * @param id id to look for
+ * @return the incoming socket associated with the id,
+ *         or NULL if there is none
+ */
+static struct Incoming *
+get_incoming (uint32_t id)
+{
+  struct Incoming *incoming;
+  for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
+    if (incoming->request_id == id)
+      return incoming;
+  return NULL;
+}
+
+static void
+destroy_incoming (struct Incoming *incoming)
+{
+  if (NULL != incoming->mq)
+  {
+    GNUNET_MQ_destroy (incoming->mq);
+    incoming->mq = NULL;
+  }
+  if (NULL != incoming->socket)
+  {
+    GNUNET_STREAM_close (incoming->socket);
+    incoming->socket = NULL;
+  }
+  GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
+  GNUNET_free (incoming);
+}
+
+
+/**
+ * Handle a request for a set operation for
+ * another peer.
+ *
+ * @param cls the incoming socket
+ * @param mh the message
+ */
+static void
+handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
+{
+  struct Incoming *incoming = cls;
+  const struct OperationRequestMessage *msg = (const struct 
OperationRequestMessage *) mh;
+  struct GNUNET_MQ_Message *mqm;
+  struct RequestMessage *cmsg;
+  struct Listener *listener;
+  const struct GNUNET_MessageHeader *context_msg;
+
+  if (ntohs (mh->size) < sizeof *msg)
+  {
+    GNUNET_break (0);
+    destroy_incoming (incoming);
+    return;
+  }
+  else if (ntohs (mh->size) == sizeof *msg)
+  {
+    context_msg = NULL;
+  }
+  else
+  {
+    context_msg = &msg[1].header;
+    if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size))
+    {
+      /* size of context message is invalid */
+      GNUNET_break (0);
+      destroy_incoming (incoming);
+      return;
+    }
+  }
+
+  for (listener = listeners_head; listener != NULL; listener = listener->next)
+  {
+    if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
+         (htons (msg->operation) != listener->operation) )
+      continue;
+    mqm = GNUNET_MQ_msg_concat (cmsg, context_msg, 
GNUNET_MESSAGE_TYPE_SET_REQUEST);
+    incoming->request_id = request_id++;
+    cmsg->request_id = htonl (incoming->request_id);
+    GNUNET_MQ_send (listener->client_mq, mqm);
+    return;
+  }
+}
+
+
+/**
  * Called when a client wants to create a new set.
  *
  * @param cls unused
@@ -63,11 +226,307 @@
                       struct GNUNET_SERVER_Client *client,
                       const struct GNUNET_MessageHeader *m)
 {
+  struct SetCreateMessage *msg = (struct SetCreateMessage *) m;
+  struct Set *set;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new set created\n");
+
+  if (NULL != get_set (client))
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_client_disconnect (client);
+    return;
+  }
+
+  set = GNUNET_new (struct Set);
+
+  switch (ntohs (msg->operation))
+  {
+    case GNUNET_SET_OPERATION_INTERSECTION:
+      /* FIXME: cfuchs */
+      GNUNET_assert (0);
+      break;
+    case GNUNET_SET_OPERATION_UNION:
+      set = union_set_create ();
+      break;
+    default:
+      GNUNET_free (set);
+      GNUNET_break (0);
+      GNUNET_SERVER_client_disconnect (client);
+      return;
+  }
+
+  set->client = client;
+  set->client_mq = GNUNET_MQ_queue_for_server_client (client);
+  GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
+
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Called when a client wants to create a new set.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_listen (void *cls,
+                      struct GNUNET_SERVER_Client *client,
+                      const struct GNUNET_MessageHeader *m)
+{
+  struct ListenMessage *msg = (struct ListenMessage *) m;
+  struct Listener *listener;
   
+  if (NULL != get_listener (client))
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_client_disconnect (client);
+    return;
+  }
+  
+  listener = GNUNET_new (struct Listener);
+  listener->app_id = msg->app_id;
+  listener->operation = msg->operation;
+  GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
 }
 
 
 /**
+ * Called when a client wants to create a new set.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_add (void *cls,
+                      struct GNUNET_SERVER_Client *client,
+                      const struct GNUNET_MessageHeader *m)
+{
+  struct Set *set;
+
+  set = get_set (client);
+  if (NULL == set)
+  {
+    GNUNET_break (0);
+    client_disconnect (client);
+    return;
+  }
+  switch (set->operation)
+  {
+    case GNUNET_SET_OPERATION_UNION:
+      union_add (set, (struct ElementMessage *) m);
+      break;
+    case GNUNET_SET_OPERATION_INTERSECTION:
+      /* FIXME: cfuchs */
+      break;
+    default:
+      GNUNET_assert (0);
+      break;
+  }
+}
+
+
+/**
+ * Called when a client wants to evaluate a set operation with another peer.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_evaluate (void *cls,
+                        struct GNUNET_SERVER_Client *client,
+                        const struct GNUNET_MessageHeader *m)
+{
+  struct Set *set;
+  struct EvaluateMessage *msg = (struct EvaluateMessage *) m;
+  struct EvaluateOperation *eo;
+
+  set = get_set (client);
+
+  if (NULL == set)
+  {
+    GNUNET_break (0);
+    client_disconnect (client);
+    return;
+  }
+
+  eo = GNUNET_new (struct EvaluateOperation);
+  eo->peer = msg->peer;
+  eo->app_id = msg->app_id;
+  eo->request_id = msg->request_id;
+  eo->context_msg = GNUNET_copy_message (&msg[1].header);
+  eo->set = set;
+
+  switch (set->operation)
+  {
+    case GNUNET_SET_OPERATION_INTERSECTION:
+      /* FIXME: cfuchs */
+      break;
+    case GNUNET_SET_OPERATION_UNION:
+      union_evaluate (eo);
+      break;
+    default:
+      GNUNET_assert (0);
+      break;
+  }
+}
+
+
+/**
+ * Handle a cancel request from a client.
+ *
+ * @param cls unused
+ * @param client the client
+ * @param m the cancel message
+ */
+static void
+handle_client_cancel (void *cls,
+                      struct GNUNET_SERVER_Client *client,
+                      const struct GNUNET_MessageHeader *m)
+{
+  /* FIXME: implement */
+}
+
+
+/**
+ * Handle an ack from a client.
+ *
+ * @param cls unused
+ * @param client the client
+ * @param m the message
+ */
+static void
+handle_client_ack (void *cls,
+                   struct GNUNET_SERVER_Client *client,
+                   const struct GNUNET_MessageHeader *m)
+{
+  /* FIXME: implement */
+}
+
+
+/**
+ * Handle a request from the client to accept
+ * a set operation.
+ *
+ * @param cls unused
+ * @param client the client
+ * @param m the message
+ */
+static void
+handle_client_accept (void *cls,
+                      struct GNUNET_SERVER_Client *client,
+                      const struct GNUNET_MessageHeader *m)
+{
+  struct AcceptMessage *msg = (struct AcceptMessage *) m;
+  struct Set *set;
+  struct Incoming *incoming;
+  struct EvaluateOperation *eo;
+
+  set = get_set (client);
+
+  if (NULL == set)
+  {
+    GNUNET_break (0);
+    client_disconnect (client);
+    return;
+  }
+
+  incoming = get_incoming (ntohl (msg->request_id));
+
+  if ( (NULL == incoming) ||
+       (incoming->operation != set->operation) )
+  {
+    GNUNET_break (0);
+    client_disconnect (client);
+    return;
+  }
+
+  eo = GNUNET_new (struct EvaluateOperation);
+  eo->peer = incoming->peer;
+  eo->app_id = incoming->app_id;
+  eo->request_id = msg->request_id;
+  eo->set = set;
+
+  switch (set->operation)
+  {
+    case GNUNET_SET_OPERATION_INTERSECTION:
+      /* FIXME: cfuchs*/
+      GNUNET_assert (0);
+      break;
+    case GNUNET_SET_OPERATION_UNION:
+      union_accept (eo, incoming);
+      break;
+    default:
+      GNUNET_assert (0);
+      break;
+  }
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other 
peers
+ * or upon binding error which happen when the app_port given in
+ * GNUNET_STREAM_listen() is already taken.
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream; NULL on binding error
+ * @param initiator the identity of the peer who wants to establish a stream
+ *            with us; NULL on binding error
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ *             stream (the socket will be invalid after the call)
+ */
+static int
+stream_listen_cb (void *cls,
+                  struct GNUNET_STREAM_Socket *socket,
+                  const struct GNUNET_PeerIdentity *initiator)
+{
+  struct Incoming *incoming;
+  static const struct GNUNET_MQ_Handler handlers[] = {
+    {handle_p2p_operation_request, 
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST},
+    GNUNET_MQ_HANDLERS_END
+  };
+
+  if (NULL == socket)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+
+  incoming = GNUNET_new (struct Incoming);
+  incoming->peer = *initiator;
+  incoming->socket = socket;
+  incoming->mq = GNUNET_MQ_queue_for_stream_socket (incoming->socket, 
handlers, incoming);
+  /* FIXME: timeout for peers that only connect but don't send anything */
+  GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Called to clean up, after a shutdown has been requested.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+shutdown_task (void *cls,
+               const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  if (NULL != stream_listen_socket)
+  {
+    GNUNET_STREAM_listen_close (stream_listen_socket);
+    stream_listen_socket = NULL;
+  } 
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
+}
+
+
+/**
  * Function called by the service's run
  * method to run service-specific setup code.
  *
@@ -77,15 +536,24 @@
  */
 static void
 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct 
GNUNET_CONFIGURATION_Handle *cfg)
-
 {
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
     {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
+    {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
+    {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
+    {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0},
+    {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
+    {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
+    {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
     {NULL, NULL, 0, 0}
   };
 
-
+  configuration = cfg;
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, 
NULL);
   GNUNET_SERVER_add_handlers (server, server_handlers);
+  stream_listen_socket = GNUNET_STREAM_listen (cfg, 
GNUNET_APPLICATION_TYPE_SET,
+                                               &stream_listen_cb, NULL,
+                                               GNUNET_STREAM_OPTION_END);
 }
 
 

Modified: gnunet/src/set/gnunet-set.c
===================================================================
--- gnunet/src/set/gnunet-set.c 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/gnunet-set.c 2013-04-24 11:48:31 UTC (rev 26981)
@@ -27,8 +27,25 @@
 #include "gnunet_common.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_testbed_service.h"
+#include "gnunet_set_service.h"
 
 
+static struct GNUNET_HashCode app_id;
+static struct GNUNET_SET_Handle *set1;
+static struct GNUNET_SET_Handle *set2;
+static struct GNUNET_SET_ListenHandle *listen_handle;
+
+
+static void
+listen_cb (void *cls,
+           const struct GNUNET_PeerIdentity *other_peer,
+           const struct GNUNET_MessageHeader *context_msg,
+           struct GNUNET_SET_Request *request)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
+}
+
+
 /**
  * Main function that will be run.
  *
@@ -40,10 +57,15 @@
 static void
 run (void *cls, char *const *args,
      const char *cfgfile,
-     const struct GNUNET_CONFIGURATION_Handle *
-     cfg)
+     const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  /* FIXME */
+  static const char* app_str = "gnunet-set";
+  GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
+
+  set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, &app_id,
+                                     listen_cb, NULL);
 }
 
 
@@ -56,7 +78,7 @@
   };
   GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set",
                      "help",
-                     options, &run, NULL, GNUNET_YES);
+                     options, &run, NULL, GNUNET_NO);
   return 0;
 }
 

Modified: gnunet/src/set/mq.c
===================================================================
--- gnunet/src/set/mq.c 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/mq.c 2013-04-24 11:48:31 UTC (rev 26981)
@@ -26,13 +26,181 @@
 
 #include "mq.h"
 
+/**
+ * Signature of functions implementing the
+ * sending part of a message queue
+ *
+ * @param q the message queue
+ * @param m the message
+ */
+typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct 
GNUNET_MQ_Message *m);
 
+
+typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
+
+
+/**
+ * Collection of the state necessary to read and write gnunet messages 
+ * to a stream socket. Should be used as closure for stream_data_processor.
+ */
+struct MessageStreamState
+{
+  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+  struct MessageQueue *mq;
+  struct GNUNET_STREAM_Socket *socket;
+  struct GNUNET_STREAM_ReadHandle *rh;
+  struct GNUNET_STREAM_WriteHandle *wh;
+};
+
+
+struct ServerClientSocketState
+{
+  struct GNUNET_SERVER_Client *client;
+  struct GNUNET_SERVER_TransmitHandle* th;
+};
+
+
+struct ClientConnectionState
+{
+  struct GNUNET_CLIENT_Connection *connection;
+  struct GNUNET_CLIENT_TransmitHandle *th;
+};
+
+
+struct GNUNET_MQ_MessageQueue
+{
+  /**
+   * Handlers array, or NULL if the queue should not receive messages
+   */
+  const struct GNUNET_MQ_Handler *handlers;
+
+  /**
+   * Closure for the handler callbacks
+   */
+  void *handlers_cls;
+
+  /**
+   * Actual implementation of message sending,
+   * called when a message is added
+   */
+  SendImpl send_impl;
+
+  /**
+   * Implementation-dependent queue destruction function
+   */
+  DestroyImpl destroy_impl;
+
+  /**
+   * Implementation-specific state
+   */
+  void *impl_state;
+
+  /**
+   * Linked list of messages pending to be sent
+   */
+  struct GNUNET_MQ_Message *msg_head;
+
+  /**
+   * Linked list of messages pending to be sent
+   */
+  struct GNUNET_MQ_Message *msg_tail;
+
+  /**
+   * Message that is currently scheduled to be
+   * sent. Not the head of the message queue, as the implementation
+   * needs to know if sending has been already scheduled or not.
+   */
+  struct GNUNET_MQ_Message *current_msg;
+
+  /**
+   * Map of associations, lazily allocated
+   */
+  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+
+  /**
+   * Next id that should be used for the assoc_map,
+   * initialized lazily to a random value together with
+   * assoc_map
+   */
+  uint32_t assoc_id;
+};
+
+
 struct GNUNET_MQ_Message
 {
+  /**
+   * Messages are stored in a linked list
+   */
+  struct GNUNET_MQ_Message *next;
+
+  /**
+   * Messages are stored in a linked list
+   */
+  struct GNUNET_MQ_Message *prev;
+
+  /**
+   * Actual allocated message header,
+   * usually points to the end of the containing GNUNET_MQ_Message
+   */
   struct GNUNET_MessageHeader *mh;
+
+  /**
+   * Queue the message is queued in, NULL if message is not queued.
+   */
+  struct GNUNET_MQ_MessageQueue *parent_queue;
+
+  /**
+   * Called after the message was sent irrevokably
+   */
+  GNUNET_MQ_NotifyCallback sent_cb;
+
+  /**
+   * Closure for send_cb
+   */
+  void *sent_cls;
 };
 
 
+/**
+ * Call the right callback for a message received
+ * by a queue
+ */
+static void
+dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct 
GNUNET_MessageHeader *mh)
+{
+  const struct GNUNET_MQ_Handler *handler;
+
+  handler = mq->handlers;
+  if (NULL == handler)
+    return;
+  for (; NULL != handler->cb; handler++)
+    if (handler->type == ntohs (mh->type))
+      handler->cb (mq->handlers_cls, mh);
+}
+
+
+void
+GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
+{
+  GNUNET_assert (NULL == mqm->parent_queue);
+  GNUNET_free (mqm);
+}
+
+
+/**
+ * Send a message with the give message queue.
+ * May only be called once per message.
+ * 
+ * @param mq message queue
+ * @param mqm the message to send.
+ */
+void
+GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message 
*mqm)
+{
+  mq->send_impl (mq, mqm);
+}
+
+
 struct GNUNET_MQ_Message *
 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t 
type)
 {
@@ -40,8 +208,422 @@
   mqm = GNUNET_malloc (sizeof *mqm + size);
   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
   mqm->mh->size = htons (size);
-  mqm->mh->type = htons(type);
+  mqm->mh->type = htons (type);
   if (NULL != mhp)
     *mhp = mqm->mh;
   return mqm;
 }
+
+
+struct GNUNET_MQ_Message *
+GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, 
struct GNUNET_MessageHeader *m, uint16_t type)
+{
+  struct GNUNET_MQ_Message *mq;
+
+  GNUNET_assert (NULL != mhp);
+  if (NULL == m)
+    return GNUNET_MQ_msg_ (mhp, base_size, type);
+  GNUNET_assert (ntohs (m->size >= sizeof (struct GNUNET_MessageHeader)));
+  /* check for overflow */
+  if (base_size + ntohs (m->size) <= base_size)
+    return NULL; 
+  mq = GNUNET_MQ_msg_ (mhp, base_size + ntohs (m->size), type);
+  memcpy (((void *) *mhp) + base_size, m, ntohs (m->size));
+  return mq;
+}
+
+
+/**
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
+ *
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ *          GNUNET_STREAM_OK if writing to stream was completed successfully;
+ *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ *          (this doesn't mean that the data is never sent, the receiver may
+ *          have read the data but its ACKs may have been lost);
+ *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ *          be processed.
+ * @param size the number of bytes written
+ */
+static void 
+stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+{
+  struct GNUNET_MQ_MessageQueue *mq = cls;
+  struct MessageStreamState *mss = (struct MessageStreamState *) 
mq->impl_state;
+  struct GNUNET_MQ_Message *mqm;
+
+  GNUNET_assert (GNUNET_STREAM_OK == status);
+  
+  /* call cb for message we finished sending */
+  mqm = mq->current_msg;
+  if (NULL != mqm)
+  {
+    if (NULL != mqm->sent_cb)
+      mqm->sent_cb (mqm->sent_cls);
+    GNUNET_free (mqm);
+  }
+
+  mss->wh = NULL;
+
+  mqm = mq->msg_head;
+  mq->current_msg = mqm;
+  if (NULL == mqm)
+    return;
+  GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
+  mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
+                                 GNUNET_TIME_UNIT_FOREVER_REL, 
stream_write_queued, cls);
+  GNUNET_assert (NULL != mss->wh);
+}
+
+
+static void
+stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct 
GNUNET_MQ_Message *mqm)
+{
+  if (NULL != mq->current_msg)
+  {
+    GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
+    return;
+  }
+  stream_write_queued (mq, GNUNET_STREAM_OK, 0);
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received by the tokenizer.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+stream_mst_callback (void *cls, void *client, const struct 
GNUNET_MessageHeader *message)
+{
+  struct GNUNET_MQ_MessageQueue *mq = cls;
+
+  GNUNET_assert (NULL != message);
+  dispatch_message (mq, message);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on 
timeout 
+ * @return number of bytes of processed from 'data' (any data remaining should 
be
+ *         given to the next time the read processor is called).
+ */
+static size_t
+stream_data_processor (void *cls,
+                       enum GNUNET_STREAM_Status status,
+                       const void *data,
+                       size_t size)
+{
+  struct GNUNET_MQ_MessageQueue *mq = cls;
+  struct MessageStreamState *mss;
+  int ret;
+  mss = (struct MessageStreamState *) mq->impl_state;
+
+  GNUNET_assert (GNUNET_STREAM_OK == status);
+  ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, 
GNUNET_NO);
+  GNUNET_assert (GNUNET_OK == ret);
+  /* we always read all data */
+  return size;
+}
+
+
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
+                                   const struct GNUNET_MQ_Handler *handlers,
+                                   void *cls)
+{
+  struct GNUNET_MQ_MessageQueue *mq;
+  struct MessageStreamState *mss;
+
+  mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+  mss = GNUNET_new (struct MessageStreamState);
+  mss->socket = socket;
+  mq->impl_state = mss;
+  mq->send_impl = stream_socket_send_impl;
+  mq->handlers = handlers;
+  mq->handlers_cls = cls;
+  if (NULL != handlers)
+  {
+    mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
+    mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 
+                                  stream_data_processor, mq);
+  }
+  return mq;
+}
+
+
+/**
+ * Transmit a queued message to the session's client.
+ *
+ * @param cls consensus session
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_queued (void *cls, size_t size,
+                 void *buf)
+{
+  struct GNUNET_MQ_MessageQueue *mq = cls;
+  struct GNUNET_MQ_Message *mqm = mq->current_msg;
+  struct ServerClientSocketState *state = mq->impl_state;
+  size_t msg_size;
+
+  mq->current_msg = NULL;
+  GNUNET_assert (NULL != mqm);
+  GNUNET_assert (NULL != buf);
+  msg_size = ntohs (mqm->mh->size);
+  GNUNET_assert (size >= msg_size);
+  memcpy (buf, mqm->mh, msg_size);
+  GNUNET_free (mqm);
+  state->th = NULL;
+  if (NULL != mq->msg_head)
+  {
+    mq->current_msg = mq->msg_head;
+    GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
+    state->th = 
+        GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, 
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             &transmit_queued, mq);
+  }
+  return msg_size;
+}
+
+
+static void
+server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct 
GNUNET_MQ_Message *mqm)
+{
+  struct ServerClientSocketState *state = mq->impl_state;
+  int msize;
+
+  GNUNET_assert (NULL != state);
+
+  if (NULL != state->th)
+  {
+    GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
+    return;
+  }
+  GNUNET_assert (NULL == mq->current_msg);
+  msize = ntohs (mq->msg_head->mh->size);
+  mq->current_msg = mqm;
+  state->th = 
+      GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
+                                           GNUNET_TIME_UNIT_FOREVER_REL,
+                                           &transmit_queued, mq);
+}
+
+
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
+{
+  struct GNUNET_MQ_MessageQueue *mq;
+  struct ServerClientSocketState *scss;
+
+  mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+  scss = GNUNET_new (struct ServerClientSocketState);
+  mq->impl_state = scss;
+  mq->send_impl = server_client_send_impl;
+  return mq;
+}
+
+
+/**
+ * Transmit a queued message to the session's client.
+ *
+ * @param cls consensus session
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+connection_client_transmit_queued (void *cls, size_t size,
+                 void *buf)
+{
+  struct GNUNET_MQ_MessageQueue *mq = cls;
+  struct GNUNET_MQ_Message *mqm = mq->current_msg;
+  struct ClientConnectionState *state = mq->impl_state;
+  size_t msg_size;
+
+  mq->current_msg = NULL;
+  GNUNET_assert (NULL != mqm);
+  GNUNET_assert (NULL != buf);
+  msg_size = ntohs (mqm->mh->size);
+  GNUNET_assert (size >= msg_size);
+  memcpy (buf, mqm->mh, msg_size);
+  GNUNET_free (mqm);
+  state->th = NULL;
+  if (NULL != mq->msg_head)
+  {
+    mq->current_msg = mq->msg_head;
+    GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
+    state->th = 
+        GNUNET_CLIENT_notify_transmit_ready (state->connection, msg_size, 
+                                             GNUNET_TIME_UNIT_FOREVER_REL, 
GNUNET_NO,
+                                             
&connection_client_transmit_queued, mq);
+  }
+  return msg_size;
+}
+
+
+static void
+connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct 
GNUNET_MQ_Message *mqm)
+{
+  struct ClientConnectionState *state = mq->impl_state;
+  int msize;
+
+  GNUNET_assert (NULL != state);
+
+  if (NULL != state->th)
+  {
+    GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
+    return;
+  }
+  GNUNET_assert (NULL == mq->current_msg);
+  mq->current_msg = mqm;
+  msize = ntohs (mqm->mh->size);
+  state->th = 
+      GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 
+                                           GNUNET_TIME_UNIT_FOREVER_REL, 
GNUNET_NO,
+                                           &connection_client_transmit_queued, 
mq);
+}
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+handle_client_message (void *cls,
+                       const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MQ_MessageQueue *mq = cls;
+
+  GNUNET_assert (NULL != msg);
+
+  dispatch_message (mq, msg);
+}
+
+
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection 
*connection,
+                                       const struct GNUNET_MQ_Handler 
*handlers,
+                                       void *cls)
+{
+  struct GNUNET_MQ_MessageQueue *mq;
+  struct ClientConnectionState *state;
+
+  GNUNET_assert (NULL != connection);
+
+  mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+  mq->handlers = handlers;
+  mq->handlers_cls = cls;
+  state = GNUNET_new (struct ClientConnectionState);
+  state->connection = connection;
+  mq->impl_state = state;
+  mq->send_impl = connection_client_send_impl;
+
+  if (NULL != handlers)
+  {
+    GNUNET_CLIENT_receive (connection, handle_client_message, mq,
+                           GNUNET_TIME_UNIT_FOREVER_REL);
+  }
+
+  return mq;
+}
+
+
+
+void
+GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
+                            const struct GNUNET_MQ_Handler *new_handlers,
+                            void *cls)
+{
+  mq->handlers = new_handlers;
+  mq->handlers_cls = cls;
+}
+
+
+
+/**
+ * Associate the assoc_data in mq with a unique request id.
+ *
+ * @param mq message queue, id will be unique for the queue
+ * @param mqm message to associate
+ * @param data to associate
+ */
+uint32_t
+GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
+                     struct GNUNET_MQ_Message *mqm,
+                     void *assoc_data)
+{
+  uint32_t id;
+
+  if (NULL == mq->assoc_map)
+    mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
+  id = mq->assoc_id++;
+  GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  return id;
+}
+
+
+
+void *
+GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
+{
+  if (NULL == mq->assoc_map)
+    return NULL;
+  return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
+}
+
+
+void *
+GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
+{
+  void *val;
+
+  if (NULL == mq->assoc_map)
+    return NULL;
+  val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
+  GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
+  return val;
+}
+
+
+void
+GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
+                       GNUNET_MQ_NotifyCallback cb,
+                       void *cls)
+{
+  mqm->sent_cb = cb;
+  mqm->sent_cls = cls;
+}
+
+
+void
+GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
+{
+  /* FIXME: destroy all pending messages in the queue */
+  GNUNET_free (mq);
+}
+

Modified: gnunet/src/set/mq.h
===================================================================
--- gnunet/src/set/mq.h 2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/mq.h 2013-04-24 11:48:31 UTC (rev 26981)
@@ -30,23 +30,109 @@
 #include "gnunet_common.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_connection_lib.h"
+#include "gnunet_server_lib.h"
+#include "gnunet_stream_lib.h"
 
 
-#define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_(((void) 
mvar->header, (struct GNUNET_MessageHeader**) &mvar), (esize) + sizeof *mvar, 
type)
+/**
+ * Allocate a GNUNET_MQ_Message, with extra space allocated after the space 
needed
+ * by the message struct.
+ * The allocated message will already have the type and size field set.
+ *
+ * @param mvar variable to store the allocated message in;
+ *             must have a header field
+ * @param esize extra space to allocate after the message
+ * @param type type of the message
+ * @return the MQ message
+ */
+#define GNUNET_MQ_msg_extra(mvar, esize, type) 
GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) 
&(mvar)), (esize) + sizeof *(mvar), (type))
 
+/**
+ * Allocate a GNUNET_MQ_Message.
+ * The allocated message will already have the type and size field set.
+ *
+ * @param mvar variable to store the allocated message in;
+ *             must have a header field
+ * @param type type of the message
+ * @return the MQ message
+ */
 #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
 
-#define GNUNET_MQ_msg_raw(type) GNUNET_MQ_msg_ (NULL, sizeof (struct 
GNUNET_MessageHeader), type)
+/**
+ * Allocate a GNUNET_MQ_Message, and concatenate another message
+ * after the space needed by the message struct.
+ *
+ * @param mvar variable to store the allocated message in;
+ *             must have a header field
+ * @param mc message to concatenate, can be NULL
+ * @param type type of the message
+ * @return the MQ message, NULL if mc is to large to be concatenated
+ */
+#define GNUNET_MQ_msg_concat(mvar, mc, t) GNUNET_MQ_msg_concat_(((void) 
mvar->header, (struct GNUNET_MessageHeader **) &(mvar)), \
+      sizeof *mvar, (struct GNUNET_MessageHeader *) mc, t)
 
+
+/**
+ * Allocate a GNUNET_MQ_Message, where the message only consists of a header.
+ * The allocated message will already have the type and size field set.
+ *
+ * @param mvar variable to store the allocated message in;
+ *             must have a header field
+ * @param type type of the message
+ */
+#define GNUNET_MQ_msg_header(type) GNUNET_MQ_msg_ (NULL, sizeof (struct 
GNUNET_MessageHeader), type)
+
+
+/**
+ * Allocate a GNUNET_MQ_Message, where the message only consists of a header 
and extra space.
+ * The allocated message will already have the type and size field set.
+ *
+ * @param mvar variable to store the allocated message in;
+ *             must have a header field
+ * @param esize extra space to allocate after the message header
+ * @param type type of the message
+ */
+#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, 
sizeof (struct GNUNET_MessageHeader), type)
+
+
+/**
+ * End-marker for the handlers array
+ */
 #define GNUNET_MQ_HANDLERS_END {NULL, 0}
 
+/**
+ * Opaque handle to a message queue
+ */
 struct GNUNET_MQ_MessageQueue;
 
+/**
+ * Opaque handle to an allocated message
+ */
 struct GNUNET_MQ_Message;
 
+/**
+ * Called when a message has been received.
+ *
+ * @param cls closure
+ * @param msg the received message
+ */
+typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct 
GNUNET_MessageHeader *msg);
+
+
+/**
+ * Message handler for a specific message type.
+ */
 struct GNUNET_MQ_Handler
 {
-  void *cb;
+  /**
+   * Callback, called every time a new message of 
+   * the specified type has been receied.
+   */
+  GNUNET_MQ_MessageCallback cb;
+
+  /**
+   * Type of the message we are interested in
+   */
   uint16_t type;
 };
 
@@ -63,12 +149,34 @@
  * @param mhp message header to store the allocated message header in, can be 
NULL
  * @param size size of the message to allocate
  * @param type type of the message, will be set in the allocated message
- * @param return the allocated MQ message
+ * @return the allocated MQ message
  */
 struct GNUNET_MQ_Message *
 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t 
type);
 
+
 /**
+ * Create a new message for MQ, by concatenating another message
+ * after a message of the specified type.
+ *
+ * @retrn the allocated MQ message
+ */
+struct GNUNET_MQ_Message *
+GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, 
struct GNUNET_MessageHeader *m, uint16_t type);
+
+
+/**
+ * Discard the message queue message, free all
+ * allocated resources. Must be called in the event
+ * that a message is created but should not actually be sent.
+ *
+ * @param mqm the message to discard
+ */
+void
+GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm);
+
+
+/**
  * Send a message with the give message queue.
  * May only be called once per message.
  * 
@@ -78,6 +186,7 @@
 void
 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message 
*mqm);
 
+
 /**
  * Cancel sending the message. Message must have been sent with GNUNET_MQ_send 
before.
  * May not be called after the notify sent callback has been called
@@ -100,33 +209,106 @@
                      struct GNUNET_MQ_Message *mqm,
                      void *assoc_data);
 
+/**
+ * Get the data associated with a request id in a queue
+ *
+ * @param mq the message queue with the association
+ * @param request_id the request id we are interested in
+ * @return the associated data
+ */
 void *
 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
 
+
+/**
+ * Remove the association for a request id
+ *
+ * @param mq the message queue with the association
+ * @param request_id the request id we want to remove
+ * @return the associated data
+ */
 void *
 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t 
request_id);
 
 
 
+/**
+ * Create a message queue for a GNUNET_CLIENT_Connection.
+ * If handlers are specfied, receive messages from the connection.
+ *
+ * @param connection the client connection
+ * @param handlers handlers for receiving messages
+ * @param cls closure for the handlers
+ * @return the message queue
+ */
 struct GNUNET_MQ_MessageQueue *
 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection 
*connection,
                                        const struct GNUNET_MQ_Handler 
*handlers,
                                        void *cls);
 
 
+/**
+ * Create a message queue for a GNUNET_STREAM_Socket.
+ *
+ * @param param client the client
+ * @return the message queue
+ */
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client);
+
+
+
+/**
+ * Create a message queue for a GNUNET_STREAM_Socket.
+ * If handlers are specfied, receive messages from the stream socket.
+ *
+ * @param socket the stream socket
+ * @param handlers handlers for receiving messages
+ * @param cls closure for the handlers
+ * @return the message queue
+ */
+struct GNUNET_MQ_MessageQueue *
+GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
+                                   const struct GNUNET_MQ_Handler *handlers,
+                                   void *cls);
+
+/**
+ * Replace the handlers of a message queue with new handlers.
+ * Takes effect immediately, even for messages that already have been 
received, but for
+ * with the handler has not been called.
+ *
+ * @param mq message queue
+ * @param new_handlers new handlers
+ * @param cls new closure for the handlers
+ */
 void
+GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
+                            const struct GNUNET_MQ_Handler *new_handlers,
+                            void *cls);
+
+
+
+/**
+ * Call a callback once the message has been sent, that is, the message
+ * can not be canceled anymore.
+ * There can be only one notify sent callback per message.
+ *
+ * @param mqm message to call the notify callback for
+ * @param cb the notify callback
+ * @param cls closure for the callback
+ */
+void
 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
                        GNUNET_MQ_NotifyCallback cb,
                        void *cls);
 
 
+/**
+ * Destroy the message queue.
+ *
+ * @param mq message queue to destroy
+ */
 void
-GNUNET_MQ_notify_timeout (struct GNUNET_MQ_Message *mqm,
-                          GNUNET_MQ_NotifyCallback cb,
-                          void *cls);
-
-
-void
 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq);
 
 #endif

Modified: gnunet/src/set/set.conf.in
===================================================================
--- gnunet/src/set/set.conf.in  2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/set.conf.in  2013-04-24 11:48:31 UTC (rev 26981)
@@ -0,0 +1,11 @@
+[set]
+AUTOSTART = NO
+# PORT = 2106
+HOSTNAME = localhost
+HOME = $SERVICEHOME
+BINARY = gnunet-service-set
+ACCEPT_FROM = 127.0.0.1;
+ACCEPT_FROM6 = ::1;
+UNIXPATH = /tmp/gnunet-service-set.sock
+UNIX_MATCH_UID = YES
+UNIX_MATCH_GID = YES

Modified: gnunet/src/set/set.h
===================================================================
--- gnunet/src/set/set.h        2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/set.h        2013-04-24 11:48:31 UTC (rev 26981)
@@ -20,12 +20,13 @@
 
 /**
  * @author Florian Dold
- * @file consensus/consensus.h
- * @brief
+ * @file set/set.h
+ * @brief messages used for the set api
  */
 #ifndef SET_H
 #define SET_H
 
+#include "platform.h"
 #include "gnunet_common.h"
 
 
@@ -68,12 +69,6 @@
    * Operation type, values of enum GNUNET_SET_OperationType
    */
   uint16_t operation GNUNET_PACKED;
-
-  /**
-   * Operation type, values of enum GNUNET_SET_OperationType
-   */
-  uint16_t op GNUNET_PACKED;
-  
 };
 
 
@@ -88,6 +83,12 @@
    * request id of the request we want to accept
    */
   uint32_t request_id GNUNET_PACKED;
+
+  /**
+   * Zero if the client has rejected the request,
+   * non-zero if it has accepted it
+   */
+  uint32_t accepted GNUNET_PACKED;
 };
 
 
@@ -119,8 +120,14 @@
    */
   struct GNUNET_MessageHeader header;
 
-  struct GNUNET_PeerIdentity other_peer;
+  /**
+   * Peer to evaluate the operation with
+   */
+  struct GNUNET_PeerIdentity peer;
 
+  /**
+   * Application id
+   */
   struct GNUNET_HashCode app_id;
 
   /**
@@ -144,8 +151,15 @@
    */
   uint32_t request_id GNUNET_PACKED;
 
+  /**
+   * Was the evaluation successful?
+   */
   uint16_t result_status GNUNET_PACKED;
 
+  /**
+   * Type of the element attachted to the message,
+   * if any.
+   */
   uint16_t element_type GNUNET_PACKED;
 
   /* rest: the actual element */
@@ -179,6 +193,7 @@
   uint32_t request_id GNUNET_PACKED;
 };
 
+
 GNUNET_NETWORK_STRUCT_END
 
 #endif

Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c    2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/set_api.c    2013-04-24 11:48:31 UTC (rev 26981)
@@ -93,7 +93,7 @@
   if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
   {
     struct GNUNET_MQ_Message *mqm;
-    mqm = GNUNET_MQ_msg_raw (GNUNET_MESSAGE_TYPE_SET_ACK);
+    mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
     GNUNET_MQ_send (set->mq, mqm);
   }
 
@@ -136,7 +136,15 @@
   req->request_id = ntohl (msg->request_id);
   lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req);
   if (GNUNET_NO == req->accepted)
+  {
+    struct GNUNET_MQ_Message *mqm;
+    struct AcceptMessage *amsg;
+    
+    mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
+    amsg->request_id = msg->request_id;
+    GNUNET_MQ_send (lh->mq, mqm);
     GNUNET_free (req);
+  }
 }
 
 
@@ -152,7 +160,7 @@
  * @return a handle to the set
  */
 struct GNUNET_SET_Handle *
-GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
                    enum GNUNET_SET_OperationType op)
 {
   struct GNUNET_SET_Handle *set;
@@ -165,6 +173,7 @@
 
   set = GNUNET_new (struct GNUNET_SET_Handle);
   set->client = GNUNET_CLIENT_connect ("set", cfg);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set client created\n");
   GNUNET_assert (NULL != set->client);
   set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, 
set);
   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
@@ -295,7 +304,7 @@
 
   mqm = GNUNET_MQ_msg_extra (msg, htons(context_msg->size), 
GNUNET_MESSAGE_TYPE_SET_EVALUATE);
   msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh));
-  msg->other_peer = *other_peer;
+  msg->peer = *other_peer;
   msg->app_id = *app_id;
   memcpy (&msg[1], context_msg, htons (context_msg->size));
   oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, 
operation_timeout_task, oh);
@@ -318,7 +327,7 @@
  * @return a handle that can be used to cancel the listen operation
  */
 struct GNUNET_SET_ListenHandle *
-GNUNET_SET_listen (struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
                    enum GNUNET_SET_OperationType operation,
                    const struct GNUNET_HashCode *app_id,
                    GNUNET_SET_ListenCallback listen_cb,
@@ -396,9 +405,11 @@
 
   mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT);
   msg->request_id = htonl (request->request_id);
-  oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, 
operation_timeout_task, oh);
+  msg->accepted = 1;
   GNUNET_MQ_send (set->mq, mqm);
 
+  oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, 
operation_timeout_task, oh);
+
   return oh;
 }
 
@@ -416,7 +427,7 @@
 
   h_assoc = GNUNET_MQ_assoc_remove (h->set->mq, h->request_id);
   GNUNET_assert (h_assoc == h);
-  mqm = GNUNET_MQ_msg_raw (GNUNET_MESSAGE_TYPE_SET_CANCEL);
+  mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
   GNUNET_MQ_send (h->set->mq, mqm);
   GNUNET_free (h);
 }

Modified: gnunet/src/set/strata_estimator.c
===================================================================
--- gnunet/src/set/strata_estimator.c   2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/strata_estimator.c   2013-04-24 11:48:31 UTC (rev 26981)
@@ -53,15 +53,15 @@
 
 
 void
-strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode 
*key)
+strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key)
 {
-  uint32_t v;
+  uint64_t v;
   int i;
-  v = key->bits[0];
+  v = key.key_val;
   /* count trailing '1'-bits of v */
   for (i = 0; v & 1; v>>=1, i++)
     /* empty */;
-  ibf_insert (se->strata[i], ibf_key_from_hashcode (key));
+  ibf_insert (se->strata[i], key);
 }
 
 

Modified: gnunet/src/set/strata_estimator.h
===================================================================
--- gnunet/src/set/strata_estimator.h   2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/strata_estimator.h   2013-04-24 11:48:31 UTC (rev 26981)
@@ -66,7 +66,7 @@
 
 
 void
-strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode 
*key);
+strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key);
 
 
 void

Modified: gnunet/src/set/test_set_api.c
===================================================================
--- gnunet/src/set/test_set_api.c       2013-04-23 15:55:13 UTC (rev 26980)
+++ gnunet/src/set/test_set_api.c       2013-04-24 11:48:31 UTC (rev 26981)
@@ -44,8 +44,8 @@
   struct GNUNET_SET_Handle *set1;
   struct GNUNET_SET_Handle *set2;
 
-  set1 = GNUNET_SET_create (GNUNET_SET_OPERATION_UNION);
-  set2 = GNUNET_SET_create (GNUNET_SET_OPERATION_UNION);
+  set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
 }
 
 int




reply via email to

[Prev in Thread] Current Thread [Next in Thread]