[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r12764 - gnunet/src/dht
From: |
gnunet |
Subject: |
[GNUnet-SVN] r12764 - gnunet/src/dht |
Date: |
Mon, 30 Aug 2010 18:33:42 +0200 |
Author: nevans
Date: 2010-08-30 18:33:42 +0200 (Mon, 30 Aug 2010)
New Revision: 12764
Modified:
gnunet/src/dht/dht.h
gnunet/src/dht/dht_api.c
gnunet/src/dht/gnunet-dht-driver.c
gnunet/src/dht/gnunet-service-dht.c
Log:
assorted dht changes, fixes, etc.
Modified: gnunet/src/dht/dht.h
===================================================================
--- gnunet/src/dht/dht.h 2010-08-30 14:30:49 UTC (rev 12763)
+++ gnunet/src/dht/dht.h 2010-08-30 16:33:42 UTC (rev 12764)
@@ -61,6 +61,8 @@
#define STAT_GET_RESPONSE_START "# DHT GET Responses Initiated"
#define STAT_HELLOS_PROVIDED "# HELLO Messages given to transport"
#define STAT_DISCONNECTS "# Disconnects received"
+#define STAT_DUPLICATE_UID "# Duplicate UID's encountered (bad if any!)"
+#define STAT_RECENT_SEEN "# recent requests seen again (routing loops,
alternate paths)"
typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls,
const struct
GNUNET_MessageHeader
Modified: gnunet/src/dht/dht_api.c
===================================================================
--- gnunet/src/dht/dht_api.c 2010-08-30 14:30:49 UTC (rev 12763)
+++ gnunet/src/dht/dht_api.c 2010-08-30 16:33:42 UTC (rev 12764)
@@ -872,6 +872,55 @@
}
/**
+ * Send a message to the DHT telling it to issue a single find
+ * peer request using the peers unique identifier as key. This
+ * is used to fill the routing table, and is normally controlled
+ * by the DHT itself. However, for testing and perhaps more
+ * close control over the DHT, this can be explicitly managed.
+ *
+ * @param handle handle to the DHT service
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
+ GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+ struct GNUNET_DHT_ControlMessage *msg;
+ struct PendingMessage *pending;
+
+ if ((handle->current != NULL) && (handle->retransmit_stage !=
DHT_RETRANSMITTING))
+ return GNUNET_NO;
+
+ msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+ msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+ msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+ msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+
+ pending = GNUNET_malloc (sizeof (struct PendingMessage));
+ pending->msg = &msg->header;
+ pending->timeout = GNUNET_TIME_relative_get_forever();
+ pending->free_on_send = GNUNET_YES;
+ pending->cont = cont;
+ pending->cont_cls = cont_cls;
+ pending->unique_id = 0;
+
+ if (handle->current == NULL)
+ {
+ handle->current = pending;
+ process_pending_message (handle);
+ }
+ else
+ {
+ handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+ handle->retransmission_buffer = pending;
+ }
+
+ return GNUNET_YES;
+}
+
+/**
* Send a message to the DHT telling it to start issuing random PUT
* requests every 'frequency' milliseconds.
*
Modified: gnunet/src/dht/gnunet-dht-driver.c
===================================================================
--- gnunet/src/dht/gnunet-dht-driver.c 2010-08-30 14:30:49 UTC (rev 12763)
+++ gnunet/src/dht/gnunet-dht-driver.c 2010-08-30 16:33:42 UTC (rev 12764)
@@ -50,12 +50,21 @@
/* Timeout for waiting for puts to be sent to the service */
#define DEFAULT_PUT_DELAY
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 10)
+/* Timeout for waiting for puts to be sent to the service */
+#define DEFAULT_FIND_PEER_DELAY
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 40)
+
#define DEFAULT_SECONDS_PER_PEER_START
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 45)
#define DEFAULT_TEST_DATA_SIZE 8
+#define DEFAULT_BUCKET_SIZE 4
+
+#define FIND_PEER_THRESHOLD DEFAULT_BUCKET_SIZE * 2
+
#define DEFAULT_MAX_OUTSTANDING_PUTS 10
+#define DEFAULT_MAX_OUTSTANDING_FIND_PEERS 1
+
#define DEFAULT_MAX_OUTSTANDING_GETS 10
#define DEFAULT_CONNECT_TIMEOUT 60
@@ -97,6 +106,30 @@
int malicious_type;
};
+struct TestFindPeer
+{
+ /* This is a linked list */
+ struct TestFindPeer *next;
+
+ /* Handle to the bigger context */
+ struct FindPeerContext *find_peer_context;
+
+ /**
+ * Handle to the peer's DHT service (via the API)
+ */
+ struct GNUNET_DHT_Handle *dht_handle;
+
+ /**
+ * Handle to the peer daemon
+ */
+ struct GNUNET_TESTING_Daemon *daemon;
+
+ /**
+ * Task for disconnecting DHT handles
+ */
+ GNUNET_SCHEDULER_TaskIdentifier disconnect_task;
+};
+
struct TestPutContext
{
/* This is a linked list */
@@ -232,8 +265,12 @@
static struct GNUNET_TIME_Relative put_delay;
+static struct GNUNET_TIME_Relative find_peer_delay;
+
static struct GNUNET_TIME_Relative seconds_per_peer_start;
+static int do_find_peer;
+
static unsigned long long test_data_size = DEFAULT_TEST_DATA_SIZE;
static unsigned long long max_outstanding_puts = DEFAULT_MAX_OUTSTANDING_PUTS;
@@ -242,6 +279,8 @@
static unsigned long long malicious_getters;
+static unsigned long long max_outstanding_find_peers;
+
static unsigned long long malicious_putters;
static unsigned long long malicious_droppers;
@@ -651,6 +690,7 @@
stats_ctx->peer = peer;
GNUNET_CONTAINER_multihashmap_put(stats_map, &peer->hashPubKey,
stats_ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
}
+ GNUNET_assert(stats_ctx != NULL);
if (strcmp(name, STAT_ROUTES) == 0)
stats_ctx->stat_routes = value;
@@ -696,6 +736,7 @@
log_dht_statistics (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
{
stats_map = GNUNET_CONTAINER_multihashmap_create(num_peers);
+ fprintf(stderr, "Starting statistics logging\n");
GNUNET_TESTING_get_statistics(pg, &stats_finished, &stats_handle, NULL);
}
@@ -1005,8 +1046,171 @@
GNUNET_SCHEDULER_add_delayed(sched,
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, rand), &do_put,
test_put->next);
}
+/**
+ * Context for sending out find peer requests.
+ */
+struct FindPeerContext
+{
+ struct GNUNET_DHT_Handle *dht_handle;
+ struct GNUNET_TIME_Absolute endtime;
+ unsigned int current_peers;
+ unsigned int previous_peers;
+ unsigned int outstanding;
+ unsigned int total;
+};
+static void
+schedule_find_peer_requests (void *cls, const struct
GNUNET_SCHEDULER_TaskContext * tc);
+
/**
+ * Given a number of total peers and a bucket size, estimate the number of
+ * connections in a perfect kademlia topology.
+ */
+static unsigned int connection_estimate(unsigned int peer_count, unsigned int
bucket_size)
+{
+ unsigned int i;
+ unsigned int filled;
+ i = num_peers;
+
+ filled = 0;
+ while (i > bucket_size)
+ {
+ filled++;
+ i = i/2;
+ }
+ return filled * bucket_size * peer_count;
+
+}
+
+/**
+ * Callback for iterating over all the peer connections of a peer group.
+ */
+void count_peers_cb (void *cls,
+ const struct GNUNET_PeerIdentity *first,
+ const struct GNUNET_PeerIdentity *second,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance,
+ const char *emsg)
+{
+ struct FindPeerContext *find_peer_context = cls;
+ if ((first != NULL) && (second != NULL))
+ {
+ find_peer_context->current_peers++;
+ }
+ else
+ {
+ GNUNET_assert(dhtlog_handle != NULL);
+ fprintf(stderr, "peer count finished (%u connections), %u new peers,
connection estimate %u\n", find_peer_context->current_peers,
find_peer_context->current_peers - find_peer_context->previous_peers,
connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));
+ if ((find_peer_context->current_peers -
find_peer_context->previous_peers > FIND_PEER_THRESHOLD) &&
+ (find_peer_context->current_peers < connection_estimate(num_peers,
DEFAULT_BUCKET_SIZE)) &&
+
(GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0))
+ {
+ fprintf(stderr, "Scheduling another round of find peer requests.\n");
+ GNUNET_SCHEDULER_add_now(sched, schedule_find_peer_requests,
find_peer_context);
+ }
+ else
+ {
+ fprintf(stderr, "Not sending any more find peer requests.\n");
+ }
+ }
+}
+
+/**
+ * Connect to all peers in the peer group and iterate over their
+ * connections.
+ */
+static void
+count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+ struct FindPeerContext *find_peer_context = cls;
+ find_peer_context->previous_peers = find_peer_context->current_peers;
+ find_peer_context->current_peers = 0;
+ GNUNET_TESTING_get_topology (pg, &count_peers_cb, find_peer_context);
+}
+
+
+static void
+decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *
tc)
+{
+ struct TestFindPeer *test_find_peer = cls;
+ GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0);
+ test_find_peer->find_peer_context->outstanding--;
+ test_find_peer->find_peer_context->total--;
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%d find_peers remaining\n",
test_find_peer->find_peer_context->total);
+ if ((0 == test_find_peer->find_peer_context->total) &&
+
(GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value
> 0))
+ {
+ GNUNET_SCHEDULER_add_now(sched, &count_new_peers,
test_find_peer->find_peer_context);
+ }
+ GNUNET_free(test_find_peer);
+}
+
+/**
+ * A find peer request has been sent to the server, now we will schedule a task
+ * to wait the appropriate time to allow the request to go out and back.
+ *
+ * @param cls closure - a TestFindPeer struct
+ * @param tc context the task is being called with
+ */
+static void
+handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *
tc)
+{
+ struct TestFindPeer *test_find_peer = cls;
+
+ GNUNET_DHT_disconnect(test_find_peer->dht_handle);
+ GNUNET_SCHEDULER_add_delayed(sched, find_peer_delay, &decrement_find_peers,
test_find_peer);
+}
+
+static void
+send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *
tc)
+{
+ struct TestFindPeer *test_find_peer = cls;
+
+ if (test_find_peer->find_peer_context->outstanding >
max_outstanding_find_peers)
+ {
+ GNUNET_SCHEDULER_add_delayed(sched,
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 300),
&send_find_peer_request, test_find_peer);
+ return;
+ }
+
+ test_find_peer->find_peer_context->outstanding++;
+ if
(GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value
== 0)
+ {
+ GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer);
+ return;
+ }
+
+ test_find_peer->dht_handle = GNUNET_DHT_connect(sched,
test_find_peer->daemon->cfg, 1);
+ GNUNET_assert(test_find_peer->dht_handle != NULL);
+ fprintf(stderr, "calling GNUNET_DHT_find_peers\n");
+ GNUNET_DHT_find_peers (test_find_peer->dht_handle,
+ &handle_find_peer_sent, test_find_peer);
+}
+
+/**
+ * Set up a single find peer request for each peer in the topology. Do this
+ * until the settle time is over, limited by the number of outstanding requests
+ * and the time allowed for each one!
+ */
+static void
+schedule_find_peer_requests (void *cls, const struct
GNUNET_SCHEDULER_TaskContext * tc)
+{
+ struct FindPeerContext *find_peer_ctx = cls;
+ struct TestFindPeer *test_find_peer;
+ uint32_t i;
+ uint32_t random;
+
+ for (i = 0; i < max_outstanding_find_peers; i++)
+ {
+ test_find_peer = GNUNET_malloc(sizeof(struct TestFindPeer));
+ random = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
+ test_find_peer->daemon = GNUNET_TESTING_daemon_get(pg, random);
+ test_find_peer->find_peer_context = find_peer_ctx;
+ find_peer_ctx->total++;
+ GNUNET_SCHEDULER_add_now(sched, &send_find_peer_request, test_find_peer);
+ }
+}
+
+/**
* Set up some all of the put and get operations we want
* to do. Allocate data structure for each, add to list,
* then call actual insert functions.
@@ -1061,6 +1265,7 @@
int i;
int max;
struct TopologyIteratorContext *topo_ctx;
+ struct FindPeerContext *find_peer_context;
if (dhtlog_handle != NULL)
{
if (settle_time >= 60 * 2)
@@ -1078,7 +1283,14 @@
GNUNET_SCHEDULER_add_delayed(sched,
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time),
&capture_current_topology, topo_ctx);
}
else
- GNUNET_SCHEDULER_add_now (sched, &setup_puts_and_gets, NULL);
+ GNUNET_SCHEDULER_add_delayed(sched,
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time),
&setup_puts_and_gets, NULL);
+
+ if (GNUNET_YES == do_find_peer)
+ {
+ find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext));
+ find_peer_context->endtime =
GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
settle_time));
+ GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests,
find_peer_context);
+ }
}
/**
@@ -1509,6 +1721,7 @@
hostfile = NULL;
hosts = NULL;
+ temphost = NULL;
if (hostfile != NULL)
{
if (GNUNET_OK != GNUNET_DISK_file_test (hostfile))
@@ -1533,6 +1746,7 @@
"Could not read file %s specified for host list, ending
test!", hostfile);
GNUNET_free (hostfile);
GNUNET_free (data);
+ GNUNET_free_non_null(trialmessage);
return;
}
@@ -1588,6 +1802,20 @@
num_gets = num_peers;
if (GNUNET_OK ==
+ GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing",
"find_peer_delay",
+ &temp_config_number))
+ find_peer_delay = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
temp_config_number);
+ else
+ find_peer_delay = DEFAULT_FIND_PEER_DELAY;
+
+ if (GNUNET_OK ==
+ GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing",
"concurrent_find_peers",
+ &temp_config_number))
+ max_outstanding_find_peers = temp_config_number;
+ else
+ max_outstanding_find_peers = DEFAULT_MAX_OUTSTANDING_FIND_PEERS;
+
+ if (GNUNET_OK ==
GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing",
"get_timeout",
&temp_config_number))
get_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
temp_config_number);
@@ -1658,6 +1886,15 @@
&malicious_put_frequency))
malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
+ if (GNUNET_NO ==
+ GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
+ "find_peers"))
+ {
+ do_find_peer = GNUNET_NO;
+ }
+ else
+ do_find_peer = GNUNET_YES;
+
topology_str = NULL;
if ((GNUNET_YES ==
GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "topology",
@@ -1745,7 +1982,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Number of peers must be specified in section %s option
%s\n", topology_str, "TESTING", "NUM_PEERS");
}
-
+ GNUNET_assert(num_peers > 0 && num_peers < (unsigned long long)-1);
/* Set peers_left so we know when all peers started */
peers_left = num_peers;
@@ -1800,6 +2037,7 @@
&topology_callback, NULL,
hosts);
+ GNUNET_free_non_null(temphost);
}
Modified: gnunet/src/dht/gnunet-service-dht.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht.c 2010-08-30 14:30:49 UTC (rev 12763)
+++ gnunet/src/dht/gnunet-service-dht.c 2010-08-30 16:33:42 UTC (rev 12764)
@@ -67,14 +67,14 @@
*/
#define MINIMUM_PEER_THRESHOLD 20
+#define DHT_MAX_RECENT 100
+#define FIND_PEER_CALC_INTERVAL GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 60)
-#define DHT_MAX_RECENT 100
-
/**
* Default time to wait to send messages on behalf of other peers.
*/
-#define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 10);
+#define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 10)
/**
* Default importance for handling messages on behalf of other peers.
@@ -82,9 +82,14 @@
#define DHT_DEFAULT_P2P_IMPORTANCE 0
/**
+ * How long to keep recent requests arounds by default.
+ */
+#define DEFAULT_RECENT_REMOVAL
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 60)
+
+/**
* Default time to wait to send find peer messages sent by the dht service.
*/
-#define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 30);
+#define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 30)
/**
* Default importance for find peer messages sent by the dht service.
@@ -99,7 +104,7 @@
/**
* Default options for find peer requests sent by the dht service.
*/
-#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
+#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE
/**
* How long at least to wait before sending another find peer request.
@@ -109,7 +114,7 @@
/**
* How long at most to wait before sending another find peer request.
*/
-#define DHT_MAXIMUM_FIND_PEER_INTERVAL
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
+#define DHT_MAXIMUM_FIND_PEER_INTERVAL
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
/**
* How often to update our preference levels for peers in our routing tables.
@@ -117,6 +122,12 @@
#define DHT_DEFAULT_PREFERENCE_INTERVAL
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
/**
+ * How long at most on average will we allow a reply forward to take
+ * (before we quit sending out new requests)
+ */
+#define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 1)
+
+/**
* How many initial requests to send out (in true Kademlia fashion)
*/
#define DHT_KADEMLIA_REPLICATION 3
@@ -145,6 +156,12 @@
#define MAX_HOPS 20
/**
+ * How many time differences between requesting a core send and
+ * the actual callback to remember.
+ */
+#define MAX_REPLY_TIMES 8
+
+/**
* Linked list of messages to send to clients.
*/
struct P2PPendingMessage
@@ -165,6 +182,11 @@
unsigned int importance;
/**
+ * Time when this request was scheduled to be sent.
+ */
+ struct GNUNET_TIME_Absolute scheduled;
+
+ /**
* How long to wait before sending message.
*/
struct GNUNET_TIME_Relative timeout;
@@ -251,7 +273,6 @@
* Task for scheduling periodic ping messages for this peer.
*/
GNUNET_SCHEDULER_TaskIdentifier ping_task;
-
};
/**
@@ -329,7 +350,6 @@
* Tail of linked list of pending messages for this client
*/
struct PendingMessage *pending_tail;
-
};
@@ -353,7 +373,7 @@
/**
* The key this request was about
*/
- const GNUNET_HashCode *key;
+ GNUNET_HashCode key;
/**
* The unique identifier of this request
@@ -484,6 +504,21 @@
};
/**
+ * Context used to calculate the number of find peer messages
+ * per X time units since our last scheduled find peer message
+ * was sent. If we have seen too many messages, delay or don't
+ * send our own out.
+ */
+struct FindPeerMessageContext
+{
+ unsigned int count;
+
+ struct GNUNET_TIME_Absolute start;
+
+ struct GNUNET_TIME_Absolute end;
+};
+
+/**
* DHT Routing results structure
*/
struct DHTResults
@@ -518,18 +553,50 @@
struct RecentRequest
{
+ /**
+ * Position of this node in the min heap.
+ */
+ struct GNUNET_CONTAINER_HeapNode *heap_node;
+
+ /**
+ * Bloomfilter containing entries for peers
+ * we forwarded this request to.
+ */
+ struct GNUNET_CONTAINER_BloomFilter *bloom;
+
+ /**
+ * Timestamp of this request, for ordering
+ * the min heap.
+ */
+ struct GNUNET_TIME_Absolute timestamp;
+
+ /**
+ * Key of this request.
+ */
GNUNET_HashCode key;
+
+ /**
+ * Unique identifier for this request.
+ */
uint64_t uid;
+
+ /**
+ * Task to remove this entry on timeout.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier remove_task;
};
-
-#if 0
/**
* Recent requests by hash/uid and by time inserted.
*/
static struct RecentRequests recent;
-#endif
+
/**
+ * Context to use to calculate find peer rates.
+ */
+static struct FindPeerMessageContext find_peer_context;
+
+/**
* Don't use our routing algorithm, always route
* to closest peer; initially send requests to 3
* peers.
@@ -547,6 +614,12 @@
static int stop_on_found;
/**
+ * Whether DHT needs to manage find peer requests, or
+ * an external force will do it on behalf of the DHT.
+ */
+static int do_find_peer;
+
+/**
* How many peers have we added since we sent out our last
* find peer request?
*/
@@ -662,26 +735,114 @@
*/
static unsigned int malicious_getter;
-/*
+/**
* GNUNET_YES or GNUNET_NO, whether or not to act as
* a malicious node which sends out lots of PUTS
*/
static unsigned int malicious_putter;
+/**
+ * Frequency for malicious get requests.
+ */
static unsigned long long malicious_get_frequency;
+/**
+ * Frequency for malicious put requests.
+ */
static unsigned long long malicious_put_frequency;
/**
+ * Reply times for requests, if we are busy, don't send any
+ * more requests!
+ */
+static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
+
+/**
+ * Current counter for replies.
+ */
+static unsigned int reply_counter;
+
+/**
* Forward declaration.
*/
static size_t send_generic_reply (void *cls, size_t size, void *buf);
-/* Declare here so retry_core_send is aware of it */
+/** Declare here so retry_core_send is aware of it */
size_t core_transmit_notify (void *cls,
size_t size, void *buf);
+/**
+ * Convert unique ID to hash code.
+ *
+ * @param uid unique ID to convert
+ * @param hash set to uid (extended with zeros)
+ */
static void
+hash_from_uid (uint64_t uid,
+ GNUNET_HashCode *hash)
+{
+ memset (hash, 0, sizeof(GNUNET_HashCode));
+ *((uint64_t*)hash) = uid;
+}
+
+#if AVG
+/**
+ * Calculate the average send time between messages so that we can
+ * ignore certain requests if we get too busy.
+ *
+ * @return the average time between asking core to send a message
+ * and when the buffer for copying it is passed
+ */
+static struct GNUNET_TIME_Relative get_average_send_delay()
+{
+ unsigned int i;
+ unsigned int divisor;
+ struct GNUNET_TIME_Relative average_time;
+ average_time = GNUNET_TIME_relative_get_zero();
+ divisor = 0;
+ for (i = 0; i < MAX_REPLY_TIMES; i++)
+ {
+ average_time = GNUNET_TIME_relative_add(average_time, reply_times[i]);
+ if (reply_times[i].value == (uint64_t)0)
+ continue;
+ else
+ divisor++;
+ }
+ if (divisor == 0)
+ {
+ return average_time;
+ }
+
+ average_time = GNUNET_TIME_relative_divide(average_time, divisor);
+ fprintf(stderr, "Avg send delay: %u sends is %llu\n", divisor, (long long
unsigned int)average_time.value);
+ return average_time;
+}
+#endif
+
+/**
+ * Find the maximum send time of the recently sent values.
+ *
+ * @return the average time between asking core to send a message
+ * and when the buffer for copying it is passed
+ */
+static struct GNUNET_TIME_Relative get_max_send_delay()
+{
+ unsigned int i;
+ struct GNUNET_TIME_Relative max_time;
+ max_time = GNUNET_TIME_relative_get_zero();
+
+ for (i = 0; i < MAX_REPLY_TIMES; i++)
+ {
+ if (reply_times[i].value > max_time.value)
+ max_time.value = reply_times[i].value;
+ }
+
+ if (max_time.value > MAX_REQUEST_TIME.value)
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n", (long
long unsigned int)max_time.value);
+ return max_time;
+}
+
+static void
increment_stats(const char *value)
{
if (stats != NULL)
@@ -718,6 +879,10 @@
"`%s:%s': Calling notify_transmit_ready with size %d for peer
%s\n", my_short_id,
"DHT", ssize, GNUNET_i2s(&peer->id));
#endif
+ pending->scheduled = GNUNET_TIME_absolute_get();
+ reply_counter++;
+ if (reply_counter >= MAX_REPLY_TIMES)
+ reply_counter = 0;
peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI,
pending->importance,
pending->timeout, &peer->id,
ssize,
&core_transmit_notify, peer);
@@ -759,7 +924,7 @@
result_message->hop_count = htonl(msg_ctx->hop_count + 1);
GNUNET_assert(GNUNET_OK ==
GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom,
result_message->bloomfilter, DHT_BLOOM_SIZE));
result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
- memcpy(&result_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+ memcpy(&result_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
memcpy(&result_message[1], msg, ntohs(msg->size));
#if DEBUG_DHT > 1
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d
for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
@@ -802,6 +967,7 @@
peer->th = NULL;
off = 0;
pending = peer->head;
+ reply_times[reply_counter] =
GNUNET_TIME_absolute_get_difference(pending->scheduled,
GNUNET_TIME_absolute_get());
msize = ntohs(pending->msg->size);
if (msize <= size)
{
@@ -1246,6 +1412,7 @@
struct PeerInfo *peer = value;
int new_bucket;
+ GNUNET_assert(lowest_bucket > 0);
new_bucket = lowest_bucket - 1;
remove_peer(peer, lowest_bucket);
GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
@@ -1358,7 +1525,7 @@
increment_stats(STAT_ROUTE_FORWARDS);
- if ((msg_ctx->closest != GNUNET_YES) && (peer ==
find_closest_peer(msg_ctx->key)))
+ if ((msg_ctx->closest != GNUNET_YES) && (peer ==
find_closest_peer(&msg_ctx->key)))
increment_stats(STAT_ROUTE_FORWARDS_CLOSEST);
msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
@@ -1378,8 +1545,7 @@
route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
if (msg_ctx->bloom != NULL)
GNUNET_assert(GNUNET_OK ==
GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom,
route_message->bloomfilter, DHT_BLOOM_SIZE));
- if (msg_ctx->key != NULL)
- memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+ memcpy(&route_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
memcpy(&route_message[1], msg, ntohs(msg->size));
#if DEBUG_DHT > 1
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d
for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
@@ -1625,6 +1791,8 @@
if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers,
&peer->hashPubKey))
return GNUNET_NO; /* We already know this peer (are connected even!) */
bucket = find_current_bucket(&peer->hashPubKey);
+ if (bucket == GNUNET_SYSERR)
+ return GNUNET_NO;
if ((k_buckets[bucket].peers_size < bucket_size) || ((bucket ==
lowest_bucket) && (lowest_bucket > 0)))
return GNUNET_YES;
@@ -1667,6 +1835,7 @@
}
else /* We have a valid hello, and peer id stored in new_peer */
{
+ find_peer_context.count++;
increment_stats(STAT_FIND_PEER_REPLY);
if (GNUNET_YES == consider_peer(&new_peer))
{
@@ -1682,7 +1851,7 @@
if (malicious_dropper == GNUNET_YES)
record = NULL;
else
- record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
message_context->key);
+ record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
&message_context->key);
if (record == NULL) /* No record of this message! */
{
@@ -1701,7 +1870,7 @@
message_context->hop_count,
GNUNET_SYSERR,
&my_identity,
- message_context->key,
+ &message_context->key,
message_context->peer, NULL);
}
#endif
@@ -1728,7 +1897,7 @@
{
dhtlog_handle->insert_route (NULL, message_context->unique_id,
DHTLOG_RESULT,
message_context->hop_count,
- GNUNET_YES, &my_identity,
message_context->key,
+ GNUNET_YES, &my_identity,
&message_context->key,
message_context->peer, NULL);
}
#endif
@@ -1763,7 +1932,7 @@
dhtlog_handle->insert_route (NULL,
message_context->unique_id,
DHTLOG_RESULT,
message_context->hop_count,
- GNUNET_NO, &my_identity,
message_context->key,
+ GNUNET_NO, &my_identity,
&message_context->key,
message_context->peer,
&pos->source);
}
#endif
@@ -1876,7 +2045,7 @@
if (datacache != NULL)
results =
- GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
+ GNUNET_DATACACHE_get (datacache, &message_context->key, get_type,
&datacache_get_iterator, message_context);
if (results >= 1)
@@ -1891,14 +2060,14 @@
{
dhtlog_handle->insert_query (NULL, message_context->unique_id,
DHTLOG_GET,
message_context->hop_count, GNUNET_YES,
&my_identity,
- message_context->key);
+ &message_context->key);
}
if ((debug_routes_extended) && (dhtlog_handle != NULL))
{
dhtlog_handle->insert_route (NULL, message_context->unique_id,
DHTLOG_ROUTE,
message_context->hop_count, GNUNET_YES,
- &my_identity, message_context->key,
message_context->peer,
+ &my_identity, &message_context->key,
message_context->peer,
NULL);
}
#endif
@@ -1911,7 +2080,7 @@
{
dhtlog_handle->insert_query (NULL, message_context->unique_id,
DHTLOG_GET,
message_context->hop_count, GNUNET_NO,
&my_identity,
- message_context->key);
+ &message_context->key);
}
#endif
}
@@ -2000,7 +2169,7 @@
{
dhtlog_handle->insert_query (NULL, message_context->unique_id,
DHTLOG_FIND_PEER,
message_context->hop_count, GNUNET_YES,
&my_identity,
- message_context->key);
+ &message_context->key);
}
#endif
GNUNET_free(find_peer_result);
@@ -2046,7 +2215,7 @@
{
dhtlog_handle->insert_query (NULL, message_context->unique_id,
DHTLOG_PUT,
message_context->hop_count, GNUNET_NO,
&my_identity,
- message_context->key);
+ &message_context->key);
}
}
#endif
@@ -2059,7 +2228,7 @@
{
dhtlog_handle->insert_route (NULL, message_context->unique_id,
DHTLOG_ROUTE,
message_context->hop_count, GNUNET_YES,
- &my_identity, message_context->key,
message_context->peer,
+ &my_identity, &message_context->key,
message_context->peer,
NULL);
}
@@ -2067,13 +2236,13 @@
{
dhtlog_handle->insert_query (NULL, message_context->unique_id,
DHTLOG_PUT,
message_context->hop_count, GNUNET_YES,
&my_identity,
- message_context->key);
+ &message_context->key);
}
#endif
increment_stats(STAT_PUTS_INSERTED);
if (datacache != NULL)
- GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
+ GNUNET_DATACACHE_put (datacache, &message_context->key, data_size,
(char *) &put_msg[1], put_type,
GNUNET_TIME_absolute_ntoh(put_msg->expiration));
else
@@ -2154,9 +2323,8 @@
}
target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */
target_replication / (target_replication * (hop_count + 1) + diameter);
+#if NONSENSE
target_value = 0;
-
-#if NONSENSE
while (target_value < target_count)
target_value++; /* target_value is ALWAYS 1 after this "loop" */
#else
@@ -2171,33 +2339,44 @@
/*
* Check whether my identity is closer than any known peers.
+ * If a non-null bloomfilter is given, check if this is the closest
+ * peer that hasn't already been routed to.
*
* @param target hash code to check closeness to
+ * @param bloom bloomfilter, exclude these entries from the decision
*
* Return GNUNET_YES if node location is closest, GNUNET_NO
* otherwise.
*/
int
-am_closest_peer (const GNUNET_HashCode * target)
+am_closest_peer (const GNUNET_HashCode * target, struct
GNUNET_CONTAINER_BloomFilter *bloom)
{
int bits;
int other_bits;
int bucket_num;
int count;
struct PeerInfo *pos;
+#if INTEGER_DISTANCE
unsigned int my_distance;
-
+#endif
bucket_num = find_current_bucket(target);
if (bucket_num == GNUNET_SYSERR) /* Same key! */
return GNUNET_YES;
bits = matching_bits(&my_identity.hashPubKey, target);
+#if INTEGER_DISTANCE
my_distance = distance(&my_identity.hashPubKey, target);
-
+#endif
pos = k_buckets[bucket_num].head;
count = 0;
while ((pos != NULL) && (count < bucket_size))
{
+ if ((bloom != NULL) && (GNUNET_YES ==
GNUNET_CONTAINER_bloomfilter_test(bloom, &pos->id.hashPubKey)))
+ {
+ pos = pos->next;
+ continue; /* Skip already checked entries */
+ }
+
other_bits = matching_bits(&pos->id.hashPubKey, target);
if (other_bits > bits)
return GNUNET_NO;
@@ -2361,7 +2540,34 @@
}
}
+/**
+ * Task used to remove recent entries, either
+ * after timeout, when full, or on shutdown.
+ *
+ * @param cls the entry to remove
+ * @param tc context, reason, etc.
+ */
+static void
+remove_recent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct RecentRequest *req = cls;
+ static GNUNET_HashCode hash;
+ GNUNET_assert(req != NULL);
+ hash_from_uid(req->uid, &hash);
+ GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove(recent.hashmap, &hash, req));
+ GNUNET_CONTAINER_heap_remove_node(recent.minHeap, req->heap_node);
+ GNUNET_CONTAINER_bloomfilter_free(req->bloom);
+ GNUNET_free(req);
+
+ if ((tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) && (0 ==
GNUNET_CONTAINER_multihashmap_size(recent.hashmap)) && (0 ==
GNUNET_CONTAINER_heap_get_size(recent.minHeap)))
+ {
+ GNUNET_CONTAINER_multihashmap_destroy(recent.hashmap);
+ GNUNET_CONTAINER_heap_destroy(recent.minHeap);
+ }
+}
+
+
/**
* Task used to remove forwarding entries, either
* after timeout, when full, or on shutdown.
@@ -2408,6 +2614,7 @@
while (current_size >= MAX_OUTSTANDING_FORWARDS)
{
source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap);
+ GNUNET_assert(source_info != NULL);
record = source_info->record;
GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
if (record->head == NULL) /* No more entries in DLL */
@@ -2420,7 +2627,7 @@
current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
}
now = GNUNET_TIME_absolute_get();
- record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
msg_ctx->key);
+ record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
&msg_ctx->key);
if (record != NULL) /* Already know this request! */
{
pos = record->head;
@@ -2439,8 +2646,8 @@
else
{
record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
- GNUNET_assert(GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, msg_ctx->key, record,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- memcpy(&record->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+ GNUNET_assert(GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, &msg_ctx->key, record,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ memcpy(&record->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
}
source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
@@ -2480,11 +2687,12 @@
{
int i;
struct PeerInfo *selected;
+#if DEBUG_DHT_ROUTING > 1
struct PeerInfo *nearest;
+#endif
unsigned int forward_count;
-#if DEBUG_DHT
- char *nearest_buf;
-#endif
+ struct RecentRequest *recent_req;
+ GNUNET_HashCode unique_hash;
#if DEBUG_DHT_ROUTING
int ret;
#endif
@@ -2496,7 +2704,7 @@
{
dhtlog_handle->insert_route (NULL, message_context->unique_id,
DHTLOG_ROUTE,
message_context->hop_count,
GNUNET_SYSERR,
- &my_identity, message_context->key,
message_context->peer,
+ &my_identity, &message_context->key,
message_context->peer,
NULL);
}
#endif
@@ -2506,15 +2714,16 @@
}
increment_stats(STAT_ROUTES);
- message_context->closest = am_closest_peer(message_context->key);
+ /* Semantics of this call means we find whether we are the closest peer out
of those already
+ * routed to on this messages path.
+ */
+ message_context->closest = am_closest_peer(&message_context->key,
message_context->bloom);
forward_count = get_forward_count(message_context->hop_count,
message_context->replication);
- nearest = find_closest_peer(message_context->key);
-
+
if (message_context->bloom == NULL)
message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL,
DHT_BLOOM_SIZE, DHT_BLOOM_K);
if ((stop_on_closest == GNUNET_YES) && (message_context->closest ==
GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
-/* || ((strict_kademlia == GNUNET_YES) && (message_context->closest ==
GNUNET_YES))) */
forward_count = 0;
#if DEBUG_DHT_ROUTING
@@ -2527,7 +2736,7 @@
{
dhtlog_handle->insert_route (NULL, message_context->unique_id,
DHTLOG_ROUTE,
message_context->hop_count, ret,
- &my_identity, message_context->key,
message_context->peer,
+ &my_identity, &message_context->key,
message_context->peer,
NULL);
}
#endif
@@ -2556,10 +2765,10 @@
{
if ((debug_routes) && (dhtlog_handle != NULL))
{
- dhtlog_handle->insert_dhtkey(NULL, message_context->key);
+ dhtlog_handle->insert_dhtkey(NULL, &message_context->key);
dhtlog_handle->insert_query (NULL, message_context->unique_id,
DHTLOG_FIND_PEER,
message_context->hop_count,
GNUNET_NO, &my_identity,
- message_context->key);
+ &message_context->key);
}
}
#endif
@@ -2570,42 +2779,47 @@
}
GNUNET_CONTAINER_bloomfilter_add (message_context->bloom,
&my_identity.hashPubKey);
-#if 0
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent->hashmap,
message_context->key))
- {
- if (GNUNET_SYSERR = GNUNET_CONTAINER_multihashmap_get_multiple
(recent->hashmap, message_context->key, &find_matching_recent,
&message_context)) /* Have too recently seen this request! */
- {
- forward_count = 0;
- }
- else /* Exact match not found, but same key found */
- {
- recent_req = GNUNET_CONTAINER_multihashmap_get(recent->hashmap,
message_context->key);
- }
+ hash_from_uid(message_context->unique_id, &unique_hash);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent.hashmap,
&unique_hash))
+ {
+ recent_req = GNUNET_CONTAINER_multihashmap_get(recent.hashmap,
&unique_hash);
+ GNUNET_assert(recent_req != NULL);
+ if (0 != memcmp(&recent_req->key, &message_context->key,
sizeof(GNUNET_HashCode)))
+ increment_stats(STAT_DUPLICATE_UID);
+ else
+ {
+ increment_stats(STAT_RECENT_SEEN);
+ GNUNET_CONTAINER_bloomfilter_or2(message_context->bloom,
recent_req->bloom, DHT_BLOOM_SIZE);
+ }
}
else
{
recent_req = GNUNET_malloc(sizeof(struct RecentRequest));
recent_req->uid = message_context->unique_id;
- memcmp(&recent_req->key, message_context->key, sizeof(GNUNET_HashCode));
+ memcpy(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode));
recent_req->remove_task = GNUNET_SCHEDULER_add_delayed(sched,
DEFAULT_RECENT_REMOVAL, &remove_recent, recent_req);
- GNUNET_CONTAINER_heap_insert(recent->minHeap, recent_req,
GNUNET_TIME_absolute_get());
- GNUNET_CONTAINER_multihashmap_put(recent->hashmap, message_context->key,
recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ recent_req->heap_node = GNUNET_CONTAINER_heap_insert(recent.minHeap,
recent_req, GNUNET_TIME_absolute_get().value);
+ recent_req->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL,
DHT_BLOOM_SIZE, DHT_BLOOM_K);
+ GNUNET_CONTAINER_multihashmap_put(recent.hashmap, &unique_hash,
recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
}
- if (GNUNET_CONTAINER_multihashmap_size(recent->hashmap) > DHT_MAX_RECENT)
+ if (GNUNET_CONTAINER_multihashmap_size(recent.hashmap) > DHT_MAX_RECENT)
{
- remove_oldest_recent();
+ recent_req = GNUNET_CONTAINER_heap_peek(recent.minHeap);
+ GNUNET_assert(recent_req != NULL);
+ GNUNET_SCHEDULER_cancel(sched, recent_req->remove_task);
+ GNUNET_SCHEDULER_add_now(sched, &remove_recent, recent_req);
}
-#endif
for (i = 0; i < forward_count; i++)
{
- selected = select_peer(message_context->key, message_context->bloom);
+ selected = select_peer(&message_context->key, message_context->bloom);
if (selected != NULL)
{
GNUNET_CONTAINER_bloomfilter_add(message_context->bloom,
&selected->id.hashPubKey);
#if DEBUG_DHT_ROUTING > 1
+ nearest = find_closest_peer(&message_context->key);
nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s:%s': Forwarding request key %s uid %llu to peer %s
(closest %s, bits %d, distance %u)\n", my_short_id,
@@ -2616,7 +2830,7 @@
{
dhtlog_handle->insert_route (NULL, message_context->unique_id,
DHTLOG_ROUTE,
message_context->hop_count,
GNUNET_NO,
- &my_identity, message_context->key,
message_context->peer,
+ &my_identity,
&message_context->key, message_context->peer,
&selected->id);
}
forward_message(cls, msg, selected, message_context);
@@ -2640,7 +2854,10 @@
#endif
if (message_context->bloom != NULL)
- GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
+ {
+ GNUNET_CONTAINER_bloomfilter_or2(recent_req->bloom,
message_context->bloom, DHT_BLOOM_SIZE);
+ GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
+ }
return forward_count;
}
@@ -2684,7 +2901,6 @@
static struct GNUNET_DHT_PutMessage put_message;
static struct DHT_MessageContext message_context;
static GNUNET_HashCode key;
- unsigned int mcsize;
uint32_t random_key;
if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
@@ -2694,12 +2910,11 @@
put_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
put_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
put_message.expiration =
GNUNET_TIME_absolute_hton(GNUNET_TIME_absolute_get_forever());
- mcsize = sizeof(struct DHT_MessageContext) + sizeof(GNUNET_HashCode);
memset(&message_context, 0, sizeof(struct DHT_MessageContext));
message_context.client = NULL;
random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
(uint32_t)-1);
GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
- message_context.key = &key;
+ memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
message_context.unique_id = GNUNET_ntohll
(GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
message_context.msg_options = ntohl (0);
@@ -2726,9 +2941,8 @@
malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
static struct GNUNET_DHT_GetMessage get_message;
- static struct DHT_MessageContext message_context;
+ struct DHT_MessageContext message_context;
static GNUNET_HashCode key;
- unsigned int mcsize;
uint32_t random_key;
if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
@@ -2737,12 +2951,11 @@
get_message.header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
get_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
get_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
- mcsize = sizeof(struct DHT_MessageContext) + sizeof(GNUNET_HashCode);
memset(&message_context, 0, sizeof(struct DHT_MessageContext));
message_context.client = NULL;
random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
(uint32_t)-1);
GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
- message_context.key = &key;
+ memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
message_context.unique_id = GNUNET_ntohll
(GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
message_context.msg_options = ntohl (0);
@@ -2754,13 +2967,10 @@
dhtlog_handle->insert_dhtkey(NULL, &key);
increment_stats(STAT_GET_START);
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious GET message
with hash %s", my_short_id, "DHT", GNUNET_h2s(&key));
- route_message(NULL, &get_message.header, &message_context);
+ route_message (NULL, &get_message.header, &message_context);
GNUNET_SCHEDULER_add_delayed(sched,
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS,
malicious_get_frequency), &malicious_get_task, NULL);
}
-#if DO_FIND_PEER
-
-
/**
* Iterator over hash map entries.
*
@@ -2797,12 +3007,42 @@
int ret;
struct GNUNET_TIME_Relative next_send_time;
struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
-
+#if COUNT_INTERVAL
+ struct GNUNET_TIME_Relative time_diff;
+ struct GNUNET_TIME_Absolute end;
+ double multiplier;
+ double count_per_interval;
+#endif
if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
return;
+ if ((newly_found_peers > bucket_size) && (GNUNET_YES == do_find_peer)) /* If
we are finding peers already, no need to send out our request right now! */
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Have %d newly found peers since
last find peer message sent!\n", newly_found_peers);
+ GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_TIME_UNIT_MINUTES,
+ &send_find_peer_message, NULL);
+ newly_found_peers = 0;
+ return;
+ }
+
increment_stats(STAT_FIND_PEER_START);
+#if COUNT_INTERVAL
+ end = GNUNET_TIME_absolute_get();
+ time_diff = GNUNET_TIME_absolute_get_difference(find_peer_context.start,
end);
+ if (time_diff.value > FIND_PEER_CALC_INTERVAL.value)
+ {
+ multiplier = time_diff.value / FIND_PEER_CALC_INTERVAL.value;
+ count_per_interval = find_peer_context.count / multiplier;
+ }
+ else
+ {
+ multiplier = FIND_PEER_CALC_INTERVAL.value / time_diff.value;
+ count_per_interval = find_peer_context.count * multiplier;
+ }
+#endif
+
find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage));
find_peer_msg->header.size = htons(sizeof(struct
GNUNET_DHT_FindPeerMessage));
find_peer_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
@@ -2810,10 +3050,10 @@
GNUNET_CONTAINER_multihashmap_iterate(all_known_peers, &add_known_to_bloom,
temp_bloom);
GNUNET_assert(GNUNET_OK ==
GNUNET_CONTAINER_bloomfilter_get_raw_data(temp_bloom,
find_peer_msg->bloomfilter, DHT_BLOOM_SIZE));
memset(&message_context, 0, sizeof(struct DHT_MessageContext));
- message_context.key = &my_identity.hashPubKey;
+ memcpy(&message_context.key, &my_identity.hashPubKey,
sizeof(GNUNET_HashCode));
message_context.unique_id = GNUNET_ntohll
(GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, (uint64_t)-1));
- message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
- message_context.msg_options = ntohl (DHT_DEFAULT_FIND_PEER_OPTIONS);
+ message_context.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
+ message_context.msg_options = DHT_DEFAULT_FIND_PEER_OPTIONS;
message_context.network_size = estimate_diameter();
message_context.peer = &my_identity;
message_context.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
@@ -2836,12 +3076,18 @@
GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
DHT_MAXIMUM_FIND_PEER_INTERVAL.value - DHT_MINIMUM_FIND_PEER_INTERVAL.value);
}
+
+ GNUNET_assert (next_send_time.value != 0);
+ find_peer_context.count = 0;
newly_found_peers = 0;
- GNUNET_SCHEDULER_add_delayed (sched,
- next_send_time,
- &send_find_peer_message, NULL);
+ find_peer_context.start = GNUNET_TIME_absolute_get();
+ if (GNUNET_YES == do_find_peer)
+ {
+ GNUNET_SCHEDULER_add_delayed (sched,
+ next_send_time,
+ &send_find_peer_message, NULL);
+ }
}
-#endif
/**
* Handler for any generic DHT messages, calls the appropriate handler
@@ -2859,11 +3105,7 @@
const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct
GNUNET_DHT_RouteMessage *) message;
const struct GNUNET_MessageHeader *enc_msg;
struct DHT_MessageContext message_context;
- size_t enc_type;
-
enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
- enc_type = ntohs (enc_msg->type);
-
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s:%s': Received `%s' request from client, message type %d,
key %s, uid %llu\n",
@@ -2876,7 +3118,7 @@
#endif
memset(&message_context, 0, sizeof(struct DHT_MessageContext));
message_context.client = find_active_client (client);
- message_context.key = &dht_msg->key;
+ memcpy(&message_context.key, &dht_msg->key, sizeof(GNUNET_HashCode));
message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id);
message_context.replication = ntohl (dht_msg->desired_replication_level);
message_context.msg_options = ntohl (dht_msg->options);
@@ -2920,6 +3162,10 @@
switch (ntohs(dht_control_msg->command))
{
+ case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending self seeking find peer
request!\n");
+ GNUNET_SCHEDULER_add_now(sched, &send_find_peer_message, NULL);
+ break;
case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET:
if (ntohs(dht_control_msg->variable) > 0)
malicious_get_frequency = ntohs(dht_control_msg->variable);
@@ -2971,15 +3217,11 @@
(const struct GNUNET_DHT_StopMessage *) message;
struct DHTQueryRecord *record;
struct DHTRouteSource *pos;
- uint64_t uid;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s:%s': Received `%s' request from client, uid %llu\n",
my_short_id, "DHT",
"GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
#endif
-
- uid = GNUNET_ntohll(dht_stop_msg->unique_id);
-
record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
&dht_stop_msg->key);
if (record != NULL)
{
@@ -3017,6 +3259,12 @@
struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader
*)&incoming[1];
struct DHT_MessageContext *message_context;
+ if (get_max_send_delay().value > MAX_REQUEST_TIME.value)
+ {
+ fprintf(stderr, "Sending of previous requests has taken far too long,
backing off!\n");
+ return GNUNET_YES;
+ }
+
if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these
away. FIXME: Don't throw these away? (reply)*/
{
#if DEBUG_PING
@@ -3034,7 +3282,7 @@
message_context->bloom =
GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE,
DHT_BLOOM_K);
GNUNET_assert(message_context->bloom != NULL);
message_context->hop_count = ntohl(incoming->hop_count);
- message_context->key = &incoming->key;
+ memcpy(&message_context->key, &incoming->key, sizeof(GNUNET_HashCode));
message_context->replication = ntohl(incoming->desired_replication_level);
message_context->unique_id = GNUNET_ntohll(incoming->unique_id);
message_context->msg_options = ntohl(incoming->options);
@@ -3074,7 +3322,7 @@
memset(&message_context, 0, sizeof(struct DHT_MessageContext));
message_context.bloom =
GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE,
DHT_BLOOM_K);
GNUNET_assert(message_context.bloom != NULL);
- message_context.key = &incoming->key;
+ memcpy(&message_context.key, &incoming->key, sizeof(GNUNET_HashCode));
message_context.unique_id = GNUNET_ntohll(incoming->unique_id);
message_context.msg_options = ntohl(incoming->options);
message_context.hop_count = ntohl(incoming->hop_count);
@@ -3322,7 +3570,7 @@
coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
cfg, /* Main configuration */
GNUNET_TIME_UNIT_FOREVER_REL,
- NULL, /* Closure passed to DHT functionas
around? */
+ NULL, /* Closure passed to DHT functions */
&core_init, /* Call core_init once
connected */
&handle_core_connect, /* Handle connects */
&handle_core_disconnect, /* remove peers on
disconnects */
@@ -3402,6 +3650,15 @@
malicious_dropper = GNUNET_YES;
}
+ if (GNUNET_NO ==
+ GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
+ "do_find_peer"))
+ {
+ do_find_peer = GNUNET_NO;
+ }
+ else
+ do_find_peer = GNUNET_YES;
+
if (GNUNET_YES ==
GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
"mysql_logging_extended"))
@@ -3445,14 +3702,19 @@
GNUNET_STATISTICS_set(stats, STAT_HELLOS_PROVIDED, 0, GNUNET_NO);
GNUNET_STATISTICS_set(stats, STAT_DISCONNECTS, 0, GNUNET_NO);
}
-#if DO_FIND_PEER
- next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
- GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
-
(DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) -
DHT_MINIMUM_FIND_PEER_INTERVAL.value);
- GNUNET_SCHEDULER_add_delayed (sched,
- next_send_time,
- &send_find_peer_message, NULL);
-#endif
+ /* FIXME: if there are no recent requests then these never get freed, but
alternative is _annoying_! */
+ recent.hashmap = GNUNET_CONTAINER_multihashmap_create(DHT_MAX_RECENT / 2);
+ recent.minHeap =
GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ if (GNUNET_YES == do_find_peer)
+ {
+ next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
+
GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
+
(DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) -
DHT_MINIMUM_FIND_PEER_INTERVAL.value);
+ find_peer_context.start = GNUNET_TIME_absolute_get();
+ GNUNET_SCHEDULER_add_delayed (sched,
+ next_send_time,
+ &send_find_peer_message, &find_peer_context);
+ }
/* Scheduled the task to clean up when shutdown is called */
cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r12764 - gnunet/src/dht,
gnunet <=