[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3850 - in GNUnet/src: applications/dht/module applications
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3850 - in GNUnet/src: applications/dht/module applications/dht/tools applications/fs/module include |
Date: |
Sat, 2 Dec 2006 14:28:43 -0800 (PST) |
Author: grothoff
Date: 2006-12-02 14:28:37 -0800 (Sat, 02 Dec 2006)
New Revision: 3850
Modified:
GNUnet/src/applications/dht/module/cs.c
GNUnet/src/applications/dht/module/dht.c
GNUnet/src/applications/dht/module/table.c
GNUnet/src/applications/dht/module/table.h
GNUnet/src/applications/dht/tools/Makefile.am
GNUnet/src/applications/dht/tools/dht-query.c
GNUnet/src/applications/dht/tools/dht_api.c
GNUnet/src/applications/fs/module/fs.c
GNUnet/src/include/gnunet_dht.h
GNUnet/src/include/gnunet_dht_lib.h
GNUnet/src/include/gnunet_dht_service.h
GNUnet/src/include/gnunet_protocols.h
Log:
dht hacking
Modified: GNUnet/src/applications/dht/module/cs.c
===================================================================
--- GNUnet/src/applications/dht/module/cs.c 2006-12-02 22:01:36 UTC (rev
3849)
+++ GNUnet/src/applications/dht/module/cs.c 2006-12-02 22:28:37 UTC (rev
3850)
@@ -42,66 +42,7 @@
*/
static DHT_ServiceAPI * dhtAPI;
-/**
- * Information for each table for which persistence is provided
- * by a local client via the TCP link.
- */
typedef struct {
- /**
- * Handle to access the client.
- */
- struct ClientHandle * handler;
- /**
- * For which table is this client responsible?
- */
- DHT_TableId table;
-
- /**
- * What was the Blockstore that was passed to the DHT service?
- * (must be a pointer since this reference is passed out).
- */
- Blockstore * store;
-
- /**
- * Semaphore that is aquired before using the maxResults
- * and results fields for sending a request to the client.
- * Released after the request has been processed.
- */
- struct SEMAPHORE * prerequest;
-
- /**
- * Semaphore that is up'ed by the client handler whenever a reply
- * was received. The client exit handler also needs to up this
- * semaphore to unblock threads that wait for replies.
- */
- struct SEMAPHORE * prereply;
-
- /**
- * Semaphore that is down'ed by the client handler before storing
- * the data from a reply. The cs-functions need to up it
- * once they have prepared the handlers.
- */
- struct SEMAPHORE * postreply;
-
- /**
- * Function to call for results
- */
- DataProcessor resultCallback;
-
- /**
- * Extra argument to result callback.
- */
- void * resultCallbackClosure;
-
- /**
- * Status value; used to communciate errors (typically using
- * SYSERR/OK or number of results).
- */
- int status;
-
-} DHT_CLIENT_TableHandlers;
-
-typedef struct {
struct ClientHandle * client;
struct DHT_PUT_RECORD * put_record;
DHT_TableId table;
@@ -110,13 +51,6 @@
typedef struct {
struct ClientHandle * client;
- struct DHT_REMOVE_RECORD * remove_record;
- DHT_TableId table;
- unsigned int replicas; /* confirmed dels? */
-} DHT_CLIENT_REMOVE_RECORD;
-
-typedef struct {
- struct ClientHandle * client;
struct DHT_GET_RECORD * get_record;
DHT_TableId table;
unsigned int count;
@@ -130,216 +64,14 @@
static unsigned int putRecordsSize;
-static DHT_CLIENT_REMOVE_RECORD ** removeRecords;
-
-static unsigned int removeRecordsSize;
-
/**
- * If clients provide a datastore implementation for a table,
- * we keep the corresponding client handler in this array.
+ * Lock.
*/
-static DHT_CLIENT_TableHandlers ** csHandlers;
-
-/**
- * Size of the csHandlers array.
- */
-static unsigned int csHandlersCount;
-
-/**
- * Lock for accessing csHandlers.
- */
static struct MUTEX * csLock;
static struct GE_Context * ectx;
-/* ******* implementation of Blockstore via TCP link ********** */
-/**
- * Lookup an item in the datastore.
- *
- * @param key the value to lookup
- * @param maxResults maximum number of results
- * @param results where to store the result
- * @return number of results, SYSERR on error
- */
-static int tcp_get(void * closure,
- unsigned int type,
- unsigned int prio,
- unsigned int keyCount,
- const HashCode512 * keys,
- DataProcessor resultCallback,
- void * resCallbackClosure) {
- CS_dht_request_get_MESSAGE * req;
- unsigned short size;
- DHT_CLIENT_TableHandlers * handlers = closure;
- int ret;
-
- if (keyCount < 1) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
-
- SEMAPHORE_DOWN(handlers->prerequest, YES);
- handlers->resultCallback = resultCallback;
- handlers->resultCallbackClosure = resCallbackClosure;
- handlers->status = 0;
- size = sizeof(CS_dht_request_get_MESSAGE) +
- (keyCount-1) * sizeof(HashCode512);
- if (((unsigned int)size)
- != sizeof(CS_dht_request_get_MESSAGE) +
- (keyCount-1) * sizeof(HashCode512)) {
- SEMAPHORE_UP(handlers->prerequest);
- return SYSERR; /* too many keys, size > rangeof(short) */
- }
- req = MALLOC(size);
- req->header.size = htons(size);
- req->header.type = htons(CS_PROTO_dht_REQUEST_GET);
- req->type = htonl(type);
- req->priority = htonl(prio);
- req->table = handlers->table;
- memcpy(&req->keys,
- keys,
- sizeof(HashCode512) * keyCount);
- req->timeout = htonll(0);
- if (OK != coreAPI->sendToClient(handlers->handler,
- &req->header)) {
- SEMAPHORE_UP(handlers->prerequest);
- return SYSERR;
- }
- FREE(req);
- SEMAPHORE_UP(handlers->postreply);
- SEMAPHORE_DOWN(handlers->prereply, YES);
- ret = handlers->status;
- SEMAPHORE_UP(handlers->prerequest);
- return ret;
-}
-
-/**
- * Store an item in the datastore.
- *
- * @param key the key of the item
- * @param value the value to store
- * @param prio the priority for the store
- * @return OK if the value could be stored, SYSERR if not (i.e. out of space)
- */
-static int tcp_put(void * closure,
- const HashCode512 * key,
- const DataContainer * value,
- unsigned int prio) {
- CS_dht_request_put_MESSAGE * req;
- DHT_CLIENT_TableHandlers * handlers = closure;
- int ret;
- size_t n;
-
- n = sizeof(CS_dht_request_put_MESSAGE) + ntohl(value->size);
- req = MALLOC(n);
- SEMAPHORE_DOWN(handlers->prerequest, YES);
- handlers->status = 0;
- req->header.size = htons(n);
- req->header.type = htons(CS_PROTO_dht_REQUEST_PUT);
- req->table = handlers->table;
- req->key = *key;
- req->timeout = htonl(0);
- req->priority = htonl(prio);
- memcpy(&req[1],
- value,
- ntohl(value->size));
- if (OK != coreAPI->sendToClient(handlers->handler,
- &req->header)) {
- FREE(req);
- SEMAPHORE_UP(handlers->prerequest);
- return SYSERR;
- }
- FREE(req);
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "Sending STORE request to client!\n");
- SEMAPHORE_UP(handlers->postreply);
- SEMAPHORE_DOWN(handlers->prereply, YES);
- ret = handlers->status;
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "Client confirmed STORE request with status %d!\n",
- ret);
- SEMAPHORE_UP(handlers->prerequest);
- return ret;
-}
-
-/**
- * Remove an item from the datastore.
- * @param key the key of the item
- * @param value the value to remove, NULL for all values of the key
- * @return OK if the value could be removed, SYSERR if not (i.e. not present)
- */
-static int tcp_del(void * closure,
- const HashCode512 * key,
- const DataContainer * value) {
- CS_dht_request_remove_MESSAGE * req;
- DHT_CLIENT_TableHandlers * handlers = closure;
- int ret;
- size_t n;
-
- n = sizeof(CS_dht_request_remove_MESSAGE);
- if (value != NULL)
- n += htonl(value->size);
- req = MALLOC(n);
- SEMAPHORE_DOWN(handlers->prerequest, YES);
- handlers->status = 0;
- req->header.size = htons(n);
- req->header.type = htons(CS_PROTO_dht_REQUEST_REMOVE);
- req->table = handlers->table;
- req->key = *key;
- req->timeout = htonl(0);
- if (value != NULL)
- memcpy(&req[1],
- value,
- htonl(value->size));
- if (OK != coreAPI->sendToClient(handlers->handler,
- &req->header)) {
- FREE(req);
- SEMAPHORE_UP(handlers->prerequest);
- return SYSERR;
- }
- FREE(req);
- SEMAPHORE_UP(handlers->postreply);
- SEMAPHORE_DOWN(handlers->prereply, YES);
- ret = handlers->status;
- SEMAPHORE_UP(handlers->prerequest);
- return ret;
-}
-
-/**
- * Iterate over all keys in the local datastore
- *
- * @param processor function to call on each item
- * @param cls argument to processor
- * @return number of results, SYSERR on error
- */
-static int tcp_iterate(void * closure,
- DataProcessor processor,
- void * cls) {
- CS_dht_request_iterate_MESSAGE req;
- DHT_CLIENT_TableHandlers * handlers = closure;
- int ret;
-
- SEMAPHORE_DOWN(handlers->prerequest, YES);
- handlers->status = 0;
- handlers->resultCallback = processor;
- handlers->resultCallbackClosure = cls;
- req.header.size = htons(sizeof(CS_dht_request_iterate_MESSAGE));
- req.header.type = htons(CS_PROTO_dht_REQUEST_ITERATE);
- if (OK != coreAPI->sendToClient(handlers->handler,
- &req.header)) {
- SEMAPHORE_UP(handlers->prerequest);
- return SYSERR;
- }
- SEMAPHORE_UP(handlers->postreply);
- SEMAPHORE_DOWN(handlers->prereply, YES);
- ret = handlers->status;
- SEMAPHORE_UP(handlers->prerequest);
- return ret;
-}
-
-/* *********************** CS handlers *********************** */
-
static int sendAck(struct ClientHandle * client,
DHT_TableId * table,
int value) {
@@ -353,121 +85,23 @@
&msg.header);
}
-/**
- * CS handler for joining existing DHT-table.
- */
-static int csJoin(struct ClientHandle * client,
- const MESSAGE_HEADER * message) {
- DHT_CLIENT_TableHandlers * ptr;
- CS_dht_request_join_MESSAGE * req;
- int ret;
-
- if (ntohs(message->size) != sizeof(CS_dht_request_join_MESSAGE)) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
- req = (CS_dht_request_join_MESSAGE*) message;
- MUTEX_LOCK(csLock);
- ptr = MALLOC(sizeof(DHT_CLIENT_TableHandlers));
- ptr->store = MALLOC(sizeof(Blockstore));
- ptr->store->iterate = &tcp_iterate;
- ptr->store->del = &tcp_del;
- ptr->store->put = &tcp_put;
- ptr->store->get = &tcp_get;
- ptr->store->closure = ptr;
- ptr->handler = client;
- ptr->table = req->table;
- ptr->prerequest = SEMAPHORE_CREATE(1);
- ptr->prereply = SEMAPHORE_CREATE(0);
- ptr->postreply = SEMAPHORE_CREATE(0);
- ret = dhtAPI->join(ptr->store,
- &req->table);
- if (ret == OK) {
- GROW(csHandlers,
- csHandlersCount,
- csHandlersCount+1);
- csHandlers[csHandlersCount-1] = ptr;
- } else {
- SEMAPHORE_DESTROY(ptr->prerequest);
- SEMAPHORE_DESTROY(ptr->prereply);
- SEMAPHORE_DESTROY(ptr->postreply);
- FREE(ptr->store);
- FREE(ptr);
- }
- ret = sendAck(client,
- &req->table,
- ret);
- MUTEX_UNLOCK(csLock);
- return ret;
-}
-
-/**
- * CS handler for leaving DHT-table.
- */
-static int csLeave(struct ClientHandle * client,
- const MESSAGE_HEADER * message) {
-
- CS_dht_request_leave_MESSAGE * req;
- int i;
- DHT_CLIENT_TableHandlers * ptr;
-
- if (ntohs(message->size) != sizeof(CS_dht_request_leave_MESSAGE)) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
- req = (CS_dht_request_leave_MESSAGE*) message;
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "Client leaving request received!\n");
-
- MUTEX_LOCK(csLock);
- for (i=0;i<csHandlersCount;i++) {
- ptr = csHandlers[i];
- if ( (equalsHashCode512(&ptr->table,
- &req->table)) ) {
- csHandlers[i] = csHandlers[csHandlersCount-1];
- GROW(csHandlers,
- csHandlersCount,
- csHandlersCount-1);
- MUTEX_UNLOCK(csLock);
-
- /* release clients waiting on this DHT */
- ptr->status = SYSERR;
- SEMAPHORE_UP(ptr->prereply);
- SEMAPHORE_DOWN(ptr->prerequest, YES);
- SEMAPHORE_DESTROY(ptr->prerequest);
- SEMAPHORE_DESTROY(ptr->prereply);
- SEMAPHORE_DESTROY(ptr->postreply);
- FREE(ptr->store);
- FREE(ptr);
- return sendAck(client,
- &req->table,
- OK);
- }
- }
- MUTEX_UNLOCK(csLock);
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("`%s' failed: table not found!\n"),
- "CS_DHT_LEAVE");
- return sendAck(client,
- &req->table,
- SYSERR);
-}
-
static void cs_put_abort(void * cls) {
DHT_CLIENT_PUT_RECORD * record = cls;
int i;
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Signaling client put completion: %d\n",
- record->replicas);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "Signaling client put completion: %d\n",
+ record->replicas);
MUTEX_LOCK(csLock);
dhtAPI->put_stop(record->put_record);
if (OK != sendAck(record->client,
&record->table,
record->replicas)) {
- GE_LOG(ectx, GE_ERROR | GE_IMMEDIATE | GE_USER,
- _("`%s' failed. Terminating connection to client.\n"),
- "sendAck");
+ GE_LOG(ectx,
+ GE_ERROR | GE_IMMEDIATE | GE_USER,
+ _("`%s' failed. Terminating connection to client.\n"),
+ "sendAck");
coreAPI->terminateClientConnection(record->client);
}
for (i=putRecordsSize-1;i>=0;i--)
@@ -535,110 +169,6 @@
return OK;
}
-static void cs_remove_abort(DHT_CLIENT_REMOVE_RECORD * record) {
- int i;
-
- dhtAPI->remove_stop(record->remove_record);
- if (OK != sendAck(record->client,
- &record->table,
- record->replicas)) {
- GE_LOG(ectx, GE_ERROR | GE_IMMEDIATE | GE_USER,
- _("sendAck failed. Terminating connection to client.\n"));
- coreAPI->terminateClientConnection(record->client);
- }
- MUTEX_LOCK(csLock);
- for (i=removeRecordsSize-1;i>=0;i--)
- if (removeRecords[i] == record) {
- removeRecords[i] = removeRecords[removeRecordsSize-1];
- GROW(removeRecords,
- removeRecordsSize,
- removeRecordsSize-1);
- break;
- }
- MUTEX_UNLOCK(csLock);
-
- FREE(record);
-}
-
-struct CSRemoveClosure {
- struct ClientHandle * client;
- CS_dht_request_remove_MESSAGE * message;
-};
-
-/**
- * CronJob for removing <key,value>-pairs inserted by this node.
- */
-static void csRemoveJob(struct CSRemoveClosure * cpc) {
- CS_dht_request_remove_MESSAGE * req;
- DataContainer * data;
- DHT_CLIENT_REMOVE_RECORD * ptr;
- struct ClientHandle * client;
- unsigned int size;
-
- req = cpc->message;
- client = cpc->client;
- FREE(cpc);
- size = ntohs(req->header.size)
- - sizeof(CS_dht_request_remove_MESSAGE)
- + sizeof(DataContainer);
- GE_ASSERT(ectx, size < 0xFFFF);
- if (size == 0) {
- data = NULL;
- } else {
- data = MALLOC(size);
- data->size = htonl(size);
- memcpy(&data[1],
- &req[1],
- size - sizeof(DataContainer));
- }
- ptr = MALLOC(sizeof(DHT_CLIENT_REMOVE_RECORD));
- ptr->client = client;
- ptr->replicas = 0;
- ptr->table = req->table;
- ptr->remove_record = NULL;
- MUTEX_LOCK(csLock);
- GROW(removeRecords,
- removeRecordsSize,
- removeRecordsSize+1);
- removeRecords[removeRecordsSize-1] = ptr;
- MUTEX_UNLOCK(csLock);
- ptr->remove_record = dhtAPI->remove_start(&req->table,
- &req->key,
- ntohll(req->timeout),
- data,
- (DHT_OP_Complete) &cs_remove_abort,
- ptr);
- FREE(req);
- FREE(data);
-}
-
-/**
- * CS handler for inserting <key,value>-pair into DHT-table.
- */
-static int csRemove(struct ClientHandle * client,
- const MESSAGE_HEADER * message) {
- struct CSRemoveClosure * cpc;
-
- if (ntohs(message->size) < sizeof(CS_dht_request_remove_MESSAGE)) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
- cpc = MALLOC(sizeof(struct CSRemoveClosure));
- cpc->message = MALLOC(ntohs(message->size));
- memcpy(cpc->message,
- message,
- ntohs(message->size));
- cpc->client = client;
- cron_add_job(coreAPI->cron,
- (CronJob)&csRemoveJob,
- 0,
- 0,
- cpc);
- return OK;
-}
-
-
-
static int cs_get_result_callback(const HashCode512 * key,
const DataContainer * value,
void * cls) {
@@ -652,19 +182,21 @@
memcpy(&msg[1],
value,
ntohl(value->size));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "`%s' processes reply '%.*s'\n",
- __FUNCTION__,
- ntohl(value->size) - sizeof(DataContainer),
- &value[1]);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "`%s' processes reply '%.*s'\n",
+ __FUNCTION__,
+ ntohl(value->size) - sizeof(DataContainer),
+ &value[1]);
msg->table = record->table;
msg->header.size = htons(n);
msg->header.type = htons(CS_PROTO_dht_REPLY_GET);
if (OK != coreAPI->sendToClient(record->client,
&msg->header)) {
- GE_LOG(ectx, GE_ERROR | GE_IMMEDIATE | GE_USER,
- _("`%s' failed. Terminating connection to client.\n"),
- "sendToClient");
+ GE_LOG(ectx,
+ GE_ERROR | GE_IMMEDIATE | GE_USER,
+ _("`%s' failed. Terminating connection to client.\n"),
+ "sendToClient");
coreAPI->terminateClientConnection(record->client);
}
FREE(msg);
@@ -680,18 +212,20 @@
if (OK != sendAck(record->client,
&record->table,
SYSERR)) {
- GE_LOG(ectx, GE_ERROR | GE_IMMEDIATE | GE_USER,
- _("`%s' failed. Terminating connection to client.\n"),
- "sendAck");
+ GE_LOG(ectx,
+ GE_ERROR | GE_IMMEDIATE | GE_USER,
+ _("`%s' failed. Terminating connection to client.\n"),
+ "sendAck");
coreAPI->terminateClientConnection(record->client);
}
} else {
if (OK != sendAck(record->client,
&record->table,
record->count)) {
- GE_LOG(ectx, GE_ERROR | GE_IMMEDIATE | GE_USER,
- _("`%s' failed. Terminating connection to client.\n"),
- "sendAck");
+ GE_LOG(ectx,
+ GE_ERROR | GE_IMMEDIATE | GE_USER,
+ _("`%s' failed. Terminating connection to client.\n"),
+ "sendAck");
coreAPI->terminateClientConnection(record->client);
}
}
@@ -778,98 +312,6 @@
}
/**
- * CS handler for ACKs. Finds the appropriate handler entry, stores
- * the status value in status and up's the semaphore to signal
- * that we received a reply.
- */
-static int csACK(struct ClientHandle * client,
- const MESSAGE_HEADER * message) {
- DHT_CLIENT_TableHandlers * ptr;
- CS_dht_reply_ack_MESSAGE * req;
- int i;
-
- if (ntohs(message->size) != sizeof(CS_dht_reply_ack_MESSAGE)) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
- req =(CS_dht_reply_ack_MESSAGE*) message;
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "`%s' received from client.\n",
- "CS_dht_reply_ack_MESSAGE");
- MUTEX_LOCK(csLock);
- for (i=0;i<csHandlersCount;i++) {
- ptr = csHandlers[i];
- if ( (ptr->handler == client) &&
- (equalsHashCode512(&ptr->table,
- &req->table)) ) {
- SEMAPHORE_DOWN(ptr->postreply, YES);
- ptr->status = ntohl(req->status);
- SEMAPHORE_UP(ptr->prereply);
- MUTEX_UNLOCK(csLock);
- return OK;
- }
- }
- MUTEX_UNLOCK(csLock);
- GE_LOG(ectx, GE_ERROR | GE_BULK | GE_USER,
- _("Failed to deliver `%s' message.\n"),
- "CS_dht_reply_ack_MESSAGE");
- return SYSERR; /* failed to signal */
-}
-
-/**
- * CS handler for results. Finds the appropriate record
- * and passes on the new result. If all results have been
- * collected, signals using the semaphore.
- */
-static int csResults(struct ClientHandle * client,
- const MESSAGE_HEADER * message) {
- CS_dht_reply_results_MESSAGE * req;
- DHT_CLIENT_TableHandlers * ptr;
- unsigned int dataLength;
- int i;
-
- if (ntohs(message->size) < sizeof(CS_dht_reply_results_MESSAGE)) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
- req = (CS_dht_reply_results_MESSAGE*) message;
- dataLength = ntohs(message->size) - sizeof(CS_dht_reply_results_MESSAGE);
- if (dataLength != ntohl(req->data.size)) {
- GE_BREAK(ectx, 0);
- return SYSERR;
- }
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "`%s' received from client.\n",
- "CS_dht_reply_results_MESSAGE");
- MUTEX_LOCK(csLock);
- for (i=0;i<csHandlersCount;i++) {
- if ( (csHandlers[i]->handler == client) &&
- (equalsHashCode512(&csHandlers[i]->table,
- &req->table)) ) {
- ptr = csHandlers[i];
- SEMAPHORE_DOWN(ptr->postreply, YES);
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "`%s' received result '%.*s'!\n",
- __FUNCTION__,
- dataLength - sizeof(DataContainer),
- &(&req->data)[1]);
-
- ptr->resultCallback(&req->key,
- &req->data,
- ptr->resultCallbackClosure);
- ptr->status++;
- MUTEX_UNLOCK(csLock);
- return OK;
- }
- }
- MUTEX_UNLOCK(csLock);
- GE_LOG(ectx, GE_ERROR | GE_BULK | GE_USER,
- _("Failed to deliver `%s' message.\n"),
- "CS_dht_reply_results_MESSAGE");
- return SYSERR; /* failed to deliver */
-}
-
-/**
* CS handler for handling exiting client. Triggers
* csLeave for all tables that rely on this client.
*/
@@ -877,22 +319,7 @@
int i;
DHT_CLIENT_GET_RECORD * gr;
DHT_CLIENT_PUT_RECORD * pr;
- DHT_CLIENT_REMOVE_RECORD * rr;
- MUTEX_LOCK(csLock);
- for (i=0;i<csHandlersCount;i++) {
- if (csHandlers[i]->handler == client) {
- CS_dht_request_leave_MESSAGE message;
-
- message.header.size = ntohs(sizeof(CS_dht_request_leave_MESSAGE));
- message.header.type = ntohs(CS_PROTO_dht_REQUEST_LEAVE);
- message.table = csHandlers[i]->table;
- csLeave(client,
- &message.header);
- i--;
- }
- }
- MUTEX_UNLOCK(csLock);
cron_suspend(coreAPI->cron,
YES);
MUTEX_LOCK(csLock);
@@ -926,21 +353,6 @@
putRecordsSize-1);
}
}
- for (i=0;i<removeRecordsSize;i++) {
- if (removeRecords[i]->client == client) {
- rr = removeRecords[i];
-
- cron_del_job(coreAPI->cron,
- (CronJob) &cs_remove_abort,
- 0,
- rr);
- dhtAPI->remove_stop(rr->remove_record);
- removeRecords[i] = removeRecords[removeRecordsSize-1];
- GROW(removeRecords,
- removeRecordsSize,
- removeRecordsSize-1);
- }
- }
MUTEX_UNLOCK(csLock);
cron_resume_jobs(coreAPI->cron,
YES);
@@ -956,37 +368,19 @@
coreAPI = capi;
GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
"DHT registering client handlers: "
- "%d %d %d %d %d %d %d\n",
- CS_PROTO_dht_REQUEST_JOIN,
- CS_PROTO_dht_REQUEST_LEAVE,
+ "%d %d %d %d\n",
CS_PROTO_dht_REQUEST_PUT,
CS_PROTO_dht_REQUEST_GET,
- CS_PROTO_dht_REQUEST_REMOVE,
CS_PROTO_dht_REPLY_GET,
CS_PROTO_dht_REPLY_ACK);
status = OK;
csLock = MUTEX_CREATE(YES);
- if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REQUEST_JOIN,
- &csJoin))
- status = SYSERR;
- if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REQUEST_LEAVE,
- &csLeave))
- status = SYSERR;
if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REQUEST_PUT,
&csPut))
status = SYSERR;
if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REQUEST_GET,
&csGet))
status = SYSERR;
- if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REQUEST_REMOVE,
- &csRemove))
- status = SYSERR;
- if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REPLY_GET,
- &csResults))
- status = SYSERR;
- if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REPLY_ACK,
- &csACK))
- status = SYSERR;
if (SYSERR == capi->registerClientExitHandler(&csClientExit))
status = SYSERR;
return status;
@@ -999,29 +393,15 @@
int status;
status = OK;
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "DHT: shutdown\n");
- if (OK != coreAPI->unregisterClientHandler(CS_PROTO_dht_REQUEST_JOIN,
- &csJoin))
- status = SYSERR;
- if (OK != coreAPI->unregisterClientHandler(CS_PROTO_dht_REQUEST_LEAVE,
- &csLeave))
- status = SYSERR;
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "DHT: shutdown\n");
if (OK != coreAPI->unregisterClientHandler(CS_PROTO_dht_REQUEST_PUT,
&csPut))
status = SYSERR;
if (OK != coreAPI->unregisterClientHandler(CS_PROTO_dht_REQUEST_GET,
&csGet))
status = SYSERR;
- if (OK != coreAPI->unregisterClientHandler(CS_PROTO_dht_REQUEST_REMOVE,
- &csRemove))
- status = SYSERR;
- if (OK != coreAPI->unregisterClientHandler(CS_PROTO_dht_REPLY_GET,
- &csResults))
- status = SYSERR;
- if (OK != coreAPI->unregisterClientHandler(CS_PROTO_dht_REPLY_ACK,
- &csACK))
- status = SYSERR;
if (OK != coreAPI->unregisterClientExitHandler(&csClientExit))
status = SYSERR;
@@ -1033,14 +413,6 @@
cs_put_abort(putRecords[0]);
}
- while (removeRecordsSize > 0) {
- cron_del_job(coreAPI->cron,
- (CronJob) &cs_remove_abort,
- 0,
- removeRecords[0]);
- cs_remove_abort(removeRecords[0]);
- }
-
while (getRecordsSize > 0) {
cron_del_job(coreAPI->cron,
&cs_get_abort,
@@ -1048,10 +420,6 @@
getRecords[0]);
cs_get_abort(getRecords[0]);
}
-
- /* simulate client-exit for all existing handlers */
- while (csHandlersCount > 0)
- csClientExit(csHandlers[0]->handler);
coreAPI->releaseService(dhtAPI);
dhtAPI = NULL;
coreAPI = NULL;
@@ -1060,3 +428,9 @@
}
/* end of cs.c */
+
+
+
+
+
+
Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c 2006-12-02 22:01:36 UTC (rev
3849)
+++ GNUnet/src/applications/dht/module/dht.c 2006-12-02 22:28:37 UTC (rev
3850)
@@ -3658,14 +3658,10 @@
rpcAPI->RPC_register_async("DHT_remove",
&rpc_DHT_remove);
lock = coreAPI->getConnectionModuleLock();
- api.join = &dht_join;
- api.leave = &dht_leave;
api.get_start = &dht_get_async_start;
api.get_stop = &dht_get_async_stop;
api.put_start = &dht_put_async_start;
api.put_stop = &dht_put_async_stop;
- api.remove_start = &dht_remove_async_start;
- api.remove_stop = &dht_remove_async_stop;
memset(&masterTableId, 0, sizeof(HashCode512));
/* join the master table */
Modified: GNUnet/src/applications/dht/module/table.c
===================================================================
--- GNUnet/src/applications/dht/module/table.c 2006-12-02 22:01:36 UTC (rev
3849)
+++ GNUnet/src/applications/dht/module/table.c 2006-12-02 22:28:37 UTC (rev
3850)
@@ -35,7 +35,8 @@
* reliabily metrics (to be added later)
* + routing.c: tracking of get/put operations, retry, reply handling
* code tries best-match routing among entries in table
- * + dstore (plugin): SQL-based datastore: key, value, expiration
+ * + cs.c: services to out-of-process DHT clients (via dht-lib)
+ * + dstore.c + plugin: SQL-based datastore: key, value, expiration
* (bounded FIFO-datastore, when full, kill oldest entry first)
* [?: better replacement policy to guard against attacks?]
*/
@@ -710,7 +711,6 @@
/**
* Shutdown table DHT component.
*
- * @param capi the core API
* @return OK on success
*/
int done_dht_table() {
Modified: GNUnet/src/applications/dht/module/table.h
===================================================================
--- GNUnet/src/applications/dht/module/table.h 2006-12-02 22:01:36 UTC (rev
3849)
+++ GNUnet/src/applications/dht/module/table.h 2006-12-02 22:28:37 UTC (rev
3850)
@@ -58,7 +58,6 @@
/**
* Shutdown table DHT component.
*
- * @param capi the core API
* @return OK on success
*/
int done_dht_table(void);
Modified: GNUnet/src/applications/dht/tools/Makefile.am
===================================================================
--- GNUnet/src/applications/dht/tools/Makefile.am 2006-12-02 22:01:36 UTC
(rev 3849)
+++ GNUnet/src/applications/dht/tools/Makefile.am 2006-12-02 22:28:37 UTC
(rev 3850)
@@ -4,7 +4,6 @@
libgnunetdht_api.la
bin_PROGRAMS = \
- gnunet-dht-join \
gnunet-dht-query
libgnunetdht_api_la_SOURCES = \
@@ -13,17 +12,6 @@
$(top_builddir)/src/util/network_client/libgnunetutil_network_client.la \
$(top_builddir)/src/util/libgnunetutil.la
-
-gnunet_dht_join_SOURCES = \
- dht-join.c
-gnunet_dht_join_LDADD = \
- $(top_builddir)/src/applications/dht/module/libgnunetdht_datastore_memory.la
\
- $(top_builddir)/src/applications/dht/tools/libgnunetdht_api.la \
- $(top_builddir)/src/util/boot/libgnunetutil_boot.la \
- $(top_builddir)/src/util/network_client/libgnunetutil_network_client.la \
- $(top_builddir)/src/util/libgnunetutil.la
-
-
gnunet_dht_query_SOURCES = \
dht-query.c
gnunet_dht_query_LDADD = \
Modified: GNUnet/src/applications/dht/tools/dht-query.c
===================================================================
--- GNUnet/src/applications/dht/tools/dht-query.c 2006-12-02 22:01:36 UTC
(rev 3849)
+++ GNUnet/src/applications/dht/tools/dht-query.c 2006-12-02 22:28:37 UTC
(rev 3850)
@@ -20,7 +20,7 @@
/**
* @file dht-query.c
- * @brief perform DHT operations (insert, lookup, remove)
+ * @brief perform DHT operations (insert, lookup)
* @author Christian Grothoff
*/
@@ -50,7 +50,7 @@
*/
static struct CommandLineOption gnunetqueryOptions[] = {
COMMAND_LINE_OPTION_CFG_FILE(&cfgFilename), /* -c */
- COMMAND_LINE_OPTION_HELP(gettext_noop("Query (get KEY, put KEY VALUE, remove
KEY VALUE) a DHT table.")), /* -h */
+ COMMAND_LINE_OPTION_HELP(gettext_noop("Query (get KEY, put KEY VALUE) a DHT
table.")), /* -h */
COMMAND_LINE_OPTION_HOSTNAME, /* -H */
COMMAND_LINE_OPTION_LOGGING, /* -L */
{ 't', "table", "NAME",
@@ -138,40 +138,6 @@
FREE(dc);
}
-static void do_remove(struct ClientServerConnection * sock,
- const char * key,
- const char * value) {
- DataContainer * dc;
- HashCode512 hc;
-
- hash(key, strlen(key), &hc);
- dc = MALLOC(sizeof(DataContainer)
- + strlen(value));
- dc->size = htonl(strlen(value)
- + sizeof(DataContainer));
- memcpy(&dc[1], value, strlen(value));
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "Issuing '%s(%s,%s)' command.\n",
- "remove", key, value);
- if (OK == DHT_LIB_remove(cfg,
- ectx,
- &table,
- &hc,
- timeout,
- dc)) {
- printf(_("'%s(%s,%s)' succeeded\n"),
- "remove",
- key, value);
- } else {
- printf(_("'%s(%s,%s)' failed.\n"),
- "remove",
- key, value);
- }
- FREE(dc);
-}
-
-
int main(int argc,
char * const * argv) {
int i;
@@ -240,20 +206,6 @@
}
continue;
}
- if (0 == strcmp("remove", argv[i])) {
- if (i+3 > argc) {
- fprintf(stderr,
- _("Command `%s' requires two arguments (`%s' and `%s').\n"),
- "remove",
- "key",
- "value");
- break;
- } else {
- do_remove(handle, argv[i+1], argv[i+2]);
- i+=2;
- }
- continue;
- }
fprintf(stderr,
_("Unsupported command `%s'. Aborting.\n"),
argv[i]);
Modified: GNUnet/src/applications/dht/tools/dht_api.c
===================================================================
--- GNUnet/src/applications/dht/tools/dht_api.c 2006-12-02 22:01:36 UTC (rev
3849)
+++ GNUnet/src/applications/dht/tools/dht_api.c 2006-12-02 22:28:37 UTC (rev
3850)
@@ -31,573 +31,21 @@
#include "gnunet_util_network_client.h"
/**
- * Information for each table that this client is responsible
- * for.
- */
-typedef struct {
-
- /**
- * ID of the table.
- */
- DHT_TableId table;
-
- /**
- * The socket that was used to join GNUnet to receive
- * requests for this table.
- */
- struct ClientServerConnection * sock;
-
- /**
- * The thread that is processing the requests received
- * from GNUnet on sock.
- */
- struct PTHREAD * processor;
-
- /**
- * The Datastore provided by the client that performs the
- * actual storage operations.
- */
- Blockstore * store;
-
- /**
- * Did we receive a request to leave the table?
- */
- int leave_request;
-
- struct MUTEX * lock;
-
- struct GC_Configuration * cfg;
-
- struct GE_Context * ectx;
-
-} TableList;
-
-/**
- * Connections to GNUnet helt by this module.
- */
-static TableList ** tables;
-
-/**
- * Size of the tables array.
- */
-static unsigned int tableCount;
-
-/**
- * Lock for access to tables array.
- */
-static struct MUTEX * lock;
-
-/**
- * FIXME -- avoid this global!
- */
-static struct GE_Context * ectx;
-
-
-/**
* Check if the given message is an ACK. If so,
* return the status, otherwise SYSERR.
*/
static int checkACK(MESSAGE_HEADER * reply) {
- GE_LOG(ectx,
+ GE_LOG(NULL,
GE_DEBUG | GE_REQUEST | GE_USER,
"received ACK from gnunetd\n");
- if ( (sizeof(CS_dht_reply_ack_MESSAGE) == ntohs(reply->size)) &&
+ if ( (sizeof(CS_dht_reply_ack_MESSAGE) == ntohs(reply->size)) &&
(CS_PROTO_dht_REPLY_ACK == ntohs(reply->type)) )
return ntohl(((CS_dht_reply_ack_MESSAGE*)reply)->status);
return SYSERR;
}
-/**
- * Send an ACK message of the given value to gnunetd.
- */
-static int sendAck(struct ClientServerConnection * sock,
- DHT_TableId * table,
- int value) {
- CS_dht_reply_ack_MESSAGE msg;
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "sending ACK to gnunetd\n");
- msg.header.size = htons(sizeof(CS_dht_reply_ack_MESSAGE));
- msg.header.type = htons(CS_PROTO_dht_REPLY_ACK);
- msg.status = htonl(value);
- msg.table = *table;
- return connection_write(sock,
- &msg.header);
-}
-
-static int sendAllResults(const HashCode512 * key,
- const DataContainer * value,
- void * cls) {
- TableList * list = (TableList*) cls;
- CS_dht_reply_results_MESSAGE * reply;
-
- reply = MALLOC(sizeof(CS_dht_reply_results_MESSAGE) + ntohl(value->size) +
sizeof(HashCode512));
- reply->header.size = htons(sizeof(CS_dht_reply_results_MESSAGE) +
ntohl(value->size) + sizeof(HashCode512));
- reply->header.type = htons(CS_PROTO_dht_REPLY_GET);
- reply->totalResults = htonl(1);
- reply->table = list->table;
- reply->key = *key;
- memcpy(&reply->data,
- value,
- ntohl(value->size));
- if (OK != connection_write(list->sock,
- &reply->header)) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("Failed to send `%s'. Closing connection.\n"),
- "CS_dht_reply_results_MESSAGE");
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- FREE(reply);
- return SYSERR;
- }
- FREE(reply);
- return OK;
-
-}
-
/**
- * Thread that processes requests from gnunetd (by forwarding
- * them to the implementation of list->store).
- */
-static void * process_thread(void * cls) {
- TableList * list = cls;
- MESSAGE_HEADER * buffer;
- MESSAGE_HEADER * reply;
- CS_dht_request_join_MESSAGE req;
- int ok;
-
- req.header.size = htons(sizeof(CS_dht_request_join_MESSAGE));
- req.header.type = htons(CS_PROTO_dht_REQUEST_JOIN);
- req.table = list->table;
-
- while (list->leave_request == NO) {
- if (list->sock == NULL) {
- PTHREAD_SLEEP(500 * cronMILLIS);
- MUTEX_LOCK(list->lock);
- if (list->leave_request == NO)
- list->sock = client_connection_create(ectx,
- list->cfg);
- MUTEX_UNLOCK(list->lock);
- }
- if (list->sock == NULL)
- continue;
-
- ok = NO;
- /* send 'join' message via socket! */
- if (OK == connection_write(list->sock,
- &req.header)) {
- reply = NULL;
- if (OK == connection_read(list->sock,
- &reply)) {
- if (OK == checkACK(reply))
- ok = YES;
- FREENONNULL(reply);
- }
- }
- if (ok == NO) {
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- continue; /* retry... */
- }
-
- buffer = NULL;
- while (OK == connection_read(list->sock,
- &buffer)) {
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "Received message of type %d from gnunetd\n",
- ntohs(buffer->type));
-
- switch (ntohs(buffer->type)) {
- case CS_PROTO_dht_REQUEST_GET: {
- CS_dht_request_get_MESSAGE * req;
- int resCount;
- int keyCount;
-
- if (sizeof(CS_dht_request_get_MESSAGE) != ntohs(buffer->size)) {
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Received invalid `%s' request (size %d)\n"),
- "GET",
- ntohs(buffer->size));
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- FREE(buffer);
- }
- req = (CS_dht_request_get_MESSAGE*) buffer;
- if (0 != memcmp(&req->table,
- &list->table,
- sizeof(HashCode512))) {
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Received invalid `%s' request (wrong table)\n"),
- "GET");
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- break;
- }
-
- keyCount = 1 + ( (ntohs(req->header.size) -
sizeof(CS_dht_request_get_MESSAGE)) / sizeof(HashCode512));
- resCount = list->store->get(list->store->closure,
- ntohl(req->type),
- ntohl(req->priority),
- keyCount,
- &req->keys,
- &sendAllResults,
- list);
- if ( (resCount != SYSERR) &&
- (OK != sendAck(list->sock,
- &list->table,
- resCount)) ) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("Failed to send `%s'. Closing connection.\n"),
- "ACK");
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- }
- break;
- }
-
-
- case CS_PROTO_dht_REQUEST_PUT: {
- CS_dht_request_put_MESSAGE * req;
- DataContainer * value;
-
- if (sizeof(CS_dht_request_put_MESSAGE) > ntohs(buffer->size)) {
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Received invalid `%s' request (size %d)\n"),
- "PUT",
- ntohs(buffer->size));
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- break;
- }
- req = (CS_dht_request_put_MESSAGE*) buffer;
- if (0 != memcmp(&req->table,
- &list->table,
- sizeof(HashCode512))) {
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Received invalid `%s' request (wrong table)\n"),
- "PUT");
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- break;
- }
- value = MALLOC(sizeof(DataContainer) +
- ntohs(buffer->size) -
sizeof(CS_dht_request_put_MESSAGE));
- value->size = htonl(sizeof(DataContainer) +
- ntohs(buffer->size) -
sizeof(CS_dht_request_put_MESSAGE));
- memcpy(&value[1],
- &req[1],
- ntohs(buffer->size) - sizeof(CS_dht_request_put_MESSAGE));
- if (OK !=
- sendAck(list->sock,
- &req->table,
- list->store->put(list->store->closure,
- &req->key,
- value,
- ntohl(req->priority)))) {
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Failed to send `%s'. Closing connection.\n"),
- "ACK");
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- }
- FREE(value);
- break;
- }
-
-
- case CS_PROTO_dht_REQUEST_REMOVE: {
- CS_dht_request_remove_MESSAGE * req;
- DataContainer * value;
-
- if (sizeof(CS_dht_request_remove_MESSAGE) > ntohs(buffer->size)) {
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Received invalid `%s' request (size %d)\n"),
- "REMOVE",
- ntohs(buffer->size));
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- break;
- }
- req = (CS_dht_request_remove_MESSAGE*) buffer;
- if (0 != memcmp(&req->table,
- &list->table,
- sizeof(HashCode512))) {
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Received invalid `%s' request (wrong table)\n"),
- "REMOVE");
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- break;
- }
-
- value = MALLOC(sizeof(DataContainer) +
- ntohs(buffer->size) -
sizeof(CS_dht_request_remove_MESSAGE));
- value->size = htonl(sizeof(DataContainer) +
- ntohs(buffer->size) -
sizeof(CS_dht_request_remove_MESSAGE));
- memcpy(&value[1],
- &req[1],
- ntohs(buffer->size) - sizeof(CS_dht_request_remove_MESSAGE));
- if (OK !=
- sendAck(list->sock,
- &req->table,
- list->store->del(list->store->closure,
- &req->key,
- value))) {
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Failed to send `%s'. Closing connection.\n"),
- "ACK");
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- }
- FREE(value);
- break;
- }
-
- case CS_PROTO_dht_REQUEST_ITERATE: {
- CS_dht_request_iterate_MESSAGE * req;
- int resCount;
-
- if (sizeof(CS_dht_request_iterate_MESSAGE) != ntohs(buffer->size)) {
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Received invalid `%s' request (size %d)\n"),
- "ITERATE",
- ntohs(buffer->size));
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- FREE(buffer);
- }
- req = (CS_dht_request_iterate_MESSAGE*) buffer;
- resCount = list->store->iterate(list->store->closure,
- &sendAllResults,
- list);
- if (OK != sendAck(list->sock,
- &list->table,
- resCount)) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("Failed to send `%s'. Closing connection.\n"),
- "ACK");
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- }
- break;
- }
-
-
- default:
- GE_LOG(ectx,
- GE_ERROR | GE_BULK | GE_USER,
- _("Received unknown request type %d at %s:%d\n"),
- ntohs(buffer->type),
- __FILE__, __LINE__);
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- } /* end of switch */
- FREE(buffer);
- buffer = NULL;
- }
- MUTEX_LOCK(list->lock);
- connection_destroy(list->sock);
- list->sock = NULL;
- MUTEX_UNLOCK(list->lock);
- }
-
- return NULL;
-}
-
-
-/**
- * Join a table (start storing data for the table). Join
- * fails if the node is already joint with the particular
- * table.
- *
- * @param datastore the storage callbacks to use for the table
- * @param table the ID of the table
- * @return SYSERR on error, OK on success
- */
-int DHT_LIB_join(Blockstore * store,
- struct GC_Configuration * cfg,
- struct GE_Context * ectx,
- const DHT_TableId * table) {
- TableList * list;
- int i;
-
- MUTEX_LOCK(lock);
- for (i=0;i<tableCount;i++)
- if (0 == memcmp(&tables[i]->table,
- table,
- sizeof(HashCode512))) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("This client already participates in the given DHT!\n"));
- MUTEX_UNLOCK(lock);
- return SYSERR;
- }
- list = MALLOC(sizeof(TableList));
- list->cfg = cfg;
- list->ectx = ectx;
- list->table = *table;
- list->store = store;
- list->leave_request = NO;
- list->sock = client_connection_create(ectx,
- cfg);
- if (list->sock == NULL) {
- FREE(list);
- MUTEX_UNLOCK(lock);
- return SYSERR;
- }
- list->lock = MUTEX_CREATE(NO);
- list->processor = PTHREAD_CREATE(&process_thread,
- list,
- 32 * 1024);
- if (list->processor == NULL) {
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
- "pthread_create");
- connection_destroy(list->sock);
- MUTEX_DESTROY(list->lock);
- FREE(list);
- MUTEX_UNLOCK(lock);
- return SYSERR;
- }
- GROW(tables,
- tableCount,
- tableCount+1);
- tables[tableCount-1] = list;
- MUTEX_UNLOCK(lock);
- return OK;
-}
-
-
-/**
- * Leave a table (stop storing data for the table). Leave
- * fails if the node is not joint with the table.
- *
- * @param datastore the storage callbacks to use for the table
- * @param table the ID of the table
- * @return SYSERR on error, OK on success
- */
-int DHT_LIB_leave(const DHT_TableId * table) {
- TableList * list;
- int i;
- void * unused;
- CS_dht_request_leave_MESSAGE req;
- MESSAGE_HEADER * reply;
- int ret;
- struct ClientServerConnection * sock;
-
- list = NULL;
- MUTEX_LOCK(lock);
- for (i=0;i<tableCount;i++) {
- if (0 == memcmp(&tables[i]->table,
- table,
- sizeof(HashCode512))) {
- list = tables[i];
- tables[i] = tables[tableCount-1];
- GROW(tables,
- tableCount,
- tableCount-1);
- break;
- }
- }
- MUTEX_UNLOCK(lock);
- if (list == NULL) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("Cannot leave DHT: table not known!\n"));
- return SYSERR; /* no such table! */
- }
-
- list->leave_request = YES;
- /* send LEAVE message! */
- req.header.size = htons(sizeof(CS_dht_request_leave_MESSAGE));
- req.header.type = htons(CS_PROTO_dht_REQUEST_LEAVE);
- req.table = *table;
-
- ret = SYSERR;
- sock = client_connection_create(list->ectx,
- list->cfg);
- if (sock != NULL) {
- if (OK == connection_write(sock,
- &req.header)) {
- reply = NULL;
- if (OK == connection_read(sock,
- &reply)) {
- if (OK == checkACK(reply))
- ret = OK;
- else
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("gnunetd signaled error in response to `%s' message\n"),
- "CS_dht_request_leave_MESSAGE");
- FREE(reply);
- } else {
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Failed to receive response to `%s' message from gnunetd\n"),
- "CS_dht_request_leave_MESSAGE");
- }
- } else {
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Failed to send `%s' message to gnunetd\n"),
- "CS_dht_request_leave_MESSAGE");
- }
- connection_destroy(sock);
- }
- MUTEX_LOCK(list->lock);
- if (list->sock != NULL)
- connection_close_forever(list->sock); /* signal process_thread */
- MUTEX_UNLOCK(list->lock);
- unused = NULL;
- PTHREAD_JOIN(list->processor, &unused);
- if (list->sock != NULL)
- connection_destroy(list->sock);
- MUTEX_DESTROY(list->lock);
- FREE(list);
- return ret;
-}
-
-
-/**
* Perform a synchronous GET operation on the DHT identified by
* 'table' using 'key' as the key; store the result in 'result'. If
* result->dataLength == 0 the result size is unlimited and
@@ -652,7 +100,7 @@
keys,
keyCount * sizeof(HashCode512));
if (OK != connection_write(sock,
- &req->header)) {
+ &req->header)) {
connection_destroy(sock);
return SYSERR;
}
@@ -673,9 +121,10 @@
}
if ( (sizeof(CS_dht_reply_results_MESSAGE) > ntohs(reply->size)) ||
(CS_PROTO_dht_REPLY_GET != ntohs(reply->type)) ) {
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Unexpected reply to `%s' operation.\n"),
- "GET");
+ GE_LOG(ectx,
+ GE_WARNING | GE_BULK | GE_USER,
+ _("Unexpected reply to `%s' operation.\n"),
+ "GET");
connection_destroy(sock);
FREE(reply);
return SYSERR;
@@ -755,7 +204,7 @@
ntohl(value->size) - sizeof(DataContainer));
ret = SYSERR;
if (OK == connection_write(sock,
- &req->header))
+ &req->header))
reply = NULL;
if (OK == connection_read(sock,
&reply)) {
@@ -767,73 +216,4 @@
return ret;
}
-/**
- * Perform a synchronous remove operation. The peer does not have
- * to be part of the table!
- *
- * @param table table to use for the lookup
- * @param key the key to store
- * @param timeout how long to wait until this operation should
- * automatically time-out
- * @param value what to remove; NULL for all values matching the key
- * @return OK on success, SYSERR on error (or timeout)
- */
-int DHT_LIB_remove(struct GC_Configuration * cfg,
- struct GE_Context * ectx,
- const DHT_TableId * table,
- const HashCode512 * key,
- cron_t timeout,
- const DataContainer * value) {
- struct ClientServerConnection * sock;
- CS_dht_request_remove_MESSAGE * req;
- MESSAGE_HEADER * reply;
- int ret;
- size_t n;
-
- sock = client_connection_create(ectx, cfg);
- if (sock == NULL)
- return SYSERR;
- n = sizeof(CS_dht_request_remove_MESSAGE);
- if (value != NULL)
- n += ntohl(value->size) - sizeof(DataContainer);
- req = MALLOC(n);
- req->header.size = htons(n);
- req->header.type = htons(CS_PROTO_dht_REQUEST_REMOVE);
- req->table = *table;
- req->key = *key;
- req->timeout = htonll(timeout);
- if (value != NULL)
- memcpy(&req[1],
- &value[1],
- ntohl(value->size) - sizeof(DataContainer));
- ret = SYSERR;
- if (OK == connection_write(sock,
- &req->header))
- reply = NULL;
- if (OK == connection_read(sock,
- &reply)) {
- if (OK == checkACK(reply))
- ret = OK;
- FREE(reply);
- }
- connection_destroy(sock);
- return ret;
-}
-
-
-/**
- * Initialize DHT_LIB.
- */
-void __attribute__ ((constructor)) DHT_LIB_init() {
- lock = MUTEX_CREATE(NO);
-}
-
-/**
- * Shutdown DHT_LIB.
- */
-void __attribute__ ((destructor)) DHT_LIB_fini() {
- MUTEX_DESTROY(lock);
-}
-
-
/* end of dht_api.c */
Modified: GNUnet/src/applications/fs/module/fs.c
===================================================================
--- GNUnet/src/applications/fs/module/fs.c 2006-12-02 22:01:36 UTC (rev
3849)
+++ GNUnet/src/applications/fs/module/fs.c 2006-12-02 22:28:37 UTC (rev
3850)
@@ -876,132 +876,6 @@
return SYSERR;
}
-
-/**
- * Callback that converts the Datastore_Value values
- * from the datastore to Blockstore values for the
- * DHT routing protocol.
- */
-static int dhtGetConverter(const HashCode512 * key,
- const Datastore_Value * invalue,
- void * cls) {
- GGC * ggc = (GGC*) cls;
- GapWrapper * gw;
- int ret;
- unsigned int size;
- cron_t et;
- cron_t now;
- const Datastore_Value * value;
- Datastore_Value * xvalue;
-
- if (ntohl(invalue->type) == ONDEMAND_BLOCK) {
- if (OK != ONDEMAND_getIndexed(datastore,
- invalue,
- key,
- &xvalue))
- return SYSERR;
- value = xvalue;
- } else {
- xvalue = NULL;
- value = invalue;
- }
-
- ret = isDatumApplicable(ntohl(value->type),
- ntohl(value->size) - sizeof(Datastore_Value),
- (const DBlock*) &value[1],
- key,
- ggc->keyCount,
- ggc->keys);
- if (ret == SYSERR) {
- FREENONNULL(xvalue);
- return SYSERR; /* no query will ever match */
- }
- if (ret == NO) {
- FREENONNULL(xvalue);
- return OK; /* Additional filtering based on type;
- i.e., namespace request and namespace
- in reply does not match namespace in query */
- }
- size = sizeof(GapWrapper) +
- ntohl(value->size) -
- sizeof(Datastore_Value);
-
- if (ntohl(value->anonymityLevel) != 0) {
- FREENONNULL(xvalue);
- return OK; /* do not allow anonymous content to leak through DHT */
- }
-
- gw = MALLOC(size);
- gw->dc.size = htonl(size);
- et = ntohll(value->expirationTime);
- /* expiration time normalization and randomization */
- now = get_time();
- if (et > now) {
- et -= now;
- et = et % MAX_MIGRATION_EXP;
- if (et > 0)
- et = weak_randomi(et);
- et = et + now;
- }
- gw->timeout = htonll(et);
- memcpy(&gw[1],
- &value[1],
- size - sizeof(GapWrapper));
-
- if (ggc->resultCallback != NULL)
- ret = ggc->resultCallback(key,
- &gw->dc,
- ggc->resCallbackClosure);
- else
- ret = OK;
- FREE(gw);
- FREENONNULL(xvalue);
- return ret;
-}
-
-/**
- * Lookup an item in the datastore.
- *
- * @param key the value to lookup
- * @param resultCallback function to call for each result that was found
- * @param resCallbackClosure extra argument to resultCallback
- * @return number of results, SYSERR on error
- */
-static int dhtGet(void * closure,
- unsigned int type,
- unsigned int prio,
- unsigned int keyCount,
- const HashCode512 * keys,
- DataProcessor resultCallback,
- void * resCallbackClosure) {
- int ret;
- GGC myClosure;
- EncName enc;
-
- IF_GELOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&keys[0],
- &enc));
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "DHT requests content for %s of type %u\n",
- &enc,
- type);
- myClosure.keyCount = keyCount;
- myClosure.keys = keys;
- myClosure.resultCallback = resultCallback;
- myClosure.resCallbackClosure = resCallbackClosure;
- ret = datastore->get(&keys[0],
- type,
- &dhtGetConverter,
- &myClosure);
- if (ret != SYSERR)
- ret = myClosure.count; /* return number of actual
- results (unfiltered) that
- were found */
- return ret;
-}
-
static int replyHashFunction(const DataContainer * content,
HashCode512 * id) {
const GapWrapper * gw;
@@ -1239,7 +1113,6 @@
*/
int initialize_module_fs(CoreAPIForApplication * capi) {
static Blockstore dsGap;
- static Blockstore dsDht;
unsigned long long quota;
ectx = capi->ectx;
@@ -1307,17 +1180,6 @@
gap->init(&dsGap,
&uniqueReplyIdentifier,
(ReplyHashFunction) &replyHashFunction);
-
- if (dht != NULL) {
- dsDht.closure = NULL;
- dsDht.get = &dhtGet;
- dsDht.put = &gapPut; /* exactly the same method for gap/dht*/
- dsDht.del = &gapDel; /* exactly the same method for gap/dht*/
- dsDht.iterate = &gapIterate; /* exactly the same method for gap/dht*/
- dsDht.fast_get = &fastGet;
- dht->join(&dsDht, &dht_table);
- }
-
GE_LOG(ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
_("`%s' registering client handlers %d %d %d %d %d %d %d %d %d\n"),
@@ -1369,16 +1231,6 @@
void * unused;
doneMigration();
- if (dht != NULL) {
- GE_LOG(ectx,
- GE_INFO | GE_REQUEST | GE_USER,
- "Leaving DHT (this may take a while).");
- dht->leave(&dht_table);
- GE_LOG(ectx,
- GE_INFO | GE_REQUEST | GE_USER,
- "Leaving DHT complete.");
-
- }
GE_ASSERT(ectx, SYSERR !=
coreAPI->unregisterClientHandler(CS_PROTO_gap_QUERY_START,
&csHandleRequestQueryStart));
GE_ASSERT(ectx, SYSERR !=
coreAPI->unregisterClientHandler(CS_PROTO_gap_QUERY_STOP,
Modified: GNUnet/src/include/gnunet_dht.h
===================================================================
--- GNUnet/src/include/gnunet_dht.h 2006-12-02 22:01:36 UTC (rev 3849)
+++ GNUnet/src/include/gnunet_dht.h 2006-12-02 22:28:37 UTC (rev 3850)
@@ -62,32 +62,6 @@
#define equalsDHT_TableId(a,b) equalsHashCode512(a,b)
/**
- * TCP communication: client to gnunetd: join table.
- * All future communications via this socket are reserved
- * for either gnunetd requesting datastore operations or
- * the client sending a leave table message.
- */
-typedef struct {
-
- MESSAGE_HEADER header;
-
- DHT_TableId table;
-
-} CS_dht_request_join_MESSAGE;
-
-/**
- * TCP communication: client to gnunetd: leave table
- */
-typedef struct {
-
- MESSAGE_HEADER header;
-
- DHT_TableId table;
-
-} CS_dht_request_leave_MESSAGE;
-
-
-/**
* TCP communication: put <key,value>-mapping to table.
* Reply is an ACK.
*/
@@ -127,31 +101,6 @@
} CS_dht_request_get_MESSAGE;
/**
- * remove value. Reply is just an ACK.
- */
-typedef struct {
-
- MESSAGE_HEADER header;
-
- DHT_TableId table;
-
- unsigned long long timeout; /* nbo */
-
- HashCode512 key;
-
-} CS_dht_request_remove_MESSAGE;
-
-/**
- * gnunetd to client: iterate over all values. Reply is
- * a CS_dht_reply_results_MESSAGE message.
- */
-typedef struct {
-
- MESSAGE_HEADER header;
-
-} CS_dht_request_iterate_MESSAGE;
-
-/**
* TCP communication: Results for a request. Uses a separate message
* for each result; CS_dht_reply_results_MESSAGE maybe repeated many
* times (the total number is given in totalResults).
Modified: GNUnet/src/include/gnunet_dht_lib.h
===================================================================
--- GNUnet/src/include/gnunet_dht_lib.h 2006-12-02 22:01:36 UTC (rev 3849)
+++ GNUnet/src/include/gnunet_dht_lib.h 2006-12-02 22:28:37 UTC (rev 3850)
@@ -38,41 +38,6 @@
#endif
/**
- * Join a table (start storing data for the table). Join
- * fails if the node is already joint with the particular
- * table.
- *
- * @param datastore the storage callbacks to use for the table
- * @param table the ID of the table
- * @param timeout how long to wait for other peers to respond to
- * the join request (has no impact on success or failure)
- * @param flags
- * @return SYSERR on error, OK on success
- */
-int DHT_LIB_join(Blockstore * store,
- struct GC_Configuration * cfg,
- struct GE_Context * ectx,
- const DHT_TableId * table);
-
-
-/**
- * Leave a table (stop storing data for the table). Leave
- * fails if the node is not joint with the table.
- *
- * @param datastore the storage callbacks to use for the table
- * @param table the ID of the table
- * @param timeout how long to wait for other peers to respond to
- * the leave request (has no impact on success or failure);
- * but only timeout time is available for migrating data, so
- * pick this value with caution.
- * @param flags maximum number of parallel puts for migration (0
- * implies 'use value from gnunet.conf').
- * @return SYSERR on error, OK on success
- */
-int DHT_LIB_leave(const DHT_TableId * table);
-
-
-/**
* Perform a synchronous GET operation on the DHT identified by
* 'table' using 'key' as the key; store the result in 'result'. If
* result->dataLength == 0 the result size is unlimited and
@@ -120,25 +85,6 @@
cron_t timeout,
const DataContainer * value);
-/**
- * Perform a synchronous remove operation. The peer does not have
- * to be part of the table!
- *
- * @param table table to use for the lookup
- * @param key the key to store
- * @param timeout how long to wait until this operation should
- * automatically time-out
- * @param value what to remove; NULL for all values matching the key
- * @return OK on success, SYSERR on error (or timeout)
- */
-int DHT_LIB_remove(struct GC_Configuration * cfg,
- struct GE_Context * ectx,
- const DHT_TableId * table,
- const HashCode512 * key,
- cron_t timeout,
- const DataContainer * value);
-
-
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: GNUnet/src/include/gnunet_dht_service.h
===================================================================
--- GNUnet/src/include/gnunet_dht_service.h 2006-12-02 22:01:36 UTC (rev
3849)
+++ GNUnet/src/include/gnunet_dht_service.h 2006-12-02 22:28:37 UTC (rev
3850)
@@ -47,8 +47,6 @@
struct DHT_PUT_RECORD;
-struct DHT_REMOVE_RECORD;
-
/**
* DHT operation 'complete' (i.e timed out).
*/
@@ -60,36 +58,6 @@
typedef struct {
/**
- * Join a table (start storing data for the table). Join
- * fails if the node is already joint with the particular
- * table.
- *
- * @param datastore the storage callbacks to use for the table
- * @param table the ID of the table
- * @param timeout how long to wait for other peers to respond to
- * the join request (has no impact on success or failure)
- * @return SYSERR on error, OK on success
- */
- int (*join)(Blockstore * datastore,
- const DHT_TableId * table);
-
- /**
- * Leave a table (stop storing data for the table). Leave
- * fails if the node is not joint with the table.
- *
- * @param datastore the storage callbacks to use for the table
- * @param table the ID of the table
- * @param timeout how long to wait for other peers to respond to
- * the leave request (has no impact on success or failure);
- * but only timeout time is available for migrating data, so
- * pick this value with caution.
- * implies 'use value from gnunet.conf').
- * @return SYSERR on error, OK on success
- */
- int (*leave)(const DHT_TableId * table);
-
-
- /**
* Perform an asynchronous GET operation on the DHT identified by
* 'table' using 'key' as the key. The peer does not have to be part
* of the table (if so, we will attempt to locate a peer that is!)
@@ -143,32 +111,6 @@
*/
int (*put_stop)(struct DHT_PUT_RECORD * record);
- /**
- * Perform an asynchronous REMOVE operation on the DHT identified by
- * 'table' removing the binding of 'key' to 'value'. The peer does not
- * have to be part of the table (if so, we will attempt to locate a
- * peer that is!)
- *
- * @param table table to use for the lookup
- * @param key the key to look up
- * @param timeout how long to wait until this operation should
- * automatically time-out
- * @param callback function to call on successful completion
- * @param closure extra argument to callback
- * @return handle to stop the async remove
- */
- struct DHT_REMOVE_RECORD * (*remove_start)(const DHT_TableId * table,
- const HashCode512 * key,
- cron_t timeout,
- const DataContainer * value,
- DHT_OP_Complete callback,
- void * closure);
-
- /**
- * Stop async DHT-remove. Frees associated resources.
- */
- int (*remove_stop)(struct DHT_REMOVE_RECORD * record);
-
} DHT_ServiceAPI;
#if 0 /* keep Emacsens' auto-indent happy */
Modified: GNUnet/src/include/gnunet_protocols.h
===================================================================
--- GNUnet/src/include/gnunet_protocols.h 2006-12-02 22:01:36 UTC (rev
3849)
+++ GNUnet/src/include/gnunet_protocols.h 2006-12-02 22:28:37 UTC (rev
3850)
@@ -257,46 +257,26 @@
/* ********** CS DHT application messages ********** */
/**
- * client to CS: join table
- */
-#define CS_PROTO_dht_REQUEST_JOIN 72
-
-/**
- * client to CS: leave table
- */
-#define CS_PROTO_dht_REQUEST_LEAVE 73
-
-/**
* Client to CS or CS to client: get from table
*/
-#define CS_PROTO_dht_REQUEST_GET 74
+#define CS_PROTO_dht_REQUEST_GET 72
/**
* Client to CS or CS to client: put into table
*/
-#define CS_PROTO_dht_REQUEST_PUT 75
+#define CS_PROTO_dht_REQUEST_PUT 73
/**
- * Client to CS or CS to client: remove from table
- */
-#define CS_PROTO_dht_REQUEST_REMOVE 76
-
-/**
* Client to CS or CS to client: results from get
*/
-#define CS_PROTO_dht_REPLY_GET 77
+#define CS_PROTO_dht_REPLY_GET 74
/**
* Client to CS or CS to client: confirmed
*/
-#define CS_PROTO_dht_REPLY_ACK 78
+#define CS_PROTO_dht_REPLY_ACK 75
-/**
- * Client to CS: iterate over table
- */
-#define CS_PROTO_dht_REQUEST_ITERATE 79
-
/* ************* CS VPN messages ************* */
/**
@@ -408,6 +388,7 @@
#define P2P_PROTO_DHT_ASK_HELLO 46
#define P2P_PROTO_DHT_GET 47
#define P2P_PROTO_DHT_PUT 48
+#define P2P_PROTO_DHT_RESULT 49
/* ************* p2p VPN messages ************* */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3850 - in GNUnet/src: applications/dht/module applications/dht/tools applications/fs/module include,
grothoff <=