[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3430 - in GNUnet: . src/applications/dht/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3430 - in GNUnet: . src/applications/dht/module |
Date: |
Sat, 30 Sep 2006 22:16:23 -0700 (PDT) |
Author: grothoff
Date: 2006-09-30 22:16:17 -0700 (Sat, 30 Sep 2006)
New Revision: 3430
Modified:
GNUnet/src/applications/dht/module/cs.c
GNUnet/src/applications/dht/module/datastore_dht_master.c
GNUnet/src/applications/dht/module/datastore_dht_master.h
GNUnet/src/applications/dht/module/datastore_memory.c
GNUnet/src/applications/dht/module/datastore_memory_test.c
GNUnet/src/applications/dht/module/dht.c
GNUnet/src/applications/dht/module/dht.h
GNUnet/todo
Log:
making dht/module compile
Modified: GNUnet/src/applications/dht/module/cs.c
===================================================================
--- GNUnet/src/applications/dht/module/cs.c 2006-10-01 04:45:12 UTC (rev
3429)
+++ GNUnet/src/applications/dht/module/cs.c 2006-10-01 05:16:17 UTC (rev
3430)
@@ -1,5 +1,6 @@
/*
This file is part of GNUnet
+ Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -34,7 +35,7 @@
/**
* Global core API.
*/
-static CoreAPIForApplication * coreAPI = NULL;
+static CoreAPIForApplication * coreAPI;
/**
* Reference to the DHT service API.
@@ -49,7 +50,7 @@
/**
* Handle to access the client.
*/
- ClientHandle handler;
+ struct ClientHandle * handler;
/**
* For which table is this client responsible?
*/
@@ -66,21 +67,21 @@
* and results fields for sending a request to the client.
* Released after the request has been processed.
*/
- Semaphore * prerequest;
+ struct SEMAPHORE * prerequest;
/**
* Semaphore that is up'ed by the client handler whenever a reply
* was received. The client exit handler also needs to up this
* semaphore to unblock threads that wait for replies.
*/
- Semaphore * prereply;
+ struct SEMAPHORE * prereply;
/**
* Semaphore that is down'ed by the client handler before storing
* the data from a reply. The cs-functions need to up it
* once they have prepared the handlers.
*/
- Semaphore * postreply;
+ struct SEMAPHORE * postreply;
/**
* Function to call for results
@@ -101,21 +102,21 @@
} DHT_CLIENT_TableHandlers;
typedef struct {
- ClientHandle client;
+ struct ClientHandle * client;
struct DHT_PUT_RECORD * put_record;
DHT_TableId table;
unsigned int replicas; /* confirmed puts? */
} DHT_CLIENT_PUT_RECORD;
typedef struct {
- ClientHandle client;
+ struct ClientHandle * client;
struct DHT_REMOVE_RECORD * remove_record;
DHT_TableId table;
unsigned int replicas; /* confirmed dels? */
} DHT_CLIENT_REMOVE_RECORD;
typedef struct {
- ClientHandle client;
+ struct ClientHandle * client;
struct DHT_GET_RECORD * get_record;
DHT_TableId table;
unsigned int count;
@@ -147,8 +148,10 @@
/**
* Lock for accessing csHandlers.
*/
-static Mutex csLock;
+static struct MUTEX * csLock;
+static struct GE_Context * ectx;
+
/* ******* implementation of Blockstore via TCP link ********** */
/**
@@ -176,7 +179,7 @@
return SYSERR;
}
- SEMAPHORE_DOWN(handlers->prerequest);
+ SEMAPHORE_DOWN(handlers->prerequest, YES);
handlers->resultCallback = resultCallback;
handlers->resultCallbackClosure = resCallbackClosure;
handlers->status = 0;
@@ -205,7 +208,7 @@
}
FREE(req);
SEMAPHORE_UP(handlers->postreply);
- SEMAPHORE_DOWN(handlers->prereply);
+ SEMAPHORE_DOWN(handlers->prereply, YES);
ret = handlers->status;
SEMAPHORE_UP(handlers->prerequest);
return ret;
@@ -230,7 +233,7 @@
n = sizeof(CS_dht_request_put_MESSAGE) + ntohl(value->size);
req = MALLOC(n);
- SEMAPHORE_DOWN(handlers->prerequest);
+ SEMAPHORE_DOWN(handlers->prerequest, YES);
handlers->status = 0;
req->header.size = htons(n);
req->header.type = htons(CS_PROTO_dht_REQUEST_PUT);
@@ -248,12 +251,12 @@
return SYSERR;
}
FREE(req);
- LOG(LOG_EVERYTHING,
+ GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"Sending STORE request to client!\n");
SEMAPHORE_UP(handlers->postreply);
- SEMAPHORE_DOWN(handlers->prereply);
+ SEMAPHORE_DOWN(handlers->prereply, YES);
ret = handlers->status;
- LOG(LOG_EVERYTHING,
+ GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"Client confirmed STORE request with status %d!\n",
ret);
SEMAPHORE_UP(handlers->prerequest);
@@ -278,7 +281,7 @@
if (value != NULL)
n += htonl(value->size);
req = MALLOC(n);
- SEMAPHORE_DOWN(handlers->prerequest);
+ SEMAPHORE_DOWN(handlers->prerequest, YES);
handlers->status = 0;
req->header.size = htons(n);
req->header.type = htons(CS_PROTO_dht_REQUEST_REMOVE);
@@ -297,7 +300,7 @@
}
FREE(req);
SEMAPHORE_UP(handlers->postreply);
- SEMAPHORE_DOWN(handlers->prereply);
+ SEMAPHORE_DOWN(handlers->prereply, YES);
ret = handlers->status;
SEMAPHORE_UP(handlers->prerequest);
return ret;
@@ -317,7 +320,7 @@
DHT_CLIENT_TableHandlers * handlers = closure;
int ret;
- SEMAPHORE_DOWN(handlers->prerequest);
+ SEMAPHORE_DOWN(handlers->prerequest, YES);
handlers->status = 0;
handlers->resultCallback = processor;
handlers->resultCallbackClosure = cls;
@@ -329,7 +332,7 @@
return SYSERR;
}
SEMAPHORE_UP(handlers->postreply);
- SEMAPHORE_DOWN(handlers->prereply);
+ SEMAPHORE_DOWN(handlers->prereply, YES);
ret = handlers->status;
SEMAPHORE_UP(handlers->prerequest);
return ret;
@@ -337,7 +340,7 @@
/* *********************** CS handlers *********************** */
-static int sendAck(ClientHandle client,
+static int sendAck(struct ClientHandle * client,
DHT_TableId * table,
int value) {
CS_dht_reply_ack_MESSAGE msg;
@@ -353,8 +356,8 @@
/**
* CS handler for joining existing DHT-table.
*/
-static int csJoin(ClientHandle client,
- const CS_MESSAGE_HEADER * message) {
+static int csJoin(struct ClientHandle * client,
+ const MESSAGE_HEADER * message) {
DHT_CLIENT_TableHandlers * ptr;
CS_dht_request_join_MESSAGE * req;
int ret;
@@ -364,7 +367,7 @@
return SYSERR;
}
req = (CS_dht_request_join_MESSAGE*) message;
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
ptr = MALLOC(sizeof(DHT_CLIENT_TableHandlers));
ptr->store = MALLOC(sizeof(Blockstore));
ptr->store->iterate = &tcp_iterate;
@@ -394,15 +397,15 @@
ret = sendAck(client,
&req->table,
ret);
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
return ret;
}
/**
* CS handler for leaving DHT-table.
*/
-static int csLeave(ClientHandle client,
- const CS_MESSAGE_HEADER * message) {
+static int csLeave(struct ClientHandle * client,
+ const MESSAGE_HEADER * message) {
CS_dht_request_leave_MESSAGE * req;
int i;
@@ -413,10 +416,10 @@
return SYSERR;
}
req = (CS_dht_request_leave_MESSAGE*) message;
- LOG(LOG_EVERYTHING,
+ GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"Client leaving request received!\n");
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
for (i=0;i<csHandlersCount;i++) {
ptr = csHandlers[i];
if ( (equalsHashCode512(&ptr->table,
@@ -425,12 +428,12 @@
GROW(csHandlers,
csHandlersCount,
csHandlersCount-1);
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
/* release clients waiting on this DHT */
ptr->status = SYSERR;
SEMAPHORE_UP(ptr->prereply);
- SEMAPHORE_DOWN(ptr->prerequest);
+ SEMAPHORE_DOWN(ptr->prerequest, YES);
SEMAPHORE_DESTROY(ptr->prerequest);
SEMAPHORE_DESTROY(ptr->prereply);
SEMAPHORE_DESTROY(ptr->postreply);
@@ -441,7 +444,7 @@
OK);
}
}
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
_("`%s' failed: table not found!\n"),
"CS_DHT_LEAVE");
@@ -457,7 +460,7 @@
GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
"Signaling client put completion: %d\n",
record->replicas);
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
dhtAPI->put_stop(record->put_record);
if (OK != sendAck(record->client,
&record->table,
@@ -475,15 +478,15 @@
putRecordsSize-1);
break;
}
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
FREE(record);
}
/**
* CS handler for inserting <key,value>-pair into DHT-table.
*/
-static int csPut(ClientHandle client,
- const CS_MESSAGE_HEADER * message) {
+static int csPut(struct ClientHandle * client,
+ const MESSAGE_HEADER * message) {
CS_dht_request_put_MESSAGE * req;
DataContainer * data;
DHT_CLIENT_PUT_RECORD * ptr;
@@ -513,12 +516,12 @@
ptr->table = req->table;
ptr->put_record = NULL;
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
GROW(putRecords,
putRecordsSize,
putRecordsSize+1);
putRecords[putRecordsSize-1] = ptr;
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
"Starting DHT put\n");
ptr->put_record = dhtAPI->put_start(&req->table,
@@ -542,7 +545,7 @@
_("sendAck failed. Terminating connection to client.\n"));
coreAPI->terminateClientConnection(record->client);
}
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
for (i=removeRecordsSize-1;i>=0;i--)
if (removeRecords[i] == record) {
removeRecords[i] = removeRecords[removeRecordsSize-1];
@@ -551,13 +554,13 @@
removeRecordsSize-1);
break;
}
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
FREE(record);
}
struct CSRemoveClosure {
- ClientHandle client;
+ struct ClientHandle * client;
CS_dht_request_remove_MESSAGE * message;
};
@@ -568,7 +571,7 @@
CS_dht_request_remove_MESSAGE * req;
DataContainer * data;
DHT_CLIENT_REMOVE_RECORD * ptr;
- ClientHandle client;
+ struct ClientHandle * client;
unsigned int size;
req = cpc->message;
@@ -592,12 +595,12 @@
ptr->replicas = 0;
ptr->table = req->table;
ptr->remove_record = NULL;
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
GROW(removeRecords,
removeRecordsSize,
removeRecordsSize+1);
removeRecords[removeRecordsSize-1] = ptr;
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
ptr->remove_record = dhtAPI->remove_start(&req->table,
&req->key,
ntohll(req->timeout),
@@ -611,8 +614,8 @@
/**
* CS handler for inserting <key,value>-pair into DHT-table.
*/
-static int csRemove(ClientHandle client,
- const CS_MESSAGE_HEADER * message) {
+static int csRemove(struct ClientHandle * client,
+ const MESSAGE_HEADER * message) {
struct CSRemoveClosure * cpc;
if (ntohs(message->size) < sizeof(CS_dht_request_remove_MESSAGE)) {
@@ -625,10 +628,11 @@
message,
ntohs(message->size));
cpc->client = client;
- addCronJob((CronJob)&csRemoveJob,
- 0,
- 0,
- cpc);
+ cron_add_job(coreAPI->cron,
+ (CronJob)&csRemoveJob,
+ 0,
+ 0,
+ cpc);
return OK;
}
@@ -690,7 +694,7 @@
coreAPI->terminateClientConnection(record->client);
}
}
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
for (i=getRecordsSize-1;i>=0;i--)
if (getRecords[i] == record) {
getRecords[i] = getRecords[getRecordsSize-1];
@@ -699,12 +703,12 @@
getRecordsSize-1);
break;
}
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
FREE(record);
}
struct CSGetClosure {
- ClientHandle client;
+ struct ClientHandle * client;
CS_dht_request_get_MESSAGE * message;
};
@@ -714,7 +718,7 @@
static int csGetJob(struct CSGetClosure * cpc) {
CS_dht_request_get_MESSAGE * req;
DHT_CLIENT_GET_RECORD * ptr;
- ClientHandle client;
+ struct ClientHandle * client;
unsigned int keyCount;
client = cpc->client;
@@ -728,12 +732,12 @@
ptr->table = req->table;
ptr->get_record = NULL;
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
GROW(getRecords,
getRecordsSize,
getRecordsSize+1);
getRecords[getRecordsSize-1] = ptr;
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
ptr->get_record = dhtAPI->get_start(&req->table,
ntohl(req->type),
keyCount,
@@ -749,8 +753,8 @@
/**
* CS handler for inserting <key,value>-pair into DHT-table.
*/
-static int csGet(ClientHandle client,
- const CS_MESSAGE_HEADER * message) {
+static int csGet(struct ClientHandle * client,
+ const MESSAGE_HEADER * message) {
struct CSGetClosure * cpc;
if (ntohs(message->size) != sizeof(CS_dht_request_get_MESSAGE)) {
@@ -764,10 +768,11 @@
message,
ntohs(message->size));
cpc->client = client;
- addCronJob((CronJob)&csGetJob,
- 0,
- 0,
- cpc);
+ cron_add_job(coreAPI->cron,
+ (CronJob)&csGetJob,
+ 0,
+ 0,
+ cpc);
return OK;
}
@@ -776,8 +781,8 @@
* the status value in status and up's the semaphore to signal
* that we received a reply.
*/
-static int csACK(ClientHandle client,
- const CS_MESSAGE_HEADER * message) {
+static int csACK(struct ClientHandle * client,
+ const MESSAGE_HEADER * message) {
DHT_CLIENT_TableHandlers * ptr;
CS_dht_reply_ack_MESSAGE * req;
int i;
@@ -787,23 +792,23 @@
return SYSERR;
}
req =(CS_dht_reply_ack_MESSAGE*) message;
- LOG(LOG_EVERYTHING,
+ GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"`%s' received from client.\n",
"CS_dht_reply_ack_MESSAGE");
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
for (i=0;i<csHandlersCount;i++) {
ptr = csHandlers[i];
if ( (ptr->handler == client) &&
(equalsHashCode512(&ptr->table,
&req->table)) ) {
- SEMAPHORE_DOWN(ptr->postreply);
+ SEMAPHORE_DOWN(ptr->postreply, YES);
ptr->status = ntohl(req->status);
SEMAPHORE_UP(ptr->prereply);
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
return OK;
}
}
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
GE_LOG(ectx, GE_ERROR | GE_BULK | GE_USER,
_("Failed to deliver `%s' message.\n"),
"CS_dht_reply_ack_MESSAGE");
@@ -815,8 +820,8 @@
* and passes on the new result. If all results have been
* collected, signals using the semaphore.
*/
-static int csResults(ClientHandle client,
- const CS_MESSAGE_HEADER * message) {
+static int csResults(struct ClientHandle * client,
+ const MESSAGE_HEADER * message) {
CS_dht_reply_results_MESSAGE * req;
DHT_CLIENT_TableHandlers * ptr;
unsigned int dataLength;
@@ -832,17 +837,17 @@
GE_BREAK(ectx, 0);
return SYSERR;
}
- LOG(LOG_EVERYTHING,
+ GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"`%s' received from client.\n",
"CS_dht_reply_results_MESSAGE");
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
for (i=0;i<csHandlersCount;i++) {
if ( (csHandlers[i]->handler == client) &&
(equalsHashCode512(&csHandlers[i]->table,
&req->table)) ) {
ptr = csHandlers[i];
- SEMAPHORE_DOWN(ptr->postreply);
- LOG(LOG_EVERYTHING,
+ SEMAPHORE_DOWN(ptr->postreply, YES);
+ GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"`%s' received result '%.*s'!\n",
__FUNCTION__,
dataLength - sizeof(DataContainer),
@@ -852,11 +857,11 @@
&req->data,
ptr->resultCallbackClosure);
ptr->status++;
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
return OK;
}
}
- MUTEX_UNLOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
GE_LOG(ectx, GE_ERROR | GE_BULK | GE_USER,
_("Failed to deliver `%s' message.\n"),
"CS_dht_reply_results_MESSAGE");
@@ -867,14 +872,13 @@
* CS handler for handling exiting client. Triggers
* csLeave for all tables that rely on this client.
*/
-static void csClientExit(ClientHandle client) {
+static void csClientExit(struct ClientHandle * client) {
int i;
DHT_CLIENT_GET_RECORD * gr;
DHT_CLIENT_PUT_RECORD * pr;
DHT_CLIENT_REMOVE_RECORD * rr;
- int haveCron;
- MUTEX_LOCK(&csLock);
+ MUTEX_LOCK(csLock);
for (i=0;i<csHandlersCount;i++) {
if (csHandlers[i]->handler == client) {
CS_dht_request_leave_MESSAGE message;
@@ -887,18 +891,18 @@
i--;
}
}
- haveCron = isCronRunning();
- MUTEX_UNLOCK(&csLock);
- if (YES == haveCron)
- suspendCron();
- MUTEX_LOCK(&csLock);
+ MUTEX_UNLOCK(csLock);
+ cron_suspend(coreAPI->cron,
+ YES);
+ MUTEX_LOCK(csLock);
for (i=0;i<getRecordsSize;i++) {
if (getRecords[i]->client == client) {
gr = getRecords[i];
- delCronJob(&cs_get_abort,
- 0,
- gr);
+ cron_del_job(coreAPI->cron,
+ &cs_get_abort,
+ 0,
+ gr);
dhtAPI->get_stop(gr->get_record);
getRecords[i] = getRecords[getRecordsSize-1];
GROW(getRecords,
@@ -910,9 +914,10 @@
if (putRecords[i]->client == client) {
pr = putRecords[i];
- delCronJob(&cs_put_abort,
- 0,
- pr);
+ cron_del_job(coreAPI->cron,
+ &cs_put_abort,
+ 0,
+ pr);
dhtAPI->put_stop(pr->put_record);
putRecords[i] = putRecords[putRecordsSize-1];
GROW(putRecords,
@@ -924,9 +929,10 @@
if (removeRecords[i]->client == client) {
rr = removeRecords[i];
- delCronJob((CronJob) &cs_remove_abort,
- 0,
- rr);
+ cron_del_job(coreAPI->cron,
+ (CronJob) &cs_remove_abort,
+ 0,
+ rr);
dhtAPI->remove_stop(rr->remove_record);
removeRecords[i] = removeRecords[removeRecordsSize-1];
GROW(removeRecords,
@@ -934,14 +940,15 @@
removeRecordsSize-1);
}
}
- MUTEX_UNLOCK(&csLock);
- if (YES == haveCron)
- resumeCron();
+ MUTEX_UNLOCK(csLock);
+ cron_resume_jobs(coreAPI->cron,
+ YES);
}
int initialize_module_dht(CoreAPIForApplication * capi) {
int status;
+ ectx = capi->ectx;
dhtAPI = capi->requestService("dht");
if (dhtAPI == NULL)
return SYSERR;
@@ -957,7 +964,7 @@
CS_PROTO_dht_REPLY_GET,
CS_PROTO_dht_REPLY_ACK);
status = OK;
- MUTEX_CREATE_RECURSIVE(&csLock);
+ csLock = MUTEX_CREATE(YES);
if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REQUEST_JOIN,
&csJoin))
status = SYSERR;
@@ -1018,23 +1025,26 @@
status = SYSERR;
while (putRecordsSize > 0) {
- delCronJob(&cs_put_abort,
- 0,
- putRecords[0]);
+ cron_del_job(coreAPI->cron,
+ &cs_put_abort,
+ 0,
+ putRecords[0]);
cs_put_abort(putRecords[0]);
}
while (removeRecordsSize > 0) {
- delCronJob((CronJob) &cs_remove_abort,
- 0,
- removeRecords[0]);
+ cron_del_job(coreAPI->cron,
+ (CronJob) &cs_remove_abort,
+ 0,
+ removeRecords[0]);
cs_remove_abort(removeRecords[0]);
}
while (getRecordsSize > 0) {
- delCronJob(&cs_get_abort,
- 0,
- getRecords[0]);
+ cron_del_job(coreAPI->cron,
+ &cs_get_abort,
+ 0,
+ getRecords[0]);
cs_get_abort(getRecords[0]);
}
@@ -1044,7 +1054,7 @@
coreAPI->releaseService(dhtAPI);
dhtAPI = NULL;
coreAPI = NULL;
- MUTEX_DESTROY(&csLock);
+ MUTEX_DESTROY(csLock);
return status;
}
Modified: GNUnet/src/applications/dht/module/datastore_dht_master.c
===================================================================
--- GNUnet/src/applications/dht/module/datastore_dht_master.c 2006-10-01
04:45:12 UTC (rev 3429)
+++ GNUnet/src/applications/dht/module/datastore_dht_master.c 2006-10-01
05:16:17 UTC (rev 3430)
@@ -1,5 +1,6 @@
/*
This file is part of GNUnet
+ Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -38,6 +39,7 @@
#include "platform.h"
#include "gnunet_core.h"
+#include "gnunet_util_cron.h"
#include "datastore_dht_master.h"
typedef struct {
@@ -59,7 +61,9 @@
* @brief the per-table data
*/
typedef struct {
- Mutex lock;
+ struct GE_Context * ectx;
+ struct MUTEX * lock;
+ struct CronManager * cron;
size_t max_memory;
HT_Entry * first;
} MemoryDatastore;
@@ -80,16 +84,16 @@
const HashCode512 * keys,
DataProcessor resultCallback,
void * resCallbackClosure) {
- MemoryDatastore * ds = (MemoryDatastore*) closure;
+ MemoryDatastore * ds = closure;
HT_Entry * pos;
int count;
int i;
DataContainer * data;
- GE_ASSERT(ectx, keyCount == 1);
+ GE_ASSERT(ds->ectx, keyCount == 1);
if (ds == NULL)
return SYSERR;
- MUTEX_LOCK(&ds->lock);
+ MUTEX_LOCK(ds->lock);
pos = ds->first;
while (pos != NULL) {
if (equalsHashCode512(&keys[0], &pos->key)) {
@@ -123,12 +127,12 @@
FREE(data);
}
FREENONNULL(perm);
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return count;
}
pos = pos->next;
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return 0;
}
@@ -155,7 +159,7 @@
!= sizeof(HashCode512))
return SYSERR;
- MUTEX_LOCK(&ds->lock);
+ MUTEX_LOCK(ds->lock);
pos = ds->first;
while (pos != NULL) {
if (equalsHashCode512(key, &pos->key)) {
@@ -163,11 +167,11 @@
if (equalsHashCode512(&pos->values[i].hash,
(HashCode512*)&value[1])) {
pos->values[i].lastRefreshTime = get_time();
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return OK; /* already present */
}
if (ds->max_memory < sizeof(MasterEntry)) {
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return NO;
}
ds->max_memory -= sizeof(MasterEntry);
@@ -178,14 +182,14 @@
memcpy(&pos->values[pos->count-1].hash,
&value[1],
sizeof(HashCode512));
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return OK;
} /* end key match */
pos = pos->next;
}
/* no key matched, create fresh entry */
if (ds->max_memory < sizeof(HT_Entry) + sizeof(MasterEntry)) {
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return NO;
}
ds->max_memory -= sizeof(HT_Entry) + sizeof(MasterEntry);
@@ -199,7 +203,7 @@
pos->values[0].lastRefreshTime = get_time();
pos->next = ds->first;
ds->first = pos;
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return OK;
}
@@ -224,7 +228,7 @@
!= sizeof(HashCode512)) )
return SYSERR;
- MUTEX_LOCK(&ds->lock);
+ MUTEX_LOCK(ds->lock);
prev = NULL;
pos = ds->first;
while (pos != NULL) {
@@ -247,7 +251,7 @@
FREE(pos);
ds->max_memory += sizeof(HT_Entry);
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return OK;
}
}
@@ -265,13 +269,13 @@
FREE(pos);
ds->max_memory += sizeof(HT_Entry);
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return OK;
}
prev = pos;
pos = pos->next;
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return SYSERR; /* not found */
}
@@ -294,7 +298,7 @@
if (ds == NULL)
return SYSERR;
- MUTEX_LOCK(&ds->lock);
+ MUTEX_LOCK(ds->lock);
pos = ds->first;
ret = 0;
cont = MALLOC(sizeof(HashCode512) + sizeof(DataContainer));
@@ -309,7 +313,7 @@
if (OK != processor(&pos->key,
cont,
cls)) {
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
FREE(cont);
return ret;
}
@@ -317,7 +321,7 @@
}
pos = pos->next;
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
FREE(cont);
return SYSERR;
}
@@ -329,7 +333,7 @@
cron_t now;
prev = NULL;
- MUTEX_LOCK(&store->lock);
+ MUTEX_LOCK(store->lock);
now = get_time();
pos = store->first;
while (pos != NULL) {
@@ -355,32 +359,36 @@
prev = pos;
pos = pos->next;
}
- MUTEX_UNLOCK(&store->lock);
+ MUTEX_UNLOCK(store->lock);
}
/**
* Create a DHT Datastore (in memory)
* @param max_memory do not use more than max_memory memory.
*/
-Blockstore * create_datastore_dht_master(size_t max_memory) {
+Blockstore * create_datastore_dht_master(struct GE_Context * ectx,
+ size_t max_memory) {
Blockstore * res;
MemoryDatastore * md;
md = MALLOC(sizeof(MemoryDatastore));
md->max_memory = max_memory;
md->first = NULL;
- MUTEX_CREATE_RECURSIVE(&md->lock);
-
+ md->lock = MUTEX_CREATE(YES);
+ md->cron = cron_create(ectx);
+ md->ectx = ectx;
res = MALLOC(sizeof(Blockstore));
res->get = &lookup;
res->put = &store;
res->del = &ds_remove;
res->iterate = &iterate;
res->closure = md;
- addCronJob((CronJob) &expirationJob,
- 5 * cronMINUTES,
- 5 * cronMINUTES,
- md);
+ cron_add_job(md->cron,
+ (CronJob) &expirationJob,
+ 5 * cronMINUTES,
+ 5 * cronMINUTES,
+ md);
+ cron_start(md->cron);
return res;
}
@@ -393,17 +401,14 @@
MemoryDatastore * md;
HT_Entry * pos;
HT_Entry * next;
- int icr;
md = ds->closure;
- icr = isCronRunning();
- if (icr)
- suspendCron();
- delCronJob((CronJob) &expirationJob,
- 5 * cronMINUTES,
- md);
- if (icr)
- resumeCron();
+ cron_stop(md->cron);
+ cron_del_job(md->cron,
+ (CronJob) &expirationJob,
+ 5 * cronMINUTES,
+ md);
+ cron_destroy(md->cron);
pos = md->first;
while (pos != NULL) {
@@ -414,7 +419,7 @@
FREE(pos);
pos = next;
}
- MUTEX_DESTROY(&md->lock);
+ MUTEX_DESTROY(md->lock);
FREE(md);
FREE(ds);
}
Modified: GNUnet/src/applications/dht/module/datastore_dht_master.h
===================================================================
--- GNUnet/src/applications/dht/module/datastore_dht_master.h 2006-10-01
04:45:12 UTC (rev 3429)
+++ GNUnet/src/applications/dht/module/datastore_dht_master.h 2006-10-01
05:16:17 UTC (rev 3430)
@@ -1,5 +1,6 @@
/*
This file is part of GNUnet
+ Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -28,12 +29,14 @@
#define DHT_DATASTORE_MASTER_H
#include "gnunet_blockstore.h"
+#include "gnunet_util.h"
/**
* Create a DHT Master Datastore
* @param max_memory do not use more than max_memory memory.
*/
-Blockstore * create_datastore_dht_master(size_t max_memory);
+Blockstore * create_datastore_dht_master(struct GE_Context * ectx,
+ size_t max_memory);
/**
* Destroy a DHT Master Datastore (in memory)
Modified: GNUnet/src/applications/dht/module/datastore_memory.c
===================================================================
--- GNUnet/src/applications/dht/module/datastore_memory.c 2006-10-01
04:45:12 UTC (rev 3429)
+++ GNUnet/src/applications/dht/module/datastore_memory.c 2006-10-01
05:16:17 UTC (rev 3430)
@@ -1,5 +1,6 @@
/*
This file is part of GNUnet
+ Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -48,7 +49,7 @@
* @brief the per-table data
*/
typedef struct {
- Mutex lock;
+ struct MUTEX * lock;
size_t max_memory;
HT_Entry * first;
} MemoryDatastore;
@@ -81,7 +82,7 @@
if ( (ds == NULL) || (keyCount != 1) )
return SYSERR;
- MUTEX_LOCK(&ds->lock);
+ MUTEX_LOCK(ds->lock);
pos = ds->first;
while (pos != NULL) {
if (equalsHashCode512(&keys[0], &pos->key)) {
@@ -89,15 +90,15 @@
if (OK != resultCallback(&pos->key,
pos->values[i],
resCallbackClosure)) {
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return SYSERR;
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return pos->count;
}
pos = pos->next;
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return 0;
}
@@ -113,7 +114,7 @@
const HashCode512 * key,
const DataContainer * value,
unsigned int prio) {
- MemoryDatastore * ds = (MemoryDatastore*) closure;
+ MemoryDatastore * ds = closure;
HT_Entry * pos;
unsigned int size;
@@ -121,12 +122,12 @@
return SYSERR;
size = ntohl(value->size);
- MUTEX_LOCK(&ds->lock);
+ MUTEX_LOCK(ds->lock);
pos = ds->first;
while (pos != NULL) {
if (equalsHashCode512(key, &pos->key)) {
if (ds->max_memory < size) {
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return NO;
}
ds->max_memory -= size;
@@ -138,14 +139,14 @@
memcpy(pos->values[pos->count-1],
value,
size);
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return OK;
} /* end key match */
pos = pos->next;
}
/* no key matched, create fresh entry */
if (ds->max_memory < sizeof(HT_Entry) + size) {
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return NO;
}
ds->max_memory -= sizeof(HT_Entry) + size;
@@ -159,7 +160,7 @@
size);
pos->next = ds->first;
ds->first = pos;
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return OK;
}
@@ -182,7 +183,7 @@
if (ds == NULL)
return SYSERR;
size = ntohl(value->size);
- MUTEX_LOCK(&ds->lock);
+ MUTEX_LOCK(ds->lock);
prev = NULL;
pos = ds->first;
while (pos != NULL) {
@@ -207,7 +208,7 @@
FREE(pos);
ds->max_memory += sizeof(HT_Entry);
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return OK;
}
}
@@ -228,13 +229,13 @@
FREE(pos);
ds->max_memory += sizeof(HT_Entry);
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return OK;
}
prev = pos;
pos = pos->next;
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return SYSERR; /* not found */
}
@@ -256,7 +257,7 @@
if (ds == NULL)
return SYSERR;
- MUTEX_LOCK(&ds->lock);
+ MUTEX_LOCK(ds->lock);
pos = ds->first;
ret = 0;
while (pos != NULL) {
@@ -266,13 +267,13 @@
if (OK != processor(&pos->key,
pos->values[i],
cls)) {
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return ret;
}
}
pos = pos->next;
}
- MUTEX_UNLOCK(&ds->lock);
+ MUTEX_UNLOCK(ds->lock);
return SYSERR;
}
@@ -287,7 +288,7 @@
md = MALLOC(sizeof(MemoryDatastore));
md->max_memory = max_memory;
md->first = NULL;
- MUTEX_CREATE_RECURSIVE(&md->lock);
+ md->lock = MUTEX_CREATE(YES);
res = MALLOC(sizeof(Blockstore));
res->get = &lookup;
@@ -319,7 +320,7 @@
FREE(pos);
pos = next;
}
- MUTEX_DESTROY(&md->lock);
+ MUTEX_DESTROY(md->lock);
FREE(md);
FREE(ds);
}
Modified: GNUnet/src/applications/dht/module/datastore_memory_test.c
===================================================================
--- GNUnet/src/applications/dht/module/datastore_memory_test.c 2006-10-01
04:45:12 UTC (rev 3429)
+++ GNUnet/src/applications/dht/module/datastore_memory_test.c 2006-10-01
05:16:17 UTC (rev 3430)
@@ -1,5 +1,6 @@
/*
This file is part of GNUnet
+ Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c 2006-10-01 04:45:12 UTC (rev
3429)
+++ GNUnet/src/applications/dht/module/dht.c 2006-10-01 05:16:17 UTC (rev
3430)
@@ -60,7 +60,7 @@
#define DEBUG_DHT YES
#if DEBUG_DHT
-#define ENTER() LOG(LOG_EVERYTHING, "Entering method %s at %s:%d.\n",
__FUNCTION__, __FILE__, __LINE__)
+#define ENTER() GE_LOG(ectx, GE_REQUEST | GE_DEVELOPER | GE_DEBUG, "Entering
method %s at %s:%d.\n", __FUNCTION__, __FILE__, __LINE__)
#else
#define ENTER() do {} while (0)
#endif
@@ -219,7 +219,7 @@
* Signal used to return from findNodes when timeout has
* expired.
*/
- Semaphore * signal;
+ struct SEMAPHORE * signal;
/**
* Number of entries in matches.
@@ -262,7 +262,7 @@
/**
* Lock for accessing this struct.
*/
- Mutex lock;
+ struct MUTEX * lock;
} FindNodesContext;
/**
@@ -329,7 +329,7 @@
/**
* Lock for accessing this struct.
*/
- Mutex lock;
+ struct MUTEX * lock;
/**
* Callback to call on the k nodes.
@@ -393,7 +393,7 @@
/**
* Lock for concurrent access to the record.
*/
- Mutex lock;
+ struct MUTEX * lock;
} DHT_GET_RECORD;
@@ -448,7 +448,7 @@
/**
* Lock for concurrent access to the record.
*/
- Mutex lock;
+ struct MUTEX * lock;
} DHT_PUT_RECORD;
@@ -507,7 +507,7 @@
/**
* Lock for concurrent access to the record.
*/
- Mutex lock;
+ struct MUTEX * lock;
} DHT_REMOVE_RECORD;
@@ -544,7 +544,7 @@
/**
* Lock for accessing this struct.
*/
- Mutex lock;
+ struct MUTEX * lock;
} RPC_DHT_FindValue_Context;
typedef struct {
@@ -569,7 +569,7 @@
/**
* Lock for accessing this struct.
*/
- Mutex lock;
+ struct MUTEX * lock;
} RPC_DHT_store_Context;
typedef struct {
@@ -594,7 +594,7 @@
/**
* Lock for accessing this struct.
*/
- Mutex lock;
+ struct MUTEX * lock;
} RPC_DHT_remove_Context;
/**
@@ -631,12 +631,14 @@
/**
* Global core API.
*/
-static CoreAPIForApplication * coreAPI = NULL;
+static CoreAPIForApplication * coreAPI;
+static struct GE_Context * ectx;
+
/**
* RPC API
*/
-static RPC_ServiceAPI * rpcAPI = NULL;
+static RPC_ServiceAPI * rpcAPI;
/**
* The buckets (Kademlia style routing table).
@@ -666,7 +668,7 @@
/**
* Mutex to synchronize access to tables.
*/
-static Mutex * lock;
+static struct MUTEX * lock;
/**
* Handle for the masterTable datastore that is used by this node
@@ -685,6 +687,8 @@
static unsigned int abortTableSize;
+#define hostIdentityEquals(a,b) (0 == memcmp(a,b,sizeof(PeerIdentity)))
+
/* *********************** CODE! ********************* */
#if DEBUG_DHT
@@ -692,8 +696,9 @@
unsigned int i;
MUTEX_LOCK(lock);
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "DHT ROUTING TABLE:\n");
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "DHT ROUTING TABLE:\n");
for (i=0;i<bucketCount;i++) {
if (buckets[i].peers != NULL) {
PeerInfo * pos = NULL;
@@ -711,15 +716,16 @@
hash2enc(&pos->tables[j],
&tabs[j]);
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "[%4d: %3d-%3d]: %s with %u tables (%s, %s, %s)\n",
- i,
- buckets[i].bstart, buckets[i].bend,
- &enc,
- pos->tableCount,
- &tabs[0],
- &tabs[1],
- &tabs[2]);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "[%4d: %3d-%3d]: %s with %u tables (%s, %s, %s)\n",
+ i,
+ buckets[i].bstart, buckets[i].bend,
+ &enc,
+ pos->tableCount,
+ &tabs[0],
+ &tabs[1],
+ &tabs[2]);
pos = vectorGetNext(buckets[i].peers);
}
}
@@ -1189,7 +1195,7 @@
ENTER();
now = get_time();
param = RPC_paramNew();
- MUTEX_LOCK(&fnc->lock);
+ MUTEX_LOCK(fnc->lock);
if (equalsHashCode512(&fnc->key,
&coreAPI->myIdentity->hashPubKey)) {
table = getLocalTableData(&fnc->table);
@@ -1221,7 +1227,7 @@
rel,
(RPC_Complete) &create_find_nodes_rpc_complete_callback,
fnc);
- MUTEX_UNLOCK(&fnc->lock);
+ MUTEX_UNLOCK(fnc->lock);
RPC_paramFree(param);
}
@@ -1247,7 +1253,7 @@
if (fnc == NULL)
return;
/* update k-best list */
- MUTEX_LOCK(&fnc->lock);
+ MUTEX_LOCK(fnc->lock);
pos = findPeerInfo(responder);
/* does the peer support the table in question? */
if (! equalsHashCode512(&fnc->table,
@@ -1257,7 +1263,7 @@
&pos->tables[i]))
break;
if (i == -1) {
- MUTEX_UNLOCK(&fnc->lock);
+ MUTEX_UNLOCK(fnc->lock);
return; /* peer does not support table in question */
}
}
@@ -1280,7 +1286,7 @@
/* trigger transitive request searching for more nodes! */
create_find_nodes_rpc(responder,
fnc);
- MUTEX_UNLOCK(&fnc->lock);
+ MUTEX_UNLOCK(fnc->lock);
}
/**
@@ -1320,7 +1326,7 @@
MUTEX_UNLOCK(lock);
/* peer not in RPC buckets; try PINGing via RPC */
- MUTEX_LOCK(&fnc->lock);
+ MUTEX_LOCK(fnc->lock);
GROW(fnc->rpc,
fnc->rpcRepliesExpected,
fnc->rpcRepliesExpected+1);
@@ -1339,7 +1345,7 @@
(RPC_Complete) &ping_reply_handler,
fnc);
vectorFree(request_param);
- MUTEX_UNLOCK(&fnc->lock);
+ MUTEX_UNLOCK(fnc->lock);
}
/**
@@ -1449,12 +1455,12 @@
&enc);
return;
}
- MUTEX_LOCK(&record->lock);
+ MUTEX_LOCK(record->lock);
if (record->callback != NULL)
record->resultCallback(record->keys,
value,
record->resultClosure);
- MUTEX_UNLOCK(&record->lock);
+ MUTEX_UNLOCK(record->lock);
FREE(value);
}
}
@@ -1619,7 +1625,7 @@
ret->resultsFound = 0;
ret->callback = callback;
ret->closure = closure;
- MUTEX_CREATE_RECURSIVE(&ret->lock);
+ ret->lock = MUTEX_CREATE(YES);
ret->rpc = NULL;
ret->rpcRepliesExpected = 0;
ret->kfnc = NULL;
@@ -1747,7 +1753,7 @@
for (i=0;i<record->rpcRepliesExpected;i++)
rpcAPI->RPC_stop(record->rpc[i]);
- MUTEX_DESTROY(&record->lock);
+ MUTEX_DESTROY(record->lock);
resultsFound = record->resultsFound;
FREE(record);
#if DEBUG_DHT
@@ -1859,7 +1865,7 @@
fnc->rpcRepliesExpected = 0;
fnc->rpcRepliesReceived = 0;
fnc->async_handle = NULL;
- MUTEX_CREATE_RECURSIVE(&fnc->lock);
+ fnc->lock = MUTEX_CREATE(YES);
/* find peers in local peer-list that participate in
the given table */
@@ -1947,7 +1953,7 @@
for (i=fnc->rpcRepliesExpected-1;i>=0;i--)
rpcAPI->RPC_stop(fnc->rpc[i]);
SEMAPHORE_DESTROY(fnc->signal);
- MUTEX_DESTROY(&fnc->lock);
+ MUTEX_DESTROY(fnc->lock);
/* Finally perform callbacks on collected k-best nodes. */
if (callback != NULL)
@@ -2007,7 +2013,7 @@
&enc,
"DHT_findValue");
#endif
- MUTEX_LOCK(&fnc->lock);
+ MUTEX_LOCK(fnc->lock);
if (fnc->k > 0) {
if (fnc->callback != NULL)
fnc->callback(msg,
@@ -2015,7 +2021,7 @@
fnc->k--;
fnc->found++;
}
- MUTEX_UNLOCK(&fnc->lock);
+ MUTEX_UNLOCK(fnc->lock);
}
return OK;
}
@@ -2082,7 +2088,7 @@
fnc->rpcRepliesExpected = 0;
fnc->rpcRepliesReceived = 0;
fnc->found = 0;
- MUTEX_CREATE_RECURSIVE(&fnc->lock);
+ fnc->lock = MUTEX_CREATE(YES);
matches = MALLOC(sizeof(PeerIdentity) * fnc->k);
/* find peers in local peer-list that participate in
@@ -2158,7 +2164,7 @@
/* stop all async RPCs */
for (i=fnc->rpcRepliesExpected-1;i>=0;i--)
rpcAPI->RPC_stop(fnc->rpc[i]);
- MUTEX_DESTROY(&fnc->lock);
+ MUTEX_DESTROY(fnc->lock);
i = fnc->found;
FREE(fnc);
@@ -2184,7 +2190,7 @@
ENTER();
processOptionalFields(responder, results);
- MUTEX_LOCK(&record->lock);
+ MUTEX_LOCK(record->lock);
pos = findPeerInfo(responder);
pos->lastActivity = get_time();
@@ -2200,7 +2206,7 @@
(dataLength != sizeof(PeerIdentity)) ) {
EncName enc;
- MUTEX_UNLOCK(&record->lock);
+ MUTEX_UNLOCK(record->lock);
hash2enc(&responder->hashPubKey,
&enc);
GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
@@ -2210,7 +2216,7 @@
return;
}
}
- MUTEX_UNLOCK(&record->lock);
+ MUTEX_UNLOCK(record->lock);
}
/**
@@ -2351,7 +2357,7 @@
memcpy(ret->value,
value,
ntohl(value->size));
- MUTEX_CREATE_RECURSIVE(&ret->lock);
+ ret->lock = MUTEX_CREATE(YES);
ret->rpc = NULL;
ret->rpcRepliesExpected = 0;
ret->kfnc = NULL;
@@ -2427,10 +2433,11 @@
/* call OP_Complete callback after timeout! */
addAbortJob(&dht_put_async_timeout,
ret);
- addCronJob(&dht_put_async_timeout,
- timeout,
- 0,
- ret);
+ cron_add_job(coreAPI->cron,
+ &dht_put_async_timeout,
+ timeout,
+ 0,
+ ret);
MUTEX_UNLOCK(lock);
return ret;
}
@@ -2447,7 +2454,10 @@
/* cancel timeout cron job (if still live) */
delAbortJob(&dht_put_async_timeout, record);
- delCronJob(&dht_put_async_timeout, 0, record);
+ cron_del_job(coreAPI->cron,
+ &dht_put_async_timeout,
+ 0,
+ record);
/* abort findKNodes (if running) - it may cause
the addition of additional RPCs otherwise! */
if (record->kfnc != NULL)
@@ -2455,7 +2465,7 @@
for (i=0;i<record->rpcRepliesExpected;i++)
rpcAPI->RPC_stop(record->rpc[i]);
- MUTEX_DESTROY(&record->lock);
+ MUTEX_DESTROY(record->lock);
i = record->confirmed_stores;
FREE(record->value);
FREE(record);
@@ -2485,7 +2495,7 @@
ENTER();
processOptionalFields(responder, results);
- MUTEX_LOCK(&record->lock);
+ MUTEX_LOCK(record->lock);
pos = findPeerInfo(responder);
pos->lastActivity = get_time();
max = RPC_paramCount(results);
@@ -2500,7 +2510,7 @@
(dataLength != sizeof(PeerIdentity)) ) {
EncName enc;
- MUTEX_UNLOCK(&record->lock);
+ MUTEX_UNLOCK(record->lock);
hash2enc(&responder->hashPubKey,
&enc);
GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
@@ -2511,7 +2521,7 @@
}
record->confirmed_stores++;
}
- MUTEX_UNLOCK(&record->lock);
+ MUTEX_UNLOCK(record->lock);
}
/**
@@ -2629,7 +2639,7 @@
value,
ntohl(value->size));
}
- MUTEX_CREATE_RECURSIVE(&ret->lock);
+ ret->lock = MUTEX_CREATE(YES);
ret->rpc = NULL;
ret->rpcRepliesExpected = 0;
ret->confirmed_stores = 0;
@@ -2713,7 +2723,7 @@
for (i=0;i<record->rpcRepliesExpected;i++)
rpcAPI->RPC_stop(record->rpc[i]);
- MUTEX_DESTROY(&record->lock);
+ MUTEX_DESTROY(record->lock);
i = record->confirmed_stores;
FREE(record->value);
FREE(record);
@@ -2910,9 +2920,9 @@
ENTER();
delAbortJob((CronJob) &rpc_DHT_findValue_abort,
fw);
- MUTEX_LOCK(&fw->lock);
+ MUTEX_LOCK(fw->lock);
if (fw->done == YES) {
- MUTEX_UNLOCK(&fw->lock);
+ MUTEX_UNLOCK(fw->lock);
return;
}
dht_get_async_stop(fw->get_record);
@@ -2928,7 +2938,7 @@
RPC_paramFree(results);
}
fw->done = YES;
- MUTEX_UNLOCK(&fw->lock);
+ MUTEX_UNLOCK(fw->lock);
}
/**
@@ -2941,7 +2951,7 @@
const DataContainer * value,
RPC_DHT_FindValue_Context * fw) {
ENTER();
- MUTEX_LOCK(&fw->lock);
+ MUTEX_LOCK(fw->lock);
GROW(fw->results,
fw->count,
fw->count+1);
@@ -2949,7 +2959,7 @@
memcpy(fw->results[fw->count-1],
value,
ntohl(value->size));
- MUTEX_UNLOCK(&fw->lock);
+ MUTEX_UNLOCK(fw->lock);
return OK;
}
@@ -3025,7 +3035,7 @@
fw_context
= MALLOC(sizeof(RPC_DHT_FindValue_Context));
- MUTEX_CREATE_RECURSIVE(&fw_context->lock);
+ fw_context->lock = MUTEX_CREATE(YES);
fw_context->count
= 0;
fw_context->done
@@ -3048,10 +3058,11 @@
fw_context);
addAbortJob((CronJob)&rpc_DHT_findValue_abort,
fw_context);
- addCronJob((CronJob)&rpc_DHT_findValue_abort,
- ntohll(*timeout),
- 0,
- fw_context);
+ cron_add_job(coreAPI->cron,
+ (CronJob)&rpc_DHT_findValue_abort,
+ ntohll(*timeout),
+ 0,
+ fw_context);
}
/**
@@ -3069,9 +3080,9 @@
ENTER();
delAbortJob(&rpc_DHT_store_abort,
fw);
- MUTEX_LOCK(&fw->lock);
+ MUTEX_LOCK(fw->lock);
if (fw->done == YES) {
- MUTEX_UNLOCK(&fw->lock);
+ MUTEX_UNLOCK(fw->lock);
return;
}
dht_put_async_stop(fw->put_record);
@@ -3087,7 +3098,7 @@
RPC_paramFree(results);
}
fw->done = YES;
- MUTEX_UNLOCK(&fw->lock);
+ MUTEX_UNLOCK(fw->lock);
}
/**
@@ -3099,7 +3110,10 @@
static void rpc_dht_store_callback(RPC_DHT_store_Context * fw) {
RPC_Param * param;
- delCronJob(&rpc_DHT_store_abort, 0, fw);
+ cron_del_job(coreAPI->cron,
+ &rpc_DHT_store_abort,
+ 0,
+ fw);
delAbortJob(&rpc_DHT_store_abort, fw);
param = RPC_paramNew();
fw->callback(param,
@@ -3152,7 +3166,7 @@
fw_context
= MALLOC(sizeof(RPC_DHT_store_Context));
- MUTEX_CREATE_RECURSIVE(&fw_context->lock);
+ fw_context->lock = MUTEX_CREATE(YES);
MUTEX_LOCK(lock);
ltd = getLocalTableData(table);
if (ltd == NULL) {
@@ -3176,10 +3190,11 @@
fw_context);
addAbortJob(&rpc_DHT_store_abort,
fw_context);
- addCronJob(&rpc_DHT_store_abort,
- ntohll(*timeout),
- 0,
- fw_context);
+ cron_add_job(coreAPI->cron,
+ &rpc_DHT_store_abort,
+ ntohll(*timeout),
+ 0,
+ fw_context);
FREE(value);
}
@@ -3197,9 +3212,9 @@
ENTER();
delAbortJob((CronJob) &rpc_DHT_remove_abort,
fw);
- MUTEX_LOCK(&fw->lock);
+ MUTEX_LOCK(fw->lock);
if (fw->done == YES) {
- MUTEX_UNLOCK(&fw->lock);
+ MUTEX_UNLOCK(fw->lock);
return;
}
dht_remove_async_stop(fw->remove_record);
@@ -3214,7 +3229,7 @@
fw->rpc_context);
RPC_paramFree(results);
fw->done = YES;
- MUTEX_UNLOCK(&fw->lock);
+ MUTEX_UNLOCK(fw->lock);
}
/**
@@ -3226,7 +3241,10 @@
static void rpc_dht_remove_callback(RPC_DHT_remove_Context * fw) {
RPC_Param * param;
- delCronJob(&rpc_DHT_store_abort, 0, fw);
+ cron_del_job(coreAPI->cron,
+ &rpc_DHT_store_abort,
+ 0,
+ fw);
delAbortJob(&rpc_DHT_store_abort, fw);
param = RPC_paramNew();
@@ -3288,7 +3306,7 @@
"value");
fw_context
= MALLOC(sizeof(RPC_DHT_remove_Context));
- MUTEX_CREATE_RECURSIVE(&fw_context->lock);
+ fw_context->lock = MUTEX_CREATE(YES);
MUTEX_LOCK(lock);
ltd = getLocalTableData(table);
if (ltd == NULL) {
@@ -3312,10 +3330,11 @@
fw_context);
addAbortJob((CronJob)&rpc_DHT_remove_abort,
fw_context);
- addCronJob((CronJob)&rpc_DHT_remove_abort,
- ntohll(*timeout),
- 0,
- fw_context);
+ cron_add_job(coreAPI->cron,
+ (CronJob)&rpc_DHT_remove_abort,
+ ntohll(*timeout),
+ 0,
+ fw_context);
FREE(value);
}
@@ -3354,9 +3373,10 @@
#if DEBUG_DHT
printRoutingTable();
/* first, free resources from ASYNC calls started last time */
- LOG(LOG_CRON,
- "`%s' stops async requests from last cron round.\n",
- __FUNCTION__);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
+ "`%s' stops async requests from last cron round.\n",
+ __FUNCTION__);
#endif
now = get_time();
for (i=putRecordsSize-1;i>=0;i--) {
@@ -3418,7 +3438,7 @@
coreAPI->myIdentity,
sizeof(PeerIdentity));
#if DEBUG_DHT
- LOG(LOG_CRON,
+ GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"`%s' issues DHT_PUTs to advertise tables this peer participates in.\n",
__FUNCTION__);
#endif
@@ -3452,7 +3472,7 @@
for each table that we have joined gather OUR neighbours
*/
#if DEBUG_DHT
- LOG(LOG_CRON,
+ GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"`%s' issues findNodes for each table that we participate in.\n",
__FUNCTION__);
#endif
@@ -3478,7 +3498,7 @@
b) if lastActivity is very very old, drop
*/
#if DEBUG_DHT
- LOG(LOG_CRON,
+ GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"`%s' issues put to advertise tables that we participate in.\n",
__FUNCTION__);
#endif
@@ -3553,17 +3573,32 @@
*/
DHT_ServiceAPI * provide_module_dht(CoreAPIForApplication * capi) {
static DHT_ServiceAPI api;
- unsigned int i;
+ unsigned long long i;
+ unsigned long long j;
ENTER();
coreAPI = capi;
+ ectx = capi->ectx;
rpcAPI = capi->requestService("rpc");
if (rpcAPI == NULL)
return NULL;
- i = getConfigurationInt("DHT",
- "BUCKETCOUNT");
- if ( (i == 0) || (i > 512) )
- i = 512;
+ if ( (-1 == GC_get_configuration_value_number(capi->cfg,
+ "DHT",
+ "BUCKETCOUNT",
+ 1,
+ 512,
+ 512,
+ &i)) ||
+ (-1 == GC_get_configuration_value_number(capi->cfg,
+ "DHT",
+ "MASTER-TABLE-SIZE",
+ 1,
+ 65536 * 1024,
+ 65536,
+ &j)) ) {
+ capi->releaseService(rpcAPI);
+ return NULL;
+ }
GROW(buckets,
bucketCount,
i);
@@ -3573,6 +3608,7 @@
buckets[i].peers = vectorNew(4);
}
+
rpcAPI->RPC_register("DHT_ping",
&rpc_DHT_ping);
rpcAPI->RPC_register("DHT_findNode",
@@ -3595,18 +3631,16 @@
memset(&masterTableId, 0, sizeof(HashCode512));
/* join the master table */
- i = getConfigurationInt("DHT",
- "MASTER-TABLE-SIZE");
- if (i == 0)
- i = 65536; /* 64k memory should suffice */
masterTableDatastore
- = create_datastore_dht_master(i);
+ = create_datastore_dht_master(ectx,
+ j);
dht_join(masterTableDatastore,
&masterTableId);
- addCronJob(&dhtMaintainJob,
- 0,
- DHT_MAINTAIN_FREQUENCY,
- NULL);
+ cron_add_job(coreAPI->cron,
+ &dhtMaintainJob,
+ 0,
+ DHT_MAINTAIN_FREQUENCY,
+ NULL);
return &api;
}
@@ -3628,14 +3662,16 @@
&rpc_DHT_store);
rpcAPI->RPC_unregister_async("DHT_remove",
&rpc_DHT_remove);
- delCronJob(&dhtMaintainJob,
- DHT_MAINTAIN_FREQUENCY,
- NULL);
+ cron_del_job(coreAPI->cron,
+ &dhtMaintainJob,
+ DHT_MAINTAIN_FREQUENCY,
+ NULL);
/* stop existing / pending DHT operations */
while (abortTableSize > 0) {
- delCronJob(abortTable[0].job,
- 0,
- abortTable[0].arg);
+ cron_del_job(coreAPI->cron,
+ abortTable[0].job,
+ 0,
+ abortTable[0].arg);
abortTable[0].job(abortTable[0].arg);
}
/* leave the master table */
Modified: GNUnet/src/applications/dht/module/dht.h
===================================================================
--- GNUnet/src/applications/dht/module/dht.h 2006-10-01 04:45:12 UTC (rev
3429)
+++ GNUnet/src/applications/dht/module/dht.h 2006-10-01 05:16:17 UTC (rev
3430)
@@ -1,5 +1,6 @@
/*
This file is part of GNUnet
+ Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other contributing
authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-10-01 04:45:12 UTC (rev 3429)
+++ GNUnet/todo 2006-10-01 05:16:17 UTC (rev 3430)
@@ -15,7 +15,6 @@
0.7.1 ['06] (aka "stabilization")
- finish util refactoring: [RC]
* Not compiling:
- + dht/module
+ dht/tools
+ vpn
* Testcases not compiling or passing:
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3430 - in GNUnet: . src/applications/dht/module,
grothoff <=