[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r26526 - in gnunet/src: consensus include
From: |
gnunet |
Subject: |
[GNUnet-SVN] r26526 - in gnunet/src: consensus include |
Date: |
Thu, 21 Mar 2013 02:06:40 +0100 |
Author: dold
Date: 2013-03-21 02:06:40 +0100 (Thu, 21 Mar 2013)
New Revision: 26526
Added:
gnunet/src/consensus/consensus_flout.h
Modified:
gnunet/src/consensus/consensus_protocol.h
gnunet/src/consensus/gnunet-consensus-ibf.c
gnunet/src/consensus/gnunet-consensus.c
gnunet/src/consensus/gnunet-service-consensus.c
gnunet/src/consensus/ibf.c
gnunet/src/consensus/ibf.h
gnunet/src/consensus/test_consensus.conf
gnunet/src/include/gnunet_container_lib.h
Log:
fixed consensus multi-peer communication, memory leaks, various bugs
Added: gnunet/src/consensus/consensus_flout.h
===================================================================
--- gnunet/src/consensus/consensus_flout.h (rev 0)
+++ gnunet/src/consensus/consensus_flout.h 2013-03-21 01:06:40 UTC (rev
26526)
@@ -0,0 +1,60 @@
+/*
+ This file is part of GNUnet
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * @file consensus/consensus_flout.h
+ * @brief intentionally misbehave in certain ways for testing
+ * @author Florian Dold
+ */
+
+#ifndef GNUNET_CONSENSUS_FLOUT_H
+#define GNUNET_CONSENSUS_FLOUT_H
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0 /* keep Emacsens' auto-indent happy */
+}
+#endif
+#endif
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_consensus_service.h"
+
+void
+GNUNET_CONSENSUS_flout_disable_peer (struct GNUNET_CONSENSUS_Handle
*consensus);
+
+void
+GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle
*consensus, struct GNUNET_HashCode *element_hash);
+
+void
+GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle
*consensus, struct GNUNET_HashCode *element_hash);
+
+
+
+#if 0 /* keep Emacsens' auto-indent happy */
+{
+#endif
+#ifdef __cplusplus
+}
+#endif
+
+#endif
Modified: gnunet/src/consensus/consensus_protocol.h
===================================================================
--- gnunet/src/consensus/consensus_protocol.h 2013-03-20 19:15:55 UTC (rev
26525)
+++ gnunet/src/consensus/consensus_protocol.h 2013-03-21 01:06:40 UTC (rev
26526)
@@ -76,12 +76,10 @@
struct GNUNET_HashCode global_id;
};
-struct ConsensusRoundHeader
+struct ConsensusRoundMessage
{
struct GNUNET_MessageHeader header;
uint8_t round;
- uint8_t exp_round;
- uint8_t exp_subround;
};
Modified: gnunet/src/consensus/gnunet-consensus-ibf.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-ibf.c 2013-03-20 19:15:55 UTC (rev
26525)
+++ gnunet/src/consensus/gnunet-consensus-ibf.c 2013-03-21 01:06:40 UTC (rev
26526)
@@ -19,7 +19,7 @@
*/
/**
- * @file consensus/gnunet-consensus-ibf
+ * @file consensus/gnunet-consensus-ibf.c
* @brief tool for reconciling data with invertible bloom filters
* @author Florian Dold
*/
Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c 2013-03-20 19:15:55 UTC (rev
26525)
+++ gnunet/src/consensus/gnunet-consensus.c 2013-03-21 01:06:40 UTC (rev
26526)
@@ -39,6 +39,8 @@
static struct GNUNET_CONSENSUS_Handle **consensus_handles;
+static struct GNUNET_TESTBED_Operation **testbed_operations;
+
static unsigned int num_connected_handles;
static struct GNUNET_TESTBED_Peer **peers;
@@ -49,7 +51,9 @@
static struct GNUNET_HashCode session_id;
+static unsigned int peers_done = 0;
+
/**
* Signature of the event handler function called by the
* respective event controller.
@@ -64,7 +68,6 @@
GNUNET_assert (0);
}
-
static void
destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx)
{
@@ -72,14 +75,21 @@
consensus = cls;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying consensus\n");
GNUNET_CONSENSUS_destroy (consensus);
+ peers_done++;
+ if (peers_done == num_peers)
+ {
+ int i;
+ for (i = 0; i < num_peers; i++)
+ GNUNET_TESTBED_operation_done (testbed_operations[i]);
+ GNUNET_SCHEDULER_shutdown ();
+ }
}
/**
* Called when a conclusion was successful.
*
- * @param cls
- * @param group
+ * @param cls closure, the consensus handle
* @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if
not
*/
static void
@@ -255,8 +265,9 @@
num_retrieved_peer_ids++;
if (num_retrieved_peer_ids == num_peers)
for (i = 0; i < num_peers; i++)
- GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus",
connect_complete, &consensus_handles[i],
- connect_adapter, disconnect_adapter,
NULL);
+ testbed_operations[i] =
+ GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus",
connect_complete, &consensus_handles[i],
+ connect_adapter,
disconnect_adapter, NULL);
}
else
{
@@ -272,7 +283,6 @@
{
int i;
-
GNUNET_log_setup ("gnunet-consensus", "INFO", NULL);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
@@ -283,6 +293,7 @@
peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
consensus_handles = GNUNET_malloc (num_peers * sizeof (struct
ConsensusHandle *));
+ testbed_operations = GNUNET_malloc (num_peers * sizeof (struct
ConsensusHandle *));
for (i = 0; i < num_peers; i++)
GNUNET_TESTBED_peer_get_information (peers[i],
Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c 2013-03-20 19:15:55 UTC
(rev 26525)
+++ gnunet/src/consensus/gnunet-service-consensus.c 2013-03-21 01:06:40 UTC
(rev 26526)
@@ -124,7 +124,43 @@
};
+struct ElementList
+{
+ struct ElementList *next;
+ struct GNUNET_CONSENSUS_Element *element;
+ struct GNUNET_HashCode *element_hash;
+};
+
+
/**
+ * Describes the current round a consensus session is in.
+ */
+enum ConsensusRound
+{
+ /**
+ * Not started the protocol yet.
+ */
+ CONSENSUS_ROUND_BEGIN=0,
+ /**
+ * Distribution of elements with the exponential scheme.
+ */
+ CONSENSUS_ROUND_EXCHANGE,
+ /**
+ * Exchange which elements each peer has, but not the elements.
+ */
+ CONSENSUS_ROUND_INVENTORY,
+ /**
+ * Collect and distribute missing values.
+ */
+ CONSENSUS_ROUND_STOCK,
+ /**
+ * Consensus concluded.
+ */
+ CONSENSUS_ROUND_FINISH
+};
+
+
+/**
* Information about a peer that is in a consensus session.
*/
struct ConsensusPeerInformation
@@ -148,8 +184,6 @@
*/
int is_outgoing;
- int connected;
-
/**
* Did we receive/send a consensus hello?
*/
@@ -246,6 +280,15 @@
*/
int exp_subround_finished;
+ int inventory_synced;
+
+ /**
+ * Round this peer seems to be in, according to the last SE we got.
+ * Necessary to store this, as we sometimes need to respond to a request
from an
+ * older round, while we are already in the next round.
+ */
+ enum ConsensusRound apparent_round;
+
};
typedef void (*QueuedMessageCallback) (void *msg);
@@ -272,32 +315,6 @@
void *cls;
};
-/**
- * Describes the current round a consensus session is in.
- */
-enum ConsensusRound
-{
- /**
- * Not started the protocol yet.
- */
- CONSENSUS_ROUND_BEGIN=0,
- /**
- * Distribution of elements with the exponential scheme.
- */
- CONSENSUS_ROUND_EXCHANGE,
- /**
- * Exchange which elements each peer has, but not the elements.
- */
- CONSENSUS_ROUND_INVENTORY,
- /**
- * Collect and distribute missing values.
- */
- CONSENSUS_ROUND_STOCK,
- /**
- * Consensus concluded.
- */
- CONSENSUS_ROUND_FINISH
-};
struct StrataEstimator
{
@@ -342,7 +359,8 @@
/**
* Elements in the consensus set of this session,
* all of them either have been sent by or approved by the client.
- * Contains GNUNET_CONSENSUS_Element.
+ * Contains ElementList.
+ * Used as a unique-key hashmap.
*/
struct GNUNET_CONTAINER_MultiHashMap *values;
@@ -544,6 +562,8 @@
*
* @param cpi peer
* @param msg message we want to queue
+ * @param cb callback, called when the message is given to strem
+ * @param cls closure for cb
*/
static void
queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct
GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls)
@@ -572,14 +592,14 @@
}
-
+/*
static void
clear_peer_messages (struct ConsensusPeerInformation *cpi)
{
- /* FIXME: deallocate */
cpi->messages_head = NULL;
cpi->messages_tail = NULL;
}
+*/
/**
@@ -592,8 +612,8 @@
* @return the estimated difference
*/
static int
-estimate_difference (struct StrataEstimator *se1,
- struct StrataEstimator *se2)
+estimate_difference (const struct StrataEstimator *se1,
+ const struct StrataEstimator *se2)
{
int i;
int count;
@@ -701,42 +721,36 @@
}
-/**
- * Iterator over hash map entries.
- * Queue elements to be sent to the peer in cls.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
- */
-static int
-send_element_iter (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
+static void
+send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head)
{
- struct ConsensusPeerInformation *cpi;
struct GNUNET_CONSENSUS_Element *element;
struct GNUNET_MessageHeader *element_msg;
size_t msize;
- cpi = cls;
- element = value;
- msize = sizeof (struct GNUNET_MessageHeader) + element->size;
- element_msg = GNUNET_malloc (msize);
- element_msg->size = htons (msize);
- if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round)
- element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
- else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)
- element_msg->type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
- else
- GNUNET_assert (0);
- GNUNET_assert (NULL != element->data);
- memcpy (&element_msg[1], element->data, element->size);
- queue_peer_message (cpi, element_msg);
- return GNUNET_YES;
+ while (NULL != head)
+ {
+ element = head->element;
+ msize = sizeof (struct GNUNET_MessageHeader) + element->size;
+ element_msg = GNUNET_malloc (msize);
+ element_msg->size = htons (msize);
+ switch (cpi->apparent_round)
+ {
+ case CONSENSUS_ROUND_STOCK:
+ case CONSENSUS_ROUND_EXCHANGE:
+ element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
+ break;
+ case CONSENSUS_ROUND_INVENTORY:
+ element_msg->type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
+ break;
+ default:
+ GNUNET_break (0);
+ }
+ GNUNET_assert (NULL != element->data);
+ memcpy (&element_msg[1], element->data, element->size);
+ queue_peer_message (cpi, element_msg);
+ head = head->next;
+ }
}
/**
@@ -755,8 +769,13 @@
void *value)
{
struct ConsensusPeerInformation *cpi;
+ struct ElementList *head;
+ struct IBF_Key ibf_key;
cpi = cls;
- ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key));
+ head = value;
+ ibf_key = ibf_key_from_hashcode (head->element_hash);
+ GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
+ ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
return GNUNET_YES;
}
@@ -764,7 +783,7 @@
* Create and populate an IBF for the specified peer,
* if it does not already exist.
*
- * @param peer to create the ibf for
+ * @param cpi peer to create the ibf for
*/
static void
prepare_ibf (struct ConsensusPeerInformation *cpi)
@@ -791,25 +810,71 @@
GNUNET_assert (0);
}
-static void
-fin_sent_cb (void *cls)
+
+static int
+exp_subround_finished (const struct ConsensusSession *session)
{
- struct ConsensusPeerInformation *cpi;
int not_finished;
- cpi = cls;
- cpi->exp_subround_finished = GNUNET_YES;
- /* the subround is only really over if *both* partners are done */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n",
cpi->session->local_peer_idx);
not_finished = 0;
- if ((cpi->session->partner_outgoing != NULL) &&
(cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO))
+ if ((session->partner_outgoing != NULL) &&
(session->partner_outgoing->exp_subround_finished == GNUNET_NO))
not_finished++;
- if ((cpi->session->partner_incoming != NULL) &&
(cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO))
+ if ((session->partner_incoming != NULL) &&
(session->partner_incoming->exp_subround_finished == GNUNET_NO))
not_finished++;
if (0 == not_finished)
- subround_over (cpi->session, NULL);
+ return GNUNET_YES;
+ return GNUNET_NO;
}
+static int
+inventory_round_finished (struct ConsensusSession *session)
+{
+ int i;
+ int finished;
+ finished = 0;
+ for (i = 0; i < session->num_peers; i++)
+ if (GNUNET_YES == session->info[i].inventory_synced)
+ finished++;
+ if (finished >= (session->num_peers / 2))
+ return GNUNET_YES;
+ return GNUNET_NO;
+}
+
+
+static void
+fin_sent_cb (void *cls)
+{
+ struct ConsensusPeerInformation *cpi;
+ cpi = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n",
cpi->session->local_peer_idx);
+ switch (cpi->session->current_round)
+ {
+ case CONSENSUS_ROUND_EXCHANGE:
+ case CONSENSUS_ROUND_STOCK:
+ /* the subround is only really over if *both* partners are done */
+ if (cpi->session->current_round != cpi->apparent_round)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the
past\n", cpi->session->local_peer_idx);
+ break;
+ }
+ cpi->exp_subround_finished = GNUNET_YES;
+ if (GNUNET_YES == exp_subround_finished (cpi->session))
+ subround_over (cpi->session, NULL);
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after
FIN sent\n", cpi->session->local_peer_idx);
+ break;
+ case CONSENSUS_ROUND_INVENTORY:
+ cpi->inventory_synced = GNUNET_YES;
+ if (inventory_round_finished (cpi->session) &&
cpi->session->current_round == cpi->apparent_round)
+ round_over (cpi->session, NULL);
+ /* FIXME: maybe go to next round */
+ break;
+ default:
+ GNUNET_break (0);
+ }
+}
+
+
/**
* Gets called when the other peer wants us to inform that
* it has decoded our ibf and sent us all elements / requests
@@ -817,17 +882,23 @@
static int
handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct
GNUNET_MessageHeader *msg)
{
- struct GNUNET_MessageHeader *fin_msg;
+ struct ConsensusRoundMessage *fin_msg;
+
switch (cpi->session->current_round)
{
+ case CONSENSUS_ROUND_INVENTORY:
+ cpi->inventory_synced = GNUNET_YES;
+ case CONSENSUS_ROUND_STOCK:
case CONSENSUS_ROUND_EXCHANGE:
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n",
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
fin_msg = GNUNET_malloc (sizeof *fin_msg);
- fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
- fin_msg->size = htons (sizeof *fin_msg);
+ fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
+ fin_msg->header.size = htons (sizeof *fin_msg);
+ fin_msg->round = cpi->apparent_round;
/* the subround os over once we kicked off sending the fin msg */
/* FIXME: assert we are talking to the right peer! */
- queue_peer_message_with_cls (cpi, fin_msg, fin_sent_cb, cpi);
+ queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *)
fin_msg, fin_sent_cb, cpi);
+ /* FIXME: mark peer as synced */
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the
current round\n");
@@ -836,30 +907,40 @@
return GNUNET_YES;
}
+
/**
* The other peer wants us to inform that he sent us all the elements we
requested.
*/
static int
handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct
GNUNET_MessageHeader *msg)
{
+ struct ConsensusRoundMessage *round_msg;
+ round_msg = (struct ConsensusRoundMessage *) msg;
/* FIXME: only call subround_over if round is the current one! */
switch (cpi->session->current_round)
{
case CONSENSUS_ROUND_EXCHANGE:
- {
- int not_finished;
+ case CONSENSUS_ROUND_STOCK:
+ if (cpi->session->current_round != round_msg->round)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past
round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
+ cpi->ibf_state = IBF_STATE_NONE;
+ cpi->ibf_bucket_counter = 0;
+ break;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n",
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
cpi->exp_subround_finished = GNUNET_YES;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d\n",
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
- /* the subround is only really over if *both* partners are done */
- not_finished = 0;
- if ((cpi->session->partner_outgoing != NULL) &&
(cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO))
- not_finished++;
- if ((cpi->session->partner_incoming != NULL) &&
(cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO))
- not_finished++;
- if (0 == not_finished)
+ if (GNUNET_YES == exp_subround_finished (cpi->session))
subround_over (cpi->session, NULL);
- }
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after
got FIN\n", cpi->session->local_peer_idx);
break;
+ case CONSENSUS_ROUND_INVENTORY:
+ cpi->inventory_synced = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n",
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
+ if (inventory_round_finished (cpi->session))
+ round_over (cpi->session, NULL);
+ break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the
current round\n");
break;
@@ -884,7 +965,39 @@
return se;
}
+static void
+strata_estimator_destroy (struct StrataEstimator *se)
+{
+ int i;
+ for (i = 0; i < STRATA_COUNT; i++)
+ ibf_destroy (se->strata[i]);
+ GNUNET_free (se->strata);
+ GNUNET_free (se);
+}
+
+static int
+is_premature_strata_message (const struct ConsensusSession *session, const
struct StrataMessage *strata_msg)
+{
+ switch (strata_msg->round)
+ {
+ case CONSENSUS_ROUND_STOCK:
+ case CONSENSUS_ROUND_EXCHANGE:
+ /* here, we also have to compare subrounds */
+ if ( (strata_msg->round != session->current_round) ||
+ (strata_msg->exp_round != session->exp_round) ||
+ (strata_msg->exp_subround != session->exp_subround))
+ return GNUNET_YES;
+ break;
+ default:
+ if (session->current_round != strata_msg->round)
+ return GNUNET_YES;
+ break;
+ }
+ return GNUNET_NO;
+}
+
+
/**
* Called when a peer sends us its strata estimator.
* In response, we sent out IBF of appropriate size back.
@@ -900,32 +1013,28 @@
void *buf;
size_t size;
-
-
- switch (cpi->session->current_round)
+ if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) &&
(strata_msg->round == CONSENSUS_ROUND_INVENTORY))
{
- case CONSENSUS_ROUND_EXCHANGE:
- if ( (strata_msg->round != CONSENSUS_ROUND_EXCHANGE) ||
- (strata_msg->exp_round != cpi->session->exp_round) ||
- (strata_msg->exp_subround != cpi->session->exp_subround))
- {
- if (GNUNET_NO == cpi->replaying_strata_message)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE
from P%d, (%d,%d)\n",
- cpi->session->local_peer_idx, (int) (cpi -
cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround);
- cpi->premature_strata_message = (struct StrataMessage *)
GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
- }
- return GNUNET_YES;
- }
- break;
- default:
- GNUNET_assert (0);
- break;
+ /* we still have to handle this request appropriately */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we
are already further alog\n",
+ cpi->session->local_peer_idx, (int) (cpi -
cpi->session->info));
}
+ else if (is_premature_strata_message (cpi->session, strata_msg))
+ {
+ if (GNUNET_NO == cpi->replaying_strata_message)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from
P%d, (%d,%d)\n",
+ cpi->session->local_peer_idx, (int) (cpi -
cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround);
+ cpi->premature_strata_message = (struct StrataMessage *)
GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
+ }
+ return GNUNET_YES;
+ }
if (NULL == cpi->se)
cpi->se = strata_estimator_create ();
+ cpi->apparent_round = strata_msg->round;
+
size = ntohs (strata_msg->header.size);
buf = (void *) &strata_msg[1];
for (i = 0; i < STRATA_COUNT; i++)
@@ -937,25 +1046,32 @@
diff = estimate_difference (cpi->session->se, cpi->se);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d\n",
- cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
+ cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),
diff);
- if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) ||
- (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round))
+ switch (cpi->session->current_round)
{
- /* send IBF of the right size */
- cpi->ibf_order = 0;
- while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 <<
cpi->ibf_order) )
- cpi->ibf_order++;
- if (cpi->ibf_order > MAX_IBF_ORDER)
- cpi->ibf_order = MAX_IBF_ORDER;
- cpi->ibf_order += 1;
- /* create ibf if not already pre-computed */
- prepare_ibf (cpi);
- cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
- cpi->ibf_state = IBF_STATE_TRANSMITTING;
- cpi->ibf_bucket_counter = 0;
- send_ibf (cpi);
+ case CONSENSUS_ROUND_EXCHANGE:
+ case CONSENSUS_ROUND_INVENTORY:
+ case CONSENSUS_ROUND_STOCK:
+ /* send IBF of the right size */
+ cpi->ibf_order = 0;
+ while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 <<
cpi->ibf_order) )
+ cpi->ibf_order++;
+ if (cpi->ibf_order > MAX_IBF_ORDER)
+ cpi->ibf_order = MAX_IBF_ORDER;
+ cpi->ibf_order += 1;
+ /* create ibf if not already pre-computed */
+ prepare_ibf (cpi);
+ cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
+ cpi->ibf_state = IBF_STATE_TRANSMITTING;
+ cpi->ibf_bucket_counter = 0;
+ send_ibf (cpi);
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
+ cpi->session->local_peer_idx, (int) (cpi -
cpi->session->info));
+ break;
}
return GNUNET_YES;
}
@@ -973,7 +1089,7 @@
switch (cpi->ibf_state)
{
case IBF_STATE_NONE:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving first ibf of order %d\n",
digest->order);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n",
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
cpi->ibf_state = IBF_STATE_RECEIVING;
cpi->ibf_order = digest->order;
cpi->ibf_bucket_counter = 0;
@@ -984,7 +1100,8 @@
}
break;
case IBF_STATE_ANTICIPATE_DIFF:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving decode fail ibf of order
%d\n", digest->order);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d
(probably out IBF did not decode)\n",
+ cpi->session->local_peer_idx, (int) (cpi -
cpi->session->info));
cpi->ibf_state = IBF_STATE_RECEIVING;
cpi->ibf_order = digest->order;
cpi->ibf_bucket_counter = 0;
@@ -997,18 +1114,16 @@
case IBF_STATE_RECEIVING:
break;
default:
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "received ibf unexpectedly in
state %d\n", cpi->ibf_state);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n",
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
return GNUNET_YES;
}
if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n",
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
return GNUNET_YES;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n",
num_buckets,
- cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
if (NULL == cpi->ibf)
cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
@@ -1041,6 +1156,17 @@
struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
size_t size;
+ switch (cpi->session->current_round)
+ {
+ case CONSENSUS_ROUND_STOCK:
+ /* FIXME: check if we really expect the element */
+ case CONSENSUS_ROUND_EXCHANGE:
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element,
ignoring\n");
+ return GNUNET_YES;
+ }
+
size = ntohs (element_msg->size) - sizeof *element_msg;
element = GNUNET_malloc (size + sizeof *element);
@@ -1069,7 +1195,9 @@
/**
* Handle a request for elements.
- * Only allowed in exchange-rounds.
+ *
+ * @param cpi peer that is requesting the element
+ * @param msg the element request message
*/
static int
handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct
ElementRequest *msg)
@@ -1078,14 +1206,18 @@
struct IBF_Key *ibf_key;
unsigned int num;
+ /* element requests are allowed in every round */
+
num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u
elements\n", num);
ibf_key = (struct IBF_Key *) &msg[1];
while (num--)
{
+ struct ElementList *head;
ibf_hashcode_from_key (*ibf_key, &hashcode);
- GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values,
&hashcode, send_element_iter, cpi);
+ head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
+ send_elements (cpi, head);
ibf_key++;
}
return GNUNET_YES;
@@ -1162,11 +1294,13 @@
size_t msize;
int i;
+ cpi->apparent_round = cpi->session->current_round;
+ cpi->ibf_state = IBF_STATE_NONE;
+ cpi->ibf_bucket_counter = 0;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE to P%d\n",
- cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n",
+ cpi->session->local_peer_idx, cpi->session->current_round, (int)
(cpi - cpi->session->info));
-
msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE *
STRATA_IBF_BUCKETS);
strata_msg = GNUNET_malloc (msize);
@@ -1185,21 +1319,20 @@
queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg);
}
+
/**
- * Send an IBF of the order specified in cpi
+ * Send an IBF of the order specified in cpi.
*
* @param cpi the peer
*/
static void
send_ibf (struct ConsensusPeerInformation *cpi)
{
- int sent_buckets;
-
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
- sent_buckets = 0;
- while (sent_buckets < (1 << cpi->ibf_order))
+ cpi->ibf_bucket_counter = 0;
+ while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
{
int num_buckets;
void *buf;
@@ -1223,12 +1356,18 @@
queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest);
- sent_buckets += num_buckets;
+ cpi->ibf_bucket_counter += num_buckets;
}
+ cpi->ibf_bucket_counter = 0;
cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
}
+/**
+ * Decode the current diff ibf, and send elements/requests/reports/
+ *
+ * @param cpi partner peer
+ */
static void
decode (struct ConsensusPeerInformation *cpi)
{
@@ -1236,11 +1375,12 @@
struct GNUNET_HashCode hashcode;
int side;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n",
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
for (;;)
{
int res;
+
res = ibf_decode (cpi->ibf, &side, &key);
if (GNUNET_SYSERR == res)
{
@@ -1256,19 +1396,22 @@
}
if (GNUNET_NO == res)
{
- struct GNUNET_MessageHeader *msg;
+ struct ConsensusRoundMessage *msg;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values,
sending SYNC\n", cpi->session->local_peer_idx);
msg = GNUNET_malloc (sizeof *msg);
- msg->size = htons (sizeof *msg);
- msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
- queue_peer_message (cpi, msg);
+ msg->header.size = htons (sizeof *msg);
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
+ msg->round = cpi->apparent_round;
+ queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
return;
}
if (-1 == side)
{
+ struct ElementList *head;
/* we have the element(s), send it to the other peer */
ibf_hashcode_from_key (key, &hashcode);
- GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values,
&hashcode, send_element_iter, cpi);
+ head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values,
&hashcode);
+ send_elements (cpi, head);
}
else
{
@@ -1278,18 +1421,19 @@
msize = (sizeof *msg) + sizeof (struct IBF_Key);
msg = GNUNET_malloc (msize);
- if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round)
+ switch (cpi->apparent_round)
{
- msg->header.type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
+ case CONSENSUS_ROUND_STOCK:
+ /* FIXME: check if we really want to request the element */
+ case CONSENSUS_ROUND_EXCHANGE:
+ msg->header.type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
+ break;
+ case CONSENSUS_ROUND_INVENTORY:
+ msg->header.type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
+ break;
+ default:
+ GNUNET_assert (0);
}
- else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)
- {
- msg->header.type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
- }
- else
- {
- GNUNET_assert (0);
- }
msg->header.size = htons (msize);
p = (struct IBF_Key *) &msg[1];
*p = key;
@@ -1308,7 +1452,6 @@
* @param cls closure
* @param client identification of the client
* @param message the actual message
- *
* @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
*/
static int
@@ -1403,6 +1546,36 @@
/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES if we should continue to
+ * iterate,
+ * GNUNET_NO if not.
+ */
+static int
+destroy_element_list_iter (void *cls,
+ const struct GNUNET_HashCode * key,
+ void *value)
+{
+ struct ElementList *el;
+ el = value;
+ while (NULL != el)
+ {
+ struct ElementList *el_old;
+ el_old = el;
+ el = el->next;
+ GNUNET_free (el_old->element_hash);
+ GNUNET_free (el_old->element);
+ GNUNET_free (el_old);
+ }
+ return GNUNET_YES;
+}
+
+
+/**
* Destroy a session, free all resources associated with it.
*
* @param session the session to destroy
@@ -1410,9 +1583,80 @@
static void
destroy_session (struct ConsensusSession *session)
{
- /* FIXME: more stuff to free! */
+ int i;
+
GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
GNUNET_SERVER_client_drop (session->client);
+ session->client = NULL;
+ if (NULL != session->shuffle)
+ {
+ GNUNET_free (session->shuffle);
+ session->shuffle = NULL;
+ }
+ if (NULL != session->se)
+ {
+ strata_estimator_destroy (session->se);
+ session->se = NULL;
+ }
+ if (NULL != session->info)
+ {
+ for (i = 0; i < session->num_peers; i++)
+ {
+ struct ConsensusPeerInformation *cpi;
+ cpi = &session->info[i];
+ if ((NULL != cpi) && (NULL != cpi->socket))
+ {
+ if (NULL != cpi->rh)
+ {
+ GNUNET_STREAM_read_cancel (cpi->rh);
+ cpi->rh = NULL;
+ }
+ if (NULL != cpi->wh)
+ {
+ GNUNET_STREAM_write_cancel (cpi->wh);
+ cpi->wh = NULL;
+ }
+ GNUNET_STREAM_close (cpi->socket);
+ cpi->socket = NULL;
+ }
+ if (NULL != cpi->se)
+ {
+ strata_estimator_destroy (cpi->se);
+ cpi->se = NULL;
+ }
+ if (NULL != cpi->ibf)
+ {
+ ibf_destroy (cpi->ibf);
+ cpi->ibf = NULL;
+ }
+ if (NULL != cpi->mst)
+ {
+ GNUNET_SERVER_mst_destroy (cpi->mst);
+ cpi->mst = NULL;
+ }
+ }
+ GNUNET_free (session->info);
+ session->info = NULL;
+ }
+ if (NULL != session->ibfs)
+ {
+ for (i = 0; i <= MAX_IBF_ORDER; i++)
+ {
+ if (NULL != session->ibfs[i])
+ {
+ ibf_destroy (session->ibfs[i]);
+ session->ibfs[i] = NULL;
+ }
+ }
+ GNUNET_free (session->ibfs);
+ session->ibfs = NULL;
+ }
+ if (NULL != session->values)
+ {
+ GNUNET_CONTAINER_multihashmap_iterate (session->values,
destroy_element_list_iter, NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (session->values);
+ session->values = NULL;
+ }
GNUNET_free (session);
}
@@ -1448,6 +1692,7 @@
* Thus, if the local id of two consensus sessions coincide, but are not
comprised of
* exactly the same peers, the global id will be different.
*
+ * @param session session to generate the global id for
* @param session_id local id of the consensus session
*/
static void
@@ -1511,9 +1756,9 @@
/**
- * Schedule transmitting the next queued message (if any) to a client.
+ * Schedule transmitting the next queued message (if any) to the inhabiting
client of a session.
*
- * @param cli the client to send the next message to
+ * @param session the consensus session
*/
static void
client_send_next (struct ConsensusSession *session)
@@ -1545,9 +1790,9 @@
* @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
*/
static int
-hash_cmp (const void *a, const void *b)
+hash_cmp (const void *h1, const void *h2)
{
- return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct
GNUNET_HashCode *) b);
+ return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct
GNUNET_HashCode *) h2);
}
@@ -1607,6 +1852,7 @@
cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
cpi->wh =
GNUNET_STREAM_write (socket, hello, sizeof *hello,
GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
+ GNUNET_free (hello);
cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
&session_stream_data_processor, cpi);
}
@@ -1821,12 +2067,52 @@
}
+static void
+insert_element (struct ConsensusSession *session, struct
GNUNET_CONSENSUS_Element *element)
+{
+ struct GNUNET_HashCode hash;
+ struct ElementList *head;
+
+ hash_for_ibf (element->data, element->size, &hash);
+
+ head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash);
+
+ if (NULL == head)
+ {
+ int i;
+
+ head = GNUNET_malloc (sizeof *head);
+ head->element = element;
+ head->next = NULL;
+ head->element_hash = GNUNET_memdup (&hash, sizeof hash);
+ GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ strata_estimator_insert (session->se, &hash);
+
+ for (i = 0; i <= MAX_IBF_ORDER; i++)
+ if (NULL != session->ibfs[i])
+ ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash));
+ }
+ else
+ {
+ struct ElementList *el;
+ el = GNUNET_malloc (sizeof *el);
+ head->element = element;
+ head->next = NULL;
+ head->element_hash = GNUNET_memdup (&hash, sizeof hash);
+ while (NULL != head->next)
+ head = head->next;
+ head->next = el;
+ }
+}
+
+
/**
* Called when a client performs an insert operation.
*
* @param cls (unused)
* @param client client handle
- * @param message message sent by the client
+ * @param m message sent by the client
*/
void
client_insert (void *cls,
@@ -1836,7 +2122,6 @@
struct ConsensusSession *session;
struct GNUNET_CONSENSUS_ElementMessage *msg;
struct GNUNET_CONSENSUS_Element *element;
- struct GNUNET_HashCode hash;
int element_size;
session = sessions_head;
@@ -1865,13 +2150,8 @@
GNUNET_assert (NULL != element->data);
- hash_for_ibf (element->data, element_size, &hash);
+ insert_element (session, element);
- GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- strata_estimator_insert (session->se, &hash);
-
GNUNET_SERVER_receive_done (client, GNUNET_OK);
client_send_next (session);
@@ -1951,18 +2231,41 @@
{
GNUNET_assert (NULL == session->partner_outgoing);
session->partner_outgoing = &session->info[session->shuffle[arc]];
+ session->partner_outgoing->exp_subround_finished = GNUNET_NO;
}
if (arc == session->local_peer_idx)
{
GNUNET_assert (NULL == session->partner_incoming);
session->partner_incoming = &session->info[session->shuffle[i]];
+ session->partner_incoming->exp_subround_finished = GNUNET_NO;
}
}
}
+static void
+replay_premature_message (struct ConsensusPeerInformation *cpi)
+{
+ if (NULL != cpi->premature_strata_message)
+ {
+ struct StrataMessage *sm;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
+ sm = cpi->premature_strata_message;
+ cpi->premature_strata_message = NULL;
+
+ cpi->replaying_strata_message = GNUNET_YES;
+ handle_p2p_strata (cpi, sm);
+ cpi->replaying_strata_message = GNUNET_NO;
+
+ GNUNET_free (sm);
+ }
+}
+
+
/**
* Do the next subround in the exp-scheme.
+ * This function can be invoked as a timeout task, or called manually (tc will
be NULL then).
*
* @param cls the session
* @param tc task context, for when this task is invoked by the scheduler,
@@ -1977,35 +2280,29 @@
/* don't kick off next subround if we're shutting down */
if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
-
session = cls;
-
-
+ /* don't send any messages from the last round */
+ /*
+ clear_peer_messages (session->partner_outgoing);
+ clear_peer_messages (session->partner_incoming);
for (i = 0; i < session->num_peers; i++)
clear_peer_messages (&session->info[i]);
-
+ */
+ /* cancel timeout */
if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
- {
GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
- }
session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
-
- if ((session->num_peers == 2) && (session->exp_round == 1))
+ /* check if we are done with the log phase, 2-peer consensus only does one
log round */
+ if ( (session->exp_round == NUM_EXP_ROUNDS) ||
+ ((session->num_peers == 2) && (session->exp_round == 1)))
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n",
session->local_peer_idx);
round_over (session, NULL);
return;
}
-
- if (session->exp_round == NUM_EXP_ROUNDS)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n");
- round_over (session, NULL);
- return;
- }
-
if (session->exp_round == 0)
{
+ /* initialize everything for the log-rounds */
session->exp_round = 1;
session->exp_subround = 0;
if (NULL == session->shuffle)
@@ -2015,6 +2312,7 @@
}
else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
{
+ /* subrounds done, start new log-round */
session->exp_round++;
session->exp_subround = 0;
shuffle (session);
@@ -2026,6 +2324,7 @@
find_partners (session);
+#ifdef GNUNET_EXTRA_LOGGING
{
int in;
int out;
@@ -2040,15 +2339,8 @@
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d,
in: %d, out: %d\n", session->local_peer_idx,
session->exp_round, session->exp_subround, in, out);
}
+#endif /* GNUNET_EXTRA_LOGGING */
-
- if (NULL != session->partner_outgoing)
- {
- session->partner_outgoing->ibf_state = IBF_STATE_NONE;
- session->partner_outgoing->ibf_bucket_counter = 0;
- session->partner_outgoing->exp_subround_finished = GNUNET_NO;
- }
-
if (NULL != session->partner_incoming)
{
session->partner_incoming->ibf_state = IBF_STATE_NONE;
@@ -2056,24 +2348,15 @@
session->partner_incoming->ibf_bucket_counter = 0;
/* maybe there's an early strata estimator? */
- if (NULL != session->partner_incoming->premature_strata_message)
- {
- struct StrataMessage *sm;
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
- sm = session->partner_incoming->premature_strata_message;
- session->partner_incoming->premature_strata_message = NULL;
-
- session->partner_incoming->replaying_strata_message = GNUNET_YES;
- handle_p2p_strata (session->partner_incoming, sm);
- session->partner_incoming->replaying_strata_message = GNUNET_NO;
-
- GNUNET_free (sm);
- }
+ replay_premature_message (session->partner_incoming);
}
if (NULL != session->partner_outgoing)
{
+ session->partner_outgoing->ibf_state = IBF_STATE_NONE;
+ session->partner_outgoing->ibf_bucket_counter = 0;
+ session->partner_outgoing->exp_subround_finished = GNUNET_NO;
+
if (NULL == session->partner_outgoing->socket)
{
session->partner_outgoing->socket =
@@ -2092,7 +2375,6 @@
session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed
(GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
subround_over, session);
*/
-
}
static void
@@ -2110,31 +2392,63 @@
}
}
+/**
+ * Start the inventory round, contact all peers we are supposed to contact.
+ *
+ * @param session the current session
+ */
static void
start_inventory (struct ConsensusSession *session)
{
int i;
int last;
+ for (i = 0; i < session->num_peers; i++)
+ {
+ session->info[i].ibf_bucket_counter = 0;
+ session->info[i].ibf_state = IBF_STATE_NONE;
+ session->info[i].is_outgoing = GNUNET_NO;
+ }
+
last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) %
session->num_peers;
i = (session->local_peer_idx + 1) % session->num_peers;
while (i != last)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n",
session->local_peer_idx, i);
contact_peer_a2a (&session->info[i]);
+ session->info[i].is_outgoing = GNUNET_YES;
i = (i + 1) % session->num_peers;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n",
session->local_peer_idx, i);
}
// tie-breaker for even number of peers
if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all
(tie-breaker)\n", session->local_peer_idx, i);
+ session->info[last].is_outgoing = GNUNET_YES;
contact_peer_a2a (&session->info[last]);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d
(tiebreaker)\n", session->local_peer_idx, last);
}
+
+ for (i = 0; i < session->num_peers; i++)
+ {
+ if (GNUNET_NO == session->info[i].is_outgoing)
+ replay_premature_message (&session->info[i]);
+ }
}
+static void
+send_client_conclude_done (struct ConsensusSession *session)
+{
+ struct GNUNET_MessageHeader *msg;
+ session->current_round = CONSENSUS_ROUND_FINISH;
+ msg = GNUNET_malloc (sizeof *msg);
+ msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
+ msg->size = htons (sizeof *msg);
+ queue_client_message (session, msg);
+ client_send_next (session);
+}
/**
- * Select and kick off the next round, based on the current round.
+ * Start the next round.
+ * This function can be invoked as a timeout task, or called manually (tc will
be NULL then).
*
* @param cls the session
* @param tc task context, for when this task is invoked by the scheduler,
@@ -2144,17 +2458,18 @@
round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct ConsensusSession *session;
- int i;
/* don't kick off next round if we're shutting down */
if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n");
session = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n",
session->local_peer_idx);
+ /*
for (i = 0; i < session->num_peers; i++)
clear_peer_messages (&session->info[i]);
+ */
if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
{
@@ -2162,8 +2477,6 @@
session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
}
- /* FIXME: cancel current round */
-
switch (session->current_round)
{
case CONSENSUS_ROUND_BEGIN:
@@ -2172,31 +2485,24 @@
subround_over (session, NULL);
break;
case CONSENSUS_ROUND_EXCHANGE:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n",
session->local_peer_idx);
-
- if (0)
+ /* handle two peers specially */
+ if (session->num_peers <= 2)
{
- struct GNUNET_MessageHeader *msg;
- msg = GNUNET_malloc (sizeof *msg);
- msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
- msg->size = htons (sizeof *msg);
- queue_client_message (session, msg);
- client_send_next (session);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n",
session->local_peer_idx);
+ send_client_conclude_done (session);
+ return;
}
-
- if (0)
- {
- session->current_round = CONSENSUS_ROUND_INVENTORY;
- start_inventory (session);
- }
+ session->current_round = CONSENSUS_ROUND_INVENTORY;
+ start_inventory (session);
break;
case CONSENSUS_ROUND_INVENTORY:
session->current_round = CONSENSUS_ROUND_STOCK;
- /* FIXME: exchange stock */
+ session->exp_round = 0;
+ subround_over (session, NULL);
break;
case CONSENSUS_ROUND_STOCK:
session->current_round = CONSENSUS_ROUND_FINISH;
- /* FIXME: send elements to client */
+ send_client_conclude_done (session);
break;
default:
GNUNET_assert (0);
@@ -2241,11 +2547,17 @@
return;
}
- session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
+ if (session->num_peers <= 1)
+ {
+ send_client_conclude_done (session);
+ }
+ else
+ {
+ session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
+ /* the 'begin' round is over, start with the next, real round */
+ round_over (session, NULL);
+ }
- /* the 'begin' round is over, start with the next, real round */
- round_over (session, NULL);
-
GNUNET_SERVER_receive_done (client, GNUNET_OK);
client_send_next (session);
}
@@ -2267,7 +2579,6 @@
struct GNUNET_CONSENSUS_AckMessage *msg;
struct PendingElement *pending;
struct GNUNET_CONSENSUS_Element *element;
- struct GNUNET_HashCode key;
session = sessions_head;
while (NULL != session)
@@ -2291,25 +2602,17 @@
if (msg->keep)
{
- int i;
element = pending->element;
- hash_for_ibf (element->data, element->size, &key);
-
- GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ insert_element (session, element);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n");
- strata_estimator_insert (session->se, &key);
-
- for (i = 0; i <= MAX_IBF_ORDER; i++)
- {
- if (NULL != session->ibfs[i])
- ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&key));
- }
}
+ GNUNET_free (pending);
+
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+
/**
* Task that disconnects from core.
*
@@ -2366,7 +2669,18 @@
socket = incoming_sockets_head;
if (NULL == socket->cpi)
{
+ if (NULL != socket->rh)
+ {
+ GNUNET_STREAM_read_cancel (socket->rh);
+ socket->rh = NULL;
+ }
GNUNET_STREAM_close (socket->socket);
+ socket->socket = NULL;
+ if (NULL != socket->mst)
+ {
+ GNUNET_SERVER_mst_destroy (socket->mst);
+ socket->mst = NULL;
+ }
}
incoming_sockets_head = incoming_sockets_head->next;
GNUNET_free (socket);
@@ -2375,26 +2689,9 @@
while (NULL != sessions_head)
{
struct ConsensusSession *session;
- int i;
-
- session = sessions_head;
-
- if (NULL != session->info)
- for (i = 0; i < session->num_peers; i++)
- {
- struct ConsensusPeerInformation *cpi;
- cpi = &session->info[i];
- if ((NULL != cpi) && (NULL != cpi->socket))
- {
- GNUNET_STREAM_close (cpi->socket);
- }
- }
-
- if (NULL != session->client)
- GNUNET_SERVER_client_disconnect (session->client);
-
- sessions_head = sessions_head->next;
- GNUNET_free (session);
+ session = sessions_head->next;
+ destroy_session (sessions_head);
+ sessions_head = session;
}
if (NULL != core)
Modified: gnunet/src/consensus/ibf.c
===================================================================
--- gnunet/src/consensus/ibf.c 2013-03-20 19:15:55 UTC (rev 26525)
+++ gnunet/src/consensus/ibf.c 2013-03-21 01:06:40 UTC (rev 26526)
@@ -136,7 +136,7 @@
* Insert an element into an IBF.
*
* @param ibf the IBF
- * @param id the element's hash code
+ * @param key the element's hash code
*/
void
ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
@@ -171,10 +171,10 @@
* Decode and remove an element from the IBF, if possible.
*
* @param ibf the invertible bloom filter to decode
- * @param side sign of the cell's count where the decoded element came from.
- * A negative sign indicates that the element was recovered
- * resides in an IBF that was previously subtracted from.
- * @param ret_id the hash code of the decoded element, if successful
+ * @param ret_side sign of the cell's count where the decoded element came
from.
+ * A negative sign indicates that the element was recovered
+ * resides in an IBF that was previously subtracted from.
+ * @param ret_key receives the hash code of the decoded element, if successful
* @return GNUNET_YES if decoding an element was successful,
* GNUNET_NO if the IBF is empty,
* GNUNET_SYSERR if the decoding has failed
@@ -284,7 +284,7 @@
* @param size size of the buffer, will be updated
* @param start which bucket to start at
* @param count how many buckets to read
- * @param dst ibf to write buckets to
+ * @param ibf the ibf to read from
* @return GNUNET_OK on success
*/
int
@@ -325,8 +325,6 @@
* Write an ibf.
*
* @param ibf the ibf to write
- * @param start with which bucket to start
- * @param count how many buckets to write
* @param buf buffer to write the data to, will be updated to point to the
* first byte after the written data
* @param size pointer to the size of the buffer, will be updated, can be NULL
@@ -344,8 +342,6 @@
* @param buf pointer to the buffer to write to, will point to first
* byte after the written data
* @param size size of the buffer, will be updated
- * @param start which bucket to start at
- * @param count how many buckets to read
* @param dst ibf to write buckets to
* @return GNUNET_OK on success
*/
Modified: gnunet/src/consensus/ibf.h
===================================================================
--- gnunet/src/consensus/ibf.h 2013-03-20 19:15:55 UTC (rev 26525)
+++ gnunet/src/consensus/ibf.h 2013-03-21 01:06:40 UTC (rev 26526)
@@ -128,7 +128,7 @@
* @param size size of the buffer, will be updated
* @param start which bucket to start at
* @param count how many buckets to read
- * @param dst ibf to write buckets to
+ * @param ibf the ibf to read from
* @return GNUNET_OK on success
*/
int
@@ -139,8 +139,6 @@
* Write an ibf.
*
* @param ibf the ibf to write
- * @param start with which bucket to start
- * @param count how many buckets to write
* @param buf buffer to write the data to, will be updated to point to the
* first byte after the written data
* @param size pointer to the size of the buffer, will be updated, can be NULL
@@ -149,14 +147,13 @@
ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size);
+
/**
* Read an ibf.
*
* @param buf pointer to the buffer to write to, will point to first
* byte after the written data
* @param size size of the buffer, will be updated
- * @param start which bucket to start at
- * @param count how many buckets to read
* @param dst ibf to write buckets to
* @return GNUNET_OK on success
*/
@@ -223,15 +220,16 @@
* Decode and remove an element from the IBF, if possible.
*
* @param ibf the invertible bloom filter to decode
- * @param side sign of the cell's count where the decoded element came from.
- * A negative sign indicates that the element was recovered
resides in an IBF
- * that was previously subtracted from.
- * @param ret_id the hash code of the decoded element, if successful
- * @return GNUNET_YES if decoding an element was successful, GNUNET_NO if the
IBF is empty,
- * GNUNET_SYSERR if the decoding has faile
+ * @param ret_side sign of the cell's count where the decoded element came
from.
+ * A negative sign indicates that the element was recovered
+ * resides in an IBF that was previously subtracted from.
+ * @param ret_key receives the hash code of the decoded element, if successful
+ * @return GNUNET_YES if decoding an element was successful,
+ * GNUNET_NO if the IBF is empty,
+ * GNUNET_SYSERR if the decoding has failed
*/
int
-ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct IBF_Key
*ret_key);
+ibf_decode (struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key
*ret_key);
/**
Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf 2013-03-20 19:15:55 UTC (rev
26525)
+++ gnunet/src/consensus/test_consensus.conf 2013-03-21 01:06:40 UTC (rev
26526)
@@ -5,7 +5,7 @@
HOME = $SERVICEHOME
BINARY = gnunet-service-consensus
#PREFIX = gdbserver :12345
-#PREFIX = valgrind
+PREFIX = valgrind --leak-check=full
ACCEPT_FROM = 127.0.0.1;
ACCEPT_FROM6 = ::1;
UNIXPATH = /tmp/gnunet-service-consensus.sock
Modified: gnunet/src/include/gnunet_container_lib.h
===================================================================
--- gnunet/src/include/gnunet_container_lib.h 2013-03-20 19:15:55 UTC (rev
26525)
+++ gnunet/src/include/gnunet_container_lib.h 2013-03-21 01:06:40 UTC (rev
26526)
@@ -1355,7 +1355,7 @@
*/
void *
GNUNET_CONTAINER_slist_get (const struct GNUNET_CONTAINER_SList_Iterator *i,
- size_t * len);
+ size_t *len);
/**
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r26526 - in gnunet/src: consensus include,
gnunet <=