gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r13006 - gnunet/src/dht


From: gnunet
Subject: [GNUnet-SVN] r13006 - gnunet/src/dht
Date: Thu, 16 Sep 2010 16:13:57 +0200

Author: nevans
Date: 2010-09-16 16:13:57 +0200 (Thu, 16 Sep 2010)
New Revision: 13006

Modified:
   gnunet/src/dht/gnunet-dht-driver.c
Log:
many useless changes to the dht testing driver, mostly for churn (which may not 
even be useful)

Modified: gnunet/src/dht/gnunet-dht-driver.c
===================================================================
--- gnunet/src/dht/gnunet-dht-driver.c  2010-09-16 14:12:56 UTC (rev 13005)
+++ gnunet/src/dht/gnunet-dht-driver.c  2010-09-16 14:13:57 UTC (rev 13006)
@@ -59,7 +59,7 @@
 
 #define DEFAULT_BUCKET_SIZE 4
 
-#define FIND_PEER_THRESHOLD DEFAULT_BUCKET_SIZE * 2
+#define FIND_PEER_THRESHOLD 1
 
 /* If more than this many peers are added, slow down sending */
 #define MAX_FIND_PEER_CUTOFF 2500
@@ -79,6 +79,8 @@
 
 #define DEFAULT_TOPOLOGY_TIMEOUT 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
 
+#define DEFAULT_RECONNECT_ATTEMPTS 8
+
 /*
  * Default frequency for sending malicious get messages
  */
@@ -259,6 +261,104 @@
   struct GNUNET_TIME_Relative timeout;
 };
 
+
+struct PeerCount
+{
+  /** Node in the heap */
+  struct GNUNET_CONTAINER_HeapNode *heap_node;
+
+  /** Peer the count refers to */
+  struct GNUNET_PeerIdentity peer_id;
+
+  /** Count of connections this peer has */
+  unsigned int count;
+};
+
+/**
+ * Context for sending out find peer requests.
+ */
+struct FindPeerContext
+{
+  /**
+   * How long to send find peer requests, once the settle time
+   * is over don't send any more out!
+   *
+   * TODO: Add option for settle time and find peer sending time?
+   */
+  struct GNUNET_TIME_Absolute endtime;
+
+  /**
+   * Number of connections in the current topology
+   * (after this round of find peer requests has ended).
+   */
+  unsigned int current_peers;
+
+  /**
+   * Number of connections in the current topology
+   * (before this round of find peer requests started).
+   */
+  unsigned int previous_peers;
+
+  /**
+   * Number of find peer requests we have currently
+   * outstanding.
+   */
+  unsigned int outstanding;
+
+  /**
+   * Number of find peer requests to send in this round.
+   */
+  unsigned int total;
+
+  /**
+   * Number of find peer requests sent last time around.
+   */
+  unsigned int last_sent;
+
+  /**
+   * Hashmap of peers in the current topology, value
+   * is a PeerCount, with the number of connections
+   * this peer has.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *peer_hash;
+
+  /**
+   * Min heap which orders values in the peer_hash for
+   * easy lookup.
+   */
+  struct GNUNET_CONTAINER_Heap *peer_min_heap;
+
+  /**
+   * Callback for counting the peers in the current topology.
+   */
+  GNUNET_TESTING_NotifyTopology count_peers_cb;
+};
+
+enum DHT_ROUND_TYPES
+{
+  /**
+   * Next full round (puts + gets).
+   */
+  DHT_ROUND_NORMAL,
+
+  /**
+   * Next round of gets.
+   */
+  DHT_ROUND_GET,
+
+  /**
+   * Next round of puts.
+   */
+  DHT_ROUND_PUT,
+
+  /**
+   * Next round of churn.
+   */
+  DHT_ROUND_CHURN
+};
+
+
+
 /* Globals */
 
 /**
@@ -281,8 +381,10 @@
 
 static struct GNUNET_TIME_Relative seconds_per_peer_start;
 
-static int do_find_peer;
+static unsigned int do_find_peer;
 
+static unsigned int in_dht_replication;
+
 static unsigned long long test_data_size = DEFAULT_TEST_DATA_SIZE;
 
 static unsigned long long max_outstanding_puts = DEFAULT_MAX_OUTSTANDING_PUTS;
@@ -295,6 +397,8 @@
 
 static unsigned long long malicious_putters;
 
+static unsigned long long round_delay;
+
 static unsigned long long malicious_droppers;
 
 static unsigned long long malicious_get_frequency;
@@ -310,6 +414,42 @@
 static unsigned long long trialuid;
 
 /**
+ * If GNUNET_YES, insert data at the same peers every time.
+ * Otherwise, choose a new random peer to insert at each time.
+ */
+static unsigned int replicate_same;
+
+/**
+ * Number of rounds for testing (PUTS + GETS)
+ */
+static unsigned long long total_rounds;
+
+/**
+ * Number of rounds already run
+ */
+static unsigned int rounds_finished;
+
+/**
+ * Number of rounds of churn to read from the file (first line, should be a 
single number).
+ */
+static unsigned int churn_rounds;
+
+/**
+ * Current round we are in for churn, tells us how many peers to 
connect/disconnect.
+ */
+static unsigned int current_churn_round;
+
+/**
+ * Number of times to churn per round
+ */
+static unsigned long long churns_per_round;
+
+/**
+ * Array of churn values.
+ */
+static unsigned int *churn_array;
+
+/**
  * Hash map of stats contexts.
  */
 struct GNUNET_CONTAINER_MultiHashMap *stats_map;
@@ -449,6 +589,8 @@
 
 static struct ProgressMeter *get_meter;
 
+static GNUNET_HashCode *known_keys;
+
 /* Global return value (0 for success, anything else for failure) */
 static int ok;
 
@@ -516,6 +658,24 @@
 }
 
 /**
+ * Reset progress meter.
+ *
+ * @param meter the meter to reset
+ *
+ * @return GNUNET_YES if meter reset,
+ *         GNUNET_SYSERR on error
+ */
+static int
+reset_meter(struct ProgressMeter *meter)
+{
+  if (meter == NULL)
+    return GNUNET_SYSERR;
+
+  meter->completed = 0;
+  return GNUNET_YES;
+}
+
+/**
  * Release resources for meter
  *
  * @param meter the meter to free
@@ -550,6 +710,7 @@
   test_put->disconnect_task = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_DHT_disconnect(test_put->dht_handle);
   test_put->dht_handle = NULL;
+  test_put->daemon = GNUNET_TESTING_daemon_get(pg, 
GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
 }
 
 /**
@@ -826,6 +987,475 @@
 }
 
 /**
+ * Forward declaration.
+ */
+static void
+do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc);
+
+/**
+ * Forward declaration.
+ */
+static void
+do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc);
+
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+static int remove_peer_count (void *cls,
+                              const GNUNET_HashCode * key,
+                              void *value)
+{
+  struct FindPeerContext *find_peer_ctx = cls;
+  struct PeerCount *peer_count = value;
+  GNUNET_CONTAINER_heap_remove_node(find_peer_ctx->peer_min_heap, 
peer_count->heap_node);
+  GNUNET_free(peer_count);
+
+  return GNUNET_YES;
+}
+
+/**
+ * Connect to all peers in the peer group and iterate over their
+ * connections.
+ */
+static void
+count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  find_peer_context->previous_peers = find_peer_context->current_peers;
+  find_peer_context->current_peers = 0;
+  GNUNET_TESTING_get_topology (pg, find_peer_context->count_peers_cb, 
find_peer_context);
+}
+
+static void
+decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * 
tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+  GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0);
+  test_find_peer->find_peer_context->outstanding--;
+  test_find_peer->find_peer_context->total--;
+  if ((0 == test_find_peer->find_peer_context->total) &&
+      
(GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value
 > 0))
+  {
+    GNUNET_SCHEDULER_add_now(sched, &count_new_peers, 
test_find_peer->find_peer_context);
+  }
+  GNUNET_free(test_find_peer);
+}
+
+/**
+ * A find peer request has been sent to the server, now we will schedule a task
+ * to wait the appropriate time to allow the request to go out and back.
+ *
+ * @param cls closure - a TestFindPeer struct
+ * @param tc context the task is being called with
+ */
+static void
+handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * 
tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+
+  GNUNET_DHT_disconnect(test_find_peer->dht_handle);
+  GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_divide(find_peer_delay, 2), &decrement_find_peers, 
test_find_peer);
+}
+
+
+static void
+send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * 
tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+
+  if (test_find_peer->find_peer_context->outstanding > 
max_outstanding_find_peers)
+  {
+    GNUNET_SCHEDULER_add_delayed(sched, find_peer_offset, 
&send_find_peer_request, test_find_peer);
+    return;
+  }
+
+  test_find_peer->find_peer_context->outstanding++;
+  if 
(GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value
 == 0)
+  {
+    GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer);
+    return;
+  }
+
+  test_find_peer->dht_handle = GNUNET_DHT_connect(sched, 
test_find_peer->daemon->cfg, 1);
+  GNUNET_assert(test_find_peer->dht_handle != NULL);
+  GNUNET_DHT_find_peers (test_find_peer->dht_handle,
+                         &handle_find_peer_sent, test_find_peer);
+}
+
+/**
+ * Set up a single find peer request for each peer in the topology.  Do this
+ * until the settle time is over, limited by the number of outstanding requests
+ * and the time allowed for each one!
+ */
+static void
+schedule_churn_find_peer_requests (void *cls, const struct 
GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct FindPeerContext *find_peer_ctx = cls;
+  struct TestFindPeer *test_find_peer;
+  struct PeerCount *peer_count;
+  uint32_t i;
+
+  if (find_peer_ctx->previous_peers == 0) /* First time, go slowly */
+    find_peer_ctx->total = 1;
+  else if (find_peer_ctx->current_peers - find_peer_ctx->previous_peers > 
MAX_FIND_PEER_CUTOFF) /* Found LOTS of peers, still go slowly */
+    find_peer_ctx->total = find_peer_ctx->last_sent - 
(find_peer_ctx->last_sent / 8);
+  else
+    find_peer_ctx->total = find_peer_ctx->last_sent * 2;
+
+  if (find_peer_ctx->total > max_outstanding_find_peers)
+    find_peer_ctx->total = max_outstanding_find_peers;
+
+  find_peer_ctx->last_sent = find_peer_ctx->total;
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (after 
churn)\n", find_peer_ctx->total);
+
+  find_peer_offset = GNUNET_TIME_relative_divide(find_peer_delay, 
find_peer_ctx->total);
+  for (i = 0; i < find_peer_ctx->total; i++)
+    {
+      test_find_peer = GNUNET_malloc(sizeof(struct TestFindPeer));
+      /* If we have sent requests, choose peers with a low number of 
connections to send requests from */
+      peer_count = 
GNUNET_CONTAINER_heap_remove_root(find_peer_ctx->peer_min_heap);
+      GNUNET_CONTAINER_multihashmap_remove(find_peer_ctx->peer_hash, 
&peer_count->peer_id.hashPubKey, peer_count);
+      test_find_peer->daemon = GNUNET_TESTING_daemon_get_by_id(pg, 
&peer_count->peer_id);
+      GNUNET_assert(test_find_peer->daemon != NULL);
+      test_find_peer->find_peer_context = find_peer_ctx;
+      GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_multiply(find_peer_offset, i), &send_find_peer_request, 
test_find_peer);
+    }
+
+  if ((find_peer_ctx->peer_hash == NULL) && (find_peer_ctx->peer_min_heap == 
NULL))
+    {
+      find_peer_ctx->peer_hash = 
GNUNET_CONTAINER_multihashmap_create(num_peers);
+      find_peer_ctx->peer_min_heap = 
GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
+    }
+  else
+    {
+      GNUNET_CONTAINER_multihashmap_iterate(find_peer_ctx->peer_hash, 
&remove_peer_count, find_peer_ctx);
+      GNUNET_CONTAINER_multihashmap_destroy(find_peer_ctx->peer_hash);
+      find_peer_ctx->peer_hash = 
GNUNET_CONTAINER_multihashmap_create(num_peers);
+    }
+
+  GNUNET_assert(0 == 
GNUNET_CONTAINER_multihashmap_size(find_peer_ctx->peer_hash));
+  GNUNET_assert(0 == 
GNUNET_CONTAINER_heap_get_size(find_peer_ctx->peer_min_heap));
+}
+
+/**
+ * Add a connection to the find_peer_context given.  This may
+ * be complete overkill, but allows us to choose the peers with
+ * the least connections to initiate find peer requests from.
+ */
+static void add_new_connection(struct FindPeerContext *find_peer_context,
+                               const struct GNUNET_PeerIdentity *first,
+                               const struct GNUNET_PeerIdentity *second)
+{
+  struct PeerCount *first_count;
+  struct PeerCount *second_count;
+
+  if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, 
&first->hashPubKey))
+  {
+    first_count = 
GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, 
&first->hashPubKey);
+    first_count->count++;
+    GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, 
first_count->heap_node, first_count->count);
+  }
+  else
+  {
+    first_count = GNUNET_malloc(sizeof(struct PeerCount));
+    first_count->count = 1;
+    memcpy(&first_count->peer_id, first, sizeof(struct GNUNET_PeerIdentity));
+    first_count->heap_node = 
GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, first_count, 
first_count->count);
+    GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, 
&first->hashPubKey, first_count, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  }
+
+  if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, 
&second->hashPubKey))
+  {
+    second_count = 
GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, 
&second->hashPubKey);
+    second_count->count++;
+    GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, 
second_count->heap_node, second_count->count);
+  }
+  else
+  {
+    second_count = GNUNET_malloc(sizeof(struct PeerCount));
+    second_count->count = 1;
+    memcpy(&second_count->peer_id, second, sizeof(struct GNUNET_PeerIdentity));
+    second_count->heap_node = 
GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, second_count, 
second_count->count);
+    GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, 
&second->hashPubKey, second_count, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  }
+}
+
+
+/**
+ * Iterate over min heap of connections per peer.  For any
+ * peer that has 0 connections, attempt to connect them to
+ * some random peer.
+ *
+ * @param cls closure a struct FindPeerContext
+ * @param node internal node of the heap
+ * @param element value stored, a struct PeerCount
+ * @param cost cost associated with the node
+ * @return GNUNET_YES if we should continue to iterate,
+ *         GNUNET_NO if not.
+ */
+static int iterate_min_heap_peers (void *cls,
+                                   struct GNUNET_CONTAINER_HeapNode *node,
+                                   void *element,
+                                   GNUNET_CONTAINER_HeapCostType cost)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  struct PeerCount *peer_count = element;
+  struct GNUNET_TESTING_Daemon *d1;
+  struct GNUNET_TESTING_Daemon *d2;
+  struct GNUNET_TIME_Relative timeout;
+  if (cost == 0)
+    {
+      d1 = GNUNET_TESTING_daemon_get_by_id (pg, &peer_count->peer_id);
+      d2 = GNUNET_TESTING_daemon_get(pg, 
GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
+      /** Just try to connect the peers, don't worry about callbacks, etc. **/
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer %s has 0 connections.  
Trying to connect to %s...\n", GNUNET_i2s(&peer_count->peer_id), d2->shortname);
+      timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 
DEFAULT_CONNECT_TIMEOUT);
+      if (GNUNET_TIME_relative_to_absolute(timeout).value > 
find_peer_context->endtime.value)
+        {
+          timeout = 
GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime);
+        }
+      GNUNET_TESTING_daemons_connect(d1, d2, timeout, 
DEFAULT_RECONNECT_ATTEMPTS, NULL, NULL);
+    }
+  if (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0)
+    return GNUNET_YES;
+  else
+    return GNUNET_NO;
+}
+
+/**
+ * Callback for iterating over all the peer connections of a peer group.
+ * Used after we have churned on some peers to find which ones have zero
+ * connections so we can make them issue find peer requests.
+ */
+void count_peers_churn_cb (void *cls,
+                           const struct GNUNET_PeerIdentity *first,
+                           const struct GNUNET_PeerIdentity *second,
+                           struct GNUNET_TIME_Relative latency,
+                           uint32_t distance,
+                           const char *emsg)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  struct TopologyIteratorContext *topo_ctx;
+  struct PeerCount *peer_count;
+
+  if ((first != NULL) && (second != NULL))
+    {
+      add_new_connection(find_peer_context, first, second);
+      find_peer_context->current_peers++;
+    }
+  else
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u 
connections)\n",
+                                            find_peer_context->current_peers);
+      peer_count = 
GNUNET_CONTAINER_heap_peek(find_peer_context->peer_min_heap);
+
+      /* WAIT. When peers are churned they will come back with their peers (at 
least in peerinfo), because the HOSTS file doesn't likely get removed. CRAP. */
+      /* NO they won't, because we have disabled peerinfo writing to disk 
(remember?) so we WILL have to give them new connections */
+      /* Best course of action: have DHT automatically try to add peers from 
peerinfo on startup. This way IF peerinfo writes to file
+       * then some peers will end up connected.
+       *
+       * Also, find any peers that have zero connections here and set up a 
task to choose at random another peer in the network to
+       * connect to.  Of course, if they are blacklisted from that peer they 
won't be able to connect, so we will have to keep trying
+       * until they get a peer.
+       */
+      /* However, they won't automatically be connected to any of their 
previous peers... How can we handle that? */
+      /* So now we have choices: do we want them to come back with all their 
connections?  Probably not, but it solves this mess. */
+
+      /* Second problem, which is still a problem, is that a FIND_PEER request 
won't work when a peer has no connections */
+
+      /**
+       * Okay, so here's how this *should* work now.
+       *
+       * 1. We check the min heap for any peers that have 0 connections.
+       *    a. If any are found, we iterate over the heap and just randomly
+       *       choose another peer and ask testing to please connect the two.
+       *       This takes care of the case that a peer just randomly joins the
+       *       network.  However, if there are strict topology restrictions
+       *       (imagine a ring) choosing randomly most likely won't help.
+       *       We make sure the connection attempt doesn't take longer than
+       *       the total timeout, but don't care too much about the result.
+       *    b. After that, we still schedule the find peer requests 
(concurrently
+       *       with the connect attempts most likely).  This handles the case
+       *       that the DHT iterates over peerinfo and just needs to try to 
send
+       *       a message to get connected.  This should handle the case that 
the
+       *       topology is very strict.
+       *
+       * 2. If all peers have > 0 connections, we still send find peer requests
+       *    as long as possible (until timeout is reached) to help out those
+       *    peers that were newly churned and need more connections.  This is 
because
+       *    once all new peers have established a single connection, they 
won't be
+       *    well connected.
+       *
+       * 3. Once we reach the timeout, we can do no more.  We must schedule the
+       *    next iteration of get requests regardless of connections that peers
+       *    may or may not have.
+       *
+       * Caveat: it would be nice to get peers to take data offline with them 
and
+       *         come back with it (or not) based on the testing framework.  
The
+       *         same goes for remembering previous connections, but putting 
either
+       *         into the general testing churn options seems like overkill 
because
+       *         these are very specialized cases.
+       */
+      if ((peer_count->count == 0) && 
(GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0))
+        {
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Found peer with no 
connections, will choose some peers at random to connect to!\n");
+          GNUNET_CONTAINER_heap_iterate (find_peer_context->peer_min_heap, 
&iterate_min_heap_peers, find_peer_context);
+          GNUNET_SCHEDULER_add_now(sched, &schedule_churn_find_peer_requests, 
find_peer_context);
+        }
+      else if 
(GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0)
+        {
+          GNUNET_SCHEDULER_add_now(sched, &schedule_churn_find_peer_requests, 
find_peer_context);
+        }
+      else
+        {
+          GNUNET_CONTAINER_multihashmap_iterate(find_peer_context->peer_hash, 
&remove_peer_count, find_peer_context);
+          GNUNET_CONTAINER_multihashmap_destroy(find_peer_context->peer_hash);
+          GNUNET_CONTAINER_heap_destroy(find_peer_context->peer_min_heap);
+          GNUNET_free(find_peer_context);
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Churn round %u of %llu 
finished, scheduling next GET round.\n", current_churn_round, churn_rounds);
+          if (dhtlog_handle != NULL)
+            {
+              topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
+              topo_ctx->cont = &do_get;
+              topo_ctx->cls = all_gets;
+              topo_ctx->timeout = DEFAULT_GET_TIMEOUT;
+              die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, 
all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                       &end_badly, "from do 
gets (count_peers_churn_cb)");
+              GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, 
topo_ctx);
+            }
+          else
+            {
+              die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, 
all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                       &end_badly, "from do 
gets (count_peers_churn_cb)");
+              GNUNET_SCHEDULER_add_now(sched, &do_get, all_gets);
+            }
+        }
+    }
+}
+
+/**
+ * Called when churning of the topology has finished.
+ *
+ * @param cls closure unused
+ * @param emsg NULL on success, or a printable error on failure
+ */
+static void churn_complete (void *cls, const char *emsg)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  struct PeerCount *peer_count;
+  unsigned int i;
+  struct GNUNET_TESTING_Daemon *temp_daemon;
+  struct TopologyIteratorContext *topo_ctx;
+
+  if (emsg != NULL)
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Ending test, churning of peers 
failed with error `%s'", emsg);
+      GNUNET_SCHEDULER_add_now(sched, &end_badly, (void *)emsg);
+      return;
+    }
+
+  /**
+   * If we switched any peers on, we have to somehow force connect the new 
peer to
+   * SOME bootstrap peer in the network.  First schedule a task to find all 
peers
+   * with no connections, then choose a random peer for each and connect them.
+   */
+  if (find_peer_context != NULL)
+    {
+      for (i = 0; i < num_peers; i ++)
+        {
+          temp_daemon = GNUNET_TESTING_daemon_get(pg, i);
+          peer_count = GNUNET_malloc(sizeof(struct PeerCount));
+          memcpy(&peer_count->peer_id, &temp_daemon->id, sizeof(struct 
GNUNET_PeerIdentity));
+          peer_count->heap_node = 
GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, peer_count, 
peer_count->count);
+          GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, 
&temp_daemon->id.hashPubKey, peer_count, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+        }
+      GNUNET_TESTING_get_topology (pg, &count_peers_churn_cb, 
find_peer_context);
+    }
+  else
+    {
+      if (dhtlog_handle != NULL)
+        {
+          topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
+          topo_ctx->cont = &do_get;
+          topo_ctx->cls = all_gets;
+          topo_ctx->timeout = DEFAULT_GET_TIMEOUT;
+          die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, 
all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                   &end_badly, "from do gets 
(churn_complete)");
+          GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx);
+        }
+      else
+        {
+          die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, 
all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                   &end_badly, "from do gets 
(churn_complete)");
+          if (dhtlog_handle != NULL)
+            dhtlog_handle->insert_round(DHT_ROUND_GET, rounds_finished);
+          GNUNET_SCHEDULER_add_now(sched, &do_get, all_gets);
+        }
+    }
+}
+
+/**
+ * Decide how many peers to turn on or off in this round, make sure the
+ * numbers actually make sense, then do so.  This function sets in motion
+ * churn, find peer requests for newly joined peers, and issuing get
+ * requests once the new peers have done so.
+ *
+ * @param cls closure (unused)
+ * @param cls task context (unused)
+ */
+static void
+churn_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  unsigned int count_running;
+  unsigned int churn_up;
+  unsigned int churn_down;
+  struct GNUNET_TIME_Relative timeout;
+  struct FindPeerContext *find_peer_context;
+
+  churn_up = churn_down = 0;
+  count_running = GNUNET_TESTING_daemons_running(pg);
+  if (count_running > churn_array[current_churn_round])
+    churn_down = count_running - churn_array[current_churn_round];
+  else if (count_running < churn_array[current_churn_round])
+    churn_up = churn_array[current_churn_round] - count_running;
+  else
+    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Not churning any peers, topology 
unchanged.\n");
+
+  if (churn_up > num_peers - count_running)
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Churn file specified %u peers 
(up); only have %u!", churn_array[current_churn_round], num_peers);
+      churn_up = num_peers - count_running;
+    }
+  else if (churn_down > count_running)
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Churn file specified %u peers 
(down); only have %u!", churn_array[current_churn_round], count_running);
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "This will leave NO peers running 
(mistake in churn configuration?)!");
+      churn_down = count_running;
+    }
+  timeout = GNUNET_TIME_relative_multiply(seconds_per_peer_start, churn_up > 0 
? churn_up : churn_down);
+
+  find_peer_context = NULL;
+  if (churn_up > 0) /* Only need to do find peer requests if we turned new 
peers on */
+    {
+      find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext));
+      find_peer_context->count_peers_cb = &count_peers_churn_cb;
+      find_peer_context->previous_peers = 0;
+      find_peer_context->current_peers = 0;
+      find_peer_context->endtime = GNUNET_TIME_relative_to_absolute(timeout);
+    }
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "churn_peers: want %u total, %u 
running, starting %u, stopping %u\n",
+             churn_array[current_churn_round], count_running, churn_up, 
churn_down);
+  GNUNET_TESTING_daemons_churn(pg, churn_down, churn_up, timeout, 
&churn_complete, find_peer_context);
+}
+
+/**
  * Task to release DHT handle associated with GET request.
  */
 static void
@@ -837,22 +1467,78 @@
   GNUNET_DHT_disconnect(test_get->dht_handle);
   test_get->dht_handle = NULL;
 
+  /* Reset the uid (which item to search for) and the daemon (which peer to 
search from) for later get request iterations */
+  test_get->uid = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 
num_puts);
+  test_get->daemon = GNUNET_TESTING_daemon_get(pg, 
GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
+
 #if VERBOSE > 1
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d gets succeeded, %d gets failed!\n", 
gets_completed, gets_failed);
 #endif
   update_meter(get_meter);
   if ((gets_completed + gets_failed == num_gets) && (outstanding_gets == 0))
     {
+      fprintf(stderr, "Canceling die task (get_stop_finished) %llu gets 
completed, %llu gets failed\n", gets_completed, gets_failed);
       GNUNET_SCHEDULER_cancel(sched, die_task);
-      //GNUNET_SCHEDULER_add_now(sched, &finish_testing, NULL);
-      if (dhtlog_handle != NULL)
+      reset_meter(put_meter);
+      reset_meter(get_meter);
+      /**
+       *  Handle all cases:
+       *    1) Testing is completely finished, call the topology iteration 
dealy and die
+       *    2) Testing is not finished, churn the network and do gets again 
(current_churn_round < churn_rounds)
+       *    3) Testing is not finished, reschedule all the PUTS *and* GETS 
again (num_rounds > 1)
+       */
+      if (rounds_finished == total_rounds - 1) /* Everything is finished, end 
testing */
         {
-          topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
-          topo_ctx->cont = &log_dht_statistics;
-          GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx);
+          if (dhtlog_handle != NULL)
+            {
+              topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
+              topo_ctx->cont = &log_dht_statistics;
+              GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, 
topo_ctx);
+            }
+          else
+            GNUNET_SCHEDULER_add_now (sched, &finish_testing, NULL);
         }
-      else
-        GNUNET_SCHEDULER_add_now (sched, &finish_testing, NULL);
+      else if (current_churn_round < churns_per_round * (rounds_finished + 1)) 
/* Do next round of churn */
+        {
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Current churn round %u, real 
round %u, scheduling next round of churn.\n", current_churn_round, 
rounds_finished + 1);
+          gets_completed = 0;
+          gets_failed = 0;
+          current_churn_round++;
+
+          if (dhtlog_handle != NULL)
+            dhtlog_handle->insert_round(DHT_ROUND_CHURN, rounds_finished);
+
+          GNUNET_SCHEDULER_add_now(sched, &churn_peers, NULL);
+        }
+      else if (rounds_finished < total_rounds - 1) /* Start a new complete 
round */
+        {
+          rounds_finished++;
+          gets_completed = 0;
+          gets_failed = 0;
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Round %u of %llu finished, 
scheduling next round.\n", rounds_finished, total_rounds);
+          /** Make sure we only get here after churning appropriately */
+          GNUNET_assert(current_churn_round == churn_rounds);
+          current_churn_round = 0;
+
+          /** We reset the peer daemon for puts and gets on each disconnect, 
so all we need to do is start another round! */
+          if (GNUNET_YES == in_dht_replication) /* Replication done in DHT, 
don't redo puts! */
+            {
+              if (dhtlog_handle != NULL)
+                dhtlog_handle->insert_round(DHT_ROUND_GET, rounds_finished);
+
+              die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
 round_delay), all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                       &end_badly, "from do 
gets (next round)");
+              GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), &do_get, 
all_gets);
+            }
+          else
+            {
+              if (dhtlog_handle != NULL)
+                dhtlog_handle->insert_round(DHT_ROUND_NORMAL, rounds_finished);
+              die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_add(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
 round_delay), GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, num_puts 
* 2)),
+                                                       &end_badly, "from do 
puts");
+              GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), &do_put, 
all_puts);
+            }
+        }
     }
 }
 
@@ -890,18 +1576,14 @@
                           const void *data)
 {
   struct TestGetContext *test_get = cls;
-  GNUNET_HashCode search_key; /* Key stored under */
-  char original_data[test_data_size]; /* Made up data to store */
 
-  memset(original_data, test_get->uid, sizeof(original_data));
-  GNUNET_CRYPTO_hash(original_data, test_data_size, &search_key);
-
   if (test_get->succeeded == GNUNET_YES)
     return; /* Get has already been successful, probably ending now */
 
-  if ((0 != memcmp(&search_key, key, sizeof (GNUNET_HashCode))) || (0 != 
memcmp(original_data, data, sizeof(original_data))))
+  if (0 != memcmp(&known_keys[test_get->uid], key, sizeof (GNUNET_HashCode))) 
/* || (0 != memcmp(original_data, data, sizeof(original_data))))*/
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Key or data is not the same as was 
inserted!\n");
+      gets_completed++;
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Key or data is not the same as 
was inserted!\n");
     }
   else
     {
@@ -933,8 +1615,6 @@
 do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
 {
   struct TestGetContext *test_get = cls;
-  GNUNET_HashCode key; /* Made up key to store data under */
-  char data[test_data_size]; /* Made up data to store */
 
   if (num_gets == 0)
     {
@@ -944,23 +1624,26 @@
   if (test_get == NULL)
     return; /* End of the list */
 
-  memset(data, test_get->uid, sizeof(data));
-  GNUNET_CRYPTO_hash(data, test_data_size, &key);
+  /* Set this here in case we are re-running gets */
+  test_get->succeeded = GNUNET_NO;
 
+  /* Check if more gets are outstanding than should be */
   if (outstanding_gets > max_outstanding_gets)
     {
-      GNUNET_SCHEDULER_add_delayed (sched, get_delay, &do_get, test_get);
+      GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 200), &do_get, 
test_get);
       return;
     }
 
+  /* Connect to the first peer's DHT */
   test_get->dht_handle = GNUNET_DHT_connect(sched, test_get->daemon->cfg, 10);
-  /* Insert the data at the first peer */
   GNUNET_assert(test_get->dht_handle != NULL);
   outstanding_gets++;
+
+  /* Insert the data at the first peer */
   test_get->get_handle = GNUNET_DHT_get_start(test_get->dht_handle,
-                                              
GNUNET_TIME_relative_get_forever(),
+                                              get_delay,
                                               1,
-                                              &key,
+                                              &known_keys[test_get->uid],
                                               &get_result_iterator,
                                               test_get,
                                               &get_continuation,
@@ -971,6 +1654,8 @@
              test_get->daemon->shortname);
 #endif
   test_get->disconnect_task = GNUNET_SCHEDULER_add_delayed(sched, get_timeout, 
&get_stop_task, test_get);
+
+  /* Schedule the next request in the linked list of get requests */
   GNUNET_SCHEDULER_add_now (sched, &do_get, test_get->next);
 }
 
@@ -989,6 +1674,10 @@
   if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT)
     fprintf(stderr, "PUT Request failed!\n");
 
+  /* Reset the daemon (which peer to insert at) for later put request 
iterations */
+  if (replicate_same == GNUNET_NO)
+    test_put->daemon = GNUNET_TESTING_daemon_get(pg, 
GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
+
   GNUNET_SCHEDULER_cancel(sched, test_put->disconnect_task);
   test_put->disconnect_task = GNUNET_SCHEDULER_add_now(sched, 
&put_disconnect_task, test_put);
   if (GNUNET_YES == update_meter(put_meter))
@@ -1002,15 +1691,15 @@
           topo_ctx->cls = all_gets;
           topo_ctx->timeout = DEFAULT_GET_TIMEOUT;
           die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, 
all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
-                                                   &end_badly, "from do gets");
+                                                   &end_badly, "from do gets 
(put finished)");
           GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx);
         }
       else
         {
+          fprintf(stderr, "Scheduling die task (put finished)\n");
           die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout),
-                                                       &end_badly, "from do 
gets");
+                                                   &end_badly, "from do gets 
(put finished)");
           GNUNET_SCHEDULER_add_delayed(sched, DEFAULT_GET_TIMEOUT, &do_get, 
all_gets);
-          GNUNET_SCHEDULER_add_now (sched, &finish_testing, NULL);
         }
       return;
     }
@@ -1023,98 +1712,45 @@
 do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
 {
   struct TestPutContext *test_put = cls;
-  GNUNET_HashCode key; /* Made up key to store data under */
   char data[test_data_size]; /* Made up data to store */
   uint32_t rand;
+  int i;
 
   if (test_put == NULL)
     return; /* End of list */
 
-  memset(data, test_put->uid, sizeof(data));
-  GNUNET_CRYPTO_hash(data, test_data_size, &key);
+  for (i = 0; i < sizeof(data); i++)
+    {
+      memset(&data[i], GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 
(uint32_t)-1), 1);
+    }
 
   if (outstanding_puts > max_outstanding_puts)
     {
-      GNUNET_SCHEDULER_add_delayed (sched, put_delay, &do_put, test_put);
+      GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 200), &do_put, 
test_put);
       return;
     }
 
 #if VERBOSE > 1
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting put for uid %u from peer 
%s\n",
-               test_put->uid,
-               test_put->daemon->shortname);
+                test_put->uid,
+                test_put->daemon->shortname);
 #endif
   test_put->dht_handle = GNUNET_DHT_connect(sched, test_put->daemon->cfg, 10);
 
   GNUNET_assert(test_put->dht_handle != NULL);
   outstanding_puts++;
   GNUNET_DHT_put(test_put->dht_handle,
-                 &key,
+                 &known_keys[test_put->uid],
                  1,
                  sizeof(data), data,
                  GNUNET_TIME_absolute_get_forever(),
-                 GNUNET_TIME_relative_get_forever(),
+                 put_delay,
                  &put_finished, test_put);
   test_put->disconnect_task = GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_get_forever(), &put_disconnect_task, test_put);
   rand = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 2);
   GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, rand), &do_put, 
test_put->next);
 }
 
-/**
- * Context for sending out find peer requests.
- */
-struct FindPeerContext
-{
-  /**
-   * How long to send find peer requests, once the settle time
-   * is over don't send any more out!
-   *
-   * TODO: Add option for settle time and find peer sending time?
-   */
-  struct GNUNET_TIME_Absolute endtime;
-
-  /**
-   * Number of connections in the current topology
-   * (after this round of find peer requests has ended).
-   */
-  unsigned int current_peers;
-
-  /**
-   * Number of connections in the current topology
-   * (before this round of find peer requests started).
-   */
-  unsigned int previous_peers;
-
-  /**
-   * Number of find peer requests we have currently
-   * outstanding.
-   */
-  unsigned int outstanding;
-
-  /**
-   * Number of find peer requests to send in this round.
-   */
-  unsigned int total;
-
-  /**
-   * Number of find peer requests sent last time around.
-   */
-  unsigned int last_sent;
-
-  /**
-   * Hashmap of peers in the current topology, value
-   * is a PeerCount, with the number of connections
-   * this peer has.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *peer_hash;
-
-  /**
-   * Min heap which orders values in the peer_hash for
-   * easy lookup.
-   */
-  struct GNUNET_CONTAINER_Heap *peer_min_heap;
-};
-
 static void
 schedule_find_peer_requests (void *cls, const struct 
GNUNET_SCHEDULER_TaskContext * tc);
 
@@ -1139,63 +1775,8 @@
 
 }
 
-struct PeerCount
-{
-  /** Node in the heap */
-  struct GNUNET_CONTAINER_HeapNode *heap_node;
 
-  /** Peer the count refers to */
-  struct GNUNET_PeerIdentity peer_id;
-
-  /** Count of connections this peer has */
-  unsigned int count;
-};
-
-
 /**
- * Add a connection to the find_peer_context given.  This may
- * be complete overkill, but allows us to choose the peers with
- * the least connections to initiate find peer requests from.
- */
-static void add_new_connection(struct FindPeerContext *find_peer_context,
-                               const struct GNUNET_PeerIdentity *first,
-                               const struct GNUNET_PeerIdentity *second)
-{
-  struct PeerCount *first_count;
-  struct PeerCount *second_count;
-
-  if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, 
&first->hashPubKey))
-  {
-    first_count = 
GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, 
&first->hashPubKey);
-    first_count->count++;
-    GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, 
first_count->heap_node, first_count->count);
-  }
-  else
-  {
-    first_count = GNUNET_malloc(sizeof(struct PeerCount));
-    first_count->count = 1;
-    memcpy(&first_count->peer_id, first, sizeof(struct GNUNET_PeerIdentity));
-    first_count->heap_node = 
GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, first_count, 
first_count->count);
-    GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, 
&first->hashPubKey, first_count, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-  }
-
-  if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, 
&second->hashPubKey))
-  {
-    second_count = 
GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, 
&second->hashPubKey);
-    second_count->count++;
-    GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, 
second_count->heap_node, second_count->count);
-  }
-  else
-  {
-    second_count = GNUNET_malloc(sizeof(struct PeerCount));
-    second_count->count = 1;
-    memcpy(&second_count->peer_id, second, sizeof(struct GNUNET_PeerIdentity));
-    second_count->heap_node = 
GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, second_count, 
second_count->count);
-    GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, 
&second->hashPubKey, second_count, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-  }
-}
-
-/**
  * Callback for iterating over all the peer connections of a peer group.
  */
 void count_peers_cb (void *cls,
@@ -1213,114 +1794,31 @@
     }
   else
     {
-      GNUNET_assert(dhtlog_handle != NULL);
-      /*GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u 
connections), %u new peers, connection estimate %u\n", 
find_peer_context->current_peers, find_peer_context->current_peers - 
find_peer_context->previous_peers, connection_estimate(num_peers, 
DEFAULT_BUCKET_SIZE));*/
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u 
connections), %u new peers, connection estimate %u (double %u)\n",
+                                            find_peer_context->current_peers,
+                                            find_peer_context->current_peers - 
find_peer_context->previous_peers,
+                                            connection_estimate(num_peers, 
DEFAULT_BUCKET_SIZE),
+                                            2 * connection_estimate(num_peers, 
DEFAULT_BUCKET_SIZE));
+
       if ((find_peer_context->current_peers - 
find_peer_context->previous_peers > FIND_PEER_THRESHOLD) &&
-          (find_peer_context->current_peers < connection_estimate(num_peers, 
DEFAULT_BUCKET_SIZE)) &&
+          (find_peer_context->current_peers < 2 * 
connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)) &&
           
(GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0))
         {
           GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, 
find_peer_context);
         }
       else
         {
+          GNUNET_CONTAINER_multihashmap_iterate(find_peer_context->peer_hash, 
&remove_peer_count, find_peer_context);
+          GNUNET_CONTAINER_multihashmap_destroy(find_peer_context->peer_hash);
+          GNUNET_CONTAINER_heap_destroy(find_peer_context->peer_min_heap);
+          GNUNET_free(find_peer_context);
           fprintf(stderr, "Not sending any more find peer requests.\n");
         }
     }
 }
 
-/**
- * Connect to all peers in the peer group and iterate over their
- * connections.
- */
-static void
-count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
-{
-  struct FindPeerContext *find_peer_context = cls;
-  find_peer_context->previous_peers = find_peer_context->current_peers;
-  find_peer_context->current_peers = 0;
-  GNUNET_TESTING_get_topology (pg, &count_peers_cb, find_peer_context);
-}
 
-
-static void
-decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * 
tc)
-{
-  struct TestFindPeer *test_find_peer = cls;
-  GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0);
-  test_find_peer->find_peer_context->outstanding--;
-  test_find_peer->find_peer_context->total--;
-  if ((0 == test_find_peer->find_peer_context->total) &&
-      
(GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value
 > 0))
-  {
-    GNUNET_SCHEDULER_add_now(sched, &count_new_peers, 
test_find_peer->find_peer_context);
-  }
-  GNUNET_free(test_find_peer);
-}
-
 /**
- * A find peer request has been sent to the server, now we will schedule a task
- * to wait the appropriate time to allow the request to go out and back.
- *
- * @param cls closure - a TestFindPeer struct
- * @param tc context the task is being called with
- */
-static void
-handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * 
tc)
-{
-  struct TestFindPeer *test_find_peer = cls;
-
-  GNUNET_DHT_disconnect(test_find_peer->dht_handle);
-  GNUNET_SCHEDULER_add_delayed(sched, 
GNUNET_TIME_relative_divide(find_peer_delay, 2), &decrement_find_peers, 
test_find_peer);
-}
-
-static void
-send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * 
tc)
-{
-  struct TestFindPeer *test_find_peer = cls;
-
-  if (test_find_peer->find_peer_context->outstanding > 
max_outstanding_find_peers)
-  {
-    GNUNET_SCHEDULER_add_delayed(sched, find_peer_offset, 
&send_find_peer_request, test_find_peer);
-    return;
-  }
-
-  test_find_peer->find_peer_context->outstanding++;
-  if 
(GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value
 == 0)
-  {
-    GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer);
-    return;
-  }
-
-  test_find_peer->dht_handle = GNUNET_DHT_connect(sched, 
test_find_peer->daemon->cfg, 1);
-  GNUNET_assert(test_find_peer->dht_handle != NULL);
-  GNUNET_DHT_find_peers (test_find_peer->dht_handle,
-                         &handle_find_peer_sent, test_find_peer);
-}
-
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int remove_peer_count (void *cls,
-                              const GNUNET_HashCode * key,
-                              void *value)
-{
-  struct FindPeerContext *find_peer_ctx = cls;
-  struct PeerCount *peer_count = value;
-  GNUNET_CONTAINER_heap_remove_node(find_peer_ctx->peer_min_heap, 
peer_count->heap_node);
-  GNUNET_free(peer_count);
-
-  return GNUNET_YES;
-}
-
-
-/**
  * Set up a single find peer request for each peer in the topology.  Do this
  * until the settle time is over, limited by the number of outstanding requests
  * and the time allowed for each one!
@@ -1352,7 +1850,7 @@
     find_peer_ctx->total = max_outstanding_find_peers;
 
   find_peer_ctx->last_sent = find_peer_ctx->total;
-  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (goal 
%u connections)\n", find_peer_ctx->total, connection_estimate(num_peers, 
DEFAULT_BUCKET_SIZE));
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (goal 
at least %u connections)\n", find_peer_ctx->total, 
connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));
 
   find_peer_offset = GNUNET_TIME_relative_divide(find_peer_delay, 
find_peer_ctx->total);
   for (i = 0; i < find_peer_ctx->total; i++)
@@ -1420,13 +1918,16 @@
   uint32_t temp_daemon;
   struct TestPutContext *test_put;
   struct TestGetContext *test_get;
+#if REMEMBER
   int remember[num_puts][num_peers];
-
   memset(&remember, 0, sizeof(int) * num_puts * num_peers);
+#endif
+  known_keys = GNUNET_malloc(sizeof(GNUNET_HashCode) * num_puts);
   for (i = 0; i < num_puts; i++)
     {
       test_put = GNUNET_malloc(sizeof(struct TestPutContext));
       test_put->uid = i;
+      GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, 
&known_keys[i]);
       temp_daemon = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 
num_peers);
       test_put->daemon = GNUNET_TESTING_daemon_get(pg, temp_daemon);
       test_put->next = all_puts;
@@ -1437,11 +1938,12 @@
     {
       test_get = GNUNET_malloc(sizeof(struct TestGetContext));
       test_get->uid = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 
num_puts);
-      temp_daemon = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 
num_peers);
+#if REMEMBER
       while (remember[test_get->uid][temp_daemon] == 1)
         temp_daemon = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 
num_peers);
-      test_get->daemon = GNUNET_TESTING_daemon_get(pg, temp_daemon);
       remember[test_get->uid][temp_daemon] = 1;
+#endif
+      test_get->daemon = GNUNET_TESTING_daemon_get(pg, 
GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
       test_get->next = all_gets;
       all_gets = test_get;
     }
@@ -1488,6 +1990,7 @@
   if (GNUNET_YES == do_find_peer)
   {
     find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext));
+    find_peer_context->count_peers_cb = &count_peers_cb;
     find_peer_context->endtime = 
GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
 settle_time));
     GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, 
find_peer_context);
   }
@@ -1627,20 +2130,22 @@
       GNUNET_SCHEDULER_add_now (sched, &set_malicious, ctx);
     }
 
+  /**
+   * If we have any malicious peers to set up,
+   * the malicious callback should call continue_gets_and_puts
+   */
   if (malicious_getters + malicious_putters + malicious_droppers > 0)
-    die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, (malicious_getters + 
malicious_putters + malicious_droppers) * 2),
-                                             &end_badly, "from set malicious");
-  else
     {
-      if (dhtlog_handle != NULL)
-        GNUNET_SCHEDULER_add_now (sched,
-                                  &continue_puts_and_gets, NULL);
-      else
-        GNUNET_SCHEDULER_add_delayed (sched,
-                                    
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time),
-                                    &continue_puts_and_gets, NULL);
+      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Giving malicious set tasks some 
time before starting testing!\n");
+      die_task = GNUNET_SCHEDULER_add_delayed (sched, 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, (malicious_getters + 
malicious_putters + malicious_droppers) * 2),
+                                               &end_badly, "from set 
malicious");
     }
-
+  else /* Otherwise, continue testing */
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Scheduling continue_puts_and_gets 
now!\n");
+      GNUNET_SCHEDULER_add_now (sched,
+                                &continue_puts_and_gets, NULL);
+    }
 }
 
 /**
@@ -1674,15 +2179,16 @@
                  distance);
 #endif
     }
-#if VERBOSE
   else
     {
       failed_connections++;
+#if VERBOSE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to connect peer %s to peer 
%s with error :\n%s\n",
                   first_daemon->shortname,
                   second_daemon->shortname, emsg);
+#endif
     }
-#endif
+
   GNUNET_assert(peer_connect_meter != NULL);
   if (GNUNET_YES == update_meter(peer_connect_meter))
     {
@@ -1855,6 +2361,7 @@
   struct GNUNET_DHTLOG_TrialInfo trial_info;
   struct GNUNET_TESTING_Host *hosts;
   struct GNUNET_TESTING_Host *temphost;
+  struct GNUNET_TESTING_Host *tempnext;
   char *topology_str;
   char *connect_topology_str;
   char *blacklist_topology_str;
@@ -1872,10 +2379,15 @@
   int strict_kademlia;
   char *buf;
   char *data;
+  char *churn_data;
+  char *churn_filename;
   int count;
+  int ret;
+  unsigned int line_number;
 
   sched = s;
   config = cfg;
+  rounds_finished = 0;
   memset(&trial_info, 0, sizeof(struct GNUNET_DHTLOG_TrialInfo));
   /* Get path from configuration file */
   if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_string(cfg, "paths", 
"servicehome", &test_directory))
@@ -1917,6 +2429,100 @@
                                              &trialmessage))
     trialmessage = NULL;
 
+  churn_data = NULL;
+  /** Check for a churn file to do churny simulation */
+  if (GNUNET_OK ==
+      GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "churn_file",
+                                            &churn_filename))
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Reading churn data from %s\n", 
churn_filename);
+      if (GNUNET_OK != GNUNET_DISK_file_test (churn_filename))
+        {
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Error reading churn file!\n");
+          return;
+        }
+      if ((0 != STAT (churn_filename, &frstat)) || (frstat.st_size == 0))
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                      "Could not open file specified for churn data, ending 
test!");
+          ok = 1119;
+          GNUNET_free_non_null(trialmessage);
+          GNUNET_free(churn_filename);
+          return;
+        }
+
+      churn_data = GNUNET_malloc_large (frstat.st_size);
+      GNUNET_assert(churn_data != NULL);
+      if (frstat.st_size !=
+          GNUNET_DISK_fn_read (churn_filename, churn_data, frstat.st_size))
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                    "Could not read file %s specified for churn, ending 
test!", churn_filename);
+          GNUNET_free (churn_filename);
+          GNUNET_free (churn_data);
+          GNUNET_free_non_null(trialmessage);
+          return;
+        }
+
+      GNUNET_free_non_null(churn_filename);
+
+      buf = churn_data;
+      count = 0;
+      /* Read the first line */
+      while (count < frstat.st_size)
+        {
+          count++;
+          if (((churn_data[count] == '\n')) && (buf != &churn_data[count]))
+            {
+              churn_data[count] = '\0';
+              if (1 != sscanf(buf, "%u", &churn_rounds))
+                {
+                  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to read number 
of rounds from %s, ending test!\n", churn_filename);
+                  GNUNET_free_non_null(trialmessage);
+                  GNUNET_free(churn_filename);
+                  ret = 4200;
+                  return;
+                }
+              GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Read %u rounds from churn 
file\n", churn_rounds);
+              buf = &churn_data[count + 1];
+              churn_array = GNUNET_malloc(sizeof(unsigned int) * churn_rounds);
+            }
+        }
+
+      if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number(cfg, 
"dht_testing", "churns_per_round", &churns_per_round))
+        {
+          churns_per_round = (unsigned long long)churn_rounds;
+        }
+
+      line_number = 0;
+      while ((count < frstat.st_size) && (line_number < churn_rounds))
+        {
+          count++;
+          if (((churn_data[count] == '\n')) && (buf != &churn_data[count]))
+            {
+              churn_data[count] = '\0';
+
+              ret = sscanf(buf, "%u", &churn_array[line_number]);
+              if (1 == ret)
+                {
+                  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Read %u peers in 
round %u\n", churn_array[line_number], line_number);
+                  line_number++;
+                }
+              else
+                {
+                  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Error reading line 
`%s' in hostfile\n", buf);
+                  buf = &churn_data[count + 1];
+                  continue;
+                }
+              buf = &churn_data[count + 1];
+            }
+          else if (churn_data[count] == '\n') /* Blank line */
+            buf = &churn_data[count + 1];
+        }
+    }
+  GNUNET_free_non_null(churn_data);
+
+  /** Check for a hostfile containing address@hidden:port triples */
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_string (cfg, "testing", "hostfile",
                                              &hostfile))
@@ -1959,11 +2565,24 @@
     while (count < frstat.st_size)
       {
         count++;
-        if (((data[count] == '\n') || (data[count] == '\0')) && (buf != 
&data[count]))
+        /* if (((data[count] == '\n') || (data[count] == '\0')) && (buf != 
&data[count]))*/
+        if (((data[count] == '\n')) && (buf != &data[count]))
           {
             data[count] = '\0';
             temphost = GNUNET_malloc(sizeof(struct GNUNET_TESTING_Host));
-            temphost->hostname = buf;
+            ret = sscanf(buf, "address@hidden:%hd", &temphost->username, 
&temphost->hostname, &temphost->port);
+            if (3 == ret)
+              {
+                GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Successfully read host 
%s, port %d and user %s from file\n", temphost->hostname, temphost->port, 
temphost->username);
+              }
+            else
+              {
+                GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Error reading line `%s' 
in hostfile\n", buf);
+                GNUNET_free(temphost);
+                buf = &data[count + 1];
+                continue;
+              }
+            /* temphost->hostname = buf; */
             temphost->next = hosts;
             hosts = temphost;
             buf = &data[count + 1];
@@ -2043,7 +2662,7 @@
                                                &temp_config_number))
     all_get_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 
temp_config_number);
   else
-    all_get_timeout.value = get_timeout.value * ((num_gets / 
max_outstanding_gets) + 1);
+    all_get_timeout.value = get_timeout.value * num_gets;
 
   if (GNUNET_OK ==
         GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "get_delay",
@@ -2076,6 +2695,10 @@
   /**
    * Get testing related options.
    */
+  if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "DHT_TESTING", 
"REPLICATE_SAME"))
+    {
+      replicate_same = GNUNET_YES;
+    }
 
   if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT_TESTING",
                                                           
"MALICIOUS_GET_FREQUENCY",
@@ -2088,15 +2711,26 @@
                                                           
&malicious_put_frequency))
     malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
 
+
+  /* The normal behavior of the DHT is to do find peer requests
+   * on its own.  Only if this is explicitly turned off should
+   * the testing driver issue find peer requests (even though
+   * this is likely the default when testing).
+   */
   if (GNUNET_NO ==
         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
-                                             "find_peers"))
+                                             "do_find_peer"))
     {
-      do_find_peer = GNUNET_NO;
+      do_find_peer = GNUNET_YES;
     }
-  else
-    do_find_peer = GNUNET_YES;
 
+  if (GNUNET_YES ==
+        GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
+                                             "republish"))
+    {
+      in_dht_replication = GNUNET_YES;
+    }
+
   if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_number (cfg, "DHT_TESTING",
                                                           "TRIAL_TO_RUN",
                                                           &trial_to_run))
@@ -2113,6 +2747,9 @@
   else
     find_peer_delay = DEFAULT_FIND_PEER_DELAY;
 
+  if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_number(cfg, "DHT_TESTING", 
"ROUND_DELAY", &round_delay))
+    round_delay = 0;
+
   if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT_TESTING",
                                                             
"OUTSTANDING_FIND_PEERS",
                                                             
&max_outstanding_find_peers))
@@ -2123,6 +2760,13 @@
 
   find_peer_offset = GNUNET_TIME_relative_divide (find_peer_delay, 
max_outstanding_find_peers);
 
+  if (GNUNET_SYSERR ==
+        GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", 
"num_rounds",
+                                               &total_rounds))
+    {
+      total_rounds = 1;
+    }
+
   topology_str = NULL;
   if ((GNUNET_YES ==
       GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "topology",
@@ -2214,6 +2858,7 @@
   /* Set peers_left so we know when all peers started */
   peers_left = num_peers;
 
+
   /* Set up a task to end testing if peer start fails */
   die_task = GNUNET_SCHEDULER_add_delayed (sched,
                                            
GNUNET_TIME_relative_multiply(seconds_per_peer_start, num_peers),
@@ -2268,8 +2913,15 @@
                                      &peers_started_callback, NULL,
                                      &topology_callback, NULL,
                                      hosts);
-
-  GNUNET_free_non_null(temphost);
+  temphost = hosts;
+  while (temphost != NULL)
+    {
+      tempnext = temphost->next;
+      GNUNET_free (temphost->username);
+      GNUNET_free (temphost->hostname);
+      GNUNET_free (temphost);
+      temphost = tempnext;
+    }
 }
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]