[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r8762 - GNUnet/src/applications/dv_dht/module
From: |
gnunet |
Subject: |
[GNUnet-SVN] r8762 - GNUnet/src/applications/dv_dht/module |
Date: |
Tue, 21 Jul 2009 12:03:30 -0600 |
Author: nevans
Date: 2009-07-21 12:03:30 -0600 (Tue, 21 Jul 2009)
New Revision: 8762
Modified:
GNUnet/src/applications/dv_dht/module/routing.c
GNUnet/src/applications/dv_dht/module/table.c
Log:
Using heap and hashmap for efficient return route storage
Modified: GNUnet/src/applications/dv_dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dv_dht/module/routing.c 2009-07-21 18:02:00 UTC
(rev 8761)
+++ GNUnet/src/applications/dv_dht/module/routing.c 2009-07-21 18:03:30 UTC
(rev 8762)
@@ -43,7 +43,7 @@
/**
* What is the request priority for DV_DHT operations?
*/
-#define DV_DHT_PRIORITY 0
+#define DV_DHT_PRIORITY GNUNET_EXTREME_PRIORITY / 4
/*
* Number of hash functions for bloom filter
@@ -61,7 +61,7 @@
* (this is how much we will request from the GNUnet core);
* Must not be zero!
*/
-#define DV_DHT_DELAY (500 * GNUNET_CRON_MILLISECONDS)
+#define DV_DHT_DELAY (2500 * GNUNET_CRON_MILLISECONDS)
/**
* What is the maximum number of results returned by any DV_DHT
@@ -210,16 +210,32 @@
} DV_DHTQueryRecord;
-/**
- * Array of active records.
+/*
+ * DV_DHT Routing results structure
*/
-static DV_DHTQueryRecord *records;
+typedef struct DV_DHTResults
+{
+ /*
+ * Min heap for removal upon reaching limit
+ */
+ struct GNUNET_CONTAINER_Heap *minHeap;
+ /*
+ * Hashmap for fast key based lookup
+ */
+ struct GNUNET_MultiHashMap *hashmap;
+
+} DV_DHTResults;
+
+/*
+ * Container of active records
+ */
+static DV_DHTResults new_records;
+
/**
* Size of records
*/
static unsigned int rt_size;
-static unsigned int next_record;
#if DEBUG_INSANE
static unsigned int indentation;
@@ -346,7 +362,7 @@
struct GNUNET_BloomFilter *bloom;
unsigned int routed;
unsigned int tracked;
- unsigned int i;
+
int match;
int cost;
DV_DHT_Source_Route *pos;
@@ -438,24 +454,17 @@
tracked = 0;
GNUNET_mutex_lock (lock);
- for (i = 0; i < rt_size; i++)
+ if (GNUNET_multi_hash_map_contains (new_records.hashmap, key))
{
- q = &records[i];
- tracked++;
- if ((ntohl (q->get.type) != type)
- || (0 != memcmp (key, &q->get.key, sizeof (GNUNET_HashCode))))
- continue;
- else
- {
+ q = GNUNET_multi_hash_map_get (new_records.hashmap, key);
#if DEBUG_ROUTING
- GNUNET_hash_to_enc (&q->get.key, &enc);
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
- GNUNET_GE_USER | GNUNET_GE_BULK,
- "Found matching request for reply `%s'\n", &enc);
+ GNUNET_hash_to_enc (&q->get.key, &enc);
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK,
+ "Found matching request (in hashmap) for reply `%s'\n",
+ &enc);
#endif
- }
- routed++;
pos = q->sources;
prev = NULL;
while (pos != NULL)
@@ -469,33 +478,42 @@
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
GNUNET_GE_USER | GNUNET_GE_BULK,
- "Routing result to `%s'\n", &enc);
+ "Routing result (in hashmap) to `%s'\n", &enc);
#endif
match = GNUNET_NO;
- match = GNUNET_bloomfilter_test (bloom, &pos->source.hashPubKey);
+ match =
+ GNUNET_bloomfilter_test (bloom, &pos->source.hashPubKey);
if (match == GNUNET_YES)
- {
- pos = pos->next;
- continue;
- }
+ {
+ pos = pos->next;
+ continue;
+ }
cost = dvapi->dv_send (&pos->source,
- &result->header, DV_DHT_PRIORITY, DV_DHT_DELAY);
+ &result->header, DV_DHT_PRIORITY,
+ DV_DHT_DELAY);
+ /* Need to change this piece, because we may want to try another
path for the reply in the case of failure from
+ * the DV subsystem. (Likely to another close peer that may know
of this request)
+ */
+ if (cost == GNUNET_SYSERR)
+ break;
+
if ((debug_routes_extended) && (dhtlog != NULL))
{
queryuid = ntohl (result->queryuid);
dhtlog->insert_route (NULL, queryuid,
DHTLOG_RESULT,
- ntohl (result->hop_count), cost,
GNUNET_NO,
- coreAPI->my_identity, key, NULL,
- &pos->source);
+ ntohl (result->hop_count), cost,
+ GNUNET_NO, coreAPI->my_identity, key,
+ NULL, &pos->source);
}
if (stats != NULL)
stats->change (stat_replies_routed, 1);
}
+
if ((pos->receiver != NULL) && (pos->received != GNUNET_YES))
{
#if DEBUG_ROUTING
@@ -519,9 +537,9 @@
queryuid = ntohl (result->queryuid);
dhtlog->insert_route (NULL, queryuid,
DHTLOG_RESULT,
- ntohl (result->hop_count), 0,
GNUNET_YES,
- coreAPI->my_identity, key, NULL,
- NULL);
+ ntohl (result->hop_count), 0,
+ GNUNET_YES, coreAPI->my_identity, key,
+ NULL, NULL);
}
if (stats != NULL)
stats->change (stat_replies_routed, 1);
@@ -560,9 +578,13 @@
unsigned int diameter;
unsigned int hops;
struct DV_DHT_Source_Route *pos;
+ unsigned int routes_size;
+ unsigned int heap_size;
+ GNUNET_CronTime now;
hops = ntohl (get->hop_count);
diameter = GNUNET_DV_DHT_estimate_network_diameter ();
+ now = GNUNET_get_time ();
/*if (hops > 2 * diameter) */
if (hops > 2 * diameter)
{
@@ -572,26 +594,63 @@
return GNUNET_SYSERR;
}
- GNUNET_mutex_lock (lock);
-
- q = &records[next_record];
+ routes_size = GNUNET_multi_hash_map_size (new_records.hashmap);
+ heap_size = GNUNET_CONTAINER_heap_get_size (new_records.minHeap);
+ if (routes_size != heap_size)
+ {
#if DEBUG_ROUTING
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "Tracking request in slot %u\n",
- next_record);
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK,
+ "Size of record hash map %u, size of heap %u. Bad!\n",
+ routes_size,
+ GNUNET_CONTAINER_heap_get_size (new_records.minHeap));
#endif
- if (q->sources != NULL)
+ return GNUNET_SYSERR;
+ }
+ GNUNET_mutex_lock (lock);
+ while (routes_size >= (rt_size - 1))
{
- while (q->sources != NULL)
+ q = GNUNET_CONTAINER_heap_remove_root (new_records.minHeap);
+ if (q->sources != NULL)
{
- pos = q->sources;
- q->sources = pos->next;
- GNUNET_free (pos);
+ while (q->sources != NULL)
+ {
+ pos = q->sources;
+ q->sources = pos->next;
+ GNUNET_free (pos);
+ }
+ GNUNET_array_grow (q->results, q->result_count, 0);
}
- GNUNET_array_grow (q->results, q->result_count, 0);
+ GNUNET_multi_hash_map_remove_all (new_records.hashmap, &q->get.key);
}
+ routes_size = GNUNET_multi_hash_map_size (new_records.hashmap);
+ heap_size = GNUNET_CONTAINER_heap_get_size (new_records.minHeap);
+ if (routes_size != heap_size)
+ {
+#if DEBUG_ROUTING
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK,
+ "Size of record hash map %u, size of heap %u. Bad!\n",
+ routes_size,
+ GNUNET_CONTAINER_heap_get_size (new_records.minHeap));
+#endif
+ return GNUNET_SYSERR;
+ }
+
+ if (GNUNET_multi_hash_map_contains (new_records.hashmap, &get->key))
+ {
+ q = GNUNET_multi_hash_map_get (new_records.hashmap, &get->key);
+ GNUNET_CONTAINER_heap_remove_node (new_records.minHeap, q);
+ }
+ else
+ {
+ q = GNUNET_malloc (sizeof (DV_DHTQueryRecord));
+ q->sources = NULL;
+ }
+
q->get = *get;
pos = GNUNET_malloc (sizeof (DV_DHT_Source_Route));
pos->next = q->sources;
@@ -603,16 +662,9 @@
pos->receiver = handler;
pos->receiver_closure = cls;
- /* We loop through the records, allows peer
- * to control how many concurrent responses
- * are known about.
- */
- if (next_record == rt_size - 1)
- {
- next_record = 0;
- }
- else
- next_record++;
+ GNUNET_CONTAINER_heap_insert (new_records.minHeap, q, now);
+ GNUNET_multi_hash_map_put (new_records.hashmap, &get->key, q,
+ GNUNET_MultiHashMapOption_REPLACE);
GNUNET_mutex_unlock (lock);
if (stats != NULL)
@@ -706,8 +758,9 @@
{
queryuid = ntohl (get->queryuid);
dhtlog->insert_route (NULL, ntohl (get->queryuid), DHTLOG_GET,
- hop_count, 0, GNUNET_YES, coreAPI->my_identity,
- &get->key, sender, NULL);
+ hop_count, 0, GNUNET_YES,
+ coreAPI->my_identity, &get->key, sender,
+ NULL);
}
}
@@ -764,13 +817,19 @@
"Forwarding DV_DHT GET request to peer `%s'.\n", &enc);
#endif
- cost = dvapi->dv_send (&next[j], &aget.header, DV_DHT_PRIORITY,
DV_DHT_DELAY);
+ cost =
+ dvapi->dv_send (&next[j], &aget.header, DV_DHT_PRIORITY,
+ DV_DHT_DELAY);
+ if (cost == GNUNET_SYSERR)
+ continue;
+
if ((debug_routes_extended) && (dhtlog != NULL))
{
queryuid = ntohl (get->queryuid);
dhtlog->insert_route (NULL, ntohl (get->queryuid), DHTLOG_GET,
- hop_count, cost, GNUNET_NO,
coreAPI->my_identity,
- &get->key, sender, &next[j]);
+ hop_count, cost, GNUNET_NO,
+ coreAPI->my_identity, &get->key, sender,
+ &next[j]);
}
j++;
}
@@ -859,17 +918,25 @@
GNUNET_GE_BULK,
"Forwarding DV_DHT PUT request to peer `%s'.\n", &enc);
#endif
- cost = dvapi->dv_send (&next[j], &aput->header, DV_DHT_PRIORITY,
DV_DHT_DELAY);
+ cost =
+ dvapi->dv_send (&next[j], &aput->header, DV_DHT_PRIORITY,
+ DV_DHT_DELAY);
+
#if DEBUG_ROUTING
- if (cost == GNUNET_SYSERR)
- {
- GNUNET_hash_to_enc (&next[j].hashPubKey, &enc);
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "Forwarding DV_DHT PUT request FAILED (dv unknown) to
peer `%s'.\n", &enc);
- }
+ if (cost == GNUNET_SYSERR)
+ {
+ GNUNET_hash_to_enc (&next[j].hashPubKey, &enc);
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER
+ | GNUNET_GE_BULK,
+ "Forwarding DV_DHT PUT request FAILED (dv unknown) to
peer `%s'.\n",
+ &enc);
+ }
#endif
+
+ if (cost == GNUNET_SYSERR)
+ continue;
+
if ((debug_routes_extended) && (dhtlog != NULL))
{
queryuid = ntohl (put->queryuid);
@@ -1032,50 +1099,39 @@
unsigned int type, GNUNET_ResultProcessor handler,
void *cls)
{
- unsigned int i;
struct DV_DHT_Source_Route *pos;
- struct DV_DHT_Source_Route *prev;
int done;
unsigned int records_removed;
+ DV_DHTQueryRecord *q;
done = GNUNET_NO;
GNUNET_mutex_lock (lock);
records_removed = 0;
- for (i = 0; i < rt_size; i++)
+ while (GNUNET_YES ==
+ GNUNET_multi_hash_map_contains (new_records.hashmap, key))
{
- prev = NULL;
- pos = records[i].sources;
- while (pos != NULL)
+ q = GNUNET_multi_hash_map_get (new_records.hashmap, key);
+ if (q->sources != NULL)
{
- if ((pos->receiver == handler) &&
- (pos->receiver_closure == cls) &&
- (0 == memcmp (key,
- &records[i].get.key, sizeof (GNUNET_HashCode)))
- &&
- (0 == memcmp (&pos->source.hashPubKey,
- &coreAPI->my_identity->hashPubKey, sizeof
(GNUNET_HashCode))))
- {
- if (prev == NULL)
- records[i].sources = pos->next;
- else
- prev->next = pos->next;
+ while (q->sources != NULL)
+ {
+ pos = q->sources;
+ q->sources = pos->next;
GNUNET_free (pos);
- records_removed++;
- break;
- }
- prev = pos;
- pos = prev->next;
+ }
+ GNUNET_array_grow (q->results, q->result_count, 0);
}
- if (records[i].sources == NULL)
- {
- GNUNET_array_grow (records[i].results, records[i].result_count, 0);
- }
+ GNUNET_multi_hash_map_remove (new_records.hashmap, key, q);
+ GNUNET_CONTAINER_heap_remove_node (new_records.minHeap, q);
+ records_removed++;
}
+
GNUNET_mutex_unlock (lock);
#if DEBUG_ROUTING
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "Removed %u total records\n",
records_removed);
+ GNUNET_GE_BULK, "Removed %u total records\n",
+ records_removed);
#endif
if (done != GNUNET_YES)
return GNUNET_SYSERR;
@@ -1180,8 +1236,12 @@
dvapi = coreAPI->service_request ("dv");
if (dvapi == NULL)
return GNUNET_SYSERR;
- GNUNET_array_grow (records, rt_size, rts);
+ rt_size = (unsigned int) rts;
+
+ new_records.hashmap = GNUNET_multi_hash_map_create ((unsigned int) rts);
+ new_records.minHeap = GNUNET_CONTAINER_heap_create (GNUNET_MIN_HEAP);
+
lock = GNUNET_mutex_create (GNUNET_NO);
stats = capi->service_request ("stats");
if (stats != NULL)
@@ -1284,8 +1344,6 @@
int
GNUNET_DV_DHT_done_routing ()
{
- unsigned int i;
- struct DV_DHT_Source_Route *pos;
coreAPI->send_callback_unregister (sizeof (DV_DHT_MESSAGE),
&extra_get_callback);
@@ -1313,18 +1371,10 @@
dvapi = NULL;
}
GNUNET_mutex_destroy (lock);
- for (i = 0; i < rt_size; i++)
- {
- while (records[i].sources != NULL)
- {
- pos = records[i].sources;
- records[i].sources = pos->next;
- GNUNET_free (pos);
- }
- GNUNET_array_grow (records[i].results, records[i].result_count, 0);
- }
- GNUNET_array_grow (records, rt_size, 0);
+
coreAPI->service_release (dstore);
+ GNUNET_multi_hash_map_destroy (new_records.hashmap);
+ GNUNET_CONTAINER_heap_destroy (new_records.minHeap);
return GNUNET_OK;
}
Modified: GNUnet/src/applications/dv_dht/module/table.c
===================================================================
--- GNUnet/src/applications/dv_dht/module/table.c 2009-07-21 18:02:00 UTC
(rev 8761)
+++ GNUnet/src/applications/dv_dht/module/table.c 2009-07-21 18:03:30 UTC
(rev 8762)
@@ -67,19 +67,12 @@
* How often should the cron job for maintaining the DV_DHT
* run?
*/
-#define MAINTAIN_FREQUENCY 5000 * GNUNET_CRON_MILLISECONDS
+#define MAINTAIN_FREQUENCY 10000 * GNUNET_CRON_MILLISECONDS
/**
- * What is the chance (1 in XXX) that we send DISCOVERY messages
- * to another peer?
- */
-#define MAINTAIN_CHANCE (10 + 75 * total_peers)
-/*#define MAINTAIN_CHANCE (10 + 100 * total_peers)*/
-
-/**
* How long can a peer be inactive before we time it out?
*/
-#define MAINTAIN_PEER_TIMEOUT MAINTAIN_FREQUENCY * MAINTAIN_CHANCE * 4
+#define MAINTAIN_PEER_TIMEOUT MAINTAIN_FREQUENCY * 4
/**
* What is the maximum number of known DV_DHT-enabled peers
@@ -329,10 +322,7 @@
i = bucketCount - 1;
while ((buckets[i].bstart > index) && (i > 0))
i--;
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "index is %d, bucket start is %d\n", index,
- buckets[i].bstart);
+
if ((buckets[i].bstart <= index) && (buckets[i].bend >= index))
return &buckets[i];
GNUNET_GE_BREAK (NULL, 0);
@@ -463,7 +453,8 @@
largest_distance)
{
chosen = bucket->peers[ec];
- largest_distance = inverse_distance (target, &pi->id.hashPubKey);
+ largest_distance =
+ inverse_distance (target, &pi->id.hashPubKey);
}
}
}
@@ -584,7 +575,8 @@
largest_distance)
{
chosen = bucket->peers[ec];
- largest_distance = inverse_distance (target, &pi->id.hashPubKey);
+ largest_distance =
+ inverse_distance (target, &pi->id.hashPubKey);
}
}
}
@@ -654,7 +646,7 @@
{
GNUNET_PeerIdentity closest;
- memset(&closest, 0, sizeof(GNUNET_PeerIdentity));
+ memset (&closest, 0, sizeof (GNUNET_PeerIdentity));
find_closest_peer (&closest, target);
if (&closest == NULL)
return GNUNET_SYSERR;
@@ -679,7 +671,8 @@
inverse_distance (target,
&coreAPI->my_identity->hashPubKey));
if (inverse_distance (target, &coreAPI->my_identity->hashPubKey) >=
- inverse_distance (target, &closest.hashPubKey) && (inverse_distance
(target, &coreAPI->my_identity->hashPubKey) > 0))
+ inverse_distance (target, &closest.hashPubKey)
+ && (inverse_distance (target, &coreAPI->my_identity->hashPubKey) > 0))
{
return GNUNET_YES;
}
@@ -915,16 +908,6 @@
#endif
considerPeer (other, other);
- /*
- if (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, MAINTAIN_CHANCE) >
(MAINTAIN_CHANCE / 2))
- {
- #if DEBUG_TABLE
- print_exit ("broadcast_dht_discovery_prob");
- #endif
- return;
- } */
- /*fprintf(stderr, "sending discovery message\n");
- broadcast_dht_discovery (other, cls); */
#if DEBUG_TABLE
print_exit ("broadcast_dht_discovery_prob");
@@ -941,18 +924,8 @@
#if DEBUG_TABLE
print_entry ("maintain_dht_job");
#endif
- P2P_DV_DHT_Discovery disc;
- if (total_peers == 0)
- {
- disc.header.size = htons (sizeof (P2P_DV_DHT_Discovery));
- disc.header.type = htons (GNUNET_P2P_PROTO_DHT_DISCOVERY);
- disc.space_available = -1; /* FIXME */
- dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, &disc);
- }
- else
- {
- dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, NULL);
- }
+ dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, NULL);
+
#if DEBUG_TABLE
print_exit ("maintain_dht_job");
#endif
@@ -1191,8 +1164,6 @@
GNUNET_cron_add_job (coreAPI->cron, &maintain_dht_job, MAINTAIN_FREQUENCY,
MAINTAIN_FREQUENCY, NULL);
- GNUNET_cron_add_job (coreAPI->cron, &print_buckets, MAINTAIN_FREQUENCY * 30,
- MAINTAIN_FREQUENCY * 30, NULL);
return GNUNET_OK;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r8762 - GNUnet/src/applications/dv_dht/module,
gnunet <=