[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] [gnunet] 02/02: cleaning up set handlers, eliminating 2nd l
From: |
gnunet |
Subject: |
[GNUnet-SVN] [gnunet] 02/02: cleaning up set handlers, eliminating 2nd level demultiplexing and improving use of types |
Date: |
Sat, 11 Mar 2017 18:15:42 +0100 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository gnunet.
commit abdec5e11ff11bb10d32c013e11344a54786f80f
Author: Christian Grothoff <address@hidden>
AuthorDate: Sat Mar 11 18:15:38 2017 +0100
cleaning up set handlers, eliminating 2nd level demultiplexing and
improving use of types
---
src/set/Makefile.am | 2 +-
src/set/gnunet-service-set.c | 284 ++++++-------
src/set/gnunet-service-set.h | 50 ++-
src/set/gnunet-service-set_intersection.c | 139 +++----
src/set/gnunet-service-set_union.c | 645 +++++++++++++++++-------------
src/set/set_api.c | 12 +-
src/set/test_set_union_copy.c | 1 +
7 files changed, 598 insertions(+), 535 deletions(-)
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index cfe95bc1a..03c258352 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -51,7 +51,7 @@ gnunet_set_ibf_profiler_LDADD = \
gnunet_service_set_SOURCES = \
gnunet-service-set.c gnunet-service-set.h \
- gnunet-service-set_union.c \
+ gnunet-service-set_union.c gnunet-service-set_union.h \
gnunet-service-set_intersection.c \
ibf.c ibf.h \
gnunet-service-set_union_strata_estimator.c
gnunet-service-set_union_strata_estimator.h \
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 454ad9784..8f1506c6a 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -24,6 +24,8 @@
* @author Christian Grothoff
*/
#include "gnunet-service-set.h"
+#include "gnunet-service-set_union.h"
+#include "gnunet-service-set_intersection.h"
#include "gnunet-service-set_protocol.h"
#include "gnunet_statistics_service.h"
@@ -476,6 +478,7 @@ _GSS_operation_destroy (struct Operation *op,
op->channel = NULL;
GNUNET_CADET_channel_destroy (channel);
}
+
if (GNUNET_YES == gc)
collect_generation_garbage (set);
/* We rely on the channel end handler to free 'op'. When 'op->channel' was
NULL,
@@ -682,7 +685,7 @@ client_disconnect_cb (void *cls,
{
struct Operation *curr = op;
op = op->next;
- if ( (GNUNET_YES == curr->is_incoming) &&
+ if ( (GNUNET_YES == curr->is_incoming) &&
(curr->listener == listener) )
incoming_destroy (curr);
}
@@ -733,6 +736,38 @@ incoming_suggest (struct Operation *incoming,
/**
+ * Check a request for a set operation from another peer.
+ *
+ * @param cls the operation state
+ * @param msg the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ * #GNUNET_SYSERR to destroy the channel
+ */
+static int
+check_incoming_msg (void *cls,
+ const struct OperationRequestMessage *msg)
+{
+ struct Operation *op = cls;
+ const struct GNUNET_MessageHeader *nested_context;
+
+ /* double operation request */
+ if (NULL != op->spec)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ nested_context = GNUNET_MQ_extract_nested_mh (msg);
+ if ( (NULL != nested_context) &&
+ (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
* Handle a request for a set operation from another peer. Checks if we
* have a listener waiting for such a request (and in that case initiates
* asking the listener about accepting the connection). If no listener
@@ -744,42 +779,23 @@ incoming_suggest (struct Operation *incoming,
* our virtual table and subsequent msgs would be routed differently (as
* we then know what type of operation this is).
*
- * @param op the operation state
- * @param mh the received message
+ * @param cls the operation state
+ * @param msg the received message
* @return #GNUNET_OK if the channel should be kept alive,
* #GNUNET_SYSERR to destroy the channel
*/
-static int
-handle_incoming_msg (struct Operation *op,
- const struct GNUNET_MessageHeader *mh)
+static void
+handle_incoming_msg (void *cls,
+ const struct OperationRequestMessage *msg)
{
- const struct OperationRequestMessage *msg;
+ struct Operation *op = cls;
struct Listener *listener = op->listener;
struct OperationSpecification *spec;
const struct GNUNET_MessageHeader *nested_context;
- msg = (const struct OperationRequestMessage *) mh;
GNUNET_assert (GNUNET_YES == op->is_incoming);
- if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- /* double operation request */
- if (NULL != op->spec)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
spec = GNUNET_new (struct OperationSpecification);
nested_context = GNUNET_MQ_extract_nested_mh (msg);
- if ( (NULL != nested_context) &&
- (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
- {
- GNUNET_break_op (0);
- GNUNET_free (spec);
- return GNUNET_SYSERR;
- }
/* Make a copy of the nested_context (application-specific context
information that is opaque to set) so we can pass it to the
listener later on */
@@ -792,7 +808,6 @@ handle_incoming_msg (struct Operation *op,
spec->peer = op->peer;
spec->remote_element_count = ntohl (msg->element_count);
op->spec = spec;
-
listener = op->listener;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received P2P operation request (op %u, port %s) for active
listener\n",
@@ -800,7 +815,6 @@ handle_incoming_msg (struct Operation *op,
GNUNET_h2s (&listener->app_id));
incoming_suggest (op,
listener);
- return GNUNET_OK;
}
@@ -1103,9 +1117,11 @@ handle_client_create_set (void *cls,
{
case GNUNET_SET_OPERATION_INTERSECTION:
set->vt = _GSS_intersection_vt ();
+ set->type = OT_INTERSECTION;
break;
case GNUNET_SET_OPERATION_UNION:
set->vt = _GSS_union_vt ();
+ set->type = OT_UNION;
break;
default:
GNUNET_free (set);
@@ -1196,7 +1212,6 @@ channel_new_cb (void *cls,
const struct GNUNET_PeerIdentity *source)
{
static const struct SetVT incoming_vt = {
- .msg_handler = &handle_incoming_msg,
.peer_disconnect = &handle_incoming_disconnect
};
struct Listener *listener = cls;
@@ -1290,60 +1305,6 @@ channel_window_cb (void *cls,
/* FIXME: not implemented, we could do flow control here... */
}
-/**
- * FIXME: hack-job. Migrate to proper handler array use!
- *
- * @param cls local state associated with the channel.
- * @param message The actual message.
- */
-static int
-check_p2p_message (void *cls,
- const struct GNUNET_MessageHeader *message)
-{
- return GNUNET_OK;
-}
-
-
-/**
- * FIXME: hack-job. Migrate to proper handler array use!
- *
- * Functions with this signature are called whenever a message is
- * received via a cadet channel.
- *
- * The msg_handler is a virtual table set in initially either when a peer
- * creates a new channel with us, or once we create a new channel
- * ourselves (evaluate).
- *
- * Once we know the exact type of operation (union/intersection), the vt is
- * replaced with an operation specific instance (_GSS_[op]_vt).
- *
- * @param cls local state associated with the channel.
- * @param message The actual message.
- */
-static void
-handle_p2p_message (void *cls,
- const struct GNUNET_MessageHeader *message)
-{
- struct Operation *op = cls;
- int ret;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Dispatching cadet message (type: %u)\n",
- ntohs (message->type));
- /* do this before the handler, as the handler might kill the channel */
- GNUNET_CADET_receive_done (op->channel);
- if (NULL != op->vt)
- ret = op->vt->msg_handler (op,
- message);
- else
- ret = GNUNET_SYSERR;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Handled cadet message (type: %u)\n",
- ntohs (message->type));
- if (GNUNET_OK != ret)
- GNUNET_CADET_channel_destroy (op->channel);
-}
-
/**
* Called when a client wants to create a new listener.
@@ -1357,66 +1318,66 @@ handle_client_listen (void *cls,
{
struct GNUNET_SERVICE_Client *client = cls;
struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (incoming_msg,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- struct GNUNET_MessageHeader,
+ struct OperationRequestMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_ibf,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
- struct GNUNET_MessageHeader,
+ struct IBFMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_elements,
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
- struct GNUNET_MessageHeader,
+ struct GNUNET_SET_ElementMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_offer,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
struct GNUNET_MessageHeader,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_inquiry,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
- struct GNUNET_MessageHeader,
+ struct InquiryMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_demand,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
struct GNUNET_MessageHeader,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_fixed_size (union_p2p_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
- struct GNUNET_MessageHeader,
+ struct StrataEstimatorMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
- struct GNUNET_MessageHeader,
+ struct StrataEstimatorMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_full_element,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
- struct GNUNET_MessageHeader,
+ struct GNUNET_SET_ElementMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
-
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
+
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
+ struct IntersectionElementInfoMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (intersection_p2p_bf,
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
- struct GNUNET_MessageHeader,
+ struct BFMessage,
NULL),
+ GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
+ GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
+ struct IntersectionDoneMessage,
+ NULL),
GNUNET_MQ_handler_end ()
};
struct Listener *listener;
@@ -1623,66 +1584,66 @@ handle_client_evaluate (void *cls,
struct GNUNET_SERVICE_Client *client = cls;
struct Operation *op = GNUNET_new (struct Operation);
const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (incoming_msg,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- struct GNUNET_MessageHeader,
+ struct OperationRequestMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_ibf,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
- struct GNUNET_MessageHeader,
+ struct IBFMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_elements,
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
- struct GNUNET_MessageHeader,
+ struct GNUNET_SET_ElementMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_offer,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
struct GNUNET_MessageHeader,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_inquiry,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
- struct GNUNET_MessageHeader,
+ struct InquiryMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_demand,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
struct GNUNET_MessageHeader,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_fixed_size (union_p2p_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
- struct GNUNET_MessageHeader,
+ struct StrataEstimatorMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
- struct GNUNET_MessageHeader,
+ struct StrataEstimatorMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_full_element,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
-
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
- struct GNUNET_MessageHeader,
+ struct GNUNET_SET_ElementMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
+
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
+ struct IntersectionElementInfoMessage,
+ op),
+ GNUNET_MQ_hd_var_size (intersection_p2p_bf,
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
- struct GNUNET_MessageHeader,
+ struct BFMessage,
op),
+ GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
+ GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
+ struct IntersectionDoneMessage,
+ op),
GNUNET_MQ_handler_end ()
};
struct Set *set;
@@ -1717,7 +1678,7 @@ handle_client_evaluate (void *cls,
// mutations won't interfer with the running operation.
op->generation_created = set->current_generation;
advance_generation (set);
-
+ op->type = set->type;
op->vt = set->vt;
GNUNET_CONTAINER_DLL_insert (set->ops_head,
set->ops_tail,
@@ -1886,9 +1847,11 @@ handle_client_copy_lazy_connect (void *cls,
{
case GNUNET_SET_OPERATION_INTERSECTION:
set->vt = _GSS_intersection_vt ();
+ set->type = OT_INTERSECTION;
break;
case GNUNET_SET_OPERATION_UNION:
set->vt = _GSS_union_vt ();
+ set->type = OT_UNION;
break;
default:
GNUNET_assert (0);
@@ -2057,6 +2020,7 @@ handle_client_accept (void *cls,
advance_generation (set);
op->vt = set->vt;
+ op->type = set->type;
op->vt->accept (op);
GNUNET_SERVICE_client_continue (client);
}
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 68d8fe81f..c981430ef 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -213,20 +213,6 @@ typedef void
/**
- * Signature of functions that implement the message handling for
- * the different set operations.
- *
- * @param op operation state
- * @param msg received message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to
- * destroy the operation and the tunnel
- */
-typedef int
-(*MsgHandlerImpl) (struct Operation *op,
- const struct GNUNET_MessageHeader *msg);
-
-
-/**
* Signature of functions that implement operation cancellation
*
* @param op operation state
@@ -276,11 +262,6 @@ struct SetVT
DestroySetImpl destroy_set;
/**
- * Callback for handling operation-specific messages.
- */
- MsgHandlerImpl msg_handler;
-
- /**
* Callback for handling the remote peer's disconnect.
*/
PeerDisconnectImpl peer_disconnect;
@@ -364,6 +345,27 @@ struct Listener;
/**
+ * Possible set operations.
+ */
+enum OperationType {
+ /**
+ * Operation type unknown.
+ */
+ OT_UNKNOWN = 0,
+
+ /**
+ * We are performing a union.
+ */
+ OT_UNION,
+
+ /**
+ * We are performing an intersection.
+ */
+ OT_INTERSECTION
+};
+
+
+/**
* Operation context used to execute a set operation.
*/
struct Operation
@@ -427,6 +429,11 @@ struct Operation
struct GNUNET_SCHEDULER_Task *timeout_task;
/**
+ * What type of operation is this?
+ */
+ enum OperationType type;
+
+ /**
* Unique request id for the request from a remote peer, sent to the
* client, which will accept or reject the request. Set to '0' iff
* the request has not been suggested yet.
@@ -582,6 +589,11 @@ struct Set
struct Operation *ops_tail;
/**
+ * What type of operation is this set for?
+ */
+ enum OperationType type;
+
+ /**
* Current generation, that is, number of previously executed
* operations and lazy copies on the underlying set content.
*/
diff --git a/src/set/gnunet-service-set_intersection.c
b/src/set/gnunet-service-set_intersection.c
index 9fe1eabe6..b298f7b41 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- Copyright (C) 2013, 2014 GNUnet e.V.
+ Copyright (C) 2013-2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -28,6 +28,7 @@
#include "gnunet-service-set.h"
#include "gnunet_block_lib.h"
#include "gnunet-service-set_protocol.h"
+#include "gnunet-service-set_intersection.h"
#include <gcrypt.h>
@@ -550,6 +551,8 @@ send_remaining_elements (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending done and destroy because iterator ran out\n");
op->keep--;
+ GNUNET_CONTAINER_multihashmap_iterator_destroy
(op->state->full_result_iter);
+ op->state->full_result_iter = NULL;
send_client_done_and_destroy (op);
return;
}
@@ -627,9 +630,6 @@ process_bf (struct Operation *op)
case PHASE_COUNT_SENT:
/* This is the first BF being sent, build our initial map with
filtering in place */
- op->state->my_elements
- = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
- GNUNET_YES);
op->state->my_element_count = 0;
GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
&filtered_map_initialization,
@@ -665,41 +665,53 @@ process_bf (struct Operation *op)
/**
+ * Check an BF message from a remote peer.
+ *
+ * @param cls the intersection operation
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
+ */
+int
+check_intersection_p2p_bf (void *cls,
+ const struct BFMessage *msg)
+{
+ struct Operation *op = cls;
+
+ if (OT_INTERSECTION != op->type)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
* Handle an BF message from a remote peer.
*
* @param cls the intersection operation
- * @param mh the header of the message
+ * @param msg the header of the message
*/
-static void
-handle_p2p_bf (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_bf (void *cls,
+ const struct BFMessage *msg)
{
struct Operation *op = cls;
- const struct BFMessage *msg;
uint32_t bf_size;
uint32_t chunk_size;
uint32_t bf_bits_per_element;
- uint16_t msize;
- msize = htons (mh->size);
- if (msize < sizeof (struct BFMessage))
- {
- GNUNET_break_op (0);
- fail_intersection_operation (op);
- return;
- }
- msg = (const struct BFMessage *) mh;
switch (op->state->phase)
{
case PHASE_INITIAL:
GNUNET_break_op (0);
fail_intersection_operation (op);
- break;
+ return;
case PHASE_COUNT_SENT:
case PHASE_BF_EXCHANGE:
bf_size = ntohl (msg->bloomfilter_total_length);
bf_bits_per_element = ntohl (msg->bits_per_element);
- chunk_size = msize - sizeof (struct BFMessage);
+ chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
op->state->other_xor = msg->element_xor_hash;
if (bf_size == chunk_size)
{
@@ -717,7 +729,7 @@ handle_p2p_bf (void *cls,
op->state->salt = ntohl (msg->sender_mutator);
op->spec->remote_element_count = ntohl (msg->sender_element_count);
process_bf (op);
- return;
+ break;
}
/* multipart chunk */
if (NULL == op->state->bf_data)
@@ -764,8 +776,9 @@ handle_p2p_bf (void *cls,
default:
GNUNET_break_op (0);
fail_intersection_operation (op);
- break;
+ return;
}
+ GNUNET_CADET_receive_done (op->channel);
}
@@ -836,6 +849,7 @@ static void
begin_bf_exchange (struct Operation *op)
{
op->state->phase = PHASE_BF_EXCHANGE;
+ GNUNET_assert (NULL == op->state->my_elements);
op->state->my_elements
= GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
GNUNET_YES);
@@ -853,20 +867,18 @@ begin_bf_exchange (struct Operation *op)
* @param cls the intersection operation
* @param mh the header of the message
*/
-static void
-handle_p2p_element_info (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_element_info (void *cls,
+ const struct
IntersectionElementInfoMessage *msg)
{
struct Operation *op = cls;
- const struct IntersectionElementInfoMessage *msg;
- if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
+ if (OT_INTERSECTION != op->type)
{
GNUNET_break_op (0);
fail_intersection_operation(op);
return;
}
- msg = (const struct IntersectionElementInfoMessage *) mh;
op->spec->remote_element_count = ntohl (msg->sender_element_count);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received remote element count (%u), I have %u\n",
@@ -884,6 +896,7 @@ handle_p2p_element_info (void *cls,
}
GNUNET_break (NULL == op->state->remote_bf);
begin_bf_exchange (op);
+ GNUNET_CADET_receive_done (op->channel);
}
@@ -955,28 +968,26 @@ filter_all (void *cls,
* @param cls the intersection operation
* @param mh the message
*/
-static void
-handle_p2p_done (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_done (void *cls,
+ const struct IntersectionDoneMessage *idm)
{
struct Operation *op = cls;
- const struct IntersectionDoneMessage *idm;
- if (PHASE_BF_EXCHANGE != op->state->phase)
+ if (OT_INTERSECTION != op->type)
{
- /* wrong phase to conclude? FIXME: Or should we allow this
- if the other peer has _initially_ already an empty set? */
GNUNET_break_op (0);
- fail_intersection_operation (op);
+ fail_intersection_operation(op);
return;
}
- if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
+ if (PHASE_BF_EXCHANGE != op->state->phase)
{
+ /* wrong phase to conclude? FIXME: Or should we allow this
+ if the other peer has _initially_ already an empty set? */
GNUNET_break_op (0);
fail_intersection_operation (op);
return;
}
- idm = (const struct IntersectionDoneMessage *) mh;
if (0 == ntohl (idm->final_element_count))
{
/* other peer determined empty set is the intersection,
@@ -1000,6 +1011,7 @@ handle_p2p_done (void *cls,
op->state->my_element_count);
op->state->phase = PHASE_FINISHED;
finish_and_destroy (op);
+ GNUNET_CADET_receive_done (op->channel);
}
@@ -1064,11 +1076,11 @@ intersection_accept (struct Operation *op)
op->state->phase = PHASE_INITIAL;
op->state->my_element_count
= op->spec->set->state->current_set_element_count;
+ GNUNET_assert (NULL == op->state->my_elements);
op->state->my_elements
- = GNUNET_CONTAINER_multihashmap_create
- (GNUNET_MIN (op->state->my_element_count,
- op->spec->remote_element_count),
- GNUNET_YES);
+ = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN
(op->state->my_element_count,
+
op->spec->remote_element_count),
+ GNUNET_YES);
if (op->spec->remote_element_count < op->state->my_element_count)
{
/* If the other peer (Alice) has fewer elements than us (Bob),
@@ -1083,43 +1095,6 @@ intersection_accept (struct Operation *op)
/**
- * Dispatch messages for a intersection operation.
- *
- * @param op the state of the intersection evaluate operation
- * @param mh the received message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- * #GNUNET_OK otherwise
- */
-static int
-intersection_handle_p2p_message (struct Operation *op,
- const struct GNUNET_MessageHeader *mh)
-{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received p2p message (t: %u, s: %u)\n",
- ntohs (mh->type), ntohs (mh->size));
- switch (ntohs (mh->type))
- {
- /* this message handler is not active until after we received an
- * operation request message, thus the ops request is not handled here
- */
- case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
- handle_p2p_element_info (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
- handle_p2p_bf (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
- handle_p2p_done (op, mh);
- break;
- default:
- /* something wrong with cadet's message handlers? */
- GNUNET_assert (0);
- }
- return GNUNET_OK;
-}
-
-
-/**
* Handler for peer-disconnects, notifies the client about the aborted
* operation. If we did not expect anything from the other peer, we
* gracefully terminate the operation.
@@ -1168,6 +1143,11 @@ intersection_op_cancel (struct Operation *op)
GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
op->state->my_elements = NULL;
}
+ if (NULL != op->state->full_result_iter)
+ {
+ GNUNET_CONTAINER_multihashmap_iterator_destroy
(op->state->full_result_iter);
+ op->state->full_result_iter = NULL;
+ }
GNUNET_free (op->state);
op->state = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1245,7 +1225,6 @@ _GSS_intersection_vt ()
{
static const struct SetVT intersection_vt = {
.create = &intersection_set_create,
- .msg_handler = &intersection_handle_p2p_message,
.add = &intersection_add,
.remove = &intersection_remove,
.destroy_set = &intersection_set_destroy,
diff --git a/src/set/gnunet-service-set_union.c
b/src/set/gnunet-service-set_union.c
index b5b602074..200bd4b8e 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- Copyright (C) 2013-2016 GNUnet e.V.
+ Copyright (C) 2013-2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -19,15 +19,16 @@
*/
/**
* @file set/gnunet-service-set_union.c
-
* @brief two-peer set operations
* @author Florian Dold
+ * @author Christian Grothoff
*/
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_statistics_service.h"
#include "gnunet-service-set.h"
#include "ibf.h"
+#include "gnunet-service-set_union.h"
#include "gnunet-service-set_union_strata_estimator.h"
#include "gnunet-service-set_protocol.h"
#include <gcrypt.h>
@@ -813,42 +814,56 @@ send_full_set (struct Operation *op)
* Handle a strata estimator from a remote peer
*
* @param cls the union operation
- * @param mh the message
- * @param is_compressed #GNUNET_YES if the estimator is compressed
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- * #GNUNET_OK otherwise
+ * @param msg the message
*/
-static int
-handle_p2p_strata_estimator (void *cls,
- const struct GNUNET_MessageHeader *mh,
- int is_compressed)
+int
+check_union_p2p_strata_estimator (void *cls,
+ const struct StrataEstimatorMessage *msg)
{
struct Operation *op = cls;
- struct StrataEstimator *remote_se;
- struct StrataEstimatorMessage *msg = (void *) mh;
- unsigned int diff;
- uint64_t other_size;
+ int is_compressed;
size_t len;
- GNUNET_STATISTICS_update (_GSS_statistics,
- "# bytes of SE received",
- ntohs (mh->size),
- GNUNET_NO);
-
if (op->state->phase != PHASE_EXPECT_SE)
{
GNUNET_break (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
- len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
+ is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons
(msg->header.type));
+ len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
if ( (GNUNET_NO == is_compressed) &&
(len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
{
- fail_union_operation (op);
GNUNET_break (0);
return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle a strata estimator from a remote peer
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+void
+handle_union_p2p_strata_estimator (void *cls,
+ const struct StrataEstimatorMessage *msg)
+{
+ struct Operation *op = cls;
+ struct StrataEstimator *remote_se;
+ unsigned int diff;
+ uint64_t other_size;
+ size_t len;
+ int is_compressed;
+
+ is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons
(msg->header.type));
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# bytes of SE received",
+ ntohs (msg->header.size),
+ GNUNET_NO);
+ len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
other_size = GNUNET_ntohll (msg->set_size);
remote_se = strata_estimator_create (SE_STRATA_COUNT,
SE_IBF_SIZE,
@@ -857,7 +872,7 @@ handle_p2p_strata_estimator (void *cls,
{
/* insufficient resources, fail */
fail_union_operation (op);
- return GNUNET_SYSERR;
+ return;
}
if (GNUNET_OK !=
strata_estimator_read (&msg[1],
@@ -866,18 +881,16 @@ handle_p2p_strata_estimator (void *cls,
remote_se))
{
/* decompression failed */
- fail_union_operation (op);
strata_estimator_destroy (remote_se);
- return GNUNET_SYSERR;
+ fail_union_operation (op);
+ return;
}
GNUNET_assert (NULL != op->state->se);
diff = strata_estimator_difference (remote_se,
op->state->se);
if (diff > 200)
- diff = diff * 3 / 2;
-
-
+ diff = diff * 3 / 2;
strata_estimator_destroy (remote_se);
strata_estimator_destroy (op->state->se);
@@ -885,12 +898,14 @@ handle_p2p_strata_estimator (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"got se diff=%d, using ibf size %d\n",
diff,
- 1<<get_order_from_difference (diff));
+ 1U << get_order_from_difference (diff));
{
char *set_debug;
+
set_debug = getenv ("GNUNET_SET_BENCHMARK");
- if ( (NULL != set_debug) && (0 == strcmp (set_debug, "1")) )
+ if ( (NULL != set_debug) &&
+ (0 == strcmp (set_debug, "1")) )
{
FILE *f = fopen ("set.log", "a");
fprintf (f, "%llu\n", (unsigned long long) diff);
@@ -898,15 +913,16 @@ handle_p2p_strata_estimator (void *cls,
}
}
- if ((GNUNET_YES == op->spec->byzantine) && (other_size <
op->spec->byzantine_lower_bound))
+ if ( (GNUNET_YES == op->spec->byzantine) &&
+ (other_size < op->spec->byzantine_lower_bound) )
{
GNUNET_break (0);
fail_union_operation (op);
- return GNUNET_SYSERR;
+ return;
}
-
- if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size
/ 4))
+ if ( (GNUNET_YES == op->spec->force_full) ||
+ (diff > op->state->initial_size / 4))
{
LOG (GNUNET_ERROR_TYPE_INFO,
"Sending full set (diff=%d, own set=%u)\n",
@@ -923,6 +939,7 @@ handle_p2p_strata_estimator (void *cls,
else
{
struct GNUNET_MQ_Envelope *ev;
+
op->state->phase = PHASE_EXPECT_IBF;
ev = GNUNET_MQ_msg_header
(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
GNUNET_MQ_send (op->mq, ev);
@@ -942,11 +959,10 @@ handle_p2p_strata_estimator (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to send IBF, closing connection\n");
fail_union_operation (op);
- return GNUNET_SYSERR;
+ return;
}
}
-
- return GNUNET_OK;
+ GNUNET_CADET_receive_done (op->channel);
}
@@ -1164,99 +1180,116 @@ decode_and_send (struct Operation *op)
/**
- * Handle an IBF message from a remote peer.
+ * Check an IBF message from a remote peer.
*
* Reassemble the IBF from multiple pieces, and
* process the whole IBF once possible.
*
* @param cls the union operation
- * @param mh the header of the message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- * #GNUNET_OK otherwise
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
*/
-static int
-handle_p2p_ibf (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_ibf (void *cls,
+ const struct IBFMessage *msg)
{
struct Operation *op = cls;
- const struct IBFMessage *msg;
unsigned int buckets_in_message;
- if (ntohs (mh->size) < sizeof (struct IBFMessage))
+ if (OT_UNION != op->type)
{
GNUNET_break_op (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
- msg = (const struct IBFMessage *) mh;
- if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
- (op->state->phase == PHASE_EXPECT_IBF) )
+ buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) /
IBF_BUCKET_SIZE;
+ if (0 == buckets_in_message)
{
- op->state->phase = PHASE_EXPECT_IBF_CONT;
- GNUNET_assert (NULL == op->state->remote_ibf);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Creating new ibf of size %u\n",
- 1 << msg->order);
- op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
- op->state->salt_receive = ntohl (msg->salt);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n",
op->state->salt_receive);
- if (NULL == op->state->remote_ibf)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to parse remote IBF, closing connection\n");
- fail_union_operation (op);
- return GNUNET_SYSERR;
- }
- op->state->ibf_buckets_received = 0;
- if (0 != ntohl (msg->offset))
- {
- GNUNET_break_op (0);
- fail_union_operation (op);
- return GNUNET_SYSERR;
- }
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
+ if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message *
IBF_BUCKET_SIZE)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (op->state->phase == PHASE_EXPECT_IBF_CONT)
{
if (ntohl (msg->offset) != op->state->ibf_buckets_received)
{
GNUNET_break_op (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
if (1<<msg->order != op->state->remote_ibf->size)
{
GNUNET_break_op (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
if (ntohl (msg->salt) != op->state->salt_receive)
{
GNUNET_break_op (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
}
- else
+ else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+ (op->state->phase != PHASE_EXPECT_IBF) )
{
- GNUNET_assert (0);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) /
IBF_BUCKET_SIZE;
+ return GNUNET_OK;
+}
- if (0 == buckets_in_message)
+
+/**
+ * Handle an IBF message from a remote peer.
+ *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
+ * @param cls the union operation
+ * @param msg the header of the message
+ */
+void
+handle_union_p2p_ibf (void *cls,
+ const struct IBFMessage *msg)
+{
+ struct Operation *op = cls;
+ unsigned int buckets_in_message;
+
+ buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) /
IBF_BUCKET_SIZE;
+ if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
+ (op->state->phase == PHASE_EXPECT_IBF) )
{
- GNUNET_break_op (0);
- fail_union_operation (op);
- return GNUNET_SYSERR;
+ op->state->phase = PHASE_EXPECT_IBF_CONT;
+ GNUNET_assert (NULL == op->state->remote_ibf);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new ibf of size %u\n",
+ 1 << msg->order);
+ op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+ op->state->salt_receive = ntohl (msg->salt);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving new IBF with salt %u\n",
+ op->state->salt_receive);
+ if (NULL == op->state->remote_ibf)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to parse remote IBF, closing connection\n");
+ fail_union_operation (op);
+ return;
+ }
+ op->state->ibf_buckets_received = 0;
+ if (0 != ntohl (msg->offset))
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
}
-
- if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message *
IBF_BUCKET_SIZE)
+ else
{
- GNUNET_break_op (0);
- fail_union_operation (op);
- return GNUNET_SYSERR;
+ GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
}
-
GNUNET_assert (NULL != op->state->remote_ibf);
ibf_read_slice (&msg[1],
@@ -1276,10 +1309,11 @@ handle_p2p_ibf (void *cls,
/* Internal error, best we can do is shut down */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to decode IBF, closing connection\n");
- return GNUNET_SYSERR;
+ fail_union_operation (op);
+ return;
}
}
- return GNUNET_OK;
+ GNUNET_CADET_receive_done (op->channel);
}
@@ -1343,6 +1377,11 @@ send_done_and_destroy (void *cls)
}
+/**
+ * Tests if the operation is finished, and if so notify.
+ *
+ * @param op operation to check
+ */
static void
maybe_finish (struct Operation *op)
{
@@ -1382,46 +1421,59 @@ maybe_finish (struct Operation *op)
/**
- * Handle an element message from a remote peer.
- * Sent by the other peer either because we decoded an IBF and placed a demand,
- * or because the other peer switched to full set transmission.
+ * Check an element message from a remote peer.
*
* @param cls the union operation
- * @param mh the message
+ * @param emsg the message
*/
-static void
-handle_p2p_elements (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_elements (void *cls,
+ const struct GNUNET_SET_ElementMessage *emsg)
{
struct Operation *op = cls;
- struct ElementEntry *ee;
- const struct GNUNET_SET_ElementMessage *emsg;
- uint16_t element_size;
- if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
+ if (OT_UNION != op->type)
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
- if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+ if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
+
- emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+/**
+ * Handle an element message from a remote peer.
+ * Sent by the other peer either because we decoded an IBF and placed a demand,
+ * or because the other peer switched to full set transmission.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+void
+handle_union_p2p_elements (void *cls,
+ const struct GNUNET_SET_ElementMessage *emsg)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ struct KeyEntry *ke;
+ uint16_t element_size;
- element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+ element_size = ntohs (emsg->header.size) - sizeof (struct
GNUNET_SET_ElementMessage);
ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
- GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+ GNUNET_memcpy (&ee[1],
+ &emsg[1],
+ element_size);
ee->element.size = element_size;
ee->element.data = &ee[1];
ee->element.element_type = ntohs (emsg->element_type);
ee->remote = GNUNET_YES;
- GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
-
+ GNUNET_SET_element_hash (&ee->element,
+ &ee->element_hash);
if (GNUNET_NO ==
GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
&ee->element_hash,
@@ -1429,7 +1481,6 @@ handle_p2p_elements (void *cls,
{
/* We got something we didn't demand, since it's not in our map. */
GNUNET_break_op (0);
- GNUNET_free (ee);
fail_union_operation (op);
return;
}
@@ -1448,10 +1499,9 @@ handle_p2p_elements (void *cls,
1,
GNUNET_NO);
- op->state->received_total += 1;
-
- struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+ op->state->received_total++;
+ ke = op_get_element (op, &ee->element_hash);
if (NULL != ke)
{
/* Got repeated element. Should not happen since
@@ -1467,7 +1517,7 @@ handle_p2p_elements (void *cls,
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Registering new element from remote peer\n");
- op->state->received_fresh += 1;
+ op->state->received_fresh++;
op_register_element (op, ee, GNUNET_YES);
/* only send results immediately if the client wants it */
switch (op->spec->result_mode)
@@ -1485,43 +1535,57 @@ handle_p2p_elements (void *cls,
}
}
- if (op->state->received_total > 8 && op->state->received_fresh <
op->state->received_total / 3)
+ if ( (op->state->received_total > 8) &&
+ (op->state->received_fresh < op->state->received_total / 3) )
{
/* The other peer gave us lots of old elements, there's something wrong. */
GNUNET_break_op (0);
fail_union_operation (op);
return;
}
-
+ GNUNET_CADET_receive_done (op->channel);
maybe_finish (op);
}
/**
- * Handle an element message from a remote peer.
+ * Check a full element message from a remote peer.
*
* @param cls the union operation
- * @param mh the message
+ * @param emsg the message
*/
-static void
-handle_p2p_full_element (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_full_element (void *cls,
+ const struct GNUNET_SET_ElementMessage *emsg)
{
struct Operation *op = cls;
- struct ElementEntry *ee;
- const struct GNUNET_SET_ElementMessage *emsg;
- uint16_t element_size;
- if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+ if (OT_UNION != op->type)
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ // FIXME: check that we expect full elements here?
+ return GNUNET_OK;
+}
- emsg = (const struct GNUNET_SET_ElementMessage *) mh;
- element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+void
+handle_union_p2p_full_element (void *cls,
+ const struct GNUNET_SET_ElementMessage *emsg)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ struct KeyEntry *ke;
+ uint16_t element_size;
+
+ element_size = ntohs (emsg->header.size) - sizeof (struct
GNUNET_SET_ElementMessage);
ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
GNUNET_memcpy (&ee[1], &emsg[1], element_size);
ee->element.size = element_size;
@@ -1544,10 +1608,9 @@ handle_p2p_full_element (void *cls,
1,
GNUNET_NO);
- op->state->received_total += 1;
-
- struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+ op->state->received_total++;
+ ke = op_get_element (op, &ee->element_hash);
if (NULL != ke)
{
/* Got repeated element. Should not happen since
@@ -1563,7 +1626,7 @@ handle_p2p_full_element (void *cls,
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Registering new element from remote peer\n");
- op->state->received_fresh += 1;
+ op->state->received_fresh++;
op_register_element (op, ee, GNUNET_YES);
/* only send results immediately if the client wants it */
switch (op->spec->result_mode)
@@ -1581,8 +1644,8 @@ handle_p2p_full_element (void *cls,
}
}
- if ( (GNUNET_YES == op->spec->byzantine) &&
- (op->state->received_total > 384 + op->state->received_fresh * 4) &&
+ if ( (GNUNET_YES == op->spec->byzantine) &&
+ (op->state->received_total > 384 + op->state->received_fresh * 4) &&
(op->state->received_fresh < op->state->received_total / 6) )
{
/* The other peer gave us lots of old elements, there's something wrong. */
@@ -1594,51 +1657,73 @@ handle_p2p_full_element (void *cls,
fail_union_operation (op);
return;
}
+ GNUNET_CADET_receive_done (op->channel);
}
+
/**
* Send offers (for GNUNET_Hash-es) in response
* to inquiries (for IBF_Key-s).
*
* @param cls the union operation
- * @param mh the message
+ * @param msg the message
*/
-static void
-handle_p2p_inquiry (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_inquiry (void *cls,
+ const struct InquiryMessage *msg)
{
struct Operation *op = cls;
- const struct IBF_Key *ibf_key;
unsigned int num_keys;
- struct InquiryMessage *msg;
- /* look up elements and send them */
+ if (OT_UNION != op->type)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
if (op->state->phase != PHASE_INVENTORY_PASSIVE)
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
- num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
- / sizeof (struct IBF_Key);
- if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
+ num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
+ / sizeof (struct IBF_Key);
+ if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
!= num_keys * sizeof (struct IBF_Key))
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
- msg = (struct InquiryMessage *) mh;
+/**
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+void
+handle_union_p2p_inquiry (void *cls,
+ const struct InquiryMessage *msg)
+{
+ struct Operation *op = cls;
+ const struct IBF_Key *ibf_key;
+ unsigned int num_keys;
+
+ num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
+ / sizeof (struct IBF_Key);
ibf_key = (const struct IBF_Key *) &msg[1];
while (0 != num_keys--)
{
struct IBF_Key unsalted_key;
+
unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
send_offers_for_key (op, unsalted_key);
ibf_key++;
}
+ GNUNET_CADET_receive_done (op->channel);
}
@@ -1677,27 +1762,34 @@ send_missing_elements_iter (void *cls,
/**
- * Handle a
+ * Handle a request for full set transmission.
*
* @parem cls closure, a set union operation
* @param mh the demand message
*/
-static void
-handle_p2p_request_full (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_request_full (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
- if (PHASE_EXPECT_IBF != op->state->phase)
+ if (OT_UNION != op->type)
{
+ GNUNET_break_op (0);
fail_union_operation (op);
+ return;
+ }
+ if (PHASE_EXPECT_IBF != op->state->phase)
+ {
GNUNET_break_op (0);
+ fail_union_operation (op);
return;
}
// FIXME: we need to check that our set is larger than the
// byzantine_lower_bound by some threshold
send_full_set (op);
+ GNUNET_CADET_receive_done (op->channel);
}
@@ -1707,56 +1799,97 @@ handle_p2p_request_full (void *cls,
* @parem cls closure, a set union operation
* @param mh the demand message
*/
-static void
-handle_p2p_full_done (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_full_done (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
- if (PHASE_EXPECT_IBF == op->state->phase)
+ switch (op->state->phase)
{
- struct GNUNET_MQ_Envelope *ev;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other
peer is missing\n");
+ case PHASE_EXPECT_IBF:
+ {
+ struct GNUNET_MQ_Envelope *ev;
- /* send all the elements that did not come from the remote peer */
- GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
- &send_missing_elements_iter,
- op);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "got FULL DONE, sending elements that other peer is missing\n");
- ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
- GNUNET_MQ_send (op->mq, ev);
- op->state->phase = PHASE_DONE;
+ /* send all the elements that did not come from the remote peer */
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+ &send_missing_elements_iter,
+ op);
- /* we now wait until the other peer shuts the tunnel down*/
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+ GNUNET_MQ_send (op->mq, ev);
+ op->state->phase = PHASE_DONE;
+ /* we now wait until the other peer shuts the tunnel down*/
+ }
+ break;
+ case PHASE_FULL_SENDING:
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "got FULL DONE, finishing\n");
+ /* We sent the full set, and got the response for that. We're done. */
+ op->state->phase = PHASE_DONE;
+ GNUNET_CADET_receive_done (op->channel);
+ send_done_and_destroy (op);
+ return;
+ }
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Handle full done phase is %u\n",
+ (unsigned) op->state->phase);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
}
- else if (PHASE_FULL_SENDING == op->state->phase)
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Check a demand by the other peer for elements based on a list
+ * of `struct GNUNET_HashCode`s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ * @return #GNUNET_OK if @a mh is well-formed
+ */
+int
+check_union_p2p_demand (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ unsigned int num_hashes;
+
+ if (OT_UNION != op->type)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
- /* We sent the full set, and got the response for that. We're done. */
- op->state->phase = PHASE_DONE;
- send_done_and_destroy (op);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- else
+ num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ / sizeof (struct GNUNET_HashCode);
+ if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ != num_hashes * sizeof (struct GNUNET_HashCode))
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n",
(unsigned) op->state->phase);
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
}
/**
* Handle a demand by the other peer for elements based on a list
- * of GNUNET_HashCode-s.
+ * of `struct GNUNET_HashCode`s.
*
* @parem cls closure, a set union operation
* @param mh the demand message
*/
-static void
-handle_p2p_demand (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_demand (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
struct ElementEntry *ee;
@@ -1767,19 +1900,12 @@ handle_p2p_demand (void *cls,
num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
/ sizeof (struct GNUNET_HashCode);
- if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
- != num_hashes * sizeof (struct GNUNET_HashCode))
- {
- GNUNET_break_op (0);
- fail_union_operation (op);
- return;
- }
-
for (hash = (const struct GNUNET_HashCode *) &mh[1];
num_hashes > 0;
hash++, num_hashes--)
{
- ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
hash);
+ ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
+ hash);
if (NULL == ee)
{
/* Demand for non-existing element. */
@@ -1823,31 +1949,35 @@ handle_p2p_demand (void *cls,
break;
}
}
+ GNUNET_CADET_receive_done (op->channel);
}
/**
- * Handle offers (of GNUNET_HashCode-s) and
- * respond with demands (of GNUNET_HashCode-s).
+ * Check offer (of `struct GNUNET_HashCode`s).
*
* @param cls the union operation
* @param mh the message
+ * @return #GNUNET_OK if @a mh is well-formed
*/
-static void
-handle_p2p_offer (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_offer (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
- const struct GNUNET_HashCode *hash;
unsigned int num_hashes;
+ if (OT_UNION != op->type)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
/* look up elements and send them */
if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
(op->state->phase != PHASE_INVENTORY_ACTIVE))
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
/ sizeof (struct GNUNET_HashCode);
@@ -1855,10 +1985,29 @@ handle_p2p_offer (void *cls,
!= num_hashes * sizeof (struct GNUNET_HashCode))
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
+
+/**
+ * Handle offers (of `struct GNUNET_HashCode`s) and
+ * respond with demands (of `struct GNUNET_HashCode`s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+void
+handle_union_p2p_offer (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ const struct GNUNET_HashCode *hash;
+ unsigned int num_hashes;
+
+ num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ / sizeof (struct GNUNET_HashCode);
for (hash = (const struct GNUNET_HashCode *) &mh[1];
num_hashes > 0;
hash++, num_hashes--)
@@ -1897,6 +2046,7 @@ handle_p2p_offer (void *cls,
*(struct GNUNET_HashCode *) &demands[1] = *hash;
GNUNET_MQ_send (op->mq, ev);
}
+ GNUNET_CADET_receive_done (op->channel);
}
@@ -1906,16 +2056,22 @@ handle_p2p_offer (void *cls,
* @param cls the union operation
* @param mh the message
*/
-static void
-handle_p2p_done (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_done (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
- if (op->state->phase == PHASE_INVENTORY_PASSIVE)
+ if (OT_UNION != op->type)
{
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ switch (op->state->phase)
+ {
+ case PHASE_INVENTORY_PASSIVE:
/* We got all requests, but still have to send our elements in response. */
-
op->state->phase = PHASE_FINISH_WAITING;
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1929,11 +2085,10 @@ handle_p2p_done (void *cls,
* all our demands are satisfied, so that the active
* peer can quit if we gave him everything.
*/
+ GNUNET_CADET_receive_done (op->channel);
maybe_finish (op);
return;
- }
- if (op->state->phase == PHASE_INVENTORY_ACTIVE)
- {
+ case PHASE_INVENTORY_ACTIVE:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"got DONE (as active partner), waiting to finish\n");
/* All demands of the other peer are satisfied,
@@ -1944,11 +2099,14 @@ handle_p2p_done (void *cls,
* to the other peer once our demands are met.
*/
op->state->phase = PHASE_FINISH_CLOSING;
+ GNUNET_CADET_receive_done (op->channel);
maybe_finish (op);
return;
+ default:
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
}
- GNUNET_break_op (0);
- fail_union_operation (op);
}
@@ -2119,62 +2277,6 @@ union_set_destroy (struct SetState *set_state)
/**
- * Dispatch messages for a union operation.
- *
- * @param op the state of the union evaluate operation
- * @param mh the received message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- * #GNUNET_OK otherwise
- */
-int
-union_handle_p2p_message (struct Operation *op,
- const struct GNUNET_MessageHeader *mh)
-{
- //LOG (GNUNET_ERROR_TYPE_DEBUG,
- // "received p2p message (t: %u, s: %u)\n",
- // ntohs (mh->type),
- // ntohs (mh->size));
- switch (ntohs (mh->type))
- {
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
- return handle_p2p_ibf (op, mh);
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
- return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
- return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
- case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
- handle_p2p_elements (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
- handle_p2p_full_element (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
- handle_p2p_inquiry (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
- handle_p2p_done (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
- handle_p2p_offer (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
- handle_p2p_demand (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
- handle_p2p_full_done (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
- handle_p2p_request_full (op, mh);
- break;
- default:
- /* Something wrong with cadet's message handlers? */
- GNUNET_assert (0);
- }
- return GNUNET_OK;
-}
-
-
-/**
* Handler for peer-disconnects, notifies the client
* about the aborted operation in case the op was not concluded.
*
@@ -2240,7 +2342,6 @@ _GSS_union_vt ()
{
static const struct SetVT union_vt = {
.create = &union_set_create,
- .msg_handler = &union_handle_p2p_message,
.add = &union_add,
.remove = &union_remove,
.destroy_set = &union_set_destroy,
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 04a4e4910..bc428f9f6 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -76,6 +76,8 @@ struct GNUNET_SET_Handle
/**
* Should the set be destroyed once all operations are gone?
+ * #GNUNET_SYSERR if #GNUNET_SET_destroy() must raise this flag,
+ * #GNUNET_YES if #GNUNET_SET_destroy() did raise this flag.
*/
int destroy_requested;
@@ -345,11 +347,13 @@ handle_iter_done (void *cls,
if (NULL == iter)
return;
+ set->destroy_requested = GNUNET_SYSERR;
set->iterator = NULL;
set->iteration_id++;
iter (set->iterator_cls,
NULL);
-
+ if (GNUNET_SYSERR == set->destroy_requested)
+ set->destroy_requested = GNUNET_NO;
if (GNUNET_YES == set->destroy_requested)
GNUNET_SET_destroy (set);
}
@@ -736,7 +740,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
/* destroying set while iterator is active is currently
not supported; we should expand the API to allow
clients to explicitly cancel the iteration! */
- if ( (NULL != set->ops_head) || (NULL != set->iterator) )
+ if ( (NULL != set->ops_head) ||
+ (NULL != set->iterator) ||
+ (GNUNET_SYSERR == set->destroy_requested) )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Set operations are pending, delaying set destruction\n");
@@ -809,7 +815,7 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity
*other_peer,
msg->force_delta = GNUNET_YES;
break;
default:
- LOG (GNUNET_ERROR_TYPE_ERROR,
+ LOG (GNUNET_ERROR_TYPE_ERROR,
"Option with type %d not recognized\n", (int) opt->type);
}
}
diff --git a/src/set/test_set_union_copy.c b/src/set/test_set_union_copy.c
index c887a8958..a1eba6311 100644
--- a/src/set/test_set_union_copy.c
+++ b/src/set/test_set_union_copy.c
@@ -122,6 +122,7 @@ check_count_iter (void *cls,
return GNUNET_NO;
}
ci_cls->cont (ci_cls->cont_cls);
+ GNUNET_free (ci_cls);
return GNUNET_NO;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
--
To stop receiving notification emails like this one, please contact
address@hidden