[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r5904 - GNUnet/src/applications/dht/module
From: |
gnunet |
Subject: |
[GNUnet-SVN] r5904 - GNUnet/src/applications/dht/module |
Date: |
Sat, 15 Dec 2007 02:12:53 -0700 (MST) |
Author: grothoff
Date: 2007-12-15 02:12:52 -0700 (Sat, 15 Dec 2007)
New Revision: 5904
Modified:
GNUnet/src/applications/dht/module/routing.c
GNUnet/src/applications/dht/module/table.c
GNUnet/src/applications/dht/module/table.h
Log:
dht hacking
Modified: GNUnet/src/applications/dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dht/module/routing.c 2007-12-15 08:56:22 UTC
(rev 5903)
+++ GNUnet/src/applications/dht/module/routing.c 2007-12-15 09:12:52 UTC
(rev 5904)
@@ -23,12 +23,8 @@
* @brief state for active DHT routing operations
* @author Christian Grothoff
*
- * LATER:
- * - better selection of timeouts / htl
- * - add code to cleanup routing/request table!
- * - optimize memory use of routing table
+ * TODO:
* - implement extra_get_callback
- * - add similar callback for discovery in table.c
*/
#include "platform.h"
@@ -40,40 +36,46 @@
#define DEBUG_ROUTING GNUNET_YES
+/**
+ * What is the request priority for DHT operations?
+ */
#define DHT_PRIORITY 0
+/**
+ * What is the estimated per-hop delay for DHT operations
+ * (this is how much we will request from the GNUnet core)
+ */
#define DHT_DELAY (5 * GNUNET_CRON_SECONDS)
/**
+ * What is the maximum number of results returned by any DHT
+ * operation?
+ */
+#define MAX_RESULTS 64
+
+/**
+ * How many peers should a DHT GET request reach on averge?
+ *
* Larger factors will result in more aggressive routing of GET
* operations (each peer will either forward to GET_TRIES peers that
* are closer to the key).
*/
-#define GET_TRIES 4
+#define GET_TRIES 7
/**
+ * At how many peers should a DHT PUT request be replicated
+ * on average?
+ *
* Larger factors will result in more replication and
* more aggressive routing of PUT operations (each
* peer will either forward to PUT_TRIES peers that
* are closer to the key, or replicate the content).
*/
-#define PUT_TRIES 2
+#define PUT_TRIES 3
/**
- * FIXME: replace this by an estimate of the
- * network size (and then log of that).
+ * How long do we keep content after receiving a PUT request for it?
*/
-#define MAX_HOPS 10
-
-/**
- * How long do we keep track of requests?
- */
-#define MAX_REQUEST_LIFETIME (5 * GNUNET_CRON_MINUTES)
-
-
-/**
- * How long do we keep track of requests?
- */
#define CONTENT_LIFETIME (12 * GNUNET_CRON_HOURS)
/**
@@ -105,7 +107,7 @@
* At what time will this record automatically
* expire?
*/
- GNUNET_CronTime expires;
+ GNUNET_CronTime expire;
} DHT_Source_Route;
@@ -149,11 +151,10 @@
{
/**
- * When do we stop forwarding this request? (Note that
- * this time should be before the last of the source
- * routes expire).
+ * When does this record expire? Should be the max
+ * of the individual source records.
*/
- GNUNET_CronTime expires;
+ GNUNET_CronTime expire;
/**
* Information about where to send the results back to.
@@ -163,7 +164,7 @@
/**
* GET message of this record (what we are forwarding).
*/
- DHT_MESSAGE *get;
+ DHT_MESSAGE get;
/**
* Hashcodes of the results that we have send back
@@ -181,7 +182,7 @@
/**
* Linked list of active records.
*/
-static DHTQueryRecord **records;
+static DHTQueryRecord * records;
/**
* Size of records
@@ -209,7 +210,34 @@
static unsigned int stat_put_requests_received;
+
/**
+ * To how many peers should we (on average)
+ * forward the request to obtain the desired
+ * target_replication count (on average).
+ */
+static unsigned int
+get_forward_count(unsigned int hop_count,
+ double target_replication)
+{
+ double target_count;
+ unsigned int target_value;
+ unsigned int diameter;
+
+ diameter = GNUNET_DHT_estimate_network_diameter();
+ if (hop_count > (diameter + 1) * 2)
+ return 0;
+ target_count = target_replication / (target_replication * hop_count +
diameter);
+ target_value = 0;
+ while (target_value < target_count)
+ target_value++;
+ if ( (target_count + 1 - target_value ) > drand48())
+ target_value++;
+ return target_value;
+}
+
+
+/**
* Given a result, lookup in the routing table
* where to send it next.
*/
@@ -219,14 +247,15 @@
unsigned int size, const char *data, void *cls)
{
DHTQueryRecord *q;
- int i;
- int j;
+ unsigned int i;
+ unsigned int j;
int found;
GNUNET_HashCode hc;
DHT_MESSAGE *result;
unsigned int routed;
unsigned int tracked;
DHT_Source_Route *pos;
+ DHT_Source_Route *prev;
GNUNET_CronTime now;
#if DEBUG_ROUTING
GNUNET_EncName enc;
@@ -260,12 +289,10 @@
now = GNUNET_get_time ();
for (i = 0; i < rt_size; i++)
{
- q = records[i];
- if (q == NULL)
- continue;
+ q = &records[i];
tracked++;
- if ((ntohl (q->get->type) != type) ||
- (0 != memcmp (key, &q->get->key, sizeof (GNUNET_HashCode))))
+ if ((ntohl (q->get.type) != type) ||
+ (0 != memcmp (key, &q->get.key, sizeof (GNUNET_HashCode))))
continue;
found = GNUNET_NO;
for (j = 0; j < q->result_count; j++)
@@ -282,15 +309,16 @@
GNUNET_GE_DEVELOPER,
"Seen the same result earlier, not routing it
again.\n");
#endif
- continue;
+ break;
}
routed++;
GNUNET_array_grow (q->results, q->result_count, q->result_count + 1);
q->results[q->result_count - 1] = hc;
pos = q->sources;
+ prev = NULL;
while (pos != NULL)
{
- if (pos->expires < now)
+ if (pos->expire < now)
{
#if DEBUG_ROUTING
GNUNET_hash_to_enc (&pos->source.hashPubKey, &enc);
@@ -298,8 +326,17 @@
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
GNUNET_GE_DEVELOPER,
"Route to peer `%s' has expired (%llu < %llu)\n",
- &enc, pos->expires, now);
+ &enc, pos->expire, now);
#endif
+ if (prev == NULL)
+ q->sources = pos->next;
+ else
+ prev->next = pos->next;
+ GNUNET_free(pos);
+ if (prev == NULL)
+ pos = q->sources;
+ else
+ pos = prev->next;
continue;
}
if (0 != memcmp (&pos->source,
@@ -314,7 +351,7 @@
#endif
coreAPI->unicast (&pos->source,
&result->header, DHT_PRIORITY,
- pos->expires - now);
+ DHT_DELAY);
if (stats != NULL)
stats->change (stat_replies_routed, 1);
}
@@ -332,6 +369,9 @@
}
pos = pos->next;
}
+ if (q->result_count >= MAX_RESULTS)
+ q->expire = 0;
+ break;
}
GNUNET_mutex_unlock (lock);
#if DEBUG_ROUTING
@@ -354,47 +394,54 @@
DHTQueryRecord *q;
unsigned int i;
unsigned int rt_pos;
+ unsigned int diameter;
GNUNET_CronTime expire;
GNUNET_CronTime now;
unsigned int hops;
struct DHT_Source_Route *pos;
hops = ntohl (get->hop_count);
- if (hops > MAX_HOPS)
+ diameter = GNUNET_DHT_estimate_network_diameter();
+ if (hops > 2 * diameter)
return GNUNET_SYSERR;
now = GNUNET_get_time ();
- expire = now + MAX_REQUEST_LIFETIME;
+ expire = now + DHT_DELAY * diameter * 4;
GNUNET_mutex_lock (lock);
rt_pos = rt_size;
for (i = 0; i < rt_size; i++)
{
- if ((records[i] != NULL) &&
- (0 == memcmp (&records[i]->get->key,
- &get->key,
- sizeof (GNUNET_HashCode))) &&
- (records[i]->get->type == get->type))
+ q = &records[i];
+ if ( (q->expire > now) &&
+ ( (0 != memcmp (&q->get.key,
+ &get->key,
+ sizeof (GNUNET_HashCode))) ||
+ (q->get.type == get->type)) )
+ continue; /* used and not an identical request */
+ if (q->expire < now)
{
rt_pos = i;
- break;
- }
- if (records[i] == NULL)
- {
- rt_pos = i;
- break;
- }
- if (records[i]->expires < now)
- {
- rt_pos = i;
- GNUNET_free (records[rt_pos]->get);
- records[rt_pos]->get = NULL;
- while (records[rt_pos]->sources != NULL)
+ while (q->sources != NULL)
{
- pos = records[rt_pos]->sources;
- records[rt_pos]->sources = pos->next;
+ pos = q->sources;
+ q->sources = pos->next;
GNUNET_free (pos);
}
- break;
+ GNUNET_array_grow(q->results,
+ q->result_count,
+ 0);
+ q->expire = 0;
}
+ if ( (0 == memcmp (&q->get.key,
+ &get->key,
+ sizeof (GNUNET_HashCode)) &&
+ (q->get.type == get->type)) )
+ {
+ GNUNET_array_grow(q->results,
+ q->result_count,
+ 0);
+ rt_pos = i;
+ break;
+ }
}
if (rt_pos == rt_size)
{
@@ -402,15 +449,10 @@
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
- if (records[rt_pos] == NULL)
- {
- records[rt_pos] = GNUNET_malloc (sizeof (DHTQueryRecord));
- memset (records[rt_pos], 0, sizeof (DHTQueryRecord));
- }
- q = records[rt_pos];
- q->expires = expire;
- q->get = GNUNET_malloc (ntohs (get->header.size));
- memcpy (q->get, get, ntohs (get->header.size));
+ q = &records[rt_pos];
+ if (q->expire < expire)
+ q->expire = expire;
+ q->get = *get;
pos = GNUNET_malloc (sizeof (DHT_Source_Route));
pos->next = q->sources;
q->sources = pos;
@@ -418,7 +460,7 @@
pos->source = *sender;
else
pos->source = *coreAPI->myIdentity;
- pos->expires = expire;
+ pos->expire = expire;
pos->receiver = handler;
pos->receiver_closure = cls;
#if DEBUG_ROUTING
@@ -442,6 +484,8 @@
GNUNET_PeerIdentity next[GET_TRIES];
const DHT_MESSAGE *get;
DHT_MESSAGE aget;
+ unsigned int target_value;
+ unsigned int hop_count;
int total;
int i;
#if DEBUG_ROUTING
@@ -477,7 +521,7 @@
return GNUNET_OK; /* could not route */
}
total = dstore->get (&get->key, ntohl (get->type), &routeResult, NULL);
- if ((total > GET_TRIES) && (sender != NULL))
+ if (total > MAX_RESULTS)
{
#if DEBUG_ROUTING
GNUNET_GE_LOG (coreAPI->ectx,
@@ -489,8 +533,10 @@
return GNUNET_OK;
}
aget = *get;
- aget.hop_count = htonl (1 + ntohl (get->hop_count));
- for (i = 0; i < GET_TRIES; i++)
+ hop_count = ntohl(get->hop_count);
+ target_value = get_forward_count(hop_count, GET_TRIES);
+ aget.hop_count = htonl (1 + hop_count);
+ for (i = 0; i < target_value; i++)
{
if (GNUNET_OK !=
GNUNET_DHT_select_peer (&next[i], &get->key, &next[0], i))
@@ -504,28 +550,14 @@
#endif
break;
}
- if (-1 == GNUNET_hash_xorcmp (&next[i].hashPubKey,
- &coreAPI->myIdentity->hashPubKey,
- &get->key))
- {
#if DEBUG_ROUTING
- GNUNET_hash_to_enc (&next[i].hashPubKey, &enc);
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
- GNUNET_GE_DEVELOPER,
+ GNUNET_hash_to_enc (&next[i].hashPubKey, &enc);
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
+ GNUNET_GE_DEVELOPER,
"Forwarding DHT GET request to peer `%s'.\n", &enc);
#endif
- coreAPI->unicast (&next[i], &aget.header, DHT_PRIORITY, DHT_DELAY);
- }
- else
- {
-#if DEBUG_ROUTING
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
- GNUNET_GE_DEVELOPER,
- "Will not foward message to peer, we are closer!\n");
-#endif
- }
+ coreAPI->unicast (&next[i], &aget.header, DHT_PRIORITY, DHT_DELAY);
}
return GNUNET_OK;
}
@@ -538,8 +570,11 @@
const GNUNET_MessageHeader * msg)
{
GNUNET_PeerIdentity next[PUT_TRIES];
- const DHT_MESSAGE *put;
+ const DHT_MESSAGE * put;
+ DHT_MESSAGE * aput;
GNUNET_CronTime now;
+ unsigned int hop_count;
+ unsigned int target_value;
int store;
int i;
#if DEBUG_ROUTING
@@ -561,8 +596,16 @@
"Received DHT PUT for key `%s'.\n", &enc);
#endif
store = 0;
- /* FIXME: increase hop counter! */
- for (i = 0; i < PUT_TRIES; i++)
+ hop_count = htons(put->hop_count);
+ target_value = get_forward_count(hop_count, PUT_TRIES);
+ aput = GNUNET_malloc(ntohs(msg->size));
+ memcpy(aput,
+ put,
+ ntohs(msg->size));
+ aput->hop_count = htons(hop_count + 1);
+
+
+ for (i = 0; i < target_value; i++)
{
if (GNUNET_OK !=
GNUNET_DHT_select_peer (&next[i], &put->key, &next[0], i))
@@ -575,33 +618,21 @@
i, PUT_TRIES);
#endif
store = 1;
- break;
}
if (1 == GNUNET_hash_xorcmp (&next[i].hashPubKey,
&coreAPI->myIdentity->hashPubKey,
&put->key))
- {
+ store = 1; /* we're closer than the selected target */
#if DEBUG_ROUTING
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
- GNUNET_GE_DEVELOPER,
- "We are closer than selected peer for PUT in round
%d/%d\n",
- i, PUT_TRIES);
+ GNUNET_hash_to_enc (&next[i].hashPubKey, &enc);
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
+ GNUNET_GE_DEVELOPER,
+ "Forwarding DHT PUT request to peer `%s'.\n", &enc);
#endif
- store = 1; /* we're closer than the selected target */
- }
- else
- {
-#if DEBUG_ROUTING
- GNUNET_hash_to_enc (&next[i].hashPubKey, &enc);
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
- GNUNET_GE_DEVELOPER,
- "Forwarding DHT PUT request to peer `%s'.\n", &enc);
-#endif
- coreAPI->unicast (&next[i], msg, DHT_PRIORITY, DHT_DELAY);
- }
+ coreAPI->unicast (&next[i], &aput->header, DHT_PRIORITY, DHT_DELAY);
}
+ GNUNET_free(aput);
if (store != 0)
{
now = GNUNET_get_time ();
@@ -707,7 +738,7 @@
unsigned int type, GNUNET_ResultProcessor handler,
void *cls)
{
- int i;
+ unsigned int i;
struct DHT_Source_Route *pos;
struct DHT_Source_Route *prev;
int done;
@@ -716,19 +747,17 @@
GNUNET_mutex_lock (lock);
for (i = 0; i < rt_size; i++)
{
- if (records[i] == NULL)
- continue;
prev = NULL;
- pos = records[i]->sources;
+ pos = records[i].sources;
while (pos != NULL)
{
- if ((pos->receiver == handler) &&
- (pos->receiver_closure == cls) &&
- (0 == memcmp (key,
- &records[i]->get->key, sizeof (GNUNET_HashCode))))
+ if ( (pos->receiver == handler) &&
+ (pos->receiver_closure == cls) &&
+ (0 == memcmp (key,
+ &records[i].get.key, sizeof (GNUNET_HashCode))))
{
if (prev == NULL)
- records[i]->sources = pos->next;
+ records[i].sources = pos->next;
else
prev->next = pos->next;
GNUNET_free (pos);
@@ -738,11 +767,12 @@
prev = pos;
pos = prev->next;
}
- if (records[i]->sources == NULL)
+ if (records[i].sources == NULL)
{
- GNUNET_free (records[i]->get);
- GNUNET_free (records[i]);
- records[i] = NULL;
+ GNUNET_array_grow(records[i].results,
+ records[i].result_count,
+ 0);
+ records[i].expire = 0;
}
if (done == GNUNET_YES)
break;
@@ -858,6 +888,7 @@
GNUNET_DHT_done_routing ()
{
unsigned int i;
+ struct DHT_Source_Route *pos;
coreAPI->
connection_unregister_send_callback (sizeof (DHT_MESSAGE),
@@ -871,13 +902,17 @@
stats = NULL;
}
GNUNET_mutex_destroy (lock);
- for (i = 0; i < rt_size; i++)
+ for (i = 0; i < rt_size; i++)
{
- if (records[i] != NULL)
+ while (records[i].sources != NULL)
{
- GNUNET_free (records[i]->get);
- GNUNET_free (records[i]);
+ pos = records[i].sources;
+ records[i].sources = pos->next;
+ GNUNET_free (pos);
}
+ GNUNET_array_grow(records[i].results,
+ records[i].result_count,
+ 0);
}
GNUNET_array_grow (records, rt_size, 0);
coreAPI->release_service (dstore);
Modified: GNUnet/src/applications/dht/module/table.c
===================================================================
--- GNUnet/src/applications/dht/module/table.c 2007-12-15 08:56:22 UTC (rev
5903)
+++ GNUnet/src/applications/dht/module/table.c 2007-12-15 09:12:52 UTC (rev
5904)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2006 Christian Grothoff (and other contributing authors)
+ (C) 2006, 2007 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -34,8 +34,9 @@
* + table.c: DHT-peer table, peer discovery cron jobs;
* code tries to fill table "as much as possible" over time;
* TODO: expose and improve reliabily metrics (to be added later)
- * + dstore.c + plugin: SQL-based datastore: key, value, expiration
- * (bounded FIFO-datastore, when full, kill oldest entry first)
+ * TODO: better randomized neighbor selection in DHT_select_peer
+ * TODO: add callback for discovery-message padding (use core callback
+ * for extra-available bandwidth)
* + routing.c: tracking of get/put operations, retry, reply handling
* code tries best-match routing among entries in table
* + service.c: provide DHT services to rest of GNUnet process
@@ -226,6 +227,22 @@
} P2P_DHT_ASK_HELLO;
/**
+ * Compute a (rough) estimate of the networks diameter.
+ *
+ * @return estimated network diameter
+ */
+unsigned int GNUNET_DHT_estimate_network_diameter ()
+{
+ unsigned int i;
+ for (i=bucketCount-1;i>0;i--)
+ {
+ if (buckets[i].peers_size > 0)
+ break;
+ }
+ return i + 1;
+}
+
+/**
* Get the index of the lowest bit of the two GNUNET_hash codes that
* differs.
*/
@@ -574,6 +591,7 @@
bucket->peers[i] = bucket->peers[bucket->peers_size - 1];
GNUNET_array_grow (bucket->peers, bucket->peers_size,
bucket->peers_size - 1);
+ i--;
}
}
}
@@ -652,13 +670,13 @@
sizeof (P2P_DHT_Discovery)) / sizeof (GNUNET_PeerIdentity);
if (pc > MAINTAIN_ADV_CAP * 8)
{
- GNUNET_GE_BREAK (coreAPI->ectx, 0);
+ GNUNET_GE_BREAK_OP (coreAPI->ectx, 0);
return GNUNET_SYSERR; /* far too big */
}
if (ntohs (msg->size) !=
sizeof (P2P_DHT_Discovery) + pc * sizeof (GNUNET_PeerIdentity))
{
- GNUNET_GE_BREAK (coreAPI->ectx, 0);
+ GNUNET_GE_BREAK_OP (coreAPI->ectx, 0);
return GNUNET_SYSERR; /* malformed */
}
disco = (const P2P_DHT_Discovery *) msg;
@@ -689,7 +707,10 @@
GNUNET_MessageHello *hello;
if (ntohs (msg->size) != sizeof (P2P_DHT_ASK_HELLO))
- return GNUNET_SYSERR;
+ {
+ GNUNET_GE_BREAK_OP (coreAPI->ectx, 0);
+ return GNUNET_SYSERR;
+ }
ask = (const P2P_DHT_ASK_HELLO *) msg;
if (NULL == findBucketFor (&ask->peer))
return GNUNET_OK;
Modified: GNUnet/src/applications/dht/module/table.h
===================================================================
--- GNUnet/src/applications/dht/module/table.h 2007-12-15 08:56:22 UTC (rev
5903)
+++ GNUnet/src/applications/dht/module/table.h 2007-12-15 09:12:52 UTC (rev
5904)
@@ -48,6 +48,13 @@
unsigned int blocked_size);
/**
+ * Compute a (rough) estimate of the networks diameter.
+ *
+ * @return estimated network diameter
+ */
+unsigned int GNUNET_DHT_estimate_network_diameter (void);
+
+/**
* Initialize table DHT component.
*
* @param capi the core API
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r5904 - GNUnet/src/applications/dht/module,
gnunet <=