gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r5904 - GNUnet/src/applications/dht/module


From: gnunet
Subject: [GNUnet-SVN] r5904 - GNUnet/src/applications/dht/module
Date: Sat, 15 Dec 2007 02:12:53 -0700 (MST)

Author: grothoff
Date: 2007-12-15 02:12:52 -0700 (Sat, 15 Dec 2007)
New Revision: 5904

Modified:
   GNUnet/src/applications/dht/module/routing.c
   GNUnet/src/applications/dht/module/table.c
   GNUnet/src/applications/dht/module/table.h
Log:
dht hacking

Modified: GNUnet/src/applications/dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dht/module/routing.c        2007-12-15 08:56:22 UTC 
(rev 5903)
+++ GNUnet/src/applications/dht/module/routing.c        2007-12-15 09:12:52 UTC 
(rev 5904)
@@ -23,12 +23,8 @@
  * @brief state for active DHT routing operations
  * @author Christian Grothoff
  *
- * LATER:
- * - better selection of timeouts / htl
- * - add code to cleanup routing/request table!
- * - optimize memory use of routing table
+ * TODO:
  * - implement extra_get_callback
- * - add similar callback for discovery in table.c
  */
 
 #include "platform.h"
@@ -40,40 +36,46 @@
 
 #define DEBUG_ROUTING GNUNET_YES
 
+/**
+ * What is the request priority for DHT operations?
+ */
 #define DHT_PRIORITY 0
 
+/**
+ * What is the estimated per-hop delay for DHT operations
+ * (this is how much we will request from the GNUnet core)
+ */
 #define DHT_DELAY (5 * GNUNET_CRON_SECONDS)
 
 /**
+ * What is the maximum number of results returned by any DHT
+ * operation?
+ */
+#define MAX_RESULTS 64
+
+/**
+ * How many peers should a DHT GET request reach on averge?
+ *
  * Larger factors will result in more aggressive routing of GET
  * operations (each peer will either forward to GET_TRIES peers that
  * are closer to the key).
  */
-#define GET_TRIES 4
+#define GET_TRIES 7
 
 /**
+ * At how many peers should a DHT PUT request be replicated
+ * on average?
+ *
  * Larger factors will result in more replication and
  * more aggressive routing of PUT operations (each
  * peer will either forward to PUT_TRIES peers that
  * are closer to the key, or replicate the content).
  */
-#define PUT_TRIES 2
+#define PUT_TRIES 3
 
 /**
- * FIXME: replace this by an estimate of the
- * network size (and then log of that).
+ * How long do we keep content after receiving a PUT request for it?
  */
-#define MAX_HOPS 10
-
-/**
- * How long do we keep track of requests?
- */
-#define MAX_REQUEST_LIFETIME (5 * GNUNET_CRON_MINUTES)
-
-
-/**
- * How long do we keep track of requests?
- */
 #define CONTENT_LIFETIME (12 * GNUNET_CRON_HOURS)
 
 /**
@@ -105,7 +107,7 @@
    * At what time will this record automatically
    * expire?
    */
-  GNUNET_CronTime expires;
+  GNUNET_CronTime expire;
 
 } DHT_Source_Route;
 
@@ -149,11 +151,10 @@
 {
 
   /**
-   * When do we stop forwarding this request? (Note that
-   * this time should be before the last of the source
-   * routes expire).
+   * When does this record expire?  Should be the max
+   * of the individual source records.
    */
-  GNUNET_CronTime expires;
+  GNUNET_CronTime expire;
 
   /**
    * Information about where to send the results back to.
@@ -163,7 +164,7 @@
   /**
    * GET message of this record (what we are forwarding).
    */
-  DHT_MESSAGE *get;
+  DHT_MESSAGE get;
 
   /**
    * Hashcodes of the results that we have send back
@@ -181,7 +182,7 @@
 /**
  * Linked list of active records.
  */
-static DHTQueryRecord **records;
+static DHTQueryRecord * records;
 
 /**
  * Size of records
@@ -209,7 +210,34 @@
 
 static unsigned int stat_put_requests_received;
 
+
 /**
+ * To how many peers should we (on average) 
+ * forward the request to obtain the desired
+ * target_replication count (on average).
+ */
+static unsigned int
+get_forward_count(unsigned int hop_count,
+                 double target_replication) 
+{
+  double target_count;
+  unsigned int target_value;
+  unsigned int diameter;
+
+  diameter = GNUNET_DHT_estimate_network_diameter();
+  if (hop_count > (diameter + 1) * 2)
+    return 0;
+  target_count = target_replication / (target_replication * hop_count + 
diameter);
+  target_value = 0;
+  while (target_value < target_count)
+    target_value++;
+  if ( (target_count + 1 - target_value ) > drand48())
+    target_value++;
+  return target_value;
+}
+
+
+/**
  * Given a result, lookup in the routing table
  * where to send it next.
  */
@@ -219,14 +247,15 @@
              unsigned int size, const char *data, void *cls)
 {
   DHTQueryRecord *q;
-  int i;
-  int j;
+  unsigned int i;
+  unsigned int j;
   int found;
   GNUNET_HashCode hc;
   DHT_MESSAGE *result;
   unsigned int routed;
   unsigned int tracked;
   DHT_Source_Route *pos;
+  DHT_Source_Route *prev;
   GNUNET_CronTime now;
 #if DEBUG_ROUTING
   GNUNET_EncName enc;
@@ -260,12 +289,10 @@
   now = GNUNET_get_time ();
   for (i = 0; i < rt_size; i++)
     {
-      q = records[i];
-      if (q == NULL)
-        continue;
+      q = &records[i];
       tracked++;
-      if ((ntohl (q->get->type) != type) ||
-          (0 != memcmp (key, &q->get->key, sizeof (GNUNET_HashCode))))
+      if ((ntohl (q->get.type) != type) ||
+          (0 != memcmp (key, &q->get.key, sizeof (GNUNET_HashCode))))
         continue;
       found = GNUNET_NO;
       for (j = 0; j < q->result_count; j++)
@@ -282,15 +309,16 @@
                          GNUNET_GE_DEVELOPER,
                          "Seen the same result earlier, not routing it 
again.\n");
 #endif
-          continue;
+          break;
         }
       routed++;
       GNUNET_array_grow (q->results, q->result_count, q->result_count + 1);
       q->results[q->result_count - 1] = hc;
       pos = q->sources;
+      prev = NULL;
       while (pos != NULL)
         {
-          if (pos->expires < now)
+          if (pos->expire < now)
             {
 #if DEBUG_ROUTING
               GNUNET_hash_to_enc (&pos->source.hashPubKey, &enc);
@@ -298,8 +326,17 @@
                              GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
                              GNUNET_GE_DEVELOPER,
                              "Route to peer `%s' has expired (%llu < %llu)\n",
-                             &enc, pos->expires, now);
+                             &enc, pos->expire, now);
 #endif
+             if (prev == NULL)
+               q->sources = pos->next;
+             else
+               prev->next = pos->next;
+             GNUNET_free(pos);
+             if (prev == NULL)
+               pos = q->sources;
+             else
+               pos = prev->next;             
               continue;
             }
           if (0 != memcmp (&pos->source,
@@ -314,7 +351,7 @@
 #endif
               coreAPI->unicast (&pos->source,
                                 &result->header, DHT_PRIORITY,
-                                pos->expires - now);
+                                DHT_DELAY);
               if (stats != NULL)
                 stats->change (stat_replies_routed, 1);
             }
@@ -332,6 +369,9 @@
             }
           pos = pos->next;
         }
+      if (q->result_count >= MAX_RESULTS)
+         q->expire = 0;          
+      break;
     }
   GNUNET_mutex_unlock (lock);
 #if DEBUG_ROUTING
@@ -354,47 +394,54 @@
   DHTQueryRecord *q;
   unsigned int i;
   unsigned int rt_pos;
+  unsigned int diameter;
   GNUNET_CronTime expire;
   GNUNET_CronTime now;
   unsigned int hops;
   struct DHT_Source_Route *pos;
 
   hops = ntohl (get->hop_count);
-  if (hops > MAX_HOPS)
+  diameter = GNUNET_DHT_estimate_network_diameter();
+  if (hops > 2 * diameter)
     return GNUNET_SYSERR;
   now = GNUNET_get_time ();
-  expire = now + MAX_REQUEST_LIFETIME;
+  expire = now + DHT_DELAY * diameter * 4;
   GNUNET_mutex_lock (lock);
   rt_pos = rt_size;
   for (i = 0; i < rt_size; i++)
     {
-      if ((records[i] != NULL) &&
-          (0 == memcmp (&records[i]->get->key,
-                        &get->key,
-                        sizeof (GNUNET_HashCode))) &&
-          (records[i]->get->type == get->type))
+      q = &records[i];
+      if ( (q->expire > now) &&
+          ( (0 != memcmp (&q->get.key,
+                          &get->key,
+                          sizeof (GNUNET_HashCode))) ||
+            (q->get.type == get->type)) )
+       continue; /* used and not an identical request */
+      if (q->expire < now)
         {
           rt_pos = i;
-          break;
-        }
-      if (records[i] == NULL)
-        {
-          rt_pos = i;
-          break;
-        }
-      if (records[i]->expires < now)
-        {
-          rt_pos = i;
-          GNUNET_free (records[rt_pos]->get);
-          records[rt_pos]->get = NULL;
-          while (records[rt_pos]->sources != NULL)
+          while (q->sources != NULL)
             {
-              pos = records[rt_pos]->sources;
-              records[rt_pos]->sources = pos->next;
+              pos = q->sources;
+              q->sources = pos->next;
               GNUNET_free (pos);
             }
-          break;
+         GNUNET_array_grow(q->results,
+                           q->result_count,
+                           0);
+         q->expire = 0;
         }
+      if ( (0 == memcmp (&q->get.key,
+                        &get->key,
+                        sizeof (GNUNET_HashCode)) &&
+           (q->get.type == get->type)) ) 
+       {
+         GNUNET_array_grow(q->results,
+                           q->result_count,
+                           0);
+         rt_pos = i;
+         break;
+       }
     }
   if (rt_pos == rt_size)
     {
@@ -402,15 +449,10 @@
       GNUNET_mutex_unlock (lock);
       return GNUNET_SYSERR;
     }
-  if (records[rt_pos] == NULL)
-    {
-      records[rt_pos] = GNUNET_malloc (sizeof (DHTQueryRecord));
-      memset (records[rt_pos], 0, sizeof (DHTQueryRecord));
-    }
-  q = records[rt_pos];
-  q->expires = expire;
-  q->get = GNUNET_malloc (ntohs (get->header.size));
-  memcpy (q->get, get, ntohs (get->header.size));
+  q = &records[rt_pos];
+  if (q->expire < expire)
+    q->expire = expire;
+  q->get = *get;
   pos = GNUNET_malloc (sizeof (DHT_Source_Route));
   pos->next = q->sources;
   q->sources = pos;
@@ -418,7 +460,7 @@
     pos->source = *sender;
   else
     pos->source = *coreAPI->myIdentity;
-  pos->expires = expire;
+  pos->expire = expire;
   pos->receiver = handler;
   pos->receiver_closure = cls;
 #if DEBUG_ROUTING
@@ -442,6 +484,8 @@
   GNUNET_PeerIdentity next[GET_TRIES];
   const DHT_MESSAGE *get;
   DHT_MESSAGE aget;
+  unsigned int target_value;
+  unsigned int hop_count;
   int total;
   int i;
 #if DEBUG_ROUTING
@@ -477,7 +521,7 @@
       return GNUNET_OK;         /* could not route */
     }
   total = dstore->get (&get->key, ntohl (get->type), &routeResult, NULL);
-  if ((total > GET_TRIES) && (sender != NULL))
+  if (total > MAX_RESULTS)
     {
 #if DEBUG_ROUTING
       GNUNET_GE_LOG (coreAPI->ectx,
@@ -489,8 +533,10 @@
       return GNUNET_OK;
     }
   aget = *get;
-  aget.hop_count = htonl (1 + ntohl (get->hop_count));
-  for (i = 0; i < GET_TRIES; i++)
+  hop_count = ntohl(get->hop_count);
+  target_value = get_forward_count(hop_count, GET_TRIES);
+  aget.hop_count = htonl (1 + hop_count);
+  for (i = 0; i < target_value; i++)
     {
       if (GNUNET_OK !=
           GNUNET_DHT_select_peer (&next[i], &get->key, &next[0], i))
@@ -504,28 +550,14 @@
 #endif
           break;
         }
-      if (-1 == GNUNET_hash_xorcmp (&next[i].hashPubKey,
-                                    &coreAPI->myIdentity->hashPubKey,
-                                    &get->key))
-        {
 #if DEBUG_ROUTING
-          GNUNET_hash_to_enc (&next[i].hashPubKey, &enc);
-          GNUNET_GE_LOG (coreAPI->ectx,
-                         GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
-                         GNUNET_GE_DEVELOPER,
+      GNUNET_hash_to_enc (&next[i].hashPubKey, &enc);
+      GNUNET_GE_LOG (coreAPI->ectx,
+                    GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
+                    GNUNET_GE_DEVELOPER,
                          "Forwarding DHT GET request to peer `%s'.\n", &enc);
 #endif
-          coreAPI->unicast (&next[i], &aget.header, DHT_PRIORITY, DHT_DELAY);
-        }
-      else
-        {
-#if DEBUG_ROUTING
-          GNUNET_GE_LOG (coreAPI->ectx,
-                         GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
-                         GNUNET_GE_DEVELOPER,
-                         "Will not foward message to peer, we are closer!\n");
-#endif
-        }
+      coreAPI->unicast (&next[i], &aget.header, DHT_PRIORITY, DHT_DELAY);
     }
   return GNUNET_OK;
 }
@@ -538,8 +570,11 @@
            const GNUNET_MessageHeader * msg)
 {
   GNUNET_PeerIdentity next[PUT_TRIES];
-  const DHT_MESSAGE *put;
+  const DHT_MESSAGE * put;
+  DHT_MESSAGE * aput;
   GNUNET_CronTime now;
+  unsigned int hop_count;
+  unsigned int target_value;
   int store;
   int i;
 #if DEBUG_ROUTING
@@ -561,8 +596,16 @@
                  "Received DHT PUT for key `%s'.\n", &enc);
 #endif
   store = 0;
-  /* FIXME: increase hop counter! */
-  for (i = 0; i < PUT_TRIES; i++)
+  hop_count = htons(put->hop_count);
+  target_value = get_forward_count(hop_count, PUT_TRIES);
+  aput = GNUNET_malloc(ntohs(msg->size));
+  memcpy(aput,
+        put,
+        ntohs(msg->size));
+  aput->hop_count = htons(hop_count + 1);
+
+
+  for (i = 0; i < target_value; i++)
     {
       if (GNUNET_OK !=
           GNUNET_DHT_select_peer (&next[i], &put->key, &next[0], i))
@@ -575,33 +618,21 @@
                          i, PUT_TRIES);
 #endif
           store = 1;
-          break;
         }
       if (1 == GNUNET_hash_xorcmp (&next[i].hashPubKey,
                                    &coreAPI->myIdentity->hashPubKey,
                                    &put->key))
-        {
+       store = 1;            /* we're closer than the selected target */
 #if DEBUG_ROUTING
-          GNUNET_GE_LOG (coreAPI->ectx,
-                         GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
-                         GNUNET_GE_DEVELOPER,
-                         "We are closer than selected peer for PUT in round 
%d/%d\n",
-                         i, PUT_TRIES);
+      GNUNET_hash_to_enc (&next[i].hashPubKey, &enc);
+      GNUNET_GE_LOG (coreAPI->ectx,
+                    GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
+                    GNUNET_GE_DEVELOPER,
+                    "Forwarding DHT PUT request to peer `%s'.\n", &enc);
 #endif
-          store = 1;            /* we're closer than the selected target */
-        }
-      else
-        {
-#if DEBUG_ROUTING
-          GNUNET_hash_to_enc (&next[i].hashPubKey, &enc);
-          GNUNET_GE_LOG (coreAPI->ectx,
-                         GNUNET_GE_DEBUG | GNUNET_GE_REQUEST |
-                         GNUNET_GE_DEVELOPER,
-                         "Forwarding DHT PUT request to peer `%s'.\n", &enc);
-#endif
-          coreAPI->unicast (&next[i], msg, DHT_PRIORITY, DHT_DELAY);
-        }
+      coreAPI->unicast (&next[i], &aput->header, DHT_PRIORITY, DHT_DELAY);
     }
+  GNUNET_free(aput);
   if (store != 0)
     {
       now = GNUNET_get_time ();
@@ -707,7 +738,7 @@
                      unsigned int type, GNUNET_ResultProcessor handler,
                      void *cls)
 {
-  int i;
+  unsigned int i;
   struct DHT_Source_Route *pos;
   struct DHT_Source_Route *prev;
   int done;
@@ -716,19 +747,17 @@
   GNUNET_mutex_lock (lock);
   for (i = 0; i < rt_size; i++)
     {
-      if (records[i] == NULL)
-        continue;
       prev = NULL;
-      pos = records[i]->sources;
+      pos = records[i].sources;
       while (pos != NULL)
         {
-          if ((pos->receiver == handler) &&
-              (pos->receiver_closure == cls) &&
-              (0 == memcmp (key,
-                            &records[i]->get->key, sizeof (GNUNET_HashCode))))
+          if ( (pos->receiver == handler) &&
+              (pos->receiver_closure == cls) &&
+              (0 == memcmp (key,
+                            &records[i].get.key, sizeof (GNUNET_HashCode))))
             {
               if (prev == NULL)
-                records[i]->sources = pos->next;
+                records[i].sources = pos->next;
               else
                 prev->next = pos->next;
               GNUNET_free (pos);
@@ -738,11 +767,12 @@
           prev = pos;
           pos = prev->next;
         }
-      if (records[i]->sources == NULL)
+      if (records[i].sources == NULL)
         {
-          GNUNET_free (records[i]->get);
-          GNUNET_free (records[i]);
-          records[i] = NULL;
+         GNUNET_array_grow(records[i].results,
+                           records[i].result_count,
+                           0);
+          records[i].expire = 0;
         }
       if (done == GNUNET_YES)
         break;
@@ -858,6 +888,7 @@
 GNUNET_DHT_done_routing ()
 {
   unsigned int i;
+  struct DHT_Source_Route *pos;
 
   coreAPI->
     connection_unregister_send_callback (sizeof (DHT_MESSAGE),
@@ -871,13 +902,17 @@
       stats = NULL;
     }
   GNUNET_mutex_destroy (lock);
-  for (i = 0; i < rt_size; i++)
+ for (i = 0; i < rt_size; i++)
     {
-      if (records[i] != NULL)
+      while (records[i].sources != NULL)
         {
-          GNUNET_free (records[i]->get);
-          GNUNET_free (records[i]);
+         pos = records[i].sources;
+         records[i].sources = pos->next;
+         GNUNET_free (pos);
         }
+      GNUNET_array_grow(records[i].results,
+                       records[i].result_count,
+                       0);
     }
   GNUNET_array_grow (records, rt_size, 0);
   coreAPI->release_service (dstore);

Modified: GNUnet/src/applications/dht/module/table.c
===================================================================
--- GNUnet/src/applications/dht/module/table.c  2007-12-15 08:56:22 UTC (rev 
5903)
+++ GNUnet/src/applications/dht/module/table.c  2007-12-15 09:12:52 UTC (rev 
5904)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      (C) 2006 Christian Grothoff (and other contributing authors)
+      (C) 2006, 2007 Christian Grothoff (and other contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -34,8 +34,9 @@
  *   + table.c: DHT-peer table, peer discovery cron jobs;
  *     code tries to fill table "as much as possible" over time;
  *     TODO: expose and improve reliabily metrics (to be added later)
- *   + dstore.c + plugin: SQL-based datastore: key, value, expiration
- *     (bounded FIFO-datastore, when full, kill oldest entry first)
+ *     TODO: better randomized neighbor selection in DHT_select_peer
+ *     TODO: add callback for discovery-message padding (use core callback
+ *           for extra-available bandwidth)
  *   + routing.c: tracking of get/put operations, retry, reply handling
  *     code tries best-match routing among entries in table
  *   + service.c: provide DHT services to rest of GNUnet process
@@ -226,6 +227,22 @@
 } P2P_DHT_ASK_HELLO;
 
 /**
+ * Compute a (rough) estimate of the networks diameter.
+ *
+ * @return estimated network diameter
+ */
+unsigned int GNUNET_DHT_estimate_network_diameter () 
+{
+  unsigned int i;
+  for (i=bucketCount-1;i>0;i--)
+    {
+      if (buckets[i].peers_size > 0)
+       break;
+    }
+  return i + 1;
+}
+
+/**
  * Get the index of the lowest bit of the two GNUNET_hash codes that
  * differs.
  */
@@ -574,6 +591,7 @@
           bucket->peers[i] = bucket->peers[bucket->peers_size - 1];
           GNUNET_array_grow (bucket->peers, bucket->peers_size,
                              bucket->peers_size - 1);
+         i--;
         }
     }
 }
@@ -652,13 +670,13 @@
      sizeof (P2P_DHT_Discovery)) / sizeof (GNUNET_PeerIdentity);
   if (pc > MAINTAIN_ADV_CAP * 8)
     {
-      GNUNET_GE_BREAK (coreAPI->ectx, 0);
+      GNUNET_GE_BREAK_OP (coreAPI->ectx, 0);
       return GNUNET_SYSERR;     /* far too big */
     }
   if (ntohs (msg->size) !=
       sizeof (P2P_DHT_Discovery) + pc * sizeof (GNUNET_PeerIdentity))
     {
-      GNUNET_GE_BREAK (coreAPI->ectx, 0);
+      GNUNET_GE_BREAK_OP (coreAPI->ectx, 0);
       return GNUNET_SYSERR;     /* malformed */
     }
   disco = (const P2P_DHT_Discovery *) msg;
@@ -689,7 +707,10 @@
   GNUNET_MessageHello *hello;
 
   if (ntohs (msg->size) != sizeof (P2P_DHT_ASK_HELLO))
-    return GNUNET_SYSERR;
+    {
+      GNUNET_GE_BREAK_OP (coreAPI->ectx, 0);
+      return GNUNET_SYSERR;
+    }
   ask = (const P2P_DHT_ASK_HELLO *) msg;
   if (NULL == findBucketFor (&ask->peer))
     return GNUNET_OK;

Modified: GNUnet/src/applications/dht/module/table.h
===================================================================
--- GNUnet/src/applications/dht/module/table.h  2007-12-15 08:56:22 UTC (rev 
5903)
+++ GNUnet/src/applications/dht/module/table.h  2007-12-15 09:12:52 UTC (rev 
5904)
@@ -48,6 +48,13 @@
                             unsigned int blocked_size);
 
 /**
+ * Compute a (rough) estimate of the networks diameter.
+ *
+ * @return estimated network diameter
+ */
+unsigned int GNUNET_DHT_estimate_network_diameter (void);
+
+/**
  * Initialize table DHT component.
  *
  * @param capi the core API





reply via email to

[Prev in Thread] Current Thread [Next in Thread]