gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r8762 - GNUnet/src/applications/dv_dht/module


From: gnunet
Subject: [GNUnet-SVN] r8762 - GNUnet/src/applications/dv_dht/module
Date: Tue, 21 Jul 2009 12:03:30 -0600

Author: nevans
Date: 2009-07-21 12:03:30 -0600 (Tue, 21 Jul 2009)
New Revision: 8762

Modified:
   GNUnet/src/applications/dv_dht/module/routing.c
   GNUnet/src/applications/dv_dht/module/table.c
Log:
Using heap and hashmap for efficient return route storage


Modified: GNUnet/src/applications/dv_dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dv_dht/module/routing.c     2009-07-21 18:02:00 UTC 
(rev 8761)
+++ GNUnet/src/applications/dv_dht/module/routing.c     2009-07-21 18:03:30 UTC 
(rev 8762)
@@ -43,7 +43,7 @@
 /**
  * What is the request priority for DV_DHT operations?
  */
-#define DV_DHT_PRIORITY 0
+#define DV_DHT_PRIORITY GNUNET_EXTREME_PRIORITY / 4
 
 /*
  * Number of hash functions for bloom filter
@@ -61,7 +61,7 @@
  * (this is how much we will request from the GNUnet core);
  * Must not be zero!
  */
-#define DV_DHT_DELAY (500 * GNUNET_CRON_MILLISECONDS)
+#define DV_DHT_DELAY (2500 * GNUNET_CRON_MILLISECONDS)
 
 /**
  * What is the maximum number of results returned by any DV_DHT
@@ -210,16 +210,32 @@
 
 } DV_DHTQueryRecord;
 
-/**
- * Array of active records.
+/*
+ * DV_DHT Routing results structure
  */
-static DV_DHTQueryRecord *records;
+typedef struct DV_DHTResults
+{
+  /*
+   * Min heap for removal upon reaching limit
+   */
+  struct GNUNET_CONTAINER_Heap *minHeap;
 
+  /*
+   * Hashmap for fast key based lookup
+   */
+  struct GNUNET_MultiHashMap *hashmap;
+
+} DV_DHTResults;
+
+/*
+ * Container of active records
+ */
+static DV_DHTResults new_records;
+
 /**
  * Size of records
  */
 static unsigned int rt_size;
-static unsigned int next_record;
 
 #if DEBUG_INSANE
 static unsigned int indentation;
@@ -346,7 +362,7 @@
   struct GNUNET_BloomFilter *bloom;
   unsigned int routed;
   unsigned int tracked;
-  unsigned int i;
+
   int match;
   int cost;
   DV_DHT_Source_Route *pos;
@@ -438,24 +454,17 @@
   tracked = 0;
   GNUNET_mutex_lock (lock);
 
-  for (i = 0; i < rt_size; i++)
+  if (GNUNET_multi_hash_map_contains (new_records.hashmap, key))
     {
-      q = &records[i];
-      tracked++;
-      if ((ntohl (q->get.type) != type)
-          || (0 != memcmp (key, &q->get.key, sizeof (GNUNET_HashCode))))
-        continue;
-      else
-        {
+      q = GNUNET_multi_hash_map_get (new_records.hashmap, key);
 #if DEBUG_ROUTING
-          GNUNET_hash_to_enc (&q->get.key, &enc);
-          GNUNET_GE_LOG (coreAPI->ectx,
-                         GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
-                         GNUNET_GE_USER | GNUNET_GE_BULK,
-                         "Found matching request for reply `%s'\n", &enc);
+      GNUNET_hash_to_enc (&q->get.key, &enc);
+      GNUNET_GE_LOG (coreAPI->ectx,
+                     GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
+                     GNUNET_GE_USER | GNUNET_GE_BULK,
+                     "Found matching request (in hashmap) for reply `%s'\n",
+                     &enc);
 #endif
-        }
-      routed++;
       pos = q->sources;
       prev = NULL;
       while (pos != NULL)
@@ -469,33 +478,42 @@
               GNUNET_GE_LOG (coreAPI->ectx,
                              GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
                              GNUNET_GE_USER | GNUNET_GE_BULK,
-                             "Routing result to `%s'\n", &enc);
+                             "Routing result (in hashmap) to `%s'\n", &enc);
 #endif
               match = GNUNET_NO;
-              match = GNUNET_bloomfilter_test (bloom, &pos->source.hashPubKey);
+              match =
+                GNUNET_bloomfilter_test (bloom, &pos->source.hashPubKey);
               if (match == GNUNET_YES)
-              {
-                pos = pos->next;
-                continue;
-              }
+                {
+                  pos = pos->next;
+                  continue;
+                }
 
               cost = dvapi->dv_send (&pos->source,
-                              &result->header, DV_DHT_PRIORITY, DV_DHT_DELAY);
+                                     &result->header, DV_DHT_PRIORITY,
+                                     DV_DHT_DELAY);
 
+              /* Need to change this piece, because we may want to try another 
path for the reply in the case of failure from
+               * the DV subsystem. (Likely to another close peer that may know 
of this request)
+               */
+              if (cost == GNUNET_SYSERR)
+                break;
 
+
               if ((debug_routes_extended) && (dhtlog != NULL))
                 {
                   queryuid = ntohl (result->queryuid);
                   dhtlog->insert_route (NULL, queryuid,
                                         DHTLOG_RESULT,
-                                        ntohl (result->hop_count), cost, 
GNUNET_NO,
-                                        coreAPI->my_identity, key, NULL,
-                                        &pos->source);
+                                        ntohl (result->hop_count), cost,
+                                        GNUNET_NO, coreAPI->my_identity, key,
+                                        NULL, &pos->source);
                 }
 
               if (stats != NULL)
                 stats->change (stat_replies_routed, 1);
             }
+
           if ((pos->receiver != NULL) && (pos->received != GNUNET_YES))
             {
 #if DEBUG_ROUTING
@@ -519,9 +537,9 @@
                   queryuid = ntohl (result->queryuid);
                   dhtlog->insert_route (NULL, queryuid,
                                         DHTLOG_RESULT,
-                                        ntohl (result->hop_count), 0, 
GNUNET_YES,
-                                        coreAPI->my_identity, key, NULL,
-                                        NULL);
+                                        ntohl (result->hop_count), 0,
+                                        GNUNET_YES, coreAPI->my_identity, key,
+                                        NULL, NULL);
                 }
               if (stats != NULL)
                 stats->change (stat_replies_routed, 1);
@@ -560,9 +578,13 @@
   unsigned int diameter;
   unsigned int hops;
   struct DV_DHT_Source_Route *pos;
+  unsigned int routes_size;
+  unsigned int heap_size;
+  GNUNET_CronTime now;
 
   hops = ntohl (get->hop_count);
   diameter = GNUNET_DV_DHT_estimate_network_diameter ();
+  now = GNUNET_get_time ();
   /*if (hops > 2 * diameter) */
   if (hops > 2 * diameter)
     {
@@ -572,26 +594,63 @@
       return GNUNET_SYSERR;
     }
 
-  GNUNET_mutex_lock (lock);
-
-  q = &records[next_record];
+  routes_size = GNUNET_multi_hash_map_size (new_records.hashmap);
+  heap_size = GNUNET_CONTAINER_heap_get_size (new_records.minHeap);
+  if (routes_size != heap_size)
+    {
 #if DEBUG_ROUTING
-  GNUNET_GE_LOG (coreAPI->ectx,
-                 GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
-                 GNUNET_GE_BULK, "Tracking request in slot %u\n",
-                 next_record);
+      GNUNET_GE_LOG (coreAPI->ectx,
+                     GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+                     GNUNET_GE_BULK,
+                     "Size of record hash map %u, size of heap %u. Bad!\n",
+                     routes_size,
+                     GNUNET_CONTAINER_heap_get_size (new_records.minHeap));
 #endif
-  if (q->sources != NULL)
+      return GNUNET_SYSERR;
+    }
+  GNUNET_mutex_lock (lock);
+  while (routes_size >= (rt_size - 1))
     {
-      while (q->sources != NULL)
+      q = GNUNET_CONTAINER_heap_remove_root (new_records.minHeap);
+      if (q->sources != NULL)
         {
-          pos = q->sources;
-          q->sources = pos->next;
-          GNUNET_free (pos);
+          while (q->sources != NULL)
+            {
+              pos = q->sources;
+              q->sources = pos->next;
+              GNUNET_free (pos);
+            }
+          GNUNET_array_grow (q->results, q->result_count, 0);
         }
-      GNUNET_array_grow (q->results, q->result_count, 0);
+      GNUNET_multi_hash_map_remove_all (new_records.hashmap, &q->get.key);
     }
 
+  routes_size = GNUNET_multi_hash_map_size (new_records.hashmap);
+  heap_size = GNUNET_CONTAINER_heap_get_size (new_records.minHeap);
+  if (routes_size != heap_size)
+    {
+#if DEBUG_ROUTING
+      GNUNET_GE_LOG (coreAPI->ectx,
+                     GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+                     GNUNET_GE_BULK,
+                     "Size of record hash map %u, size of heap %u. Bad!\n",
+                     routes_size,
+                     GNUNET_CONTAINER_heap_get_size (new_records.minHeap));
+#endif
+      return GNUNET_SYSERR;
+    }
+
+  if (GNUNET_multi_hash_map_contains (new_records.hashmap, &get->key))
+    {
+      q = GNUNET_multi_hash_map_get (new_records.hashmap, &get->key);
+      GNUNET_CONTAINER_heap_remove_node (new_records.minHeap, q);
+    }
+  else
+    {
+      q = GNUNET_malloc (sizeof (DV_DHTQueryRecord));
+      q->sources = NULL;
+    }
+
   q->get = *get;
   pos = GNUNET_malloc (sizeof (DV_DHT_Source_Route));
   pos->next = q->sources;
@@ -603,16 +662,9 @@
   pos->receiver = handler;
   pos->receiver_closure = cls;
 
-  /* We loop through the records, allows peer
-   * to control how many concurrent responses
-   * are known about.
-   */
-  if (next_record == rt_size - 1)
-    {
-      next_record = 0;
-    }
-  else
-    next_record++;
+  GNUNET_CONTAINER_heap_insert (new_records.minHeap, q, now);
+  GNUNET_multi_hash_map_put (new_records.hashmap, &get->key, q,
+                             GNUNET_MultiHashMapOption_REPLACE);
 
   GNUNET_mutex_unlock (lock);
   if (stats != NULL)
@@ -706,8 +758,9 @@
         {
           queryuid = ntohl (get->queryuid);
           dhtlog->insert_route (NULL, ntohl (get->queryuid), DHTLOG_GET,
-                                hop_count, 0, GNUNET_YES, coreAPI->my_identity,
-                                &get->key, sender, NULL);
+                                hop_count, 0, GNUNET_YES,
+                                coreAPI->my_identity, &get->key, sender,
+                                NULL);
         }
     }
 
@@ -764,13 +817,19 @@
                      "Forwarding DV_DHT GET request to peer `%s'.\n", &enc);
 #endif
 
-      cost = dvapi->dv_send (&next[j], &aget.header, DV_DHT_PRIORITY, 
DV_DHT_DELAY);
+      cost =
+        dvapi->dv_send (&next[j], &aget.header, DV_DHT_PRIORITY,
+                        DV_DHT_DELAY);
+      if (cost == GNUNET_SYSERR)
+        continue;
+
       if ((debug_routes_extended) && (dhtlog != NULL))
         {
           queryuid = ntohl (get->queryuid);
           dhtlog->insert_route (NULL, ntohl (get->queryuid), DHTLOG_GET,
-                                hop_count, cost, GNUNET_NO, 
coreAPI->my_identity,
-                                &get->key, sender, &next[j]);
+                                hop_count, cost, GNUNET_NO,
+                                coreAPI->my_identity, &get->key, sender,
+                                &next[j]);
         }
       j++;
     }
@@ -859,17 +918,25 @@
                      GNUNET_GE_BULK,
                      "Forwarding DV_DHT PUT request to peer `%s'.\n", &enc);
 #endif
-      cost = dvapi->dv_send (&next[j], &aput->header, DV_DHT_PRIORITY, 
DV_DHT_DELAY);
+      cost =
+        dvapi->dv_send (&next[j], &aput->header, DV_DHT_PRIORITY,
+                        DV_DHT_DELAY);
+
 #if DEBUG_ROUTING
-  if (cost == GNUNET_SYSERR)
-  {
-      GNUNET_hash_to_enc (&next[j].hashPubKey, &enc);
-      GNUNET_GE_LOG (coreAPI->ectx,
-                     GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
-                     GNUNET_GE_BULK,
-                     "Forwarding DV_DHT PUT request FAILED (dv unknown) to 
peer `%s'.\n", &enc);
-  }
+      if (cost == GNUNET_SYSERR)
+        {
+          GNUNET_hash_to_enc (&next[j].hashPubKey, &enc);
+          GNUNET_GE_LOG (coreAPI->ectx,
+                         GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER
+                         | GNUNET_GE_BULK,
+                         "Forwarding DV_DHT PUT request FAILED (dv unknown) to 
peer `%s'.\n",
+                         &enc);
+        }
 #endif
+
+      if (cost == GNUNET_SYSERR)
+        continue;
+
       if ((debug_routes_extended) && (dhtlog != NULL))
         {
           queryuid = ntohl (put->queryuid);
@@ -1032,50 +1099,39 @@
                         unsigned int type, GNUNET_ResultProcessor handler,
                         void *cls)
 {
-  unsigned int i;
   struct DV_DHT_Source_Route *pos;
-  struct DV_DHT_Source_Route *prev;
   int done;
   unsigned int records_removed;
+  DV_DHTQueryRecord *q;
 
   done = GNUNET_NO;
   GNUNET_mutex_lock (lock);
   records_removed = 0;
-  for (i = 0; i < rt_size; i++)
+  while (GNUNET_YES ==
+         GNUNET_multi_hash_map_contains (new_records.hashmap, key))
     {
-      prev = NULL;
-      pos = records[i].sources;
-      while (pos != NULL)
+      q = GNUNET_multi_hash_map_get (new_records.hashmap, key);
+      if (q->sources != NULL)
         {
-          if ((pos->receiver == handler) &&
-              (pos->receiver_closure == cls) &&
-              (0 == memcmp (key,
-                            &records[i].get.key, sizeof (GNUNET_HashCode)))
-                            &&
-              (0 == memcmp (&pos->source.hashPubKey,
-                            &coreAPI->my_identity->hashPubKey, sizeof 
(GNUNET_HashCode))))
-          {
-              if (prev == NULL)
-                records[i].sources = pos->next;
-              else
-                prev->next = pos->next;
+          while (q->sources != NULL)
+            {
+              pos = q->sources;
+              q->sources = pos->next;
               GNUNET_free (pos);
-              records_removed++;
-              break;
-          }
-          prev = pos;
-          pos = prev->next;
+            }
+          GNUNET_array_grow (q->results, q->result_count, 0);
         }
-      if (records[i].sources == NULL)
-        {
-          GNUNET_array_grow (records[i].results, records[i].result_count, 0);
-        }
+      GNUNET_multi_hash_map_remove (new_records.hashmap, key, q);
+      GNUNET_CONTAINER_heap_remove_node (new_records.minHeap, q);
+      records_removed++;
     }
+
   GNUNET_mutex_unlock (lock);
 #if DEBUG_ROUTING
   GNUNET_GE_LOG (coreAPI->ectx,
                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
-                 GNUNET_GE_BULK, "Removed %u total records\n", 
records_removed);
+                 GNUNET_GE_BULK, "Removed %u total records\n",
+                 records_removed);
 #endif
   if (done != GNUNET_YES)
     return GNUNET_SYSERR;
@@ -1180,8 +1236,12 @@
   dvapi = coreAPI->service_request ("dv");
   if (dvapi == NULL)
     return GNUNET_SYSERR;
-  GNUNET_array_grow (records, rt_size, rts);
 
+  rt_size = (unsigned int) rts;
+
+  new_records.hashmap = GNUNET_multi_hash_map_create ((unsigned int) rts);
+  new_records.minHeap = GNUNET_CONTAINER_heap_create (GNUNET_MIN_HEAP);
+
   lock = GNUNET_mutex_create (GNUNET_NO);
   stats = capi->service_request ("stats");
   if (stats != NULL)
@@ -1284,8 +1344,6 @@
 int
 GNUNET_DV_DHT_done_routing ()
 {
-  unsigned int i;
-  struct DV_DHT_Source_Route *pos;
 
   coreAPI->send_callback_unregister (sizeof (DV_DHT_MESSAGE),
                                      &extra_get_callback);
@@ -1313,18 +1371,10 @@
       dvapi = NULL;
     }
   GNUNET_mutex_destroy (lock);
-  for (i = 0; i < rt_size; i++)
-    {
-      while (records[i].sources != NULL)
-        {
-          pos = records[i].sources;
-          records[i].sources = pos->next;
-          GNUNET_free (pos);
-        }
-      GNUNET_array_grow (records[i].results, records[i].result_count, 0);
-    }
-  GNUNET_array_grow (records, rt_size, 0);
+
   coreAPI->service_release (dstore);
+  GNUNET_multi_hash_map_destroy (new_records.hashmap);
+  GNUNET_CONTAINER_heap_destroy (new_records.minHeap);
   return GNUNET_OK;
 }
 

Modified: GNUnet/src/applications/dv_dht/module/table.c
===================================================================
--- GNUnet/src/applications/dv_dht/module/table.c       2009-07-21 18:02:00 UTC 
(rev 8761)
+++ GNUnet/src/applications/dv_dht/module/table.c       2009-07-21 18:03:30 UTC 
(rev 8762)
@@ -67,19 +67,12 @@
  * How often should the cron job for maintaining the DV_DHT
  * run?
  */
-#define MAINTAIN_FREQUENCY 5000 * GNUNET_CRON_MILLISECONDS
+#define MAINTAIN_FREQUENCY 10000 * GNUNET_CRON_MILLISECONDS
 
 /**
- * What is the chance (1 in XXX) that we send DISCOVERY messages
- * to another peer?
- */
-#define MAINTAIN_CHANCE (10 + 75 * total_peers)
-/*#define MAINTAIN_CHANCE (10 + 100 * total_peers)*/
-
-/**
  * How long can a peer be inactive before we time it out?
  */
-#define MAINTAIN_PEER_TIMEOUT MAINTAIN_FREQUENCY * MAINTAIN_CHANCE * 4
+#define MAINTAIN_PEER_TIMEOUT MAINTAIN_FREQUENCY * 4
 
 /**
  * What is the maximum number of known DV_DHT-enabled peers
@@ -329,10 +322,7 @@
   i = bucketCount - 1;
   while ((buckets[i].bstart > index) && (i > 0))
     i--;
-  GNUNET_GE_LOG (coreAPI->ectx,
-                 GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
-                 GNUNET_GE_BULK, "index is %d, bucket start is %d\n", index,
-                 buckets[i].bstart);
+
   if ((buckets[i].bstart <= index) && (buckets[i].bend >= index))
     return &buckets[i];
   GNUNET_GE_BREAK (NULL, 0);
@@ -463,7 +453,8 @@
               largest_distance)
             {
               chosen = bucket->peers[ec];
-              largest_distance = inverse_distance (target, &pi->id.hashPubKey);
+              largest_distance =
+                inverse_distance (target, &pi->id.hashPubKey);
             }
         }
     }
@@ -584,7 +575,8 @@
               largest_distance)
             {
               chosen = bucket->peers[ec];
-              largest_distance = inverse_distance (target, &pi->id.hashPubKey);
+              largest_distance =
+                inverse_distance (target, &pi->id.hashPubKey);
             }
         }
     }
@@ -654,7 +646,7 @@
 {
 
   GNUNET_PeerIdentity closest;
-  memset(&closest, 0, sizeof(GNUNET_PeerIdentity));
+  memset (&closest, 0, sizeof (GNUNET_PeerIdentity));
   find_closest_peer (&closest, target);
   if (&closest == NULL)
     return GNUNET_SYSERR;
@@ -679,7 +671,8 @@
                  inverse_distance (target,
                                    &coreAPI->my_identity->hashPubKey));
   if (inverse_distance (target, &coreAPI->my_identity->hashPubKey) >=
-      inverse_distance (target, &closest.hashPubKey) && (inverse_distance 
(target, &coreAPI->my_identity->hashPubKey) > 0))
+      inverse_distance (target, &closest.hashPubKey)
+      && (inverse_distance (target, &coreAPI->my_identity->hashPubKey) > 0))
     {
       return GNUNET_YES;
     }
@@ -915,16 +908,6 @@
 #endif
 
   considerPeer (other, other);
-  /*
-     if (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, MAINTAIN_CHANCE) > 
(MAINTAIN_CHANCE / 2))
-     {
-     #if DEBUG_TABLE
-     print_exit ("broadcast_dht_discovery_prob");
-     #endif
-     return;
-     } */
-  /*fprintf(stderr, "sending discovery message\n");
-     broadcast_dht_discovery (other, cls); */
 
 #if DEBUG_TABLE
   print_exit ("broadcast_dht_discovery_prob");
@@ -941,18 +924,8 @@
 #if DEBUG_TABLE
   print_entry ("maintain_dht_job");
 #endif
-  P2P_DV_DHT_Discovery disc;
-  if (total_peers == 0)
-    {
-      disc.header.size = htons (sizeof (P2P_DV_DHT_Discovery));
-      disc.header.type = htons (GNUNET_P2P_PROTO_DHT_DISCOVERY);
-      disc.space_available = -1;        /* FIXME */
-      dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, &disc);
-    }
-  else
-    {
-      dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, NULL);
-    }
+  dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, NULL);
+
 #if DEBUG_TABLE
   print_exit ("maintain_dht_job");
 #endif
@@ -1191,8 +1164,6 @@
   GNUNET_cron_add_job (coreAPI->cron, &maintain_dht_job, MAINTAIN_FREQUENCY,
                        MAINTAIN_FREQUENCY, NULL);
 
-  GNUNET_cron_add_job (coreAPI->cron, &print_buckets, MAINTAIN_FREQUENCY * 30,
-                       MAINTAIN_FREQUENCY * 30, NULL);
   return GNUNET_OK;
 }
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]