[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r25675 - in gnunet/src: consensus include
From: |
gnunet |
Subject: |
[GNUnet-SVN] r25675 - in gnunet/src: consensus include |
Date: |
Thu, 3 Jan 2013 01:43:57 +0100 |
Author: dold
Date: 2013-01-03 01:43:57 +0100 (Thu, 03 Jan 2013)
New Revision: 25675
Modified:
gnunet/src/consensus/Makefile.am
gnunet/src/consensus/consensus.h
gnunet/src/consensus/consensus_api.c
gnunet/src/consensus/gnunet-consensus-start-peers.c
gnunet/src/consensus/gnunet-consensus.c
gnunet/src/consensus/gnunet-service-consensus.c
gnunet/src/consensus/test_consensus_api.c
gnunet/src/include/gnunet_applications.h
gnunet/src/include/gnunet_consensus_service.h
gnunet/src/include/gnunet_protocols.h
Log:
implemented the modified consensus api, started implementing p2p protocol for
consensus
Modified: gnunet/src/consensus/Makefile.am
===================================================================
--- gnunet/src/consensus/Makefile.am 2013-01-02 16:03:25 UTC (rev 25674)
+++ gnunet/src/consensus/Makefile.am 2013-01-03 00:43:57 UTC (rev 25675)
@@ -57,6 +57,7 @@
gnunet_service_consensus_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/core/libgnunetcore.la \
+ $(top_builddir)/src/mesh/libgnunetmesh.la \
$(GN_LIBINTL)
libgnunetconsensus_la_SOURCES = \
Modified: gnunet/src/consensus/consensus.h
===================================================================
--- gnunet/src/consensus/consensus.h 2013-01-02 16:03:25 UTC (rev 25674)
+++ gnunet/src/consensus/consensus.h 2013-01-03 00:43:57 UTC (rev 25675)
@@ -52,7 +52,15 @@
*/
struct GNUNET_MessageHeader header;
+ /**
+ * Timeout for conclude
+ */
struct GNUNET_TIME_RelativeNBO timeout;
+
+ /**
+ * Minimum group size required for a consensus group.
+ */
+ uint32_t min_group_size;
};
@@ -102,6 +110,7 @@
*/
uint8_t keep;
+ /* FIXME: add message hash? */
};
GNUNET_NETWORK_STRUCT_END
Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c 2013-01-02 16:03:25 UTC (rev
25674)
+++ gnunet/src/consensus/consensus_api.c 2013-01-03 00:43:57 UTC (rev
25675)
@@ -33,14 +33,43 @@
#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
-struct ElementAck
+/**
+ * Actions that can be queued.
+ */
+struct QueuedMessage
{
- struct ElementAck *next;
- struct ElementAck *prev;
- int keep;
- struct GNUNET_CONSENSUS_Element *element;
+ /**
+ * Queued messages are stored in a doubly linked list.
+ */
+ struct QueuedMessage *next;
+
+ /**
+ * Queued messages are stored in a doubly linked list.
+ */
+ struct QueuedMessage *prev;
+
+ /**
+ * The actual queued message.
+ */
+ struct GNUNET_MessageHeader *msg;
+
+ /**
+ * Size of the message in msg.
+ */
+ size_t size;
+
+ /**
+ * Will be called after transmit, if not NULL
+ */
+ GNUNET_CONSENSUS_InsertDoneCallback idc;
+
+ /**
+ * Closure for idc
+ */
+ void *idc_cls;
};
+
/**
* Handle for the service.
*/
@@ -52,14 +81,14 @@
const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Socket (if available).
+ * Client connected to the consensus service, may be NULL if not connected.
*/
struct GNUNET_CLIENT_Connection *client;
/**
* Callback for new elements. Not called for elements added locally.
*/
- GNUNET_CONSENSUS_NewElementCallback new_element_cb;
+ GNUNET_CONSENSUS_ElementCallback new_element_cb;
/**
* Closure for new_element_cb
@@ -67,7 +96,7 @@
void *new_element_cls;
/**
- * Session identifier for the consensus session.
+ * The (local) session identifier for the consensus session.
*/
struct GNUNET_HashCode session_id;
@@ -77,9 +106,9 @@
int num_peers;
/**
- * Peer identities of peers in the consensus. Optionally includes the local
peer.
+ * Peer identities of peers participating in the consensus, includes the
local peer.
*/
- struct GNUNET_PeerIdentity *peers;
+ struct GNUNET_PeerIdentity **peers;
/**
* Currently active transmit request.
@@ -92,22 +121,11 @@
int joined;
/**
- * Called when the current insertion operation finishes.
- * NULL if there is no insert operation active.
- */
- GNUNET_CONSENSUS_InsertDoneCallback idc;
-
- /**
* Closure for the insert done callback.
*/
void *idc_cls;
/**
- * An element that was requested to be inserted.
- */
- struct GNUNET_CONSENSUS_Element *insert_element;
-
- /**
* Called when the conclude operation finishes or fails.
*/
GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
@@ -122,103 +140,92 @@
*/
struct GNUNET_TIME_Absolute conclude_deadline;
- struct ElementAck *ack_head;
- struct ElementAck *ack_tail;
+ unsigned int conclude_min_size;
- /**
- * Set to GNUNET_YES if the begin message has been transmitted to the service
- */
- int begin_sent;
-
- /**
- * Set to GNUNET_YES it the begin message should be transmitted to the
service
- */
- int begin_requested;
+ struct QueuedMessage *messages_head;
+ struct QueuedMessage *messages_tail;
};
-static size_t
-transmit_ack (void *cls, size_t size, void *buf);
-static size_t
-transmit_insert (void *cls, size_t size, void *buf);
-
-static size_t
-transmit_conclude (void *cls, size_t size, void *buf);
-
-static size_t
-transmit_begin (void *cls, size_t size, void *buf);
-
-
/**
- * Call notify_transmit_ready for ack if necessary and possible.
+ * Schedule transmitting the next message.
+ *
+ * @param consensus consensus handle
*/
static void
-ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus)
-{
- if ((NULL == consensus->th) && (NULL != consensus->ack_head))
- {
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- sizeof (struct
GNUNET_CONSENSUS_AckMessage),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_ack,
consensus);
- }
-}
+schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus);
/**
- * Call notify_transmit_ready for ack if necessary and possible.
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
*/
-static void
-ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus)
+static size_t transmit_queued (void *cls, size_t size,
+ void *buf)
{
- if ((NULL == consensus->th) && (NULL != consensus->insert_element))
+ struct GNUNET_CONSENSUS_Handle *consensus;
+ struct QueuedMessage *qmsg;
+ size_t ret_size;
+
+ printf("transmitting queued\n");
+
+ consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
+ qmsg = consensus->messages_head;
+ GNUNET_CONTAINER_DLL_remove (consensus->messages_head,
consensus->messages_tail, qmsg);
+ GNUNET_assert (qmsg);
+
+ if (NULL == buf)
{
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- sizeof (struct
GNUNET_CONSENSUS_ElementMessage) +
-
consensus->insert_element->size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_insert,
consensus);
+ if (NULL != qmsg->idc)
+ {
+ qmsg->idc (qmsg->idc_cls, GNUNET_YES);
+ }
}
-}
-
-/**
- * Call notify_transmit_ready for ack if necessary and possible.
- */
-static void
-ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus)
-{
- if ((NULL == consensus->th) && (NULL != consensus->conclude_cb))
+ memcpy (buf, qmsg->msg, qmsg->size);
+ ret_size = qmsg->size;
+ if (NULL != qmsg->idc)
{
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- sizeof (struct
GNUNET_CONSENSUS_ConcludeMessage),
-
GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline),
- GNUNET_NO, &transmit_conclude,
consensus);
+ qmsg->idc (qmsg->idc_cls, GNUNET_YES);
}
+ GNUNET_free (qmsg->msg);
+ GNUNET_free (qmsg);
+
+ schedule_transmit (consensus);
+
+ return ret_size;
}
/**
- * Call notify_transmit_ready for ack if necessary and possible.
+ * Schedule transmitting the next message.
+ *
+ * @param consensus consensus handle
*/
static void
-ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
+schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus)
{
- if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) &&
- (GNUNET_NO == consensus->begin_sent))
+ if (NULL != consensus->th)
+ return;
+
+ if (NULL != consensus->messages_head)
{
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- sizeof (struct
GNUNET_MessageHeader),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_begin,
consensus);
+ LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
+ GNUNET_CLIENT_notify_transmit_ready (consensus->client,
consensus->messages_head->size,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_NO, &transmit_queued,
consensus);
}
}
+
/**
* Called when the server has sent is a new element
*
@@ -226,11 +233,12 @@
* @param msg element message
*/
static void
-handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
+handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
struct GNUNET_CONSENSUS_ElementMessage *msg)
{
struct GNUNET_CONSENSUS_Element element;
- struct ElementAck *ack;
+ struct GNUNET_CONSENSUS_AckMessage *ack_msg;
+ struct QueuedMessage *queued_msg;
int ret;
element.type = msg->element_type;
@@ -238,11 +246,15 @@
element.data = &msg[1];
ret = consensus->new_element_cb (consensus->new_element_cls, &element);
- ack = GNUNET_malloc (sizeof (struct ElementAck));
- ack->keep = ret;
- GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head,
consensus->ack_tail,ack);
- ntr_ack (consensus);
+ queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct
GNUNET_CONSENSUS_AckMessage));
+ queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1];
+
+ ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg;
+ ack_msg->keep = ret;
+
+ GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head,
consensus->messages_tail,
+ queued_msg);
}
@@ -254,13 +266,12 @@
* @param msg conclude done message
*/
static void
-handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
+handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
{
GNUNET_assert (NULL != consensus->conclude_cb);
- consensus->conclude_cb(consensus->conclude_cls,
- msg->num_peers,
- (struct GNUNET_PeerIdentity *) &msg[1]);
+ consensus->conclude_cb (consensus->conclude_cls,
+ 0, NULL);
consensus->conclude_cb = NULL;
}
@@ -287,12 +298,6 @@
GNUNET_CLIENT_disconnect (consensus->client);
consensus->client = NULL;
consensus->new_element_cb (NULL, NULL);
- if (NULL != consensus->idc)
- {
- consensus->idc(consensus->idc_cls, GNUNET_NO);
- consensus->idc = NULL;
- consensus->idc_cls = NULL;
- }
return;
}
@@ -305,15 +310,12 @@
handle_conclude_done (consensus, (struct
GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
break;
default:
- LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by
service, ignoring");
+ GNUNET_break (0);
}
GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
GNUNET_TIME_UNIT_FOREVER_REL);
}
-
-
-
/**
* Function called to notify a client about the connection
* begin ready to queue more data. "buf" will be
@@ -326,99 +328,6 @@
* @return number of bytes written to buf
*/
static size_t
-transmit_ack (void *cls, size_t size, void *buf)
-{
- struct GNUNET_CONSENSUS_AckMessage *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
-
- consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
-
- GNUNET_assert (NULL != consensus->ack_head);
-
- msg = (struct GNUNET_CONSENSUS_AckMessage *) buf;
- msg->keep = consensus->ack_head->keep;
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
- msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage));
-
- consensus->ack_head = consensus->ack_head->next;
-
- consensus->th = NULL;
-
- ntr_insert (consensus);
- ntr_ack (consensus);
- ntr_conclude (consensus);
-
- return sizeof (struct GNUNET_CONSENSUS_AckMessage);
-}
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_insert (void *cls, size_t size, void *buf)
-{
- struct GNUNET_CONSENSUS_ElementMessage *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- GNUNET_CONSENSUS_InsertDoneCallback idc;
- int msize;
- void *idc_cls;
-
- GNUNET_assert (NULL != buf);
-
- consensus = cls;
-
- GNUNET_assert (NULL != consensus->insert_element);
-
- consensus->th = NULL;
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
- consensus->insert_element->size;
-
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
- msg->header.size = htons (msize);
- memcpy (&msg[1],
- consensus->insert_element->data,
- consensus->insert_element->size);
-
- consensus->insert_element = NULL;
-
- idc = consensus->idc;
- consensus->idc = NULL;
- idc_cls = consensus->idc_cls;
- consensus->idc_cls = NULL;
- idc (idc_cls, GNUNET_YES);
-
-
- ntr_ack (consensus);
- ntr_insert (consensus);
- ntr_conclude (consensus);
-
- return msize;
-}
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
transmit_join (void *cls, size_t size, void *buf)
{
struct GNUNET_CONSENSUS_JoinMessage *msg;
@@ -427,7 +336,7 @@
GNUNET_assert (NULL != buf);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
+ LOG (GNUNET_ERROR_TYPE_INFO, "transmitting join message\n");
consensus = cls;
consensus->th = NULL;
@@ -447,9 +356,7 @@
consensus->peers,
consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
- ntr_insert (consensus);
- ntr_begin (consensus);
- ntr_conclude (consensus);
+ schedule_transmit (consensus);
GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
GNUNET_TIME_UNIT_FOREVER_REL);
@@ -457,88 +364,11 @@
return msize;
}
-
/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_conclude (void *cls, size_t size, void *buf)
-{
- struct GNUNET_CONSENSUS_ConcludeMessage *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- int msize;
-
- GNUNET_assert (NULL != buf);
-
- consensus = cls;
- consensus->th = NULL;
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
-
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
- msg->header.size = htons (msize);
- msg->timeout =
- GNUNET_TIME_relative_hton
(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
-
- ntr_ack (consensus);
-
- return msize;
-}
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls the consensus handle
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_begin (void *cls, size_t size, void *buf)
-{
- struct GNUNET_MessageHeader *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- int msize;
-
- GNUNET_assert (NULL != buf);
-
- consensus = cls;
- consensus->th = NULL;
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_MessageHeader);
-
- msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
- msg->size = htons (msize);
-
- ntr_ack (consensus);
- ntr_insert (consensus);
- ntr_conclude (consensus);
-
- return msize;
-}
-
-
-/**
* Create a consensus session.
*
- * @param cfg
- * @param num_peers
+ * @param cfg configuration to use for connecting to the consensus service
+ * @param num_peers number of peers in the peers array
* @param peers array of peers participating in this consensus session
* Inclusion of the local peer is optional.
* @param session_id session identifier
@@ -553,7 +383,7 @@
unsigned int num_peers,
const struct GNUNET_PeerIdentity *peers,
const struct GNUNET_HashCode *session_id,
- GNUNET_CONSENSUS_NewElementCallback new_element_cb,
+ GNUNET_CONSENSUS_ElementCallback new_element_cb,
void *new_element_cls)
{
struct GNUNET_CONSENSUS_Handle *consensus;
@@ -567,17 +397,10 @@
consensus->session_id = *session_id;
if (0 == num_peers)
- {
consensus->peers = NULL;
- }
else if (num_peers > 0)
- {
- consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct
GNUNET_PeerIdentity));
- }
- else
- {
- GNUNET_break (0);
- }
+ consensus->peers =
+ GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
@@ -615,45 +438,37 @@
GNUNET_CONSENSUS_InsertDoneCallback idc,
void *idc_cls)
{
- GNUNET_assert (NULL == consensus->idc);
- GNUNET_assert (NULL == consensus->insert_element);
- GNUNET_assert (NULL == consensus->conclude_cb);
+ struct QueuedMessage *qmsg;
+ struct GNUNET_CONSENSUS_ElementMessage *element_msg;
+ size_t element_msg_size;
- consensus->idc = idc;
- consensus->idc_cls = idc_cls;
- consensus->insert_element = GNUNET_memdup(element, sizeof (struct
GNUNET_CONSENSUS_Element) + element->size);
+ LOG (GNUNET_ERROR_TYPE_INFO, "inserting, size=%llu\n", element->size);
- if (consensus->joined == 0)
- {
- return;
- }
+ element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
+ element->size);
- ntr_insert (consensus);
-}
+ element_msg = GNUNET_malloc (element_msg_size);
+ element_msg->header.type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
+ element_msg->header.size = htons (element_msg_size);
+ memcpy (&element_msg[1], element->data, element->size);
+ qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
+ qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
+ qmsg->size = element_msg_size;
+ qmsg->idc = idc;
+ qmsg->idc_cls = idc_cls;
-/**
- * Begin reconciling elements with other peers.
- *
- * @param consensus handle for the consensus session
- */
-void
-GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
-{
- GNUNET_assert (NULL == consensus->idc);
- GNUNET_assert (NULL == consensus->insert_element);
- GNUNET_assert (GNUNET_NO == consensus->begin_requested);
- GNUNET_assert (GNUNET_NO == consensus->begin_sent);
+ GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head,
consensus->messages_tail, qmsg);
- consensus->begin_requested = GNUNET_YES;
-
- ntr_begin (consensus);
+ schedule_transmit (consensus);
}
/**
- * We are finished inserting new elements into the consensus;
+ * We are done with inserting new elements into the consensus;
* try to conclude the consensus within a given time window.
+ * After conclude has been called, no further elements may be
+ * inserted by the client.
*
* @param consensus consensus session
* @param timeout timeout after which the conculde callback
@@ -664,20 +479,32 @@
void
GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
struct GNUNET_TIME_Relative timeout,
+ unsigned int min_group_size_in_consensus,
GNUNET_CONSENSUS_ConcludeCallback conclude,
void *conclude_cls)
{
+ struct QueuedMessage *qmsg;
+ struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
+
GNUNET_assert (NULL != conclude);
GNUNET_assert (NULL == consensus->conclude_cb);
consensus->conclude_cls = conclude_cls;
consensus->conclude_cb = conclude;
- consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
+ conclude_msg = GNUNET_malloc (sizeof (struct
GNUNET_CONSENSUS_ConcludeMessage));
+ conclude_msg->header.type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
+ conclude_msg->header.size = htons (sizeof (struct
GNUNET_CONSENSUS_ConcludeMessage));
+ conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
+ conclude_msg->min_group_size = min_group_size_in_consensus;
- /* if transmitting the conclude message is not possible right now,
transmit_join
- * or transmit_ack will handle it */
- ntr_conclude (consensus);
+ qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
+ qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
+ qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
+
+ GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head,
consensus->messages_tail, qmsg);
+
+ schedule_transmit (consensus);
}
Modified: gnunet/src/consensus/gnunet-consensus-start-peers.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-01-02 16:03:25 UTC
(rev 25674)
+++ gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-01-03 00:43:57 UTC
(rev 25675)
@@ -147,6 +147,9 @@
NULL,
test_master,
NULL);
+
+
+ printf("hello there!\n");
}
Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c 2013-01-02 16:03:25 UTC (rev
25674)
+++ gnunet/src/consensus/gnunet-consensus.c 2013-01-03 00:43:57 UTC (rev
25675)
@@ -62,10 +62,10 @@
*/
static void
conclude_cb (void *cls,
- unsigned int num_peers_in_consensus,
- const struct GNUNET_PeerIdentity *peers_in_consensus)
+ unsigned int consensus_group_count,
+ const struct GNUNET_CONSENSUS_Group *groups)
{
- printf("reached conclusion with %d peers\n", num_peers_in_consensus);
+ printf("reached conclusion\n");
GNUNET_SCHEDULER_shutdown ();
}
@@ -111,7 +111,7 @@
if (feof (stdin))
{
printf ("concluding ...\n");
- GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL,
conclude_cb, NULL);
+ GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 0,
conclude_cb, NULL);
}
return;
}
@@ -144,6 +144,12 @@
cb (void *cls,
struct GNUNET_CONSENSUS_Element *element)
{
+ if (NULL == element)
+ {
+ printf("error receiving from consensus\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_NO;
+ }
printf("got element\n");
return GNUNET_YES;
}
@@ -178,10 +184,12 @@
if (NULL == session_id_str)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given (missing
-s/--session-id)\n");
return;
}
+ GNUNET_CRYPTO_hash (session_id_str, strlen (session_id_str), &sid);
+
for (count = 0; NULL != args[count]; count++);
if (0 != count)
@@ -213,9 +221,6 @@
&sid,
&cb, NULL);
- GNUNET_CONSENSUS_begin (consensus);
-
-
stdin_fh = GNUNET_DISK_get_handle_from_native (stdin);
stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
stdin_fh,
&stdin_cb, NULL);
Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c 2013-01-02 16:03:25 UTC
(rev 25674)
+++ gnunet/src/consensus/gnunet-service-consensus.c 2013-01-03 00:43:57 UTC
(rev 25675)
@@ -16,12 +16,19 @@
along with GNUnet; see the file COPYING. If not, write to the
Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
+*/
+
+
+/**
+ * @file consensus/gnunet-service-consensus.c
+ * @brief
+ * @author Florian Dold
*/
-
#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
#include "gnunet_util_lib.h"
#include "gnunet_consensus_service.h"
#include "gnunet_core_service.h"
@@ -57,6 +64,24 @@
};
+/*
+ * A peer that is also in a consensus session.
+ * Note that 'this' peer is not in the list.
+ */
+struct ConsensusPeer
+{
+ struct GNUNET_PeerIdentity *peer_id;
+
+ /**
+ * Incoming tunnel from the peer.
+ */
+ struct GNUNET_MESH_Tunnel *incoming_tunnel;
+
+ struct InvertibleBloomFilter *last_ibf;
+
+};
+
+
/**
* A consensus session consists of one local client and the remote authorities.
*/
@@ -84,18 +109,14 @@
struct GNUNET_HashCode *global_id;
/**
- * Corresponding server handle.
+ * Local client in this consensus session.
+ * There is only one client per consensus session.
*/
struct GNUNET_SERVER_Client *client;
/**
- * Client wants to receive and send updates.
- */
- int begin;
-
- /**
* Values in the consensus set of this session,
- * all of them either have been sent or approved by the client.
+ * all of them either have been sent by or approved by the client.
*/
struct GNUNET_CONTAINER_MultiHashMap *values;
@@ -110,12 +131,12 @@
struct PendingElement *transmit_pending_tail;
/**
- * Elements that have not been sent to the client yet.
+ * Elements that have not been approved (or rejected) by the client yet.
*/
struct PendingElement *approval_pending_head;
/**
- * Elements that have not been sent to the client yet.
+ * Elements that have not been approved (or rejected) by the client yet.
*/
struct PendingElement *approval_pending_tail;
@@ -136,9 +157,47 @@
int conclude_sent;
/**
+ * Minimum number of peers to form a consensus group
+ */
+ int conclude_group_min;
+
+ /**
+ * Current round of the conclusion
+ */
+ int current_round;
+
+ /**
+ * Soft deadline for conclude.
+ * Speed up the speed of the consensus at the cost of consensus quality, as
+ * the time approached or crosses the deadline.
+ */
+ struct GNUNET_TIME_Absolute conclude_deadline;
+
+ /**
* Number of other peers in the consensus
*/
- int num_peers;
+ unsigned int num_peers;
+
+ /**
+ * Other peers in the consensus, array of ConsensusPeer
+ */
+ struct ConsensusPeer *peers;
+
+ /**
+ * Tunnel for broadcasting to all other authorities
+ */
+ struct GNUNET_MESH_Tunnel *broadcast_tunnel;
+
+ /**
+ * Time limit for one round of pairwise exchange.
+ * FIXME: should not actually be a constant
+ */
+ struct GNUNET_TIME_Relative round_time;
+
+ /**
+ * Task identifier for the round timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
};
@@ -167,10 +226,21 @@
*/
static struct GNUNET_PeerIdentity *my_peer;
+/**
+ * Handle to the mesh service.
+ */
+static struct GNUNET_MESH_Handle *mesh;
+
+/**
+ * Handle to the core service. Only used during service startup, will be NULL
after that.
+ */
+static struct GNUNET_CORE_Handle *core;
+
static void
disconnect_client (struct GNUNET_SERVER_Client *client)
{
- /* FIXME */
+ GNUNET_SERVER_client_disconnect (client);
+ /* FIXME: free data structures that this client owns */
}
static void
@@ -185,7 +255,6 @@
*dst = *local_id;
for (i = 0; i < num_peers; ++i)
{
- /* FIXME: maybe hash_xor/hash allow aliased source/target, and we can get
by without tmp */
GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
*dst = tmp;
GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
@@ -255,7 +324,7 @@
if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent
== GNUNET_NO))
{
- /* just the conclude message with no other authorities in the dummy */
+ /* FIXME */
msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
session->th =
GNUNET_SERVER_notify_transmit_ready (session->client, msize,
@@ -274,6 +343,39 @@
/**
+ * Method called whenever a peer has disconnected from the tunnel.
+ * Implementations of this callback must NOT call
+ * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those
+ * to run in some other task later. However, calling
+ * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed.
+ *
+ * @param cls closure
+ * @param peer peer identity the tunnel stopped working with
+ */
+static void
+disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+ /* FIXME: how do we handle this */
+}
+
+
+/**
+ * Method called whenever a peer has connected to the tunnel.
+ *
+ * @param cls closure
+ * @param peer peer identity the tunnel was created to, NULL on timeout
+ * @param atsi performance data for the connection
+ */
+static void
+connect_handler (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ /* not much we can do here, now we know the other peer has been added to our
broadcast tunnel */
+}
+
+
+/**
* Called when a client wants to join a consensus session.
*
* @param cls unused
@@ -288,18 +390,24 @@
struct GNUNET_HashCode global_id;
const struct GNUNET_CONSENSUS_JoinMessage *msg;
struct ConsensusSession *session;
+ unsigned int i;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s
(&msg->session_id));
+
compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity
*) &m[1], msg->num_peers);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s
(&global_id));
+
session = sessions_head;
while (NULL != session)
{
if (client == session->client)
{
+
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
disconnect_client (client);
return;
@@ -310,6 +418,7 @@
disconnect_client (client);
return;
}
+ session = session->next;
}
GNUNET_SERVER_client_keep (client);
@@ -320,11 +429,40 @@
session->global_id = GNUNET_memdup (&global_id, sizeof (struct
GNUNET_HashCode));
session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
session->client = client;
+ /* FIXME: should not be a constant, but chosen adaptively */
+ session->round_time = GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 5);
- GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
+ session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session,
connect_handler, disconnect_handler, session);
+ session->num_peers = 0;
+
+ /* count the peers that are not the local peer */
+ for (i = 0; i < msg->num_peers; i++)
+ {
+ struct GNUNET_PeerIdentity *peers;
+ peers = (struct GNUNET_PeerIdentity *) &msg[1];
+ if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
+ session->num_peers++;
+ }
+
+ session->peers = GNUNET_malloc (session->num_peers * sizeof (struct
ConsensusPeer));
+
+ /* copy the peer identities and add peers to broadcast tunnel */
+ for (i = 0; i < msg->num_peers; i++)
+ {
+ struct GNUNET_PeerIdentity *peers;
+ peers = (struct GNUNET_PeerIdentity *) &msg[1];
+ if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
+ {
+ *session->peers->peer_id = peers[i];
+ GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel,
&peers[i]);
+ }
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n");
+ GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
+
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -381,40 +519,28 @@
/**
- * Called when a client wants to begin
+ * Do one round of the conclusion.
+ * Start by broadcasting the set difference estimator (IBF strata).
+ *
*/
void
-client_begin (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
+conclude_do_round (struct ConsensusSession *session)
{
- struct ConsensusSession *session;
+ /* FIXME */
+}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client requested begin\n");
- session = sessions_head;
- while (NULL != session)
- {
- if (session->client == client)
- break;
- }
-
- if (NULL == session)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to 'begin', but
client is not in any session\n");
- GNUNET_SERVER_client_disconnect (client);
- return;
- }
-
- session->begin = GNUNET_YES;
-
- send_next (session);
-
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+/**
+ * Cancel the current round if necessary, decide to run another round or
+ * terminate.
+ */
+void
+conclude_round_done (struct ConsensusSession *session)
+{
+ /* FIXME */
}
-
/**
* Called when a client performs the conclude operation.
*/
@@ -425,6 +551,8 @@
{
struct ConsensusSession *session;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
+
session = sessions_head;
while ((session != NULL) && (session->client != client))
{
@@ -432,12 +560,25 @@
}
if (NULL == session)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n");
GNUNET_SERVER_client_disconnect (client);
return;
}
+
+ if (GNUNET_YES == session->conclude_requested)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude
twice\n");
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+
session->conclude_requested = GNUNET_YES;
+
+ conclude_do_round (session);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
send_next (session);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -462,10 +603,8 @@
disconnect_core (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct GNUNET_CORE_Handle *core;
- core = (struct GNUNET_CORE_Handle *) cls;
GNUNET_CORE_disconnect (core);
-
+ core = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
}
@@ -478,8 +617,6 @@
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
{&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
{&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
- {&client_begin, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN,
- sizeof (struct GNUNET_MessageHeader)},
{&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
{&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
@@ -489,37 +626,213 @@
GNUNET_SERVER_add_handlers (srv, handlers);
my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
+ /* core can't be disconnected directly in the core startup callback,
schedule a task to do it! */
GNUNET_SCHEDULER_add_now (&disconnect_core, core);
GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
}
+
/**
- * Process consensus requests.
+ * Method called whenever another peer has added us to a tunnel
+ * the other peer initiated.
+ * Only called (once) upon reception of data with a message type which was
+ * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
+ * causes te tunnel to be ignored and no further notifications are sent about
+ * the same tunnel.
*
* @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param atsi performance information for the tunnel
+ * @return initial tunnel context for the tunnel
+ * (can be NULL -- that's not an error)
+ */
+static void *
+new_tunnel (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *initiator,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ /* there's nothing we can do here, as we don't have the global consensus id
yet */
+ return NULL;
+}
+
+
+/**
+ * Function called whenever an inbound tunnel is destroyed. Should clean up
+ * any associated state. This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ * with the tunnel is stored
+ */
+static void
+cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
+{
+ /* FIXME: what to do here? */
+}
+
+
+
+/**
+ * Called to clean up, after a shutdown has been requested.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ /* mesh requires all the tunnels to be destroyed manually */
+ while (NULL != sessions_head)
+ {
+ struct ConsensusSession *session;
+ session = sessions_head;
+ GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel);
+ sessions_head = sessions_head->next;
+ GNUNET_free (session);
+ }
+
+ if (NULL != mesh)
+ {
+ GNUNET_MESH_disconnect (mesh);
+ mesh = NULL;
+ }
+ if (NULL != core)
+ {
+ GNUNET_CORE_disconnect (core);
+ core = NULL;
+ }
+}
+
+
+
+/**
+ * Functions with this signature are called whenever a message is
+ * received.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+p2p_delta_estimate (void *cls,
+ struct GNUNET_MESH_Tunnel * tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ /* FIXME */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Functions with this signature are called whenever a message is
+ * received.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+p2p_difference_digest (void *cls,
+ struct GNUNET_MESH_Tunnel * tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ /* FIXME */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Functions with this signature are called whenever a message is
+ * received.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+p2p_elements_and_requests (void *cls,
+ struct GNUNET_MESH_Tunnel * tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ /* FIXME */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Start processing consensus requests.
+ *
+ * @param cls closure
* @param server the initialized server
* @param c configuration to use
*/
static void
run (void *cls, struct GNUNET_SERVER_Handle *server, const struct
GNUNET_CONFIGURATION_Handle *c)
{
- struct GNUNET_CORE_Handle *my_core;
static const struct GNUNET_CORE_MessageHandler handlers[] = {
{NULL, 0, 0}
};
+ static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
+ {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0},
+ {p2p_difference_digest,
GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0},
+ {p2p_elements_and_requests,
GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0},
+ {NULL, 0, 0}
+ };
+ static const GNUNET_MESH_ApplicationType app_types[] = {
+ GNUNET_APPLICATION_TYPE_CONSENSUS,
+ GNUNET_APPLICATION_TYPE_END
+ };
- GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
+ GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
cfg = c;
srv = server;
- my_core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL,
GNUNET_NO, NULL, GNUNET_NO, handlers);
- GNUNET_assert (NULL != my_core);
+
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
NULL);
+
+ mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers,
app_types);
+ GNUNET_assert (NULL != mesh);
+
+ /* we have to wait for the core_startup callback before proceeding with the
consensus service startup */
+ core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL,
GNUNET_NO, NULL, GNUNET_NO, handlers);
+ GNUNET_assert (NULL != core);
}
/**
- * The main function for the statistics service.
+ * The main function for the consensus service.
*
* @param argc number of arguments from the command line
* @param argv command line arguments
@@ -528,7 +841,8 @@
int
main (int argc, char *const *argv)
{
- return (GNUNET_OK ==
- GNUNET_SERVICE_run (argc, argv, "consensus",
GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
+ int ret;
+ ret = GNUNET_SERVICE_run (argc, argv, "consensus",
GNUNET_SERVICE_OPTION_NONE, &run, NULL);
+ return (GNUNET_OK == ret) ? 0 : 1;
}
Modified: gnunet/src/consensus/test_consensus_api.c
===================================================================
--- gnunet/src/consensus/test_consensus_api.c 2013-01-02 16:03:25 UTC (rev
25674)
+++ gnunet/src/consensus/test_consensus_api.c 2013-01-03 00:43:57 UTC (rev
25675)
@@ -27,15 +27,10 @@
#include "gnunet_testing_lib.h"
-static struct GNUNET_CONSENSUS_Handle *consensus1;
-static struct GNUNET_CONSENSUS_Handle *consensus2;
+static struct GNUNET_CONSENSUS_Handle *consensus;
-static int concluded1;
-static int concluded2;
+static int insert;
-static int insert1;
-static int insert2;
-
static struct GNUNET_HashCode session_id;
@@ -48,20 +43,12 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n");
}
-static void
+static int
on_new_element (void *cls,
struct GNUNET_CONSENSUS_Element *element)
{
- struct GNUNET_CONSENSUS_Handle *consensus;
-
- GNUNET_assert (NULL != element);
-
- consensus = *(struct GNUNET_CONSENSUS_Handle **) cls;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
-
- GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL,
&conclude_done, consensus);
-
+ GNUNET_assert (0);
+ return GNUNET_YES;
}
static void
@@ -71,7 +58,6 @@
}
-
static void
run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -89,11 +75,9 @@
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n");
GNUNET_CRYPTO_hash (str, strlen (str), &session_id);
- consensus1 = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id,
on_new_element, &consensus1);
+ consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id,
on_new_element, &consensus);
+ GNUNET_assert (consensus != NULL);
/*
- consensus2 = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id,
on_new_element, &consensus2);
- GNUNET_assert (consensus1 != NULL);
- GNUNET_assert (consensus2 != NULL);
GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1);
GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2);
*/
Modified: gnunet/src/include/gnunet_applications.h
===================================================================
--- gnunet/src/include/gnunet_applications.h 2013-01-02 16:03:25 UTC (rev
25674)
+++ gnunet/src/include/gnunet_applications.h 2013-01-03 00:43:57 UTC (rev
25675)
@@ -71,7 +71,12 @@
*/
#define GNUNET_APPLICATION_TYPE_EXIT_REGEX_PREFIX "GNUNET-VPN-VER-0001-"
+/**
+ * Consensus.
+ */
+#define GNUNET_APPLICATION_TYPE_CONSENSUS 18
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: gnunet/src/include/gnunet_consensus_service.h
===================================================================
--- gnunet/src/include/gnunet_consensus_service.h 2013-01-02 16:03:25 UTC
(rev 25674)
+++ gnunet/src/include/gnunet_consensus_service.h 2013-01-03 00:43:57 UTC
(rev 25675)
@@ -76,8 +76,8 @@
* @return GNUNET_OK if the valid is well-formed and should be added to the
consensus,
* GNUNET_SYSERR if the element should be ignored and not be propagated
*/
-typedef int (*GNUNET_CONSENSUS_NewElementCallback) (void *cls,
- struct
GNUNET_CONSENSUS_Element *element);
+typedef int (*GNUNET_CONSENSUS_ElementCallback) (void *cls,
+ struct
GNUNET_CONSENSUS_Element *element);
@@ -105,10 +105,10 @@
*/
struct GNUNET_CONSENSUS_Handle *
GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
- unsigned int num_peers,
- const struct GNUNET_PeerIdentity *peers,
+ unsigned int num_peers,
+ const struct GNUNET_PeerIdentity *peers,
const struct GNUNET_HashCode *session_id,
- GNUNET_CONSENSUS_NewElementCallback new_element_cb,
+ GNUNET_CONSENSUS_ElementCallback new_element_cb,
void *new_element_cls);
@@ -122,7 +122,7 @@
* the insertion and thus the consensus failed for good
*/
typedef void (*GNUNET_CONSENSUS_InsertDoneCallback) (void *cls,
- int success);
+ int success);
/**
@@ -138,9 +138,9 @@
*/
void
GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
- const struct GNUNET_CONSENSUS_Element *element,
- GNUNET_CONSENSUS_InsertDoneCallback idc,
- void *idc_cls);
+ const struct GNUNET_CONSENSUS_Element *element,
+ GNUNET_CONSENSUS_InsertDoneCallback idc,
+ void *idc_cls);
/**
@@ -168,9 +168,9 @@
*/
struct GNUNET_CONSENSUS_DeltaRequest *
GNUNET_CONSENSUS_get_delta (struct GNUNET_CONSENSUS_Handle *consensus,
- uint32_t group_id,
- GNUNET_CONSENSUS_NewElementCallback
remove_element_cb,
- void *remove_element_cb_cls);
+ uint32_t group_id,
+ GNUNET_CONSENSUS_ElementCallback remove_element_cb,
+ void *remove_element_cb_cls);
void
@@ -184,7 +184,7 @@
uint64_t total_elements_in_group;
const struct GNUNET_PeerIdentity **members;
};
-
+
/**
* Called when a conclusion was successful.
@@ -193,21 +193,9 @@
* @param num_peers_in_consensus
* @param peers_in_consensus
*/
-typedef void (*GNUNET_CONSENSUS_NewConcludeCallback) (void *cls,
- unsigned int
consensus_group_count,
- const struct
GNUNET_CONSENSUS_Group *groups);
-
-
-/**
- * Called when a conclusion was successful.
- *
- * @param cls
- * @param num_peers_in_consensus
- * @param peers_in_consensus
- */
typedef void (*GNUNET_CONSENSUS_ConcludeCallback) (void *cls,
- unsigned int
num_peers_in_consensus,
- const struct
GNUNET_PeerIdentity *peers_in_consensus);
+ unsigned int
consensus_group_count,
+ const struct
GNUNET_CONSENSUS_Group *groups);
/**
@@ -222,10 +210,10 @@
*/
void
GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
- struct GNUNET_TIME_Relative timeout,
- // unsigned int
min_group_size_in_consensus,
- GNUNET_CONSENSUS_ConcludeCallback conclude,
- void *conclude_cls);
+ struct GNUNET_TIME_Relative timeout,
+ unsigned int min_group_size_in_consensus,
+ GNUNET_CONSENSUS_ConcludeCallback conclude,
+ void *conclude_cls);
/**
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2013-01-02 16:03:25 UTC (rev
25674)
+++ gnunet/src/include/gnunet_protocols.h 2013-01-03 00:43:57 UTC (rev
25675)
@@ -1657,36 +1657,31 @@
*/
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE 525
+
+/* message types 526-539 reserved for consensus client/service messages */
+
+
+
/**
* Sent by client to service, telling whether a received element should
* be accepted and propagated further or not.
*/
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK 527
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK 540
/**
- * Update another peer's consensus set with new elements.
- */
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS 528
-
-/**
- * Request elements (by their hash) from another peer.
- */
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_REQUEST_ELEMENTS 529
-
-/**
* Strata estimator.
*/
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_STRATA 530
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE 541
/**
* IBF containing all elements of a peer.
*/
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_IBF 531
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST 542
/**
- * Request reconcilliation with another peer.
+ * Elements, and requests for further elements
*/
-#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_RECONCILE 532
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS 543
/**
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r25675 - in gnunet/src: consensus include,
gnunet <=