[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3849 - in GNUnet/src: applications/dht/module include
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3849 - in GNUnet/src: applications/dht/module include |
Date: |
Sat, 2 Dec 2006 14:01:40 -0800 (PST) |
Author: grothoff
Date: 2006-12-02 14:01:36 -0800 (Sat, 02 Dec 2006)
New Revision: 3849
Added:
GNUnet/src/applications/dht/module/table.h
Modified:
GNUnet/src/applications/dht/module/Makefile.am
GNUnet/src/applications/dht/module/table.c
GNUnet/src/include/gnunet_protocols.h
Log:
dht hacking
Modified: GNUnet/src/applications/dht/module/Makefile.am
===================================================================
--- GNUnet/src/applications/dht/module/Makefile.am 2006-12-02 05:45:23 UTC
(rev 3848)
+++ GNUnet/src/applications/dht/module/Makefile.am 2006-12-02 22:01:36 UTC
(rev 3849)
@@ -28,7 +28,7 @@
$(top_builddir)/src/util/crypto/libgnunetutil_crypto.la \
$(top_builddir)/src/util/libgnunetutil.la
libgnunetdht_datastore_memory_la_LDFLAGS = \
- -export-dynamic
+ -export-dynamic -lm
check_PROGRAMS = \
Modified: GNUnet/src/applications/dht/module/table.c
===================================================================
--- GNUnet/src/applications/dht/module/table.c 2006-12-02 05:45:23 UTC (rev
3848)
+++ GNUnet/src/applications/dht/module/table.c 2006-12-02 22:01:36 UTC (rev
3849)
@@ -1,4 +1,4 @@
- /*
+/*
This file is part of GNUnet
(C) 2006 Christian Grothoff (and other contributing authors)
@@ -20,7 +20,7 @@
/**
* @file module/table.c
- * @brief maintains routing table
+ * @brief maintains table of DHT connections of this peer
* @author Christian Grothoff
*
* New DHT infrastructure plan:
@@ -33,30 +33,21 @@
* + table.c: DHT-peer table, peer discovery cron jobs;
* code tries to fill table "as much as possible" over time;
* reliabily metrics (to be added later)
- * + discovery.c: support code to supply peers with neighbour
- * information to improve routing tables (HELLO lookup)
* + routing.c: tracking of get/put operations, retry, reply handling
* code tries best-match routing among entries in table
* + dstore (plugin): SQL-based datastore: key, value, expiration
* (bounded FIFO-datastore, when full, kill oldest entry first)
* [?: better replacement policy to guard against attacks?]
- *
- * TODO:
- * - peer-in-proximity selection
- * - public internal table API
- * - HELLO request when learning
- * - tracking of live connections, expiration of stale entries
- * - tracking of peer latency and drop rates
- * - extension of protocols.h header with new DHT ID
*/
#include "platform.h"
-#include "gnunet_util.h"
-#include "gnunet_core.h"
+#include "table.h"
#include "gnunet_dht_service.h"
#include "gnunet_stats_service.h"
#include "gnunet_identity_service.h"
+#include "gnunet_pingpong_service.h"
+
/**
* How often should the cron job for maintaining the DHT
* run?
@@ -70,6 +61,11 @@
#define MAINTAIN_CHANCE 100
/**
+ * How long can a peer be inactive before we tiem it out?
+ */
+#define MAINTAIN_PEER_TIMEOUT MAINTAIN_FREQUENCY * MAINTAIN_CHANCE * 4
+
+/**
* What is the maximum number of known DHT-enabled peers
* advertised for each DISCOVERY message?
*/
@@ -78,8 +74,9 @@
/**
* Target number of peers per bucket
*/
-#define MAINTAIN_BUCKET_SIZE 2
+#define MAINTAIN_BUCKET_SIZE 4
+
/**
* Per-peer information.
*/
@@ -91,12 +88,6 @@
cron_t lastActivity;
/**
- * What was the last time we received a table status message
- * from this peer?
- */
- cron_t lastTableRefresh;
-
- /**
* What was the last time we send a PING to this peer?
*/
cron_t lastTimePingSend;
@@ -107,11 +98,16 @@
cron_t expected_latency;
/**
- * What is the average response rate?
+ * Number of responses received
*/
- double drop_rate;
+ unsigned long long response_count;
/**
+ * Number of requests sent
+ */
+ unsigned long long request_count;
+
+ /**
* What is the identity of the peer?
*/
PeerIdentity id;
@@ -179,6 +175,10 @@
*/
static Stats_ServiceAPI * stats;
+/**
+ * Pingpong service.
+ */
+static PingPong_ServiceAPI * pingpong;
static int stat_dht_total_peers;
@@ -186,6 +186,11 @@
static int stat_dht_route_looks;
+/**
+ * The struct is followed by zero or more
+ * PeerIdentities that the sender knows to
+ * be participating in the DHT.
+ */
typedef struct {
MESSAGE_HEADER header;
@@ -194,8 +199,55 @@
} P2P_DHT_Discovery;
-static PeerBucket * findBucketFor(const PeerIdentity * peer) {
- /* FIXME! */
+/**
+ * Request for a HELLO for another peer that is participating in the
+ * DHT. Receiver is expected to send back a HELLO for the peer that
+ * is being requested.
+ */
+typedef struct {
+
+ MESSAGE_HEADER header;
+
+ unsigned int reserved;
+
+ PeerIdentity peer;
+
+} P2P_DHT_ASK_HELLO;
+
+/**
+ * Get the index of the lowest bit of the two hash codes that
+ * differs.
+ */
+static unsigned int get_bit_distance(const HashCode512 * h1,
+ const HashCode512 * h2) {
+ unsigned int i;
+ int diff;
+
+ for (i=0;i<sizeof(HashCode512)*8;i++) {
+ diff = getHashCodeBit(h1, i) - getHashCodeBit(h2, i);
+ if (diff != 0)
+ return i;
+ }
+ return sizeof(HashCode512)*8;
+}
+
+/**
+ * @return NULL if peer is the current host
+ */
+static PeerBucket *
+findBucketFor(const PeerIdentity * peer) {
+ unsigned int index;
+ int i;
+
+ index = get_bit_distance(&peer->hashPubKey,
+ &coreAPI->myIdentity->hashPubKey);
+ i = bucketCount-1;
+ while ( (buckets[i].bstart >= index) &&
+ (i > 0) )
+ i--;
+ if ( (buckets[i].bstart < index) &&
+ (buckets[i].bend >= index) )
+ return &buckets[i];
return NULL;
}
@@ -203,14 +255,56 @@
* Find the PeerInfo for the given peer. Returns NULL if peer is not
* in our DHT routing table.
*/
-static PeerInfo * findPeerEntry(const PeerIdentity * peer) {
- PeerBucket * bucket;
+static PeerInfo *
+findPeerEntryInBucket(PeerBucket * bucket,
+ const PeerIdentity * peer) {
+ unsigned int i;
- bucket = findBucketFor(peer);
+ if (bucket == NULL)
+ return NULL;
+ for (i=0;i<bucket->peers_size;i++)
+ if (0 == memcmp(peer,
+ &bucket->peers[i]->id,
+ sizeof(PeerIdentity)))
+ return bucket->peers[i];
return NULL;
}
/**
+ * Find the PeerInfo for the given peer. Returns NULL if peer is not
+ * in our DHT routing table.
+ */
+static PeerInfo *
+findPeerEntry(const PeerIdentity * peer) {
+ return findPeerEntryInBucke(findBucketFor(peer),
+ peer);
+}
+
+/**
+ * Return a number that is the larger the closer the
+ * "have" hash code is to the "target". The basic
+ * idea is that if "have" would be in the n-th lowest
+ * bucket of "target", the returned value should be
+ * 2^n. However, the largest number we can return
+ * is 2^31, so this number may have to be scaled.
+ *
+ * @return inverse distance metric, non-zero.
+ */
+static unsigned int inverse_distance(const HashCode512 * target,
+ const HashCode512 * have) {
+ unsigned int bucket;
+ double d;
+
+ bucket = get_bit_distance(target,
+ have);
+ d = bucket * 32;
+ d = exp2(d / (sizeof(HashCode512)*8));
+ if (d > ((unsigned int)-1))
+ return -1;
+ return (unsigned int) d;
+}
+
+/**
* Select a peer from the routing table that would be a good routing
* destination for sending a message for "target". The resulting peer
* must not be in the set of blocked peers.<p>
@@ -222,18 +316,80 @@
*
* @return OK on success, SYSERR on error
*/
-static int selectPeer(PeerIdentity * set,
- const HashCode512 * target,
- const PeerIdentity * blocked,
- unsigned int blocked_size) {
+int select_dht_peer(PeerIdentity * set,
+ const HashCode512 * target,
+ const PeerIdentity * blocked,
+ unsigned int blocked_size) {
+ unsigned long long total_distance;
+ unsigned long long selected;
+ unsigned int distance;
+ unsigned int bc;
+ unsigned int ec;
+ unsigned int i;
+ int match;
+ PeerBucket * bucket;
+ PeerInfo * pi;
+
MUTEX_LOCK(lock);
- /* fixme: select peers */
- MUTEX_UNLOCK(lock);
if (stats != NULL)
stats->update(stat_dht_route_looks, 1);
+ for (bc=0;bc<bucketCount;bc++) {
+ bucket = &buckets[bc];
+ for (ec=0;ec<bucket->peers_size;ec++) {
+ pi = bucket->peers[ec];
+ match = NO;
+ for (i=0;i<blocked_size;i++) {
+ if (0 == memcmp(&pi->id,
+ &blocked[i])) {
+ match = YES;
+ break;
+ }
+ }
+ if (match == YES)
+ continue;
+ total_distance += inverse_distance(target,
+ &pi->id.hashPubKey);
+ }
+ }
+ if (total_distance == 0) {
+ MUTEX_UNLOCK(lock);
+ return SYSERR;
+ }
+ selected = weak_randomi64(total_distance);
+ for (bc=0;bc<bucketCount;bc++) {
+ bucket = &buckets[bc];
+ for (ec=0;ec<bucket->peers_size;ec++) {
+ pi = bucket->peers[ec];
+ match = NO;
+ for (i=0;i<blocked_size;i++) {
+ if (0 == memcmp(&pi->id,
+ &blocked[i])) {
+ match = YES;
+ break;
+ }
+ }
+ if (match == YES)
+ continue;
+ distance = inverse_distance(target,
+ &pi->id.hashPubKey);
+ if (distance > selected) {
+ set = pi->id;
+ MUTEX_UNLOCK(lock);
+ return OK;
+ }
+ selected -= distance;
+ }
+ }
+ GE_BREAK(NULL, 0);
+ MUTEX_UNLOCK(lock);
return SYSERR;
}
+/**
+ * Send a discovery message to the other peer.
+ *
+ * @param cls NULL or pre-build discovery message
+ */
static void broadcast_dht_discovery(const PeerIdentity * other,
void * cls) {
P2P_DHT_Discovery * disco = cls;
@@ -241,8 +397,6 @@
unsigned int i;
PeerIdentity * pos;
- if (weak_randomi(MAINTAIN_CHANCE) != 0)
- return;
if (disco != NULL) {
coreAPI->unicast(other,
&disco->header,
@@ -267,10 +421,10 @@
i = 1;
}
while (i < pc) {
- if (OK != selectPeer(&pos[i],
- &other->hashPubKey,
- pos,
- i))
+ if (OK != select_dht_peer(&pos[i],
+ &other->hashPubKey,
+ pos,
+ i))
pc--;
else
i++;
@@ -283,6 +437,13 @@
FREE(disco);
}
+static void broadcast_dht_discovery_prob(const PeerIdentity * other,
+ void * cls) {
+ if (weak_randomi(MAINTAIN_CHANCE) != 0)
+ return;
+ broadcast_dht_discovery(other, cls);
+}
+
/**
* Cron job to maintain DHT routing table.
*/
@@ -293,23 +454,158 @@
disc.header.size = htons(sizeof(P2P_DHT_Discovery));
disc.header.type = htons(P2P_PROTO_DHT_DISCOVERY);
disc.space_available = -1; /* FIXME */
- coreAPI->forAllConnectedNodes(&broadcast_dht_discovery,
+ coreAPI->forAllConnectedNodes(&broadcast_dht_discovery_prob,
&disc);
} else {
- coreAPI->forAllConnectedNodes(&broadcast_dht_discovery,
+ coreAPI->forAllConnectedNodes(&broadcast_dht_discovery_prob,
NULL);
}
}
/**
+ * We have received a pong from a peer and know it is still
+ * there.
+ */
+static void pongNotify(void * cls) {
+ PeerIdentity * peer = cls;
+ PeerInfo * pi;
+
+ pi = findPeerEntry(peer);
+ if (pi != NULL) {
+ pi->lastActivity = get_time();
+ pi->expected_latency = pi->lastActivity - pi->lastTimePingSend;
+ pi->response_count++;
+ }
+ FREE(peer);
+}
+
+/**
+ * Send a ping to the given peer to check if it is still
+ * running.
+ */
+static void pingPeer(PeerInfo * pi) {
+ PeerIdentity * p;
+
+ p = MALLOC(sizeof(PeerIdentity));
+ *p = pi->id;
+ if (OK == pingpong->ping(p,
+ NO,
+ &pongNotify,
+ p)) {
+ pi->lastTimePingSend = get_time();
+ pi->request_count++;
+ }
+}
+
+/**
+ * Check if pi is still up and running. May also try
+ * to confirm that the peer is still live.
+ *
+ * @return YES if the peer should be removed from the DHT table
+ */
+static int checkExpired(PeerInfo * pi) {
+ cron_t now;
+
+ now = get_time();
+ if (pi->lastActivity >= now)
+ return NO;
+ if (now - pi->lastActivity > MAINTAIN_PEER_TIMEOUT)
+ return YES;
+ if (now - pi->lastActivity > MAINTAIN_PEER_TIMEOUT / 2)
+ pingPeer(pi);
+ return NO;
+}
+
+/**
+ * Check for expired peers in the given bucket.
+ */
+static void checkExpiration(PeerBucket * bucket) {
+ unsigned int i;
+ PeerInfo * peer;
+
+ for (i=0;i<bucket->peers_size;i++) {
+ peer = bucket->peers[i];
+ if (checkExpired(peer) == YES) {
+ total_peers--;
+ if (stats != NULL)
+ stats->update(stat_dht_total_peers, -1);
+ FREE(peer);
+ bucket->peers[i] = bucket->peers[bucket->peers_size-1];
+ GROW(bucket->peers,
+ bucket->peeers_size,
+ bucket->peeers_size - 1);
+ }
+ }
+}
+
+/**
+ * Consider adding the given peer to the DHT.
+ */
+static void considerPeer(const PeerIdentity * peer) {
+ unsigned int pc;
+ PeerInfo * pi;
+ PeerBucket * bucket;
+ const P2P_DHT_Discovery * disco;
+ P2P_DHT_ASK_HELLO ask;
+ P2P_hello_MESSAGE * hello;
+
+ bucket = findBucketFor(peer);
+ if (bucket == NULL)
+ continue; /* peers[i] == self */
+ if (bucket->peers_size >= MAINTAIN_BUCKET_SIZE)
+ checkExpiration(bucket);
+ if (bucket->peers_size >= MAINTAIN_BUCKET_SIZE)
+ continue; /* do not care */
+ if (NULL != findPeerEntryInBucket(bucket,
+ peer))
+ continue; /* already have this peer in buckets */
+ /* do we know how to contact this peer? */
+ hello = identity->identity2Helo(peer,
+ ANY_PROTOCOL_NUMBER,
+ NO);
+ if (hello == NULL) {
+ /* if identity not known, ask sender for HELLO of other peer */
+ ask.header.size = htons(sizeof(P2P_DHT_ASK_HELLO));
+ ask.header.type = htons(sizeof(P2P_PROTO_DHT_ASK_HELLO));
+ ask.reserved = 0;
+ ask.peer = *peers[i];
+ coreAPI->unicast(sender,
+ &ask.header,
+ 0, /* FIXME: priority */
+ 5 * CRON_SECONDS);
+ continue;
+ }
+ FREE(hello);
+ /* check if connected, if not, send discovery */
+ if (0 == coreAPI->queryBPMfromPeer(peer)) {
+ /* not yet connected; connect sending DISCOVERY */
+ broadcast_dht_discovery(peer,
+ NULL);
+ continue;
+ }
+ /* we are connected (in core), add to bucket */
+ pi = MALLOC(sizeof(PeerInfo));
+ memset(pi,
+ 0,
+ sizeof(PeerInfo));
+ pi->id = *peer;
+ pingPeer(pi);
+ GROW(bucket->peers,
+ bucket->peers_size,
+ bucket->peers_size + 1);
+ bucket->peers[bucket->peers_size-1] = pi;
+ total_peers++;
+ if (stats != NULL)
+ stats->update(stat_dht_total_peers, 1);
+}
+
+/**
* Handle discovery message.
*/
static int handleDiscovery(const PeerIdentity * sender,
const MESSAGE_HEADER * msg) {
unsigned int pc;
unsigned int i;
- PeerBucket * bucket;
- const PeerIdentity * peers;
const P2P_DHT_Discovery * disco;
pc = (ntohs(msg->size) - sizeof(P2P_DHT_Discovery)) / sizeof(PeerIdentity);
@@ -331,22 +627,41 @@
return OK;
}
MUTEX_LOCK(lock);
+ considerPeer(sender);
peers = (const PeerIdentity*) &disco[1];
- for (i=0;i<pc;i++) {
- bucket = findBucketFor(&peers[i]);
- if (bucket->peers_size >= MAINTAIN_BUCKET_SIZE)
- continue; /* do not care */
- /* FIXME: learn about connection opportunities */
- /* if identity not known, ask sender for HELLO of other peer */
- /* if identity known, connect (sending DISCOVERY) */
- /* if connected (in core), add to bucket */
-
- }
+ for (i=0;i<pc;i++)
+ considerPeer(&peers[i]);
MUTEX_UNLOCK(lock);
return OK;
}
/**
+ * Handle ask hello message.
+ */
+static int handleAskHello(const PeerIdentity * sender,
+ const MESSAGE_HEADER * msg) {
+ const P2P_DHT_ASK_HELLO * ask;
+ P2P_hello_MESSAGE * hello;
+
+ if (ntohs(msg->size) != sizeof(P2P_DHT_ASK_HELLO))
+ return SYSERR;
+ ask = (const P2P_DHT_ASK_HELLO *) msg;
+ if (NULL == findBucketFor(&ask->peer))
+ return OK;
+ hello = identity->identity2Helo(&ask->peer,
+ ANY_PROTOCOL_NUMBER,
+ NO);
+ if (hello == NULL)
+ return OK;
+ coreAPI->unicast(sender,
+ &hello->header,
+ 0,
+ 5 * CRON_SECONDS);
+ FREE(hello);
+ return OK;
+}
+
+/**
* Initialize table DHT component.
*
* @param capi the core API
@@ -358,16 +673,11 @@
coreAPI = capi;
ectx = capi->ectx;
- /* FIXME: this should depend on core's target
- connection count, not on the end-user! */
- if (-1 == GC_get_configuration_value_number(capi->cfg,
- "DHT",
- "BUCKETCOUNT",
- 1,
- 512,
- 512,
- &i))
- return SYSERR;
+ /* use less than 50% of peer's ideal number of
+ connections for DHT table size */
+ i = coreAPI->getSlotCount() / MAINTAIN_BUCKET_SIZE / 2;
+ if (i < 4)
+ i = 4;
GROW(buckets,
bucketCount,
i);
@@ -384,9 +694,12 @@
}
identity = coreAPI->requestService("identity");
GE_ASSERT(ectx, identity != NULL);
-
+ pingpong = coreAPI->requestService("pingpong");
+ GE_ASSERT(ectx, pingpong != NULL);
capi->registerHandler(P2P_PROTO_DHT_DISCOVERY,
&handleDiscovery);
+ capi->registerHandler(P2P_PROTO_ASK_HELLO,
+ &handleAskHello);
cron_add_job(coreAPI->cron_manager,
&maintain_dht_job,
MAINTAIN_FREQUENCY,
@@ -405,6 +718,8 @@
capi->unregisterHandler(P2P_PROTO_DHT_DISCOVERY,
&handleDiscovery);
+ capi->unregisterHandler(P2P_PROTO_ASK_HELLO,
+ &handleAskHello);
cron_del_job(coreAPI->cron_manager,
&maintain_dht_job,
MAINTAIN_FREQUENCY);
@@ -414,6 +729,8 @@
}
coreAPI->releaseService(identity);
identity = NULL;
+ coreAPI->releaseService(pingpong);
+ pingpong = NULL;
for (i=0;i<bucketCount;i++) {
GROW(buckets[i]->peers,
buckets[i]->peers_size,
Added: GNUnet/src/applications/dht/module/table.h
===================================================================
--- GNUnet/src/applications/dht/module/table.h 2006-12-02 05:45:23 UTC (rev
3848)
+++ GNUnet/src/applications/dht/module/table.h 2006-12-02 22:01:36 UTC (rev
3849)
@@ -0,0 +1,66 @@
+/*
+ This file is part of GNUnet
+ (C) 2006 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * @file module/table.h
+ * @brief DHT connection table internal API
+ * @author Christian Grothoff
+ */
+
+#ifndef DHT_TABLE_H
+#define DHT_TABLE_H
+
+#include "gnunet_util.h"
+#include "gnunet_core.h"
+
+/**
+ * Select a peer from the routing table that would be a good routing
+ * destination for sending a message for "target". The resulting peer
+ * must not be in the set of blocked peers.<p>
+ *
+ * Note that we should not ALWAYS select the closest peer to the
+ * target, peers further away from the target should be chosen with
+ * exponentially declining probability (this function is also used for
+ * populating the target's routing table).
+ *
+ * @return OK on success, SYSERR on error
+ */
+int select_dht_peer(PeerIdentity * set,
+ const HashCode512 * target,
+ const PeerIdentity * blocked,
+ unsigned int blocked_size);
+
+/**
+ * Initialize table DHT component.
+ *
+ * @param capi the core API
+ * @return OK on success
+ */
+int init_dht_table(CoreAPIForApplication * capi);
+
+/**
+ * Shutdown table DHT component.
+ *
+ * @param capi the core API
+ * @return OK on success
+ */
+int done_dht_table(void);
+
+#endif
Modified: GNUnet/src/include/gnunet_protocols.h
===================================================================
--- GNUnet/src/include/gnunet_protocols.h 2006-12-02 05:45:23 UTC (rev
3848)
+++ GNUnet/src/include/gnunet_protocols.h 2006-12-02 22:01:36 UTC (rev
3849)
@@ -402,6 +402,14 @@
#define P2P_PROTO_rpc_RES 43
#define P2P_PROTO_rpc_ACK 44
+/************** p2p DHT application messages ************/
+
+#define P2P_PROTO_DHT_DISCOVERY 45
+#define P2P_PROTO_DHT_ASK_HELLO 46
+#define P2P_PROTO_DHT_GET 47
+#define P2P_PROTO_DHT_PUT 48
+
+
/* ************* p2p VPN messages ************* */
#define P2P_PROTO_aip_IP 64 /* contains IPv6 frame */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3849 - in GNUnet/src: applications/dht/module include,
grothoff <=