gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r25675 - in gnunet/src: consensus include


From: gnunet
Subject: [GNUnet-SVN] r25675 - in gnunet/src: consensus include
Date: Thu, 3 Jan 2013 01:43:57 +0100

Author: dold
Date: 2013-01-03 01:43:57 +0100 (Thu, 03 Jan 2013)
New Revision: 25675

Modified:
   gnunet/src/consensus/Makefile.am
   gnunet/src/consensus/consensus.h
   gnunet/src/consensus/consensus_api.c
   gnunet/src/consensus/gnunet-consensus-start-peers.c
   gnunet/src/consensus/gnunet-consensus.c
   gnunet/src/consensus/gnunet-service-consensus.c
   gnunet/src/consensus/test_consensus_api.c
   gnunet/src/include/gnunet_applications.h
   gnunet/src/include/gnunet_consensus_service.h
   gnunet/src/include/gnunet_protocols.h
Log:
implemented the modified consensus api, started implementing p2p protocol for 
consensus


Modified: gnunet/src/consensus/Makefile.am
===================================================================
--- gnunet/src/consensus/Makefile.am    2013-01-02 16:03:25 UTC (rev 25674)
+++ gnunet/src/consensus/Makefile.am    2013-01-03 00:43:57 UTC (rev 25675)
@@ -57,6 +57,7 @@
 gnunet_service_consensus_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/core/libgnunetcore.la \
+  $(top_builddir)/src/mesh/libgnunetmesh.la \
   $(GN_LIBINTL)
 
 libgnunetconsensus_la_SOURCES = \

Modified: gnunet/src/consensus/consensus.h
===================================================================
--- gnunet/src/consensus/consensus.h    2013-01-02 16:03:25 UTC (rev 25674)
+++ gnunet/src/consensus/consensus.h    2013-01-03 00:43:57 UTC (rev 25675)
@@ -52,7 +52,15 @@
    */
   struct GNUNET_MessageHeader header;
 
+  /**
+   * Timeout for conclude
+   */
   struct GNUNET_TIME_RelativeNBO timeout;
+
+  /**
+   * Minimum group size required for a consensus group.
+   */
+  uint32_t min_group_size;
 };
 
 
@@ -102,6 +110,7 @@
    */
   uint8_t keep;
 
+  /* FIXME: add message hash? */
 };
 
 GNUNET_NETWORK_STRUCT_END

Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c        2013-01-02 16:03:25 UTC (rev 
25674)
+++ gnunet/src/consensus/consensus_api.c        2013-01-03 00:43:57 UTC (rev 
25675)
@@ -33,14 +33,43 @@
 
 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
 
-struct ElementAck
+/**
+ * Actions that can be queued.
+ */
+struct QueuedMessage
 {
-  struct ElementAck *next;
-  struct ElementAck *prev;
-  int keep;
-  struct GNUNET_CONSENSUS_Element *element;
+  /**
+   * Queued messages are stored in a doubly linked list.
+   */
+  struct QueuedMessage *next;
+
+  /**
+   * Queued messages are stored in a doubly linked list.
+   */
+  struct QueuedMessage *prev;
+
+  /**
+   * The actual queued message.
+   */
+  struct GNUNET_MessageHeader *msg;
+
+  /**
+   * Size of the message in msg.
+   */
+  size_t size;
+
+  /**
+   * Will be called after transmit, if not NULL
+   */
+  GNUNET_CONSENSUS_InsertDoneCallback idc;
+
+  /**
+   * Closure for idc
+   */
+  void *idc_cls;
 };
 
+
 /**
  * Handle for the service.
  */
@@ -52,14 +81,14 @@
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Socket (if available).
+   * Client connected to the consensus service, may be NULL if not connected.
    */
   struct GNUNET_CLIENT_Connection *client;
 
   /**
    * Callback for new elements. Not called for elements added locally.
    */
-  GNUNET_CONSENSUS_NewElementCallback new_element_cb;
+  GNUNET_CONSENSUS_ElementCallback new_element_cb;
 
   /**
    * Closure for new_element_cb
@@ -67,7 +96,7 @@
   void *new_element_cls;
 
   /**
-   * Session identifier for the consensus session.
+   * The (local) session identifier for the consensus session.
    */
   struct GNUNET_HashCode session_id;
 
@@ -77,9 +106,9 @@
   int num_peers;
 
   /**
-   * Peer identities of peers in the consensus. Optionally includes the local 
peer.
+   * Peer identities of peers participating in the consensus, includes the 
local peer.
    */
-  struct GNUNET_PeerIdentity *peers;
+  struct GNUNET_PeerIdentity **peers;
 
   /**
    * Currently active transmit request.
@@ -92,22 +121,11 @@
   int joined;
 
   /**
-   * Called when the current insertion operation finishes.
-   * NULL if there is no insert operation active.
-   */
-  GNUNET_CONSENSUS_InsertDoneCallback idc;
-
-  /**
    * Closure for the insert done callback.
    */
   void *idc_cls;
 
   /**
-   * An element that was requested to be inserted.
-   */
-  struct GNUNET_CONSENSUS_Element *insert_element;
-
-  /**
    * Called when the conclude operation finishes or fails.
    */
   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
@@ -122,103 +140,92 @@
    */
   struct GNUNET_TIME_Absolute conclude_deadline;
 
-  struct ElementAck *ack_head;
-  struct ElementAck *ack_tail;
+  unsigned int conclude_min_size;
 
-  /**
-   * Set to GNUNET_YES if the begin message has been transmitted to the service
-   */
-  int begin_sent;
-
-  /**
-   * Set to GNUNET_YES it the begin message should be transmitted to the 
service
-   */
-  int begin_requested;
+  struct QueuedMessage *messages_head;
+  struct QueuedMessage *messages_tail;
 };
 
 
-static size_t
-transmit_ack (void *cls, size_t size, void *buf);
 
-static size_t
-transmit_insert (void *cls, size_t size, void *buf);
-
-static size_t
-transmit_conclude (void *cls, size_t size, void *buf);
-
-static size_t
-transmit_begin (void *cls, size_t size, void *buf);
-
-
 /**
- * Call notify_transmit_ready for ack if necessary and possible.
+ * Schedule transmitting the next message.
+ *
+ * @param consensus consensus handle
  */
 static void
-ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus)
-{
-  if ((NULL == consensus->th) && (NULL != consensus->ack_head))
-  {
-    consensus->th =
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                             sizeof (struct 
GNUNET_CONSENSUS_AckMessage),
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             GNUNET_NO, &transmit_ack, 
consensus);
-  }
-}
+schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus);
 
 
 /**
- * Call notify_transmit_ready for ack if necessary and possible.
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
  */
-static void
-ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus)
+static size_t transmit_queued (void *cls, size_t size,
+                               void *buf)
 {
-  if ((NULL == consensus->th) && (NULL != consensus->insert_element))
+  struct GNUNET_CONSENSUS_Handle *consensus;
+  struct QueuedMessage *qmsg;
+  size_t ret_size;
+
+  printf("transmitting queued\n");
+
+  consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
+  qmsg = consensus->messages_head;
+  GNUNET_CONTAINER_DLL_remove (consensus->messages_head, 
consensus->messages_tail, qmsg);
+  GNUNET_assert (qmsg);
+
+  if (NULL == buf)
   {
-    consensus->th =
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                             sizeof (struct 
GNUNET_CONSENSUS_ElementMessage) + 
-                                                
consensus->insert_element->size,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             GNUNET_NO, &transmit_insert, 
consensus);
+    if (NULL != qmsg->idc)
+    {
+      qmsg->idc (qmsg->idc_cls, GNUNET_YES);
+    }
   }
-}
 
-
-/**
- * Call notify_transmit_ready for ack if necessary and possible.
- */
-static void
-ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus)
-{
-  if ((NULL == consensus->th) && (NULL != consensus->conclude_cb))
+  memcpy (buf, qmsg->msg, qmsg->size);
+  ret_size = qmsg->size;
+  if (NULL != qmsg->idc)
   {
-    consensus->th =
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                             sizeof (struct 
GNUNET_CONSENSUS_ConcludeMessage),
-                                             
GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline),
-                                             GNUNET_NO, &transmit_conclude, 
consensus);
+    qmsg->idc (qmsg->idc_cls, GNUNET_YES);
   }
+  GNUNET_free (qmsg->msg);
+  GNUNET_free (qmsg);
+
+  schedule_transmit (consensus);
+
+  return ret_size;
 }
 
 
 /**
- * Call notify_transmit_ready for ack if necessary and possible.
+ * Schedule transmitting the next message.
+ *
+ * @param consensus consensus handle
  */
 static void
-ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
+schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus)
 {
-  if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) &&
-      (GNUNET_NO == consensus->begin_sent))
+  if (NULL != consensus->th)
+    return;
+
+  if (NULL != consensus->messages_head)
   {
-    consensus->th =
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                             sizeof (struct 
GNUNET_MessageHeader),
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             GNUNET_NO, &transmit_begin, 
consensus);
+    LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
+    GNUNET_CLIENT_notify_transmit_ready (consensus->client, 
consensus->messages_head->size, 
+                                         GNUNET_TIME_UNIT_FOREVER_REL,
+                                         GNUNET_NO, &transmit_queued, 
consensus);
   }
 }
 
+
 /**
  * Called when the server has sent is a new element
  * 
@@ -226,11 +233,12 @@
  * @param msg element message
  */
 static void
-handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
+handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
                    struct GNUNET_CONSENSUS_ElementMessage *msg)
 {
   struct GNUNET_CONSENSUS_Element element;
-  struct ElementAck *ack;
+  struct GNUNET_CONSENSUS_AckMessage *ack_msg;
+  struct QueuedMessage *queued_msg;
   int ret;
 
   element.type = msg->element_type;
@@ -238,11 +246,15 @@
   element.data = &msg[1];
 
   ret = consensus->new_element_cb (consensus->new_element_cls, &element);
-  ack = GNUNET_malloc (sizeof (struct ElementAck));
-  ack->keep = ret;
-  GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, 
consensus->ack_tail,ack);
 
-  ntr_ack (consensus);
+  queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct 
GNUNET_CONSENSUS_AckMessage));
+  queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1];
+
+  ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg;
+  ack_msg->keep = ret;
+
+  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, 
consensus->messages_tail,
+                                    queued_msg);
 }
 
 
@@ -254,13 +266,12 @@
  * @param msg conclude done message
  */
 static void
-handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
+handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
 {
   GNUNET_assert (NULL != consensus->conclude_cb);
-  consensus->conclude_cb(consensus->conclude_cls,
-                         msg->num_peers,
-                         (struct GNUNET_PeerIdentity *) &msg[1]);
+  consensus->conclude_cb (consensus->conclude_cls,
+                         0, NULL);
   consensus->conclude_cb = NULL;
 }
 
@@ -287,12 +298,6 @@
     GNUNET_CLIENT_disconnect (consensus->client);
     consensus->client = NULL;
     consensus->new_element_cb (NULL, NULL);
-    if (NULL != consensus->idc)
-    {
-      consensus->idc(consensus->idc_cls, GNUNET_NO);
-      consensus->idc = NULL;
-      consensus->idc_cls = NULL;
-    }
     return;
   }
 
@@ -305,15 +310,12 @@
       handle_conclude_done (consensus, (struct 
GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
       break;
     default:
-      LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by 
service, ignoring");
+      GNUNET_break (0);
   }
   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
                          GNUNET_TIME_UNIT_FOREVER_REL);
 }
 
-
-
-
 /**
  * Function called to notify a client about the connection
  * begin ready to queue more data.  "buf" will be
@@ -326,99 +328,6 @@
  * @return number of bytes written to buf
  */
 static size_t
-transmit_ack (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_CONSENSUS_AckMessage *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-
-  consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
-
-  GNUNET_assert (NULL != consensus->ack_head);
-
-  msg = (struct GNUNET_CONSENSUS_AckMessage *) buf;
-  msg->keep = consensus->ack_head->keep;
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
-  msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage));
-
-  consensus->ack_head = consensus->ack_head->next;
-
-  consensus->th = NULL;
-
-  ntr_insert (consensus);
-  ntr_ack (consensus);
-  ntr_conclude (consensus);
-
-  return sizeof (struct GNUNET_CONSENSUS_AckMessage);
-}
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_insert (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_CONSENSUS_ElementMessage *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  GNUNET_CONSENSUS_InsertDoneCallback idc;
-  int msize;
-  void *idc_cls;
-
-  GNUNET_assert (NULL != buf);
-
-  consensus = cls;
-
-  GNUNET_assert (NULL != consensus->insert_element);
-
-  consensus->th = NULL;
-
-  msg = buf;
-
-  msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
-      consensus->insert_element->size;
-
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
-  msg->header.size = htons (msize);
-  memcpy (&msg[1],
-          consensus->insert_element->data,
-          consensus->insert_element->size);
-
-  consensus->insert_element = NULL;
-
-  idc = consensus->idc;
-  consensus->idc = NULL;
-  idc_cls = consensus->idc_cls;
-  consensus->idc_cls = NULL;
-  idc (idc_cls, GNUNET_YES);
-
-
-  ntr_ack (consensus);
-  ntr_insert (consensus);
-  ntr_conclude (consensus);
-
-  return msize;
-}
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
 transmit_join (void *cls, size_t size, void *buf)
 {
   struct GNUNET_CONSENSUS_JoinMessage *msg;
@@ -427,7 +336,7 @@
 
   GNUNET_assert (NULL != buf);
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
+  LOG (GNUNET_ERROR_TYPE_INFO, "transmitting join message\n");
 
   consensus = cls;
   consensus->th = NULL;
@@ -447,9 +356,7 @@
            consensus->peers,
            consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
 
-  ntr_insert (consensus);
-  ntr_begin (consensus);
-  ntr_conclude (consensus);
+  schedule_transmit (consensus);
 
   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
                          GNUNET_TIME_UNIT_FOREVER_REL);
@@ -457,88 +364,11 @@
   return msize;
 }
 
-
 /**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_conclude (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_CONSENSUS_ConcludeMessage *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  int msize;
-
-  GNUNET_assert (NULL != buf);
-
-  consensus = cls;
-  consensus->th = NULL;
-
-  msg = buf;
-
-  msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
-
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
-  msg->header.size = htons (msize);
-  msg->timeout =
-      GNUNET_TIME_relative_hton 
(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
-
-  ntr_ack (consensus);
-
-  return msize;
-}
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls the consensus handle
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_begin (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_MessageHeader *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  int msize;
-
-  GNUNET_assert (NULL != buf);
-
-  consensus = cls;
-  consensus->th = NULL;
-
-  msg = buf;
-
-  msize = sizeof (struct GNUNET_MessageHeader);
-
-  msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
-  msg->size = htons (msize);
-
-  ntr_ack (consensus);
-  ntr_insert (consensus);
-  ntr_conclude (consensus);
-
-  return msize;
-}
-
-
-/**
  * Create a consensus session.
  *
- * @param cfg
- * @param num_peers
+ * @param cfg configuration to use for connecting to the consensus service
+ * @param num_peers number of peers in the peers array
  * @param peers array of peers participating in this consensus session
  *              Inclusion of the local peer is optional.
  * @param session_id session identifier
@@ -553,7 +383,7 @@
                         unsigned int num_peers,
                         const struct GNUNET_PeerIdentity *peers,
                          const struct GNUNET_HashCode *session_id,
-                         GNUNET_CONSENSUS_NewElementCallback new_element_cb,
+                         GNUNET_CONSENSUS_ElementCallback new_element_cb,
                          void *new_element_cls)
 {
   struct GNUNET_CONSENSUS_Handle *consensus;
@@ -567,17 +397,10 @@
   consensus->session_id = *session_id;
 
   if (0 == num_peers)
-  {
     consensus->peers = NULL;
-  }
   else if (num_peers > 0)
-  {
-    consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct 
GNUNET_PeerIdentity));
-  }
-  else
-  {
-    GNUNET_break (0);
-  }
+    consensus->peers =
+        GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
 
   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
 
@@ -615,45 +438,37 @@
                         GNUNET_CONSENSUS_InsertDoneCallback idc,
                         void *idc_cls)
 {
-  GNUNET_assert (NULL == consensus->idc);
-  GNUNET_assert (NULL == consensus->insert_element);
-  GNUNET_assert (NULL == consensus->conclude_cb);
+  struct QueuedMessage *qmsg;
+  struct GNUNET_CONSENSUS_ElementMessage *element_msg;
+  size_t element_msg_size;
 
-  consensus->idc = idc;
-  consensus->idc_cls = idc_cls;
-  consensus->insert_element = GNUNET_memdup(element, sizeof (struct 
GNUNET_CONSENSUS_Element) + element->size);
+  LOG (GNUNET_ERROR_TYPE_INFO, "inserting, size=%llu\n", element->size);
 
-  if (consensus->joined == 0)
-  {
-    return;
-  }
+  element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
+                               element->size);
 
-  ntr_insert (consensus);
-}
+  element_msg = GNUNET_malloc (element_msg_size);
+  element_msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
+  element_msg->header.size = htons (element_msg_size);
+  memcpy (&element_msg[1], element->data, element->size);
 
+  qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
+  qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
+  qmsg->size = element_msg_size;
+  qmsg->idc = idc;
+  qmsg->idc_cls = idc_cls;
 
-/**
- * Begin reconciling elements with other peers.
- *
- * @param consensus handle for the consensus session
- */
-void
-GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
-{
-  GNUNET_assert (NULL == consensus->idc);
-  GNUNET_assert (NULL == consensus->insert_element);
-  GNUNET_assert (GNUNET_NO == consensus->begin_requested);
-  GNUNET_assert (GNUNET_NO == consensus->begin_sent);
+  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, 
consensus->messages_tail, qmsg);
 
-  consensus->begin_requested = GNUNET_YES;
-
-  ntr_begin (consensus);
+  schedule_transmit (consensus);
 }
 
 
 /**
- * We are finished inserting new elements into the consensus;
+ * We are done with inserting new elements into the consensus;
  * try to conclude the consensus within a given time window.
+ * After conclude has been called, no further elements may be
+ * inserted by the client.
  *
  * @param consensus consensus session
  * @param timeout timeout after which the conculde callback
@@ -664,20 +479,32 @@
 void
 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
                           struct GNUNET_TIME_Relative timeout,
+                          unsigned int min_group_size_in_consensus,
                           GNUNET_CONSENSUS_ConcludeCallback conclude,
                           void *conclude_cls)
 {
+  struct QueuedMessage *qmsg;
+  struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
+
   GNUNET_assert (NULL != conclude);
   GNUNET_assert (NULL == consensus->conclude_cb);
 
   consensus->conclude_cls = conclude_cls;
   consensus->conclude_cb = conclude;
-  consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
 
+  conclude_msg = GNUNET_malloc (sizeof (struct 
GNUNET_CONSENSUS_ConcludeMessage));
+  conclude_msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
+  conclude_msg->header.size = htons (sizeof (struct 
GNUNET_CONSENSUS_ConcludeMessage));
+  conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
+  conclude_msg->min_group_size = min_group_size_in_consensus;
 
-  /* if transmitting the conclude message is not possible right now, 
transmit_join
-   * or transmit_ack will handle it */
-  ntr_conclude (consensus);
+  qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
+  qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
+  qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
+
+  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, 
consensus->messages_tail, qmsg);
+
+  schedule_transmit (consensus);
 }
 
 

Modified: gnunet/src/consensus/gnunet-consensus-start-peers.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-01-02 16:03:25 UTC 
(rev 25674)
+++ gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-01-03 00:43:57 UTC 
(rev 25675)
@@ -147,6 +147,9 @@
                                   NULL,
                                   test_master,
                                   NULL);
+
+
+  printf("hello there!\n");
 }
 
 

Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c     2013-01-02 16:03:25 UTC (rev 
25674)
+++ gnunet/src/consensus/gnunet-consensus.c     2013-01-03 00:43:57 UTC (rev 
25675)
@@ -62,10 +62,10 @@
  */
 static void
 conclude_cb (void *cls, 
-             unsigned int num_peers_in_consensus,
-             const struct GNUNET_PeerIdentity *peers_in_consensus)
+             unsigned int consensus_group_count,
+             const struct GNUNET_CONSENSUS_Group *groups)
 {
-  printf("reached conclusion with %d peers\n", num_peers_in_consensus);
+  printf("reached conclusion\n");
   GNUNET_SCHEDULER_shutdown ();
 }
 
@@ -111,7 +111,7 @@
     if (feof (stdin))
     {
       printf ("concluding ...\n");
-      GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 
conclude_cb, NULL);
+      GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 0, 
conclude_cb, NULL);
     }
     return;
   }
@@ -144,6 +144,12 @@
 cb (void *cls,
     struct GNUNET_CONSENSUS_Element *element)
 {
+  if (NULL == element)
+  {
+    printf("error receiving from consensus\n");
+    GNUNET_SCHEDULER_shutdown ();
+    return GNUNET_NO;
+  }
   printf("got element\n");
   return GNUNET_YES;
 }
@@ -178,10 +184,12 @@
 
   if (NULL == session_id_str)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given (missing 
-s/--session-id)\n");
     return;
   }
 
+  GNUNET_CRYPTO_hash (session_id_str, strlen (session_id_str), &sid);
+
   for (count = 0; NULL != args[count]; count++);
  
   if (0 != count)
@@ -213,9 +221,6 @@
                                &sid,
                                &cb, NULL);
 
-  GNUNET_CONSENSUS_begin (consensus);
-
-
   stdin_fh = GNUNET_DISK_get_handle_from_native (stdin);
   stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, 
stdin_fh,
                                         &stdin_cb, NULL);

Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2013-01-02 16:03:25 UTC 
(rev 25674)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2013-01-03 00:43:57 UTC 
(rev 25675)
@@ -16,12 +16,19 @@
       along with GNUnet; see the file COPYING.  If not, write to the
       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
       Boston, MA 02111-1307, USA.
+*/
+
+
+/**
+ * @file consensus/gnunet-service-consensus.c
+ * @brief 
+ * @author Florian Dold
  */
 
-
 #include "platform.h"
 #include "gnunet_common.h"
 #include "gnunet_protocols.h"
+#include "gnunet_applications.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_consensus_service.h"
 #include "gnunet_core_service.h"
@@ -57,6 +64,24 @@
 };
 
 
+/*
+ * A peer that is also in a consensus session.
+ * Note that 'this' peer is not in the list.
+ */
+struct ConsensusPeer
+{
+  struct GNUNET_PeerIdentity *peer_id;
+
+  /**
+   * Incoming tunnel from the peer.
+   */
+  struct GNUNET_MESH_Tunnel *incoming_tunnel;
+
+  struct InvertibleBloomFilter *last_ibf;
+
+};
+
+
 /**
  * A consensus session consists of one local client and the remote authorities.
  */
@@ -84,18 +109,14 @@
   struct GNUNET_HashCode *global_id;
 
   /**
-   * Corresponding server handle.
+   * Local client in this consensus session.
+   * There is only one client per consensus session.
    */
   struct GNUNET_SERVER_Client *client;
 
   /**
-   * Client wants to receive and send updates.
-   */
-  int begin;
-
-  /**
    * Values in the consensus set of this session,
-   * all of them either have been sent or approved by the client.
+   * all of them either have been sent by or approved by the client.
    */
   struct GNUNET_CONTAINER_MultiHashMap *values;
 
@@ -110,12 +131,12 @@
   struct PendingElement *transmit_pending_tail;
 
   /**
-   * Elements that have not been sent to the client yet.
+   * Elements that have not been approved (or rejected) by the client yet.
    */
   struct PendingElement *approval_pending_head;
 
   /**
-   * Elements that have not been sent to the client yet.
+   * Elements that have not been approved (or rejected) by the client yet.
    */
   struct PendingElement *approval_pending_tail;
 
@@ -136,9 +157,47 @@
   int conclude_sent;
 
   /**
+   * Minimum number of peers to form a consensus group
+   */
+  int conclude_group_min;
+
+  /**
+   * Current round of the conclusion
+   */
+  int current_round;
+
+  /**
+   * Soft deadline for conclude.
+   * Speed up the speed of the consensus at the cost of consensus quality, as
+   * the time approached or crosses the deadline.
+   */
+  struct GNUNET_TIME_Absolute conclude_deadline;
+
+  /**
    * Number of other peers in the consensus
    */
-  int num_peers;
+  unsigned int num_peers;
+
+  /**
+   * Other peers in the consensus, array of ConsensusPeer
+   */
+  struct ConsensusPeer *peers;
+
+  /**
+   * Tunnel for broadcasting to all other authorities
+   */
+  struct GNUNET_MESH_Tunnel *broadcast_tunnel;
+
+  /**
+   * Time limit for one round of pairwise exchange.
+   * FIXME: should not actually be a constant
+   */
+  struct GNUNET_TIME_Relative round_time;
+
+  /**
+   * Task identifier for the round timeout task
+   */
+  GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
 };
 
 
@@ -167,10 +226,21 @@
  */
 static struct GNUNET_PeerIdentity *my_peer;
 
+/**
+ * Handle to the mesh service.
+ */
+static struct GNUNET_MESH_Handle *mesh;
+
+/**
+ * Handle to the core service. Only used during service startup, will be NULL 
after that.
+ */
+static struct GNUNET_CORE_Handle *core;
+
 static void
 disconnect_client (struct GNUNET_SERVER_Client *client)
 {
-  /* FIXME */
+  GNUNET_SERVER_client_disconnect (client);
+  /* FIXME: free data structures that this client owns */
 }
 
 static void
@@ -185,7 +255,6 @@
   *dst = *local_id;
   for (i = 0; i < num_peers; ++i)
   {
-    /* FIXME: maybe hash_xor/hash allow aliased source/target, and we can get 
by without tmp */
     GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
     *dst = tmp;
     GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
@@ -255,7 +324,7 @@
 
   if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent 
== GNUNET_NO))
   {
-    /* just the conclude message with no other authorities in the dummy */
+    /* FIXME */
     msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
     session->th =
         GNUNET_SERVER_notify_transmit_ready (session->client, msize,
@@ -274,6 +343,39 @@
 
 
 /**
+ * Method called whenever a peer has disconnected from the tunnel.
+ * Implementations of this callback must NOT call
+ * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those
+ * to run in some other task later.  However, calling 
+ * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed.
+ *
+ * @param cls closure
+ * @param peer peer identity the tunnel stopped working with
+ */
+static void
+disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+  /* FIXME: how do we handle this */
+}
+
+
+/**
+ * Method called whenever a peer has connected to the tunnel.
+ *
+ * @param cls closure
+ * @param peer peer identity the tunnel was created to, NULL on timeout
+ * @param atsi performance data for the connection
+ */
+static void
+connect_handler (void *cls,
+                 const struct GNUNET_PeerIdentity *peer,
+                 const struct GNUNET_ATS_Information *atsi)
+{
+  /* not much we can do here, now we know the other peer has been added to our 
broadcast tunnel */
+}
+
+
+/**
  * Called when a client wants to join a consensus session.
  *
  * @param cls unused
@@ -288,18 +390,24 @@
   struct GNUNET_HashCode global_id;
   const struct GNUNET_CONSENSUS_JoinMessage *msg;
   struct ConsensusSession *session;
+  unsigned int i;
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
 
   msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s 
(&msg->session_id));
+
   compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity 
*) &m[1], msg->num_peers);
 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s 
(&global_id));
+
   session = sessions_head;
   while (NULL != session)
   {
     if (client == session->client)
     {
+
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
       disconnect_client (client);
       return;
@@ -310,6 +418,7 @@
       disconnect_client (client);
       return;
     }
+    session = session->next;
   }
 
   GNUNET_SERVER_client_keep (client);
@@ -320,11 +429,40 @@
   session->global_id = GNUNET_memdup (&global_id, sizeof (struct 
GNUNET_HashCode));
   session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
   session->client = client;
+  /* FIXME: should not be a constant, but chosen adaptively */
+  session->round_time = GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 5);
 
-  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
+  session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session, 
connect_handler, disconnect_handler, session);
 
+  session->num_peers = 0;
+
+  /* count the peers that are not the local peer */
+  for (i = 0; i < msg->num_peers; i++)
+  {
+    struct GNUNET_PeerIdentity *peers;
+    peers = (struct GNUNET_PeerIdentity *) &msg[1];
+    if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
+      session->num_peers++;
+  }
+
+  session->peers = GNUNET_malloc (session->num_peers * sizeof (struct 
ConsensusPeer));
+
+  /* copy the peer identities and add peers to broadcast tunnel */
+  for (i = 0; i < msg->num_peers; i++)
+  {
+    struct GNUNET_PeerIdentity *peers;
+    peers = (struct GNUNET_PeerIdentity *) &msg[1];
+    if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
+    {
+      *session->peers->peer_id = peers[i];
+      GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel, 
&peers[i]);
+    }
+  }
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n");
 
+  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
+
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -381,40 +519,28 @@
 
 
 /**
- * Called when a client wants to begin
+ * Do one round of the conclusion.
+ * Start by broadcasting the set difference estimator (IBF strata).
+ *
  */
 void
-client_begin (void *cls,
-             struct GNUNET_SERVER_Client *client,
-             const struct GNUNET_MessageHeader *message)
+conclude_do_round (struct ConsensusSession *session)
 {
-  struct ConsensusSession *session;
+  /* FIXME */
+}
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client requested begin\n");
 
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (session->client == client)
-      break;
-  }
-
-  if (NULL == session)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to 'begin', but 
client is not in any session\n");
-    GNUNET_SERVER_client_disconnect (client);
-    return;
-  }
-
-  session->begin = GNUNET_YES;
-
-  send_next (session);
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+/**
+ * Cancel the current round if necessary, decide to run another round or
+ * terminate.
+ */
+void
+conclude_round_done (struct ConsensusSession *session)
+{
+  /* FIXME */
 }
 
 
-
 /**
  * Called when a client performs the conclude operation.
  */
@@ -425,6 +551,8 @@
 {
   struct ConsensusSession *session;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
+
   session = sessions_head;
   while ((session != NULL) && (session->client != client))
   {
@@ -432,12 +560,25 @@
   }
   if (NULL == session)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n");
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
+
+  if (GNUNET_YES == session->conclude_requested)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude 
twice\n");
+    GNUNET_SERVER_client_disconnect (client);
+    return;
+  }
+
   session->conclude_requested = GNUNET_YES;
+
+  conclude_do_round (session);
+
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
   send_next (session);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
@@ -462,10 +603,8 @@
 disconnect_core (void *cls,
                  const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_CORE_Handle *core;
-  core = (struct GNUNET_CORE_Handle *) cls;
   GNUNET_CORE_disconnect (core);
-
+  core = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
 }
 
@@ -478,8 +617,6 @@
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
-    {&client_begin, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN,
-        sizeof (struct GNUNET_MessageHeader)},
     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
     {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
@@ -489,37 +626,213 @@
 
   GNUNET_SERVER_add_handlers (srv, handlers);
   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
+  /* core can't be disconnected directly in the core startup callback, 
schedule a task to do it! */
   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
 }
 
 
+
 /**
- * Process consensus requests.
+ * Method called whenever another peer has added us to a tunnel
+ * the other peer initiated.
+ * Only called (once) upon reception of data with a message type which was
+ * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
+ * causes te tunnel to be ignored and no further notifications are sent about
+ * the same tunnel.
  *
  * @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param atsi performance information for the tunnel
+ * @return initial tunnel context for the tunnel
+ *         (can be NULL -- that's not an error)
+ */
+static void *
+new_tunnel (void *cls,
+            struct GNUNET_MESH_Tunnel *tunnel,
+            const struct GNUNET_PeerIdentity *initiator,
+            const struct GNUNET_ATS_Information *atsi)
+{
+  /* there's nothing we can do here, as we don't have the global consensus id 
yet */
+  return NULL;
+}
+
+
+/**
+ * Function called whenever an inbound tunnel is destroyed.  Should clean up
+ * any associated state.  This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ *                   with the tunnel is stored
+ */
+static void
+cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
+{
+  /* FIXME: what to do here? */
+}
+
+
+
+/**
+ * Called to clean up, after a shutdown has been requested.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+shutdown_task (void *cls,
+               const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  /* mesh requires all the tunnels to be destroyed manually */
+  while (NULL != sessions_head)
+  {
+    struct ConsensusSession *session;
+    session = sessions_head;
+    GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel);
+    sessions_head = sessions_head->next;
+    GNUNET_free (session);
+  }
+
+  if (NULL != mesh)
+  {
+    GNUNET_MESH_disconnect (mesh);
+    mesh = NULL;
+  }
+  if (NULL != core)
+  {
+    GNUNET_CORE_disconnect (core);
+    core = NULL;
+  }
+}
+
+
+
+/**
+ * Functions with this signature are called whenever a message is
+ * received.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+p2p_delta_estimate (void *cls,
+                    struct GNUNET_MESH_Tunnel * tunnel,
+                    void **tunnel_ctx,
+                    const struct GNUNET_PeerIdentity *sender,
+                    const struct GNUNET_MessageHeader *message,
+                    const struct GNUNET_ATS_Information *atsi)
+{
+  /* FIXME */
+  return GNUNET_OK;
+}
+
+
+/**
+ * Functions with this signature are called whenever a message is
+ * received.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+p2p_difference_digest (void *cls,
+                       struct GNUNET_MESH_Tunnel * tunnel,
+                       void **tunnel_ctx,
+                       const struct GNUNET_PeerIdentity *sender,
+                       const struct GNUNET_MessageHeader *message,
+                       const struct GNUNET_ATS_Information *atsi)
+{
+  /* FIXME */
+  return GNUNET_OK;
+}
+
+
+/**
+ * Functions with this signature are called whenever a message is
+ * received.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+p2p_elements_and_requests (void *cls,
+                           struct GNUNET_MESH_Tunnel * tunnel,
+                           void **tunnel_ctx,
+                           const struct GNUNET_PeerIdentity *sender,
+                           const struct GNUNET_MessageHeader *message,
+                           const struct GNUNET_ATS_Information *atsi)
+{
+  /* FIXME */
+  return GNUNET_OK;
+}
+
+
+/**
+ * Start processing consensus requests.
+ *
+ * @param cls closure
  * @param server the initialized server
  * @param c configuration to use
  */
 static void
 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct 
GNUNET_CONFIGURATION_Handle *c)
 {
-  struct GNUNET_CORE_Handle *my_core;
   static const struct GNUNET_CORE_MessageHandler handlers[] = {
     {NULL, 0, 0}
   };
+  static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
+    {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0},
+    {p2p_difference_digest, 
GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0},
+    {p2p_elements_and_requests, 
GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0},
+    {NULL, 0, 0}
+  };
+  static const GNUNET_MESH_ApplicationType app_types[] = { 
+    GNUNET_APPLICATION_TYPE_CONSENSUS,
+    GNUNET_APPLICATION_TYPE_END
+  };
 
-  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus  running\n");
+  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
 
   cfg = c;
   srv = server;
-  my_core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, 
GNUNET_NO, NULL, GNUNET_NO, handlers);
-  GNUNET_assert (NULL != my_core);
+
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, 
NULL);
+
+  mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers, 
app_types);
+  GNUNET_assert (NULL != mesh);
+
+  /* we have to wait for the core_startup callback before proceeding with the 
consensus service startup */
+  core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, 
GNUNET_NO, NULL, GNUNET_NO, handlers);
+  GNUNET_assert (NULL != core);
 }
 
 
 /**
- * The main function for the statistics service.
+ * The main function for the consensus service.
  *
  * @param argc number of arguments from the command line
  * @param argv command line arguments
@@ -528,7 +841,8 @@
 int
 main (int argc, char *const *argv)
 {
-  return (GNUNET_OK ==
-          GNUNET_SERVICE_run (argc, argv, "consensus", 
GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
+  int ret;
+  ret = GNUNET_SERVICE_run (argc, argv, "consensus", 
GNUNET_SERVICE_OPTION_NONE, &run, NULL);
+  return (GNUNET_OK == ret) ? 0 : 1;
 }
 

Modified: gnunet/src/consensus/test_consensus_api.c
===================================================================
--- gnunet/src/consensus/test_consensus_api.c   2013-01-02 16:03:25 UTC (rev 
25674)
+++ gnunet/src/consensus/test_consensus_api.c   2013-01-03 00:43:57 UTC (rev 
25675)
@@ -27,15 +27,10 @@
 #include "gnunet_testing_lib.h"
 
 
-static struct GNUNET_CONSENSUS_Handle *consensus1;
-static struct GNUNET_CONSENSUS_Handle *consensus2;
+static struct GNUNET_CONSENSUS_Handle *consensus;
 
-static int concluded1;
-static int concluded2;
+static int insert;
 
-static int insert1;
-static int insert2;
-
 static struct GNUNET_HashCode session_id;
 
 
@@ -48,20 +43,12 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n");
 }
 
-static void
+static int
 on_new_element (void *cls,
                 struct GNUNET_CONSENSUS_Element *element)
 {
-  struct GNUNET_CONSENSUS_Handle *consensus;
-
-  GNUNET_assert (NULL != element);
-
-  consensus = *(struct GNUNET_CONSENSUS_Handle **) cls;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
-
-  GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 
&conclude_done, consensus);
-
+  GNUNET_assert (0);
+  return GNUNET_YES;
 }
 
 static void
@@ -71,7 +58,6 @@
 }
 
 
-
 static void
 run (void *cls, 
      const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -89,11 +75,9 @@
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n");
 
   GNUNET_CRYPTO_hash (str, strlen (str), &session_id);
-  consensus1 = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, 
on_new_element, &consensus1);
+  consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, 
on_new_element, &consensus);
+  GNUNET_assert (consensus != NULL);
   /*
-  consensus2 = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, 
on_new_element, &consensus2);
-  GNUNET_assert (consensus1 != NULL);
-  GNUNET_assert (consensus2 != NULL);
   GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1);
   GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2);
   */

Modified: gnunet/src/include/gnunet_applications.h
===================================================================
--- gnunet/src/include/gnunet_applications.h    2013-01-02 16:03:25 UTC (rev 
25674)
+++ gnunet/src/include/gnunet_applications.h    2013-01-03 00:43:57 UTC (rev 
25675)
@@ -71,7 +71,12 @@
  */
 #define GNUNET_APPLICATION_TYPE_EXIT_REGEX_PREFIX "GNUNET-VPN-VER-0001-"
 
+/**
+ * Consensus.
+ */
+#define GNUNET_APPLICATION_TYPE_CONSENSUS 18
 
+
 #if 0                           /* keep Emacsens' auto-indent happy */
 {
 #endif

Modified: gnunet/src/include/gnunet_consensus_service.h
===================================================================
--- gnunet/src/include/gnunet_consensus_service.h       2013-01-02 16:03:25 UTC 
(rev 25674)
+++ gnunet/src/include/gnunet_consensus_service.h       2013-01-03 00:43:57 UTC 
(rev 25675)
@@ -76,8 +76,8 @@
  * @return GNUNET_OK if the valid is well-formed and should be added to the 
consensus,
  *         GNUNET_SYSERR if the element should be ignored and not be propagated
  */
-typedef int (*GNUNET_CONSENSUS_NewElementCallback) (void *cls,
-                                                   struct 
GNUNET_CONSENSUS_Element *element);
+typedef int (*GNUNET_CONSENSUS_ElementCallback) (void *cls,
+                                                    struct 
GNUNET_CONSENSUS_Element *element);
 
 
 
@@ -105,10 +105,10 @@
  */
 struct GNUNET_CONSENSUS_Handle *
 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                        unsigned int num_peers,
-                        const struct GNUNET_PeerIdentity *peers,
+                         unsigned int num_peers,
+                         const struct GNUNET_PeerIdentity *peers,
                          const struct GNUNET_HashCode *session_id,
-                         GNUNET_CONSENSUS_NewElementCallback new_element_cb,
+                         GNUNET_CONSENSUS_ElementCallback new_element_cb,
                          void *new_element_cls);
 
 
@@ -122,7 +122,7 @@
  *        the insertion and thus the consensus failed for good
  */
 typedef void (*GNUNET_CONSENSUS_InsertDoneCallback) (void *cls,
-                                                    int success);
+                                                     int success);
 
 
 /**
@@ -138,9 +138,9 @@
  */
 void
 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
-                        const struct GNUNET_CONSENSUS_Element *element,
-                        GNUNET_CONSENSUS_InsertDoneCallback idc,
-                        void *idc_cls);
+                         const struct GNUNET_CONSENSUS_Element *element,
+                         GNUNET_CONSENSUS_InsertDoneCallback idc,
+                         void *idc_cls);
 
 
 /**
@@ -168,9 +168,9 @@
  */
 struct GNUNET_CONSENSUS_DeltaRequest *
 GNUNET_CONSENSUS_get_delta (struct GNUNET_CONSENSUS_Handle *consensus,
-                           uint32_t group_id,
-                           GNUNET_CONSENSUS_NewElementCallback 
remove_element_cb,
-                           void *remove_element_cb_cls);
+                            uint32_t group_id,
+                            GNUNET_CONSENSUS_ElementCallback remove_element_cb,
+                            void *remove_element_cb_cls);
 
 
 void
@@ -184,7 +184,7 @@
   uint64_t total_elements_in_group;
   const struct GNUNET_PeerIdentity **members;
 };
-                                      
+                                       
 
 /**
  * Called when a conclusion was successful.
@@ -193,21 +193,9 @@
  * @param num_peers_in_consensus
  * @param peers_in_consensus
  */
-typedef void (*GNUNET_CONSENSUS_NewConcludeCallback) (void *cls, 
-                                                     unsigned int 
consensus_group_count,
-                                                     const struct 
GNUNET_CONSENSUS_Group *groups);
-
-
-/**
- * Called when a conclusion was successful.
- *
- * @param cls
- * @param num_peers_in_consensus
- * @param peers_in_consensus
- */
 typedef void (*GNUNET_CONSENSUS_ConcludeCallback) (void *cls, 
-                                                  unsigned int 
num_peers_in_consensus,
-                                                  const struct 
GNUNET_PeerIdentity *peers_in_consensus);
+                                                   unsigned int 
consensus_group_count,
+                                                   const struct 
GNUNET_CONSENSUS_Group *groups);
 
 
 /**
@@ -222,10 +210,10 @@
  */
 void
 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
-                          struct GNUNET_TIME_Relative timeout,
-                          //                      unsigned int 
min_group_size_in_consensus,
-                          GNUNET_CONSENSUS_ConcludeCallback conclude,
-                          void *conclude_cls);
+                           struct GNUNET_TIME_Relative timeout,
+                           unsigned int min_group_size_in_consensus,
+                           GNUNET_CONSENSUS_ConcludeCallback conclude,
+                           void *conclude_cls);
 
 
 /**

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2013-01-02 16:03:25 UTC (rev 
25674)
+++ gnunet/src/include/gnunet_protocols.h       2013-01-03 00:43:57 UTC (rev 
25675)
@@ -1657,36 +1657,31 @@
  */
 #define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE 525
 
+
+/* message types 526-539 reserved for consensus client/service messages */
+
+
+
 /**
  * Sent by client to service, telling whether a received element should
  * be accepted and propagated further or not.
  */
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK 527
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK 540
 
 /**
- * Update another peer's consensus set with new elements.
- */
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS 528
-
-/**
- * Request elements (by their hash) from another peer.
- */
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_REQUEST_ELEMENTS 529
-
-/**
  * Strata estimator.
  */
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_STRATA 530
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE 541
 
 /**
  * IBF containing all elements of a peer.
  */
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_IBF 531
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST 542
 
 /**
- * Request reconcilliation with another peer.
+ * Elements, and requests for further elements
  */
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_RECONCILE 532
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS 543
 
 
 /**




reply via email to

[Prev in Thread] Current Thread [Next in Thread]