gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r12764 - gnunet/src/dht


From: gnunet
Subject: [GNUnet-SVN] r12764 - gnunet/src/dht
Date: Mon, 30 Aug 2010 18:33:42 +0200

Author: nevans
Date: 2010-08-30 18:33:42 +0200 (Mon, 30 Aug 2010)
New Revision: 12764

Modified:
   gnunet/src/dht/dht.h
   gnunet/src/dht/dht_api.c
   gnunet/src/dht/gnunet-dht-driver.c
   gnunet/src/dht/gnunet-service-dht.c
Log:
assorted dht changes, fixes, etc.

Modified: gnunet/src/dht/dht.h
===================================================================
--- gnunet/src/dht/dht.h        2010-08-30 14:30:49 UTC (rev 12763)
+++ gnunet/src/dht/dht.h        2010-08-30 16:33:42 UTC (rev 12764)
@@ -61,6 +61,8 @@
 #define STAT_GET_RESPONSE_START "# DHT GET Responses Initiated"
 #define STAT_HELLOS_PROVIDED "# HELLO Messages given to transport"
 #define STAT_DISCONNECTS "# Disconnects received"
+#define STAT_DUPLICATE_UID "# Duplicate UID's encountered (bad if any!)"
+#define STAT_RECENT_SEEN "# recent requests seen again (routing loops, 
alternate paths)"
 
 typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls,
                                                    const struct 
GNUNET_MessageHeader

Modified: gnunet/src/dht/dht_api.c
===================================================================
--- gnunet/src/dht/dht_api.c    2010-08-30 14:30:49 UTC (rev 12763)
+++ gnunet/src/dht/dht_api.c    2010-08-30 16:33:42 UTC (rev 12764)
@@ -872,6 +872,55 @@
 }
 
 /**
+ * Send a message to the DHT telling it to issue a single find
+ * peer request using the peers unique identifier as key.  This
+ * is used to fill the routing table, and is normally controlled
+ * by the DHT itself.  However, for testing and perhaps more
+ * close control over the DHT, this can be explicitly managed.
+ *
+ * @param handle handle to the DHT service
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
+                           GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != 
DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+/**
  * Send a message to the DHT telling it to start issuing random PUT
  * requests every 'frequency' milliseconds.
  *

Modified: gnunet/src/dht/gnunet-dht-driver.c
===================================================================
--- gnunet/src/dht/gnunet-dht-driver.c  2010-08-30 14:30:49 UTC (rev 12763)
+++ gnunet/src/dht/gnunet-dht-driver.c  2010-08-30 16:33:42 UTC (rev 12764)
@@ -50,12 +50,21 @@
 /* Timeout for waiting for puts to be sent to the service */
 #define DEFAULT_PUT_DELAY 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 10)
 
+/* Timeout for waiting for puts to be sent to the service */
+#define DEFAULT_FIND_PEER_DELAY 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 40)
+
 #define DEFAULT_SECONDS_PER_PEER_START 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 45)
 
 #define DEFAULT_TEST_DATA_SIZE 8
 
+#define DEFAULT_BUCKET_SIZE 4
+
+#define FIND_PEER_THRESHOLD DEFAULT_BUCKET_SIZE * 2
+
 #define DEFAULT_MAX_OUTSTANDING_PUTS 10
 
+#define DEFAULT_MAX_OUTSTANDING_FIND_PEERS 1
+
 #define DEFAULT_MAX_OUTSTANDING_GETS 10
 
 #define DEFAULT_CONNECT_TIMEOUT 60
@@ -97,6 +106,30 @@
   int malicious_type;
 };
 
+struct TestFindPeer
+{
+  /* This is a linked list */
+  struct TestFindPeer *next;
+
+  /* Handle to the bigger context */
+  struct FindPeerContext *find_peer_context;
+
+  /**
+   * Handle to the peer's DHT service (via the API)
+   */
+  struct GNUNET_DHT_Handle *dht_handle;
+
+  /**
+   *  Handle to the peer daemon
+   */
+  struct GNUNET_TESTING_Daemon *daemon;
+
+  /**
+   * Task for disconnecting DHT handles
+   */
+  GNUNET_SCHEDULER_TaskIdentifier disconnect_task;
+};
+
 struct TestPutContext
 {
   /* This is a linked list */
@@ -232,8 +265,12 @@
 
 static struct GNUNET_TIME_Relative put_delay;
 
+static struct GNUNET_TIME_Relative find_peer_delay;
+
 static struct GNUNET_TIME_Relative seconds_per_peer_start;
 
+static int do_find_peer;
+
 static unsigned long long test_data_size = DEFAULT_TEST_DATA_SIZE;
 
 static unsigned long long max_outstanding_puts = DEFAULT_MAX_OUTSTANDING_PUTS;
@@ -242,6 +279,8 @@
 
 static unsigned long long malicious_getters;
 
+static unsigned long long max_outstanding_find_peers;
+
 static unsigned long long malicious_putters;
 
 static unsigned long long malicious_droppers;
@@ -651,6 +690,7 @@
       stats_ctx->peer = peer;
       GNUNET_CONTAINER_multihashmap_put(stats_map, &peer->hashPubKey, 
stats_ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
     }
+  GNUNET_assert(stats_ctx != NULL);
 
   if (strcmp(name, STAT_ROUTES) == 0)
     stats_ctx->stat_routes = value;
@@ -696,6 +736,7 @@
 log_dht_statistics (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
 {
   stats_map = GNUNET_CONTAINER_multihashmap_create(num_peers);
+  fprintf(stderr, "Starting statistics logging\n");
   GNUNET_TESTING_get_statistics(pg, &stats_finished, &stats_handle, NULL);
 }
 
@@ -1005,8 +1046,171 @@
   GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, rand), &do_put, 
test_put->next);
 }
 
+/**
+ * Context for sending out find peer requests.
+ */
+struct FindPeerContext
+{
+  struct GNUNET_DHT_Handle *dht_handle;
+  struct GNUNET_TIME_Absolute endtime;
+  unsigned int current_peers;
+  unsigned int previous_peers;
+  unsigned int outstanding;
+  unsigned int total;
+};
 
+static void
+schedule_find_peer_requests (void *cls, const struct 
GNUNET_SCHEDULER_TaskContext * tc);
+
 /**
+ * Given a number of total peers and a bucket size, estimate the number of
+ * connections in a perfect kademlia topology.
+ */
+static unsigned int connection_estimate(unsigned int peer_count, unsigned int 
bucket_size)
+{
+  unsigned int i;
+  unsigned int filled;
+  i = num_peers;
+
+  filled = 0;
+  while (i > bucket_size)
+    {
+      filled++;
+      i = i/2;
+    }
+  return filled * bucket_size * peer_count;
+
+}
+
+/**
+ * Callback for iterating over all the peer connections of a peer group.
+ */
+void count_peers_cb (void *cls,
+                      const struct GNUNET_PeerIdentity *first,
+                      const struct GNUNET_PeerIdentity *second,
+                      struct GNUNET_TIME_Relative latency,
+                      uint32_t distance,
+                      const char *emsg)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  if ((first != NULL) && (second != NULL))
+    {
+      find_peer_context->current_peers++;
+    }
+  else
+    {
+      GNUNET_assert(dhtlog_handle != NULL);
+      fprintf(stderr, "peer count finished (%u connections), %u new peers, 
connection estimate %u\n", find_peer_context->current_peers, 
find_peer_context->current_peers - find_peer_context->previous_peers, 
connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));
+      if ((find_peer_context->current_peers - 
find_peer_context->previous_peers > FIND_PEER_THRESHOLD) &&
+          (find_peer_context->current_peers < connection_estimate(num_peers, 
DEFAULT_BUCKET_SIZE)) &&
+          
(GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0))
+        {
+          fprintf(stderr, "Scheduling another round of find peer requests.\n");
+          GNUNET_SCHEDULER_add_now(sched, schedule_find_peer_requests, 
find_peer_context);
+        }
+      else
+        {
+          fprintf(stderr, "Not sending any more find peer requests.\n");
+        }
+    }
+}
+
+/**
+ * Connect to all peers in the peer group and iterate over their
+ * connections.
+ */
+static void
+count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  find_peer_context->previous_peers = find_peer_context->current_peers;
+  find_peer_context->current_peers = 0;
+  GNUNET_TESTING_get_topology (pg, &count_peers_cb, find_peer_context);
+}
+
+
+static void
+decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * 
tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+  GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0);
+  test_find_peer->find_peer_context->outstanding--;
+  test_find_peer->find_peer_context->total--;
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%d find_peers remaining\n", 
test_find_peer->find_peer_context->total);
+  if ((0 == test_find_peer->find_peer_context->total) &&
+      
(GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value
 > 0))
+  {
+    GNUNET_SCHEDULER_add_now(sched, &count_new_peers, 
test_find_peer->find_peer_context);
+  }
+  GNUNET_free(test_find_peer);
+}
+
+/**
+ * A find peer request has been sent to the server, now we will schedule a task
+ * to wait the appropriate time to allow the request to go out and back.
+ *
+ * @param cls closure - a TestFindPeer struct
+ * @param tc context the task is being called with
+ */
+static void
+handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * 
tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+
+  GNUNET_DHT_disconnect(test_find_peer->dht_handle);
+  GNUNET_SCHEDULER_add_delayed(sched, find_peer_delay, &decrement_find_peers, 
test_find_peer);
+}
+
+static void
+send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * 
tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+
+  if (test_find_peer->find_peer_context->outstanding > 
max_outstanding_find_peers)
+  {
+    GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 300), 
&send_find_peer_request, test_find_peer);
+    return;
+  }
+
+  test_find_peer->find_peer_context->outstanding++;
+  if 
(GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value
 == 0)
+  {
+    GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer);
+    return;
+  }
+
+  test_find_peer->dht_handle = GNUNET_DHT_connect(sched, 
test_find_peer->daemon->cfg, 1);
+  GNUNET_assert(test_find_peer->dht_handle != NULL);
+  fprintf(stderr, "calling GNUNET_DHT_find_peers\n");
+  GNUNET_DHT_find_peers (test_find_peer->dht_handle,
+                         &handle_find_peer_sent, test_find_peer);
+}
+
+/**
+ * Set up a single find peer request for each peer in the topology.  Do this
+ * until the settle time is over, limited by the number of outstanding requests
+ * and the time allowed for each one!
+ */
+static void
+schedule_find_peer_requests (void *cls, const struct 
GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct FindPeerContext *find_peer_ctx = cls;
+  struct TestFindPeer *test_find_peer;
+  uint32_t i;
+  uint32_t random;
+
+  for (i = 0; i < max_outstanding_find_peers; i++)
+    {
+      test_find_peer = GNUNET_malloc(sizeof(struct TestFindPeer));
+      random = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
+      test_find_peer->daemon  = GNUNET_TESTING_daemon_get(pg, random);
+      test_find_peer->find_peer_context = find_peer_ctx;
+      find_peer_ctx->total++;
+      GNUNET_SCHEDULER_add_now(sched, &send_find_peer_request, test_find_peer);
+    }
+}
+
+/**
  * Set up some all of the put and get operations we want
  * to do.  Allocate data structure for each, add to list,
  * then call actual insert functions.
@@ -1061,6 +1265,7 @@
   int i;
   int max;
   struct TopologyIteratorContext *topo_ctx;
+  struct FindPeerContext *find_peer_context;
   if (dhtlog_handle != NULL)
     {
       if (settle_time >= 60 * 2)
@@ -1078,7 +1283,14 @@
       GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time), 
&capture_current_topology, topo_ctx);
     }
   else
-    GNUNET_SCHEDULER_add_now (sched, &setup_puts_and_gets, NULL);
+    GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time), 
&setup_puts_and_gets, NULL);
+
+  if (GNUNET_YES == do_find_peer)
+  {
+    find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext));
+    find_peer_context->endtime = 
GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
 settle_time));
+    GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, 
find_peer_context);
+  }
 }
 
 /**
@@ -1509,6 +1721,7 @@
     hostfile = NULL;
 
   hosts = NULL;
+  temphost = NULL;
   if (hostfile != NULL)
     {
       if (GNUNET_OK != GNUNET_DISK_file_test (hostfile))
@@ -1533,6 +1746,7 @@
                   "Could not read file %s specified for host list, ending 
test!", hostfile);
         GNUNET_free (hostfile);
         GNUNET_free (data);
+        GNUNET_free_non_null(trialmessage);
         return;
       }
 
@@ -1588,6 +1802,20 @@
     num_gets = num_peers;
 
   if (GNUNET_OK ==
+        GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", 
"find_peer_delay",
+                                               &temp_config_number))
+    find_peer_delay = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 
temp_config_number);
+  else
+    find_peer_delay = DEFAULT_FIND_PEER_DELAY;
+
+  if (GNUNET_OK ==
+        GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", 
"concurrent_find_peers",
+                                               &temp_config_number))
+    max_outstanding_find_peers = temp_config_number;
+  else
+    max_outstanding_find_peers = DEFAULT_MAX_OUTSTANDING_FIND_PEERS;
+
+  if (GNUNET_OK ==
         GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", 
"get_timeout",
                                                &temp_config_number))
     get_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 
temp_config_number);
@@ -1658,6 +1886,15 @@
                                                           
&malicious_put_frequency))
     malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
 
+  if (GNUNET_NO ==
+        GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
+                                             "find_peers"))
+    {
+      do_find_peer = GNUNET_NO;
+    }
+  else
+    do_find_peer = GNUNET_YES;
+
   topology_str = NULL;
   if ((GNUNET_YES ==
       GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "topology",
@@ -1745,7 +1982,7 @@
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "Number of peers must be specified in section %s option 
%s\n", topology_str, "TESTING", "NUM_PEERS");
     }
-
+  GNUNET_assert(num_peers > 0 && num_peers < (unsigned long long)-1);
   /* Set peers_left so we know when all peers started */
   peers_left = num_peers;
 
@@ -1800,6 +2037,7 @@
                                      &topology_callback, NULL,
                                      hosts);
 
+  GNUNET_free_non_null(temphost);
 }
 
 

Modified: gnunet/src/dht/gnunet-service-dht.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht.c 2010-08-30 14:30:49 UTC (rev 12763)
+++ gnunet/src/dht/gnunet-service-dht.c 2010-08-30 16:33:42 UTC (rev 12764)
@@ -67,14 +67,14 @@
  */
 #define MINIMUM_PEER_THRESHOLD 20
 
+#define DHT_MAX_RECENT 100
 
+#define FIND_PEER_CALC_INTERVAL GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 60)
 
-#define DHT_MAX_RECENT 100
-
 /**
  * Default time to wait to send messages on behalf of other peers.
  */
-#define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 10);
+#define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 10)
 
 /**
  * Default importance for handling messages on behalf of other peers.
@@ -82,9 +82,14 @@
 #define DHT_DEFAULT_P2P_IMPORTANCE 0
 
 /**
+ * How long to keep recent requests arounds by default.
+ */
+#define DEFAULT_RECENT_REMOVAL 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 60)
+
+/**
  * Default time to wait to send find peer messages sent by the dht service.
  */
-#define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 30);
+#define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 30)
 
 /**
  * Default importance for find peer messages sent by the dht service.
@@ -99,7 +104,7 @@
 /**
  * Default options for find peer requests sent by the dht service.
  */
-#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
+#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE
 
 /**
  * How long at least to wait before sending another find peer request.
@@ -109,7 +114,7 @@
 /**
  * How long at most to wait before sending another find peer request.
  */
-#define DHT_MAXIMUM_FIND_PEER_INTERVAL 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
+#define DHT_MAXIMUM_FIND_PEER_INTERVAL 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
 
 /**
  * How often to update our preference levels for peers in our routing tables.
@@ -117,6 +122,12 @@
 #define DHT_DEFAULT_PREFERENCE_INTERVAL 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
 
 /**
+ * How long at most on average will we allow a reply forward to take
+ * (before we quit sending out new requests)
+ */
+#define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 1)
+
+/**
  * How many initial requests to send out (in true Kademlia fashion)
  */
 #define DHT_KADEMLIA_REPLICATION 3
@@ -145,6 +156,12 @@
 #define MAX_HOPS 20
 
 /**
+ * How many time differences between requesting a core send and
+ * the actual callback to remember.
+ */
+#define MAX_REPLY_TIMES 8
+
+/**
  * Linked list of messages to send to clients.
  */
 struct P2PPendingMessage
@@ -165,6 +182,11 @@
   unsigned int importance;
 
   /**
+   * Time when this request was scheduled to be sent.
+   */
+  struct GNUNET_TIME_Absolute scheduled;
+
+  /**
    * How long to wait before sending message.
    */
   struct GNUNET_TIME_Relative timeout;
@@ -251,7 +273,6 @@
    * Task for scheduling periodic ping messages for this peer.
    */
   GNUNET_SCHEDULER_TaskIdentifier ping_task;
-
 };
 
 /**
@@ -329,7 +350,6 @@
    * Tail of linked list of pending messages for this client
    */
   struct PendingMessage *pending_tail;
-
 };
 
 
@@ -353,7 +373,7 @@
   /**
    * The key this request was about
    */
-  const GNUNET_HashCode *key;
+  GNUNET_HashCode key;
 
   /**
    * The unique identifier of this request
@@ -484,6 +504,21 @@
 };
 
 /**
+ * Context used to calculate the number of find peer messages
+ * per X time units since our last scheduled find peer message
+ * was sent.  If we have seen too many messages, delay or don't
+ * send our own out.
+ */
+struct FindPeerMessageContext
+{
+  unsigned int count;
+
+  struct GNUNET_TIME_Absolute start;
+
+  struct GNUNET_TIME_Absolute end;
+};
+
+/**
  * DHT Routing results structure
  */
 struct DHTResults
@@ -518,18 +553,50 @@
 
 struct RecentRequest
 {
+  /**
+   * Position of this node in the min heap.
+   */
+  struct GNUNET_CONTAINER_HeapNode *heap_node;
+
+  /**
+   * Bloomfilter containing entries for peers
+   * we forwarded this request to.
+   */
+  struct GNUNET_CONTAINER_BloomFilter *bloom;
+
+  /**
+   * Timestamp of this request, for ordering
+   * the min heap.
+   */
+  struct GNUNET_TIME_Absolute timestamp;
+
+  /**
+   * Key of this request.
+   */
   GNUNET_HashCode key;
+
+  /**
+   * Unique identifier for this request.
+   */
   uint64_t uid;
+
+  /**
+   * Task to remove this entry on timeout.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier remove_task;
 };
 
-
-#if 0
 /**
  * Recent requests by hash/uid and by time inserted.
  */
 static struct RecentRequests recent;
-#endif
+
 /**
+ * Context to use to calculate find peer rates.
+ */
+static struct FindPeerMessageContext find_peer_context;
+
+/**
  * Don't use our routing algorithm, always route
  * to closest peer; initially send requests to 3
  * peers.
@@ -547,6 +614,12 @@
 static int stop_on_found;
 
 /**
+ * Whether DHT needs to manage find peer requests, or
+ * an external force will do it on behalf of the DHT.
+ */
+static int do_find_peer;
+
+/**
  * How many peers have we added since we sent out our last
  * find peer request?
  */
@@ -662,26 +735,114 @@
  */
 static unsigned int malicious_getter;
 
-/*
+/**
  * GNUNET_YES or GNUNET_NO, whether or not to act as
  * a malicious node which sends out lots of PUTS
  */
 static unsigned int malicious_putter;
 
+/**
+ * Frequency for malicious get requests.
+ */
 static unsigned long long malicious_get_frequency;
 
+/**
+ * Frequency for malicious put requests.
+ */
 static unsigned long long malicious_put_frequency;
 
 /**
+ * Reply times for requests, if we are busy, don't send any
+ * more requests!
+ */
+static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
+
+/**
+ * Current counter for replies.
+ */
+static unsigned int reply_counter;
+
+/**
  * Forward declaration.
  */
 static size_t send_generic_reply (void *cls, size_t size, void *buf);
 
-/* Declare here so retry_core_send is aware of it */
+/** Declare here so retry_core_send is aware of it */
 size_t core_transmit_notify (void *cls,
                              size_t size, void *buf);
 
+/**
+ * Convert unique ID to hash code.
+ *
+ * @param uid unique ID to convert
+ * @param hash set to uid (extended with zeros)
+ */
 static void
+hash_from_uid (uint64_t uid,
+               GNUNET_HashCode *hash)
+{
+  memset (hash, 0, sizeof(GNUNET_HashCode));
+  *((uint64_t*)hash) = uid;
+}
+
+#if AVG
+/**
+ * Calculate the average send time between messages so that we can
+ * ignore certain requests if we get too busy.
+ *
+ * @return the average time between asking core to send a message
+ *         and when the buffer for copying it is passed
+ */
+static struct GNUNET_TIME_Relative get_average_send_delay()
+{
+  unsigned int i;
+  unsigned int divisor;
+  struct GNUNET_TIME_Relative average_time;
+  average_time = GNUNET_TIME_relative_get_zero();
+  divisor = 0;
+  for (i = 0; i < MAX_REPLY_TIMES; i++)
+  {
+    average_time = GNUNET_TIME_relative_add(average_time, reply_times[i]);
+    if (reply_times[i].value == (uint64_t)0)
+      continue;
+    else
+      divisor++;
+  }
+  if (divisor == 0)
+  {
+    return average_time;
+  }
+
+  average_time = GNUNET_TIME_relative_divide(average_time, divisor);
+  fprintf(stderr, "Avg send delay: %u sends is %llu\n", divisor, (long long 
unsigned int)average_time.value);
+  return average_time;
+}
+#endif
+
+/**
+ * Find the maximum send time of the recently sent values.
+ *
+ * @return the average time between asking core to send a message
+ *         and when the buffer for copying it is passed
+ */
+static struct GNUNET_TIME_Relative get_max_send_delay()
+{
+  unsigned int i;
+  struct GNUNET_TIME_Relative max_time;
+  max_time = GNUNET_TIME_relative_get_zero();
+
+  for (i = 0; i < MAX_REPLY_TIMES; i++)
+  {
+    if (reply_times[i].value > max_time.value)
+      max_time.value = reply_times[i].value;
+  }
+
+  if (max_time.value > MAX_REQUEST_TIME.value)
+    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n", (long 
long unsigned int)max_time.value);
+  return max_time;
+}
+
+static void
 increment_stats(const char *value)
 {
   if (stats != NULL)
@@ -718,6 +879,10 @@
                 "`%s:%s': Calling notify_transmit_ready with size %d for peer 
%s\n", my_short_id,
                 "DHT", ssize, GNUNET_i2s(&peer->id));
 #endif
+      pending->scheduled = GNUNET_TIME_absolute_get();
+      reply_counter++;
+      if (reply_counter >= MAX_REPLY_TIMES)
+       reply_counter = 0;
       peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI, 
pending->importance,
                                                    pending->timeout, &peer->id,
                                                    ssize, 
&core_transmit_notify, peer);
@@ -759,7 +924,7 @@
   result_message->hop_count = htonl(msg_ctx->hop_count + 1);
   GNUNET_assert(GNUNET_OK == 
GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, 
result_message->bloomfilter, DHT_BLOOM_SIZE));
   result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
-  memcpy(&result_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+  memcpy(&result_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
   memcpy(&result_message[1], msg, ntohs(msg->size));
 #if DEBUG_DHT > 1
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d 
for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
@@ -802,6 +967,7 @@
   peer->th = NULL;
   off = 0;
   pending = peer->head;
+  reply_times[reply_counter] = 
GNUNET_TIME_absolute_get_difference(pending->scheduled, 
GNUNET_TIME_absolute_get());
   msize = ntohs(pending->msg->size);
   if (msize <= size)
     {
@@ -1246,6 +1412,7 @@
   struct PeerInfo *peer = value;
   int new_bucket;
 
+  GNUNET_assert(lowest_bucket > 0);
   new_bucket = lowest_bucket - 1;
   remove_peer(peer, lowest_bucket);
   GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
@@ -1358,7 +1525,7 @@
 
   increment_stats(STAT_ROUTE_FORWARDS);
 
-  if ((msg_ctx->closest != GNUNET_YES) && (peer == 
find_closest_peer(msg_ctx->key)))
+  if ((msg_ctx->closest != GNUNET_YES) && (peer == 
find_closest_peer(&msg_ctx->key)))
     increment_stats(STAT_ROUTE_FORWARDS_CLOSEST);
 
   msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
@@ -1378,8 +1545,7 @@
   route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
   if (msg_ctx->bloom != NULL)
     GNUNET_assert(GNUNET_OK == 
GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, 
route_message->bloomfilter, DHT_BLOOM_SIZE));
-  if (msg_ctx->key != NULL)
-    memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+  memcpy(&route_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
   memcpy(&route_message[1], msg, ntohs(msg->size));
 #if DEBUG_DHT > 1
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d 
for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
@@ -1625,6 +1791,8 @@
   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers, 
&peer->hashPubKey))
     return GNUNET_NO; /* We already know this peer (are connected even!) */
   bucket = find_current_bucket(&peer->hashPubKey);
+  if (bucket == GNUNET_SYSERR)
+    return GNUNET_NO;
   if ((k_buckets[bucket].peers_size < bucket_size) || ((bucket == 
lowest_bucket) && (lowest_bucket > 0)))
     return GNUNET_YES;
 
@@ -1667,6 +1835,7 @@
       }
       else /* We have a valid hello, and peer id stored in new_peer */
       {
+        find_peer_context.count++;
         increment_stats(STAT_FIND_PEER_REPLY);
         if (GNUNET_YES == consider_peer(&new_peer))
         {
@@ -1682,7 +1851,7 @@
   if (malicious_dropper == GNUNET_YES)
     record = NULL;
   else
-    record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, 
message_context->key);
+    record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, 
&message_context->key);
 
   if (record == NULL) /* No record of this message! */
     {
@@ -1701,7 +1870,7 @@
                                        message_context->hop_count,
                                        GNUNET_SYSERR,
                                        &my_identity,
-                                       message_context->key,
+                                       &message_context->key,
                                        message_context->peer, NULL);
         }
 #endif
@@ -1728,7 +1897,7 @@
             {
               dhtlog_handle->insert_route (NULL, message_context->unique_id, 
DHTLOG_RESULT,
                                            message_context->hop_count,
-                                           GNUNET_YES, &my_identity, 
message_context->key,
+                                           GNUNET_YES, &my_identity, 
&message_context->key,
                                            message_context->peer, NULL);
             }
 #endif
@@ -1763,7 +1932,7 @@
                   dhtlog_handle->insert_route (NULL, 
message_context->unique_id,
                                                DHTLOG_RESULT,
                                                message_context->hop_count,
-                                               GNUNET_NO, &my_identity, 
message_context->key,
+                                               GNUNET_NO, &my_identity, 
&message_context->key,
                                                message_context->peer, 
&pos->source);
                 }
 #endif
@@ -1876,7 +2045,7 @@
 
   if (datacache != NULL)
     results =
-      GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
+      GNUNET_DATACACHE_get (datacache, &message_context->key, get_type,
                             &datacache_get_iterator, message_context);
 
   if (results >= 1)
@@ -1891,14 +2060,14 @@
         {
           dhtlog_handle->insert_query (NULL, message_context->unique_id, 
DHTLOG_GET,
                                 message_context->hop_count, GNUNET_YES, 
&my_identity,
-                                message_context->key);
+                                &message_context->key);
         }
 
       if ((debug_routes_extended) && (dhtlog_handle != NULL))
         {
           dhtlog_handle->insert_route (NULL, message_context->unique_id, 
DHTLOG_ROUTE,
                                        message_context->hop_count, GNUNET_YES,
-                                       &my_identity, message_context->key, 
message_context->peer,
+                                       &my_identity, &message_context->key, 
message_context->peer,
                                        NULL);
         }
 #endif
@@ -1911,7 +2080,7 @@
       {
         dhtlog_handle->insert_query (NULL, message_context->unique_id, 
DHTLOG_GET,
                                       message_context->hop_count, GNUNET_NO, 
&my_identity,
-                                      message_context->key);
+                                      &message_context->key);
       }
 #endif
     }
@@ -2000,7 +2169,7 @@
     {
       dhtlog_handle->insert_query (NULL, message_context->unique_id, 
DHTLOG_FIND_PEER,
                                    message_context->hop_count, GNUNET_YES, 
&my_identity,
-                                   message_context->key);
+                                   &message_context->key);
     }
 #endif
   GNUNET_free(find_peer_result);
@@ -2046,7 +2215,7 @@
         {
           dhtlog_handle->insert_query (NULL, message_context->unique_id, 
DHTLOG_PUT,
                                        message_context->hop_count, GNUNET_NO, 
&my_identity,
-                                       message_context->key);
+                                       &message_context->key);
         }
     }
 #endif
@@ -2059,7 +2228,7 @@
     {
       dhtlog_handle->insert_route (NULL, message_context->unique_id, 
DHTLOG_ROUTE,
                                    message_context->hop_count, GNUNET_YES,
-                                   &my_identity, message_context->key, 
message_context->peer,
+                                   &my_identity, &message_context->key, 
message_context->peer,
                                    NULL);
     }
 
@@ -2067,13 +2236,13 @@
     {
       dhtlog_handle->insert_query (NULL, message_context->unique_id, 
DHTLOG_PUT,
                                    message_context->hop_count, GNUNET_YES, 
&my_identity,
-                                   message_context->key);
+                                   &message_context->key);
     }
 #endif
 
   increment_stats(STAT_PUTS_INSERTED);
   if (datacache != NULL)
-    GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
+    GNUNET_DATACACHE_put (datacache, &message_context->key, data_size,
                           (char *) &put_msg[1], put_type,
                           GNUNET_TIME_absolute_ntoh(put_msg->expiration));
   else
@@ -2154,9 +2323,8 @@
     }
   target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */
     target_replication / (target_replication * (hop_count + 1) + diameter);
+#if NONSENSE
   target_value = 0;
-
-#if NONSENSE
   while (target_value < target_count)
     target_value++; /* target_value is ALWAYS 1 after this "loop" */
 #else
@@ -2171,33 +2339,44 @@
 
 /*
  * Check whether my identity is closer than any known peers.
+ * If a non-null bloomfilter is given, check if this is the closest
+ * peer that hasn't already been routed to.
  *
  * @param target hash code to check closeness to
+ * @param bloom bloomfilter, exclude these entries from the decision
  *
  * Return GNUNET_YES if node location is closest, GNUNET_NO
  * otherwise.
  */
 int
-am_closest_peer (const GNUNET_HashCode * target)
+am_closest_peer (const GNUNET_HashCode * target, struct 
GNUNET_CONTAINER_BloomFilter *bloom)
 {
   int bits;
   int other_bits;
   int bucket_num;
   int count;
   struct PeerInfo *pos;
+#if INTEGER_DISTANCE
   unsigned int my_distance;
-
+#endif
   bucket_num = find_current_bucket(target);
   if (bucket_num == GNUNET_SYSERR) /* Same key! */
     return GNUNET_YES;
 
   bits = matching_bits(&my_identity.hashPubKey, target);
+#if INTEGER_DISTANCE
   my_distance = distance(&my_identity.hashPubKey, target);
-
+#endif
   pos = k_buckets[bucket_num].head;
   count = 0;
   while ((pos != NULL) && (count < bucket_size))
     {
+      if ((bloom != NULL) && (GNUNET_YES == 
GNUNET_CONTAINER_bloomfilter_test(bloom, &pos->id.hashPubKey)))
+        {
+          pos = pos->next;
+          continue; /* Skip already checked entries */
+        }
+
       other_bits = matching_bits(&pos->id.hashPubKey, target);
       if (other_bits > bits)
         return GNUNET_NO;
@@ -2361,7 +2540,34 @@
     }
 }
 
+/**
+ * Task used to remove recent entries, either
+ * after timeout, when full, or on shutdown.
+ *
+ * @param cls the entry to remove
+ * @param tc context, reason, etc.
+ */
+static void
+remove_recent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct RecentRequest *req = cls;
+  static GNUNET_HashCode hash;
 
+  GNUNET_assert(req != NULL);
+  hash_from_uid(req->uid, &hash);
+  GNUNET_assert (GNUNET_YES == 
GNUNET_CONTAINER_multihashmap_remove(recent.hashmap, &hash, req));
+  GNUNET_CONTAINER_heap_remove_node(recent.minHeap, req->heap_node);
+  GNUNET_CONTAINER_bloomfilter_free(req->bloom);
+  GNUNET_free(req);
+
+  if ((tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) && (0 == 
GNUNET_CONTAINER_multihashmap_size(recent.hashmap)) && (0 == 
GNUNET_CONTAINER_heap_get_size(recent.minHeap)))
+  {
+    GNUNET_CONTAINER_multihashmap_destroy(recent.hashmap);
+    GNUNET_CONTAINER_heap_destroy(recent.minHeap);
+  }
+}
+
+
 /**
  * Task used to remove forwarding entries, either
  * after timeout, when full, or on shutdown.
@@ -2408,6 +2614,7 @@
   while (current_size >= MAX_OUTSTANDING_FORWARDS)
     {
       source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap);
+      GNUNET_assert(source_info != NULL);
       record = source_info->record;
       GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
       if (record->head == NULL) /* No more entries in DLL */
@@ -2420,7 +2627,7 @@
       current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
     }
   now = GNUNET_TIME_absolute_get();
-  record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, 
msg_ctx->key);
+  record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, 
&msg_ctx->key);
   if (record != NULL) /* Already know this request! */
     {
       pos = record->head;
@@ -2439,8 +2646,8 @@
   else
     {
       record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
-      GNUNET_assert(GNUNET_OK == 
GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, msg_ctx->key, record, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-      memcpy(&record->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+      GNUNET_assert(GNUNET_OK == 
GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, &msg_ctx->key, record, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+      memcpy(&record->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
     }
 
   source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
@@ -2480,11 +2687,12 @@
 {
   int i;
   struct PeerInfo *selected;
+#if DEBUG_DHT_ROUTING > 1
   struct PeerInfo *nearest;
+#endif
   unsigned int forward_count;
-#if DEBUG_DHT
-  char *nearest_buf;
-#endif
+  struct RecentRequest *recent_req;
+  GNUNET_HashCode unique_hash;
 #if DEBUG_DHT_ROUTING
   int ret;
 #endif
@@ -2496,7 +2704,7 @@
         {
           dhtlog_handle->insert_route (NULL, message_context->unique_id, 
DHTLOG_ROUTE,
                                        message_context->hop_count, 
GNUNET_SYSERR,
-                                       &my_identity, message_context->key, 
message_context->peer,
+                                       &my_identity, &message_context->key, 
message_context->peer,
                                        NULL);
         }
 #endif
@@ -2506,15 +2714,16 @@
     }
 
   increment_stats(STAT_ROUTES);
-  message_context->closest = am_closest_peer(message_context->key);
+  /* Semantics of this call means we find whether we are the closest peer out 
of those already
+   * routed to on this messages path.
+   */
+  message_context->closest = am_closest_peer(&message_context->key, 
message_context->bloom);
   forward_count = get_forward_count(message_context->hop_count, 
message_context->replication);
-  nearest = find_closest_peer(message_context->key);
-
+  
   if (message_context->bloom == NULL)
     message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, 
DHT_BLOOM_SIZE, DHT_BLOOM_K);
 
   if ((stop_on_closest == GNUNET_YES) && (message_context->closest == 
GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
-/*      || ((strict_kademlia == GNUNET_YES) && (message_context->closest == 
GNUNET_YES))) */
     forward_count = 0;
 
 #if DEBUG_DHT_ROUTING
@@ -2527,7 +2736,7 @@
     {
       dhtlog_handle->insert_route (NULL, message_context->unique_id, 
DHTLOG_ROUTE,
                                    message_context->hop_count, ret,
-                                   &my_identity, message_context->key, 
message_context->peer,
+                                   &my_identity, &message_context->key, 
message_context->peer,
                                    NULL);
     }
 #endif
@@ -2556,10 +2765,10 @@
         {
           if ((debug_routes) && (dhtlog_handle != NULL))
             {
-              dhtlog_handle->insert_dhtkey(NULL, message_context->key);
+              dhtlog_handle->insert_dhtkey(NULL, &message_context->key);
               dhtlog_handle->insert_query (NULL, message_context->unique_id, 
DHTLOG_FIND_PEER,
                                            message_context->hop_count, 
GNUNET_NO, &my_identity,
-                                           message_context->key);
+                                           &message_context->key);
             }
         }
 #endif
@@ -2570,42 +2779,47 @@
     }
 
   GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, 
&my_identity.hashPubKey);
-#if 0
-  if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent->hashmap, 
message_context->key))
-    {
-      if (GNUNET_SYSERR = GNUNET_CONTAINER_multihashmap_get_multiple 
(recent->hashmap, message_context->key, &find_matching_recent, 
&message_context)) /* Have too recently seen this request! */
-        {
-          forward_count = 0;
-        }
-      else /* Exact match not found, but same key found */
-        {
-          recent_req = GNUNET_CONTAINER_multihashmap_get(recent->hashmap, 
message_context->key);
-        }
+  hash_from_uid(message_context->unique_id, &unique_hash);
+  if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent.hashmap, 
&unique_hash))
+  {
+      recent_req = GNUNET_CONTAINER_multihashmap_get(recent.hashmap, 
&unique_hash);
+      GNUNET_assert(recent_req != NULL);
+      if (0 != memcmp(&recent_req->key, &message_context->key, 
sizeof(GNUNET_HashCode)))
+        increment_stats(STAT_DUPLICATE_UID);
+      else
+      {
+        increment_stats(STAT_RECENT_SEEN);
+        GNUNET_CONTAINER_bloomfilter_or2(message_context->bloom, 
recent_req->bloom, DHT_BLOOM_SIZE);
+      }
     }
   else
     {
       recent_req = GNUNET_malloc(sizeof(struct RecentRequest));
       recent_req->uid = message_context->unique_id;
-      memcmp(&recent_req->key, message_context->key, sizeof(GNUNET_HashCode));
+      memcpy(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode));
       recent_req->remove_task = GNUNET_SCHEDULER_add_delayed(sched, 
DEFAULT_RECENT_REMOVAL, &remove_recent, recent_req);
-      GNUNET_CONTAINER_heap_insert(recent->minHeap, recent_req, 
GNUNET_TIME_absolute_get());
-      GNUNET_CONTAINER_multihashmap_put(recent->hashmap, message_context->key, 
recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+      recent_req->heap_node = GNUNET_CONTAINER_heap_insert(recent.minHeap, 
recent_req, GNUNET_TIME_absolute_get().value);
+      recent_req->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, 
DHT_BLOOM_SIZE, DHT_BLOOM_K);
+      GNUNET_CONTAINER_multihashmap_put(recent.hashmap, &unique_hash, 
recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
     }
 
-  if (GNUNET_CONTAINER_multihashmap_size(recent->hashmap) > DHT_MAX_RECENT)
+  if (GNUNET_CONTAINER_multihashmap_size(recent.hashmap) > DHT_MAX_RECENT)
     {
-      remove_oldest_recent();
+      recent_req = GNUNET_CONTAINER_heap_peek(recent.minHeap);
+      GNUNET_assert(recent_req != NULL);
+      GNUNET_SCHEDULER_cancel(sched, recent_req->remove_task);
+      GNUNET_SCHEDULER_add_now(sched, &remove_recent, recent_req);
     }
-#endif
 
   for (i = 0; i < forward_count; i++)
     {
-      selected = select_peer(message_context->key, message_context->bloom);
+      selected = select_peer(&message_context->key, message_context->bloom);
 
       if (selected != NULL)
         {
           GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, 
&selected->id.hashPubKey);
 #if DEBUG_DHT_ROUTING > 1
+          nearest = find_closest_peer(&message_context->key);
           nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                       "`%s:%s': Forwarding request key %s uid %llu to peer %s 
(closest %s, bits %d, distance %u)\n", my_short_id,
@@ -2616,7 +2830,7 @@
             {
               dhtlog_handle->insert_route (NULL, message_context->unique_id, 
DHTLOG_ROUTE,
                                            message_context->hop_count, 
GNUNET_NO,
-                                           &my_identity, message_context->key, 
message_context->peer,
+                                           &my_identity, 
&message_context->key, message_context->peer,
                                            &selected->id);
             }
           forward_message(cls, msg, selected, message_context);
@@ -2640,7 +2854,10 @@
 #endif
 
   if (message_context->bloom != NULL)
-    GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
+    {
+      GNUNET_CONTAINER_bloomfilter_or2(recent_req->bloom, 
message_context->bloom, DHT_BLOOM_SIZE);
+      GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
+    }
 
   return forward_count;
 }
@@ -2684,7 +2901,6 @@
   static struct GNUNET_DHT_PutMessage put_message;
   static struct DHT_MessageContext message_context;
   static GNUNET_HashCode key;
-  unsigned int mcsize;
   uint32_t random_key;
 
   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
@@ -2694,12 +2910,11 @@
   put_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
   put_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
   put_message.expiration = 
GNUNET_TIME_absolute_hton(GNUNET_TIME_absolute_get_forever());
-  mcsize = sizeof(struct DHT_MessageContext) + sizeof(GNUNET_HashCode);
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
   message_context.client = NULL;
   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 
(uint32_t)-1);
   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
-  message_context.key = &key;
+  memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll 
(GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
   message_context.msg_options = ntohl (0);
@@ -2726,9 +2941,8 @@
 malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   static struct GNUNET_DHT_GetMessage get_message;
-  static struct DHT_MessageContext message_context;
+  struct DHT_MessageContext message_context;
   static GNUNET_HashCode key;
-  unsigned int mcsize;
   uint32_t random_key;
 
   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
@@ -2737,12 +2951,11 @@
   get_message.header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
   get_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
   get_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
-  mcsize = sizeof(struct DHT_MessageContext) + sizeof(GNUNET_HashCode);
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
   message_context.client = NULL;
   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 
(uint32_t)-1);
   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
-  message_context.key = &key;
+  memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll 
(GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
   message_context.msg_options = ntohl (0);
@@ -2754,13 +2967,10 @@
     dhtlog_handle->insert_dhtkey(NULL, &key);
   increment_stats(STAT_GET_START);
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious GET message 
with hash %s", my_short_id, "DHT", GNUNET_h2s(&key));
-  route_message(NULL, &get_message.header, &message_context);
+  route_message (NULL, &get_message.header, &message_context);
   GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 
malicious_get_frequency), &malicious_get_task, NULL);
 }
 
-#if DO_FIND_PEER
-
-
 /**
  * Iterator over hash map entries.
  *
@@ -2797,12 +3007,42 @@
   int ret;
   struct GNUNET_TIME_Relative next_send_time;
   struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
-
+#if COUNT_INTERVAL
+  struct GNUNET_TIME_Relative time_diff;
+  struct GNUNET_TIME_Absolute end;
+  double multiplier;
+  double count_per_interval;
+#endif
   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
     return;
 
+  if ((newly_found_peers > bucket_size) && (GNUNET_YES == do_find_peer)) /* If 
we are finding peers already, no need to send out our request right now! */
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Have %d newly found peers since 
last find peer message sent!\n", newly_found_peers);
+      GNUNET_SCHEDULER_add_delayed (sched,
+                                    GNUNET_TIME_UNIT_MINUTES,
+                                    &send_find_peer_message, NULL);
+      newly_found_peers = 0;
+      return;
+    }
+    
   increment_stats(STAT_FIND_PEER_START);
+#if COUNT_INTERVAL
+  end = GNUNET_TIME_absolute_get();
+  time_diff = GNUNET_TIME_absolute_get_difference(find_peer_context.start, 
end);
 
+  if (time_diff.value > FIND_PEER_CALC_INTERVAL.value)
+    {
+      multiplier = time_diff.value / FIND_PEER_CALC_INTERVAL.value;
+      count_per_interval = find_peer_context.count / multiplier;
+    }
+  else
+    {
+      multiplier = FIND_PEER_CALC_INTERVAL.value / time_diff.value;
+      count_per_interval = find_peer_context.count * multiplier;
+    }
+#endif
+
   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage));
   find_peer_msg->header.size = htons(sizeof(struct 
GNUNET_DHT_FindPeerMessage));
   find_peer_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
@@ -2810,10 +3050,10 @@
   GNUNET_CONTAINER_multihashmap_iterate(all_known_peers, &add_known_to_bloom, 
temp_bloom);
   GNUNET_assert(GNUNET_OK == 
GNUNET_CONTAINER_bloomfilter_get_raw_data(temp_bloom, 
find_peer_msg->bloomfilter, DHT_BLOOM_SIZE));
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
-  message_context.key = &my_identity.hashPubKey;
+  memcpy(&message_context.key, &my_identity.hashPubKey, 
sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll 
(GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, (uint64_t)-1));
-  message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
-  message_context.msg_options = ntohl (DHT_DEFAULT_FIND_PEER_OPTIONS);
+  message_context.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
+  message_context.msg_options = DHT_DEFAULT_FIND_PEER_OPTIONS;
   message_context.network_size = estimate_diameter();
   message_context.peer = &my_identity;
   message_context.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
@@ -2836,12 +3076,18 @@
                              
GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
                                                       
DHT_MAXIMUM_FIND_PEER_INTERVAL.value - DHT_MINIMUM_FIND_PEER_INTERVAL.value);
     }
+
+  GNUNET_assert (next_send_time.value != 0);
+  find_peer_context.count = 0;
   newly_found_peers = 0;
-  GNUNET_SCHEDULER_add_delayed (sched,
-                                next_send_time,
-                                &send_find_peer_message, NULL);
+  find_peer_context.start = GNUNET_TIME_absolute_get();
+  if (GNUNET_YES == do_find_peer)
+  {
+    GNUNET_SCHEDULER_add_delayed (sched,
+                                  next_send_time,
+                                         &send_find_peer_message, NULL);
+  }
 }
-#endif
 
 /**
  * Handler for any generic DHT messages, calls the appropriate handler
@@ -2859,11 +3105,7 @@
   const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct 
GNUNET_DHT_RouteMessage *) message;
   const struct GNUNET_MessageHeader *enc_msg;
   struct DHT_MessageContext message_context;
-  size_t enc_type;
-
   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
-  enc_type = ntohs (enc_msg->type);
-
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s:%s': Received `%s' request from client, message type %d, 
key %s, uid %llu\n",
@@ -2876,7 +3118,7 @@
 #endif
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
   message_context.client = find_active_client (client);
-  message_context.key = &dht_msg->key;
+  memcpy(&message_context.key, &dht_msg->key, sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id);
   message_context.replication = ntohl (dht_msg->desired_replication_level);
   message_context.msg_options = ntohl (dht_msg->options);
@@ -2920,6 +3162,10 @@
 
   switch (ntohs(dht_control_msg->command))
   {
+  case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
+    GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending self seeking find peer 
request!\n");
+    GNUNET_SCHEDULER_add_now(sched, &send_find_peer_message, NULL);
+    break;
   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET:
     if (ntohs(dht_control_msg->variable) > 0)
       malicious_get_frequency = ntohs(dht_control_msg->variable);
@@ -2971,15 +3217,11 @@
     (const struct GNUNET_DHT_StopMessage *) message;
   struct DHTQueryRecord *record;
   struct DHTRouteSource *pos;
-  uint64_t uid;
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s:%s': Received `%s' request from client, uid %llu\n", 
my_short_id, "DHT",
               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
 #endif
-
-  uid = GNUNET_ntohll(dht_stop_msg->unique_id);
-
   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, 
&dht_stop_msg->key);
   if (record != NULL)
     {
@@ -3017,6 +3259,12 @@
   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader 
*)&incoming[1];
   struct DHT_MessageContext *message_context;
 
+  if (get_max_send_delay().value > MAX_REQUEST_TIME.value)
+  {
+    fprintf(stderr, "Sending of previous requests has taken far too long, 
backing off!\n");
+    return GNUNET_YES;
+  }
+
   if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these 
away. FIXME: Don't throw these away? (reply)*/
     {
 #if DEBUG_PING
@@ -3034,7 +3282,7 @@
   message_context->bloom = 
GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, 
DHT_BLOOM_K);
   GNUNET_assert(message_context->bloom != NULL);
   message_context->hop_count = ntohl(incoming->hop_count);
-  message_context->key = &incoming->key;
+  memcpy(&message_context->key, &incoming->key, sizeof(GNUNET_HashCode));
   message_context->replication = ntohl(incoming->desired_replication_level);
   message_context->unique_id = GNUNET_ntohll(incoming->unique_id);
   message_context->msg_options = ntohl(incoming->options);
@@ -3074,7 +3322,7 @@
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
   message_context.bloom = 
GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, 
DHT_BLOOM_K);
   GNUNET_assert(message_context.bloom != NULL);
-  message_context.key = &incoming->key;
+  memcpy(&message_context.key, &incoming->key, sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll(incoming->unique_id);
   message_context.msg_options = ntohl(incoming->options);
   message_context.hop_count = ntohl(incoming->hop_count);
@@ -3322,7 +3570,7 @@
   coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
                                  cfg,   /* Main configuration */
                                  GNUNET_TIME_UNIT_FOREVER_REL,
-                                 NULL,  /* Closure passed to DHT functionas 
around? */
+                                 NULL,  /* Closure passed to DHT functions */
                                  &core_init,    /* Call core_init once 
connected */
                                  &handle_core_connect,  /* Handle connects */
                                  &handle_core_disconnect,  /* remove peers on 
disconnects */
@@ -3402,6 +3650,15 @@
       malicious_dropper = GNUNET_YES;
     }
 
+  if (GNUNET_NO ==
+        GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
+                                             "do_find_peer"))
+    {
+      do_find_peer = GNUNET_NO;
+    }
+  else
+    do_find_peer = GNUNET_YES;
+
   if (GNUNET_YES ==
       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
                                            "mysql_logging_extended"))
@@ -3445,14 +3702,19 @@
       GNUNET_STATISTICS_set(stats, STAT_HELLOS_PROVIDED, 0, GNUNET_NO);
       GNUNET_STATISTICS_set(stats, STAT_DISCONNECTS, 0, GNUNET_NO);
     }
-#if DO_FIND_PEER
-  next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
-                         GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
-                                                  
(DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) - 
DHT_MINIMUM_FIND_PEER_INTERVAL.value);
-  GNUNET_SCHEDULER_add_delayed (sched,
-                                next_send_time,
-                                &send_find_peer_message, NULL);
-#endif
+  /* FIXME: if there are no recent requests then these never get freed, but 
alternative is _annoying_! */
+  recent.hashmap = GNUNET_CONTAINER_multihashmap_create(DHT_MAX_RECENT / 2);
+  recent.minHeap = 
GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  if (GNUNET_YES == do_find_peer)
+  {
+    next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
+                           
GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
+                                                    
(DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) - 
DHT_MINIMUM_FIND_PEER_INTERVAL.value);
+    find_peer_context.start = GNUNET_TIME_absolute_get();
+    GNUNET_SCHEDULER_add_delayed (sched,
+                                  next_send_time,
+                                  &send_find_peer_message, &find_peer_context);
+  }
 
   /* Scheduled the task to clean up when shutdown is called */
   cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,




reply via email to

[Prev in Thread] Current Thread [Next in Thread]