gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3430 - in GNUnet: . src/applications/dht/module


From: grothoff
Subject: [GNUnet-SVN] r3430 - in GNUnet: . src/applications/dht/module
Date: Sat, 30 Sep 2006 22:16:23 -0700 (PDT)

Author: grothoff
Date: 2006-09-30 22:16:17 -0700 (Sat, 30 Sep 2006)
New Revision: 3430

Modified:
   GNUnet/src/applications/dht/module/cs.c
   GNUnet/src/applications/dht/module/datastore_dht_master.c
   GNUnet/src/applications/dht/module/datastore_dht_master.h
   GNUnet/src/applications/dht/module/datastore_memory.c
   GNUnet/src/applications/dht/module/datastore_memory_test.c
   GNUnet/src/applications/dht/module/dht.c
   GNUnet/src/applications/dht/module/dht.h
   GNUnet/todo
Log:
making dht/module compile

Modified: GNUnet/src/applications/dht/module/cs.c
===================================================================
--- GNUnet/src/applications/dht/module/cs.c     2006-10-01 04:45:12 UTC (rev 
3429)
+++ GNUnet/src/applications/dht/module/cs.c     2006-10-01 05:16:17 UTC (rev 
3430)
@@ -1,5 +1,6 @@
 /*
       This file is part of GNUnet
+      Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other 
contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -34,7 +35,7 @@
 /**
  * Global core API.
  */
-static CoreAPIForApplication * coreAPI = NULL;
+static CoreAPIForApplication * coreAPI;
 
 /**
  * Reference to the DHT service API.
@@ -49,7 +50,7 @@
   /**
    * Handle to access the client.
    */
-  ClientHandle handler;
+  struct ClientHandle * handler;
   /**
    * For which table is this client responsible?
    */
@@ -66,21 +67,21 @@
    * and results fields for sending a request to the client.
    * Released after the request has been processed.
    */
-  Semaphore * prerequest;
+  struct SEMAPHORE * prerequest;
 
   /**
    * Semaphore that is up'ed by the client handler whenever a reply
    * was received.  The client exit handler also needs to up this
    * semaphore to unblock threads that wait for replies.
    */
-  Semaphore * prereply;
+  struct SEMAPHORE * prereply;
 
   /**
    * Semaphore that is down'ed by the client handler before storing
    * the data from a reply.  The cs-functions need to up it
    * once they have prepared the handlers.
    */
-  Semaphore * postreply;
+  struct SEMAPHORE * postreply;
 
   /**
    * Function to call for results
@@ -101,21 +102,21 @@
 } DHT_CLIENT_TableHandlers;
 
 typedef struct {
-  ClientHandle client;
+  struct ClientHandle * client;
   struct DHT_PUT_RECORD * put_record;
   DHT_TableId table;
   unsigned int replicas; /* confirmed puts? */
 } DHT_CLIENT_PUT_RECORD;
 
 typedef struct {
-  ClientHandle client;
+  struct ClientHandle * client;
   struct DHT_REMOVE_RECORD * remove_record;
   DHT_TableId table;
   unsigned int replicas; /* confirmed dels? */
 } DHT_CLIENT_REMOVE_RECORD;
 
 typedef struct {
-  ClientHandle client;
+  struct ClientHandle * client;
   struct DHT_GET_RECORD * get_record;
   DHT_TableId table;
   unsigned int count;
@@ -147,8 +148,10 @@
 /**
  * Lock for accessing csHandlers.
  */
-static Mutex csLock;
+static struct MUTEX * csLock;
 
+static struct GE_Context * ectx;
+
 /* ******* implementation of Blockstore via TCP link ********** */
 
 /**
@@ -176,7 +179,7 @@
     return SYSERR;
   }
 
-  SEMAPHORE_DOWN(handlers->prerequest);
+  SEMAPHORE_DOWN(handlers->prerequest, YES);
   handlers->resultCallback = resultCallback;
   handlers->resultCallbackClosure = resCallbackClosure;
   handlers->status = 0;
@@ -205,7 +208,7 @@
   }
   FREE(req);
   SEMAPHORE_UP(handlers->postreply);
-  SEMAPHORE_DOWN(handlers->prereply);
+  SEMAPHORE_DOWN(handlers->prereply, YES);
   ret = handlers->status;
   SEMAPHORE_UP(handlers->prerequest);
   return ret;
@@ -230,7 +233,7 @@
 
   n = sizeof(CS_dht_request_put_MESSAGE) + ntohl(value->size);
   req = MALLOC(n);
-  SEMAPHORE_DOWN(handlers->prerequest);
+  SEMAPHORE_DOWN(handlers->prerequest, YES);
   handlers->status = 0;
   req->header.size = htons(n);
   req->header.type = htons(CS_PROTO_dht_REQUEST_PUT);
@@ -248,12 +251,12 @@
     return SYSERR;
   }
   FREE(req);
-  LOG(LOG_EVERYTHING,
+  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
       "Sending STORE request to client!\n");
   SEMAPHORE_UP(handlers->postreply);
-  SEMAPHORE_DOWN(handlers->prereply);
+  SEMAPHORE_DOWN(handlers->prereply, YES);
   ret = handlers->status;
-  LOG(LOG_EVERYTHING,
+  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
       "Client confirmed STORE request with status %d!\n",
       ret);
   SEMAPHORE_UP(handlers->prerequest);
@@ -278,7 +281,7 @@
   if (value != NULL)
     n += htonl(value->size);
   req = MALLOC(n);
-  SEMAPHORE_DOWN(handlers->prerequest);
+  SEMAPHORE_DOWN(handlers->prerequest, YES);
   handlers->status = 0;
   req->header.size = htons(n);
   req->header.type = htons(CS_PROTO_dht_REQUEST_REMOVE);
@@ -297,7 +300,7 @@
   }
   FREE(req);
   SEMAPHORE_UP(handlers->postreply);
-  SEMAPHORE_DOWN(handlers->prereply);
+  SEMAPHORE_DOWN(handlers->prereply, YES);
   ret = handlers->status;
   SEMAPHORE_UP(handlers->prerequest);
   return ret;
@@ -317,7 +320,7 @@
   DHT_CLIENT_TableHandlers * handlers = closure;
   int ret;
 
-  SEMAPHORE_DOWN(handlers->prerequest);
+  SEMAPHORE_DOWN(handlers->prerequest, YES);
   handlers->status = 0;
   handlers->resultCallback = processor;
   handlers->resultCallbackClosure = cls;
@@ -329,7 +332,7 @@
     return SYSERR;
   }
   SEMAPHORE_UP(handlers->postreply);
-  SEMAPHORE_DOWN(handlers->prereply);
+  SEMAPHORE_DOWN(handlers->prereply, YES);
   ret = handlers->status;
   SEMAPHORE_UP(handlers->prerequest);
   return ret;
@@ -337,7 +340,7 @@
 
 /* *********************** CS handlers *********************** */
 
-static int sendAck(ClientHandle client,
+static int sendAck(struct ClientHandle * client,
                   DHT_TableId * table,
                   int value) {
   CS_dht_reply_ack_MESSAGE msg;
@@ -353,8 +356,8 @@
 /**
  * CS handler for joining existing DHT-table.
  */
-static int csJoin(ClientHandle client,
-                  const CS_MESSAGE_HEADER * message) {
+static int csJoin(struct ClientHandle * client,
+                  const MESSAGE_HEADER * message) {
   DHT_CLIENT_TableHandlers * ptr;
   CS_dht_request_join_MESSAGE * req;
   int ret;
@@ -364,7 +367,7 @@
     return SYSERR;
   }
   req = (CS_dht_request_join_MESSAGE*) message;
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   ptr = MALLOC(sizeof(DHT_CLIENT_TableHandlers));
   ptr->store = MALLOC(sizeof(Blockstore));
   ptr->store->iterate = &tcp_iterate;
@@ -394,15 +397,15 @@
   ret = sendAck(client,
                &req->table,
                ret);
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
   return ret;
 }
 
 /**
  * CS handler for leaving DHT-table.
  */
-static int csLeave(ClientHandle client,
-                   const CS_MESSAGE_HEADER * message) {
+static int csLeave(struct ClientHandle * client,
+                   const MESSAGE_HEADER * message) {
 
   CS_dht_request_leave_MESSAGE * req;
   int i;
@@ -413,10 +416,10 @@
     return SYSERR;
   }
   req = (CS_dht_request_leave_MESSAGE*) message;
-  LOG(LOG_EVERYTHING,
+  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
       "Client leaving request received!\n");
 
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   for (i=0;i<csHandlersCount;i++) {
     ptr = csHandlers[i];
     if ( (equalsHashCode512(&ptr->table,
@@ -425,12 +428,12 @@
       GROW(csHandlers,
           csHandlersCount,
           csHandlersCount-1);
-      MUTEX_UNLOCK(&csLock);
+      MUTEX_UNLOCK(csLock);
 
       /* release clients waiting on this DHT */
       ptr->status = SYSERR;
       SEMAPHORE_UP(ptr->prereply);
-      SEMAPHORE_DOWN(ptr->prerequest);
+      SEMAPHORE_DOWN(ptr->prerequest, YES);
       SEMAPHORE_DESTROY(ptr->prerequest);
       SEMAPHORE_DESTROY(ptr->prereply);
       SEMAPHORE_DESTROY(ptr->postreply);
@@ -441,7 +444,7 @@
                     OK);
     }
   }
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
   GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
       _("`%s' failed: table not found!\n"),
       "CS_DHT_LEAVE");
@@ -457,7 +460,7 @@
   GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
       "Signaling client put completion: %d\n",
       record->replicas);
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   dhtAPI->put_stop(record->put_record);
   if (OK != sendAck(record->client,
                    &record->table,
@@ -475,15 +478,15 @@
           putRecordsSize-1);
       break;
     }
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
   FREE(record);
 }
 
 /**
  * CS handler for inserting <key,value>-pair into DHT-table.
  */
-static int csPut(ClientHandle client,
-                const CS_MESSAGE_HEADER * message) {
+static int csPut(struct ClientHandle * client,
+                const MESSAGE_HEADER * message) {
   CS_dht_request_put_MESSAGE * req;
   DataContainer * data;
   DHT_CLIENT_PUT_RECORD * ptr;
@@ -513,12 +516,12 @@
   ptr->table = req->table;
   ptr->put_record = NULL;
 
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   GROW(putRecords,
        putRecordsSize,
        putRecordsSize+1);
   putRecords[putRecordsSize-1] = ptr;
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
   GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
       "Starting DHT put\n");
   ptr->put_record = dhtAPI->put_start(&req->table,
@@ -542,7 +545,7 @@
        _("sendAck failed.  Terminating connection to client.\n"));
     coreAPI->terminateClientConnection(record->client);
   }
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   for (i=removeRecordsSize-1;i>=0;i--)
     if (removeRecords[i] == record) {
       removeRecords[i] = removeRecords[removeRecordsSize-1];
@@ -551,13 +554,13 @@
           removeRecordsSize-1);
       break;
     }
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
 
   FREE(record);
 }
 
 struct CSRemoveClosure {
-  ClientHandle client;
+  struct ClientHandle * client;
   CS_dht_request_remove_MESSAGE * message;
 };
 
@@ -568,7 +571,7 @@
   CS_dht_request_remove_MESSAGE * req;
   DataContainer * data;
   DHT_CLIENT_REMOVE_RECORD * ptr;
-  ClientHandle client;
+  struct ClientHandle * client;
   unsigned int size;
 
   req = cpc->message;
@@ -592,12 +595,12 @@
   ptr->replicas = 0;
   ptr->table = req->table;
   ptr->remove_record = NULL;
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   GROW(removeRecords,
        removeRecordsSize,
        removeRecordsSize+1);
   removeRecords[removeRecordsSize-1] = ptr;
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
   ptr->remove_record = dhtAPI->remove_start(&req->table,
                                            &req->key,
                                            ntohll(req->timeout),
@@ -611,8 +614,8 @@
 /**
  * CS handler for inserting <key,value>-pair into DHT-table.
  */
-static int csRemove(ClientHandle client,
-                   const CS_MESSAGE_HEADER * message) {
+static int csRemove(struct ClientHandle * client,
+                   const MESSAGE_HEADER * message) {
   struct CSRemoveClosure * cpc;
 
   if (ntohs(message->size) < sizeof(CS_dht_request_remove_MESSAGE)) {
@@ -625,10 +628,11 @@
         message,
         ntohs(message->size));
   cpc->client = client;
-  addCronJob((CronJob)&csRemoveJob,
-            0,
-            0,
-            cpc);
+  cron_add_job(coreAPI->cron,
+              (CronJob)&csRemoveJob,
+              0,
+              0,
+              cpc);
   return OK;
 }
 
@@ -690,7 +694,7 @@
       coreAPI->terminateClientConnection(record->client);
     }
   }
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   for (i=getRecordsSize-1;i>=0;i--)
     if (getRecords[i] == record) {
       getRecords[i] = getRecords[getRecordsSize-1];
@@ -699,12 +703,12 @@
           getRecordsSize-1);
       break;
     }
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
   FREE(record);
 }
 
 struct CSGetClosure {
-  ClientHandle client;
+  struct ClientHandle * client;
   CS_dht_request_get_MESSAGE * message;
 };
 
@@ -714,7 +718,7 @@
 static int csGetJob(struct CSGetClosure * cpc) {
   CS_dht_request_get_MESSAGE * req;
   DHT_CLIENT_GET_RECORD * ptr;
-  ClientHandle client;
+  struct ClientHandle * client;
   unsigned int keyCount;
 
   client = cpc->client;
@@ -728,12 +732,12 @@
   ptr->table = req->table;
   ptr->get_record = NULL;
 
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   GROW(getRecords,
        getRecordsSize,
        getRecordsSize+1);
   getRecords[getRecordsSize-1] = ptr;
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
   ptr->get_record = dhtAPI->get_start(&req->table,
                                      ntohl(req->type),
                                      keyCount,
@@ -749,8 +753,8 @@
 /**
  * CS handler for inserting <key,value>-pair into DHT-table.
  */
-static int csGet(ClientHandle client,
-                const CS_MESSAGE_HEADER * message) {
+static int csGet(struct ClientHandle * client,
+                const MESSAGE_HEADER * message) {
   struct CSGetClosure * cpc;
 
   if (ntohs(message->size) != sizeof(CS_dht_request_get_MESSAGE)) {
@@ -764,10 +768,11 @@
         message,
         ntohs(message->size));
   cpc->client = client;
-  addCronJob((CronJob)&csGetJob,
-            0,
-            0,
-            cpc);
+  cron_add_job(coreAPI->cron,
+              (CronJob)&csGetJob,
+              0,
+              0,
+              cpc);
   return OK;
 }
 
@@ -776,8 +781,8 @@
  * the status value in status and up's the semaphore to signal
  * that we received a reply.
  */
-static int csACK(ClientHandle client,
-                const CS_MESSAGE_HEADER * message) {
+static int csACK(struct ClientHandle * client,
+                const MESSAGE_HEADER * message) {
   DHT_CLIENT_TableHandlers * ptr;
   CS_dht_reply_ack_MESSAGE * req;
   int i;
@@ -787,23 +792,23 @@
     return SYSERR;
   }
   req =(CS_dht_reply_ack_MESSAGE*) message;
-  LOG(LOG_EVERYTHING,
+  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
       "`%s' received from client.\n",
       "CS_dht_reply_ack_MESSAGE");
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   for (i=0;i<csHandlersCount;i++) {
     ptr = csHandlers[i];
     if ( (ptr->handler == client) &&
         (equalsHashCode512(&ptr->table,
                            &req->table)) ) {
-      SEMAPHORE_DOWN(ptr->postreply);
+      SEMAPHORE_DOWN(ptr->postreply, YES);
       ptr->status = ntohl(req->status);
       SEMAPHORE_UP(ptr->prereply);
-      MUTEX_UNLOCK(&csLock);
+      MUTEX_UNLOCK(csLock);
       return OK;
     }
   }
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
   GE_LOG(ectx, GE_ERROR | GE_BULK | GE_USER,
       _("Failed to deliver `%s' message.\n"),
       "CS_dht_reply_ack_MESSAGE");
@@ -815,8 +820,8 @@
  * and passes on the new result.  If all results have been
  * collected, signals using the semaphore.
  */
-static int csResults(ClientHandle client,
-                    const CS_MESSAGE_HEADER * message) {
+static int csResults(struct ClientHandle * client,
+                    const MESSAGE_HEADER * message) {
   CS_dht_reply_results_MESSAGE * req;
   DHT_CLIENT_TableHandlers * ptr;
   unsigned int dataLength;
@@ -832,17 +837,17 @@
     GE_BREAK(ectx, 0);
     return SYSERR;
   }
-  LOG(LOG_EVERYTHING,
+  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
       "`%s' received from client.\n",
       "CS_dht_reply_results_MESSAGE");
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   for (i=0;i<csHandlersCount;i++) {
     if ( (csHandlers[i]->handler == client) &&
         (equalsHashCode512(&csHandlers[i]->table,
                            &req->table)) ) {
       ptr = csHandlers[i];
-      SEMAPHORE_DOWN(ptr->postreply);
-      LOG(LOG_EVERYTHING,
+      SEMAPHORE_DOWN(ptr->postreply, YES);
+      GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
          "`%s' received result '%.*s'!\n",
          __FUNCTION__,
          dataLength - sizeof(DataContainer),
@@ -852,11 +857,11 @@
                          &req->data,                   
                          ptr->resultCallbackClosure);
       ptr->status++;
-      MUTEX_UNLOCK(&csLock);
+      MUTEX_UNLOCK(csLock);
       return OK;
     }
   }
-  MUTEX_UNLOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
   GE_LOG(ectx, GE_ERROR | GE_BULK | GE_USER,
       _("Failed to deliver `%s' message.\n"),
       "CS_dht_reply_results_MESSAGE");
@@ -867,14 +872,13 @@
  * CS handler for handling exiting client.  Triggers
  * csLeave for all tables that rely on this client.
  */
-static void csClientExit(ClientHandle client) {
+static void csClientExit(struct ClientHandle * client) {
   int i;
   DHT_CLIENT_GET_RECORD * gr;
   DHT_CLIENT_PUT_RECORD * pr;
   DHT_CLIENT_REMOVE_RECORD * rr;
-  int haveCron;
 
-  MUTEX_LOCK(&csLock);
+  MUTEX_LOCK(csLock);
   for (i=0;i<csHandlersCount;i++) {
     if (csHandlers[i]->handler == client) {
       CS_dht_request_leave_MESSAGE message;
@@ -887,18 +891,18 @@
       i--;
     }
   }
-  haveCron = isCronRunning();
-  MUTEX_UNLOCK(&csLock);
-  if (YES == haveCron)
-    suspendCron();
-  MUTEX_LOCK(&csLock);
+  MUTEX_UNLOCK(csLock);
+  cron_suspend(coreAPI->cron,
+              YES);
+  MUTEX_LOCK(csLock);
   for (i=0;i<getRecordsSize;i++) {
     if (getRecords[i]->client == client) {
       gr = getRecords[i];
 
-      delCronJob(&cs_get_abort,
-                0,
-                gr);
+      cron_del_job(coreAPI->cron,
+                  &cs_get_abort,
+                  0,
+                  gr);
       dhtAPI->get_stop(gr->get_record);
       getRecords[i] = getRecords[getRecordsSize-1];
       GROW(getRecords,
@@ -910,9 +914,10 @@
     if (putRecords[i]->client == client) {
       pr = putRecords[i];
 
-      delCronJob(&cs_put_abort,
-                0,
-                pr);
+      cron_del_job(coreAPI->cron,
+                  &cs_put_abort,
+                  0,
+                  pr);
       dhtAPI->put_stop(pr->put_record);
       putRecords[i] = putRecords[putRecordsSize-1];
       GROW(putRecords,
@@ -924,9 +929,10 @@
     if (removeRecords[i]->client == client) {
       rr = removeRecords[i];
 
-      delCronJob((CronJob) &cs_remove_abort,
-                0,
-                rr);
+      cron_del_job(coreAPI->cron,
+                  (CronJob) &cs_remove_abort,
+                  0,
+                  rr);
       dhtAPI->remove_stop(rr->remove_record);
       removeRecords[i] = removeRecords[removeRecordsSize-1];
       GROW(removeRecords,
@@ -934,14 +940,15 @@
           removeRecordsSize-1);
     }
   }
-  MUTEX_UNLOCK(&csLock);
-  if (YES == haveCron)
-    resumeCron();
+  MUTEX_UNLOCK(csLock);
+  cron_resume_jobs(coreAPI->cron,
+                  YES);
 }
 
 int initialize_module_dht(CoreAPIForApplication * capi) {
   int status;
 
+  ectx = capi->ectx;
   dhtAPI = capi->requestService("dht");
   if (dhtAPI == NULL)
     return SYSERR;
@@ -957,7 +964,7 @@
       CS_PROTO_dht_REPLY_GET,
       CS_PROTO_dht_REPLY_ACK);
   status = OK;
-  MUTEX_CREATE_RECURSIVE(&csLock);
+  csLock = MUTEX_CREATE(YES);
   if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REQUEST_JOIN,
                                             &csJoin))
     status = SYSERR;
@@ -1018,23 +1025,26 @@
     status = SYSERR;
 
   while (putRecordsSize > 0) {
-    delCronJob(&cs_put_abort,
-              0,
-              putRecords[0]);
+    cron_del_job(coreAPI->cron,
+                &cs_put_abort,
+                0,
+                putRecords[0]);
     cs_put_abort(putRecords[0]);
   }
 
   while (removeRecordsSize > 0) {
-    delCronJob((CronJob) &cs_remove_abort,
-              0,
-              removeRecords[0]);
+    cron_del_job(coreAPI->cron,
+                (CronJob) &cs_remove_abort,
+                0,
+                removeRecords[0]);
     cs_remove_abort(removeRecords[0]);
   }
 
   while (getRecordsSize > 0) {
-    delCronJob(&cs_get_abort,
-              0,
-              getRecords[0]);
+    cron_del_job(coreAPI->cron,
+                &cs_get_abort,
+                0,
+                getRecords[0]);
     cs_get_abort(getRecords[0]);
   }
 
@@ -1044,7 +1054,7 @@
   coreAPI->releaseService(dhtAPI);
   dhtAPI = NULL;
   coreAPI = NULL;
-  MUTEX_DESTROY(&csLock);
+  MUTEX_DESTROY(csLock);
   return status;
 }
 

Modified: GNUnet/src/applications/dht/module/datastore_dht_master.c
===================================================================
--- GNUnet/src/applications/dht/module/datastore_dht_master.c   2006-10-01 
04:45:12 UTC (rev 3429)
+++ GNUnet/src/applications/dht/module/datastore_dht_master.c   2006-10-01 
05:16:17 UTC (rev 3430)
@@ -1,5 +1,6 @@
  /*
       This file is part of GNUnet
+      Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other 
contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -38,6 +39,7 @@
 #include "platform.h"
 
 #include "gnunet_core.h"
+#include "gnunet_util_cron.h"
 #include "datastore_dht_master.h"
 
 typedef struct {
@@ -59,7 +61,9 @@
  * @brief the per-table data
  */
 typedef struct {
-  Mutex lock;
+  struct GE_Context * ectx;
+  struct MUTEX * lock;
+  struct CronManager * cron;
   size_t max_memory;
   HT_Entry * first;
 } MemoryDatastore;
@@ -80,16 +84,16 @@
                  const HashCode512 * keys,
                  DataProcessor resultCallback,
                  void * resCallbackClosure) {
-  MemoryDatastore * ds = (MemoryDatastore*) closure;
+  MemoryDatastore * ds = closure;
   HT_Entry * pos;
   int count;
   int i;
   DataContainer * data;
 
-  GE_ASSERT(ectx, keyCount == 1);
+  GE_ASSERT(ds->ectx, keyCount == 1);
   if (ds == NULL)
     return SYSERR;
-  MUTEX_LOCK(&ds->lock);
+  MUTEX_LOCK(ds->lock);
   pos = ds->first;
   while (pos != NULL) {
     if (equalsHashCode512(&keys[0], &pos->key)) {
@@ -123,12 +127,12 @@
        FREE(data);
       }
       FREENONNULL(perm);
-      MUTEX_UNLOCK(&ds->lock);
+      MUTEX_UNLOCK(ds->lock);
       return count;
     }
     pos = pos->next;
   }
-  MUTEX_UNLOCK(&ds->lock);
+  MUTEX_UNLOCK(ds->lock);
   return 0;
 }
 
@@ -155,7 +159,7 @@
       != sizeof(HashCode512))
     return SYSERR;
 
-  MUTEX_LOCK(&ds->lock);
+  MUTEX_LOCK(ds->lock);
   pos = ds->first;
   while (pos != NULL) {
     if (equalsHashCode512(key, &pos->key)) {
@@ -163,11 +167,11 @@
        if (equalsHashCode512(&pos->values[i].hash,
                              (HashCode512*)&value[1])) {
          pos->values[i].lastRefreshTime = get_time();
-         MUTEX_UNLOCK(&ds->lock);
+         MUTEX_UNLOCK(ds->lock);
          return OK; /* already present */
        }
       if (ds->max_memory < sizeof(MasterEntry)) {
-       MUTEX_UNLOCK(&ds->lock);
+       MUTEX_UNLOCK(ds->lock);
        return NO;
       }
       ds->max_memory -= sizeof(MasterEntry);
@@ -178,14 +182,14 @@
       memcpy(&pos->values[pos->count-1].hash,
             &value[1],
             sizeof(HashCode512));
-      MUTEX_UNLOCK(&ds->lock);
+      MUTEX_UNLOCK(ds->lock);
       return OK;
     } /* end key match */
     pos = pos->next;
   }
   /* no key matched, create fresh entry */
   if (ds->max_memory < sizeof(HT_Entry) + sizeof(MasterEntry)) {
-    MUTEX_UNLOCK(&ds->lock);
+    MUTEX_UNLOCK(ds->lock);
     return NO;
   }
   ds->max_memory -= sizeof(HT_Entry) + sizeof(MasterEntry);
@@ -199,7 +203,7 @@
   pos->values[0].lastRefreshTime = get_time();
   pos->next = ds->first;
   ds->first = pos;
-  MUTEX_UNLOCK(&ds->lock);
+  MUTEX_UNLOCK(ds->lock);
   return OK;
 }
 
@@ -224,7 +228,7 @@
        != sizeof(HashCode512)) )
     return SYSERR;
 
-  MUTEX_LOCK(&ds->lock);
+  MUTEX_LOCK(ds->lock);
   prev = NULL;
   pos = ds->first;
   while (pos != NULL) {
@@ -247,7 +251,7 @@
              FREE(pos);
              ds->max_memory += sizeof(HT_Entry);       
            }
-           MUTEX_UNLOCK(&ds->lock);
+           MUTEX_UNLOCK(ds->lock);
            return OK;
          }
        }
@@ -265,13 +269,13 @@
        FREE(pos);
        ds->max_memory += sizeof(HT_Entry);
       }
-      MUTEX_UNLOCK(&ds->lock);
+      MUTEX_UNLOCK(ds->lock);
       return OK;
     }
     prev = pos;
     pos = pos->next;
   }
-  MUTEX_UNLOCK(&ds->lock);
+  MUTEX_UNLOCK(ds->lock);
   return SYSERR; /* not found */
 }
 
@@ -294,7 +298,7 @@
   if (ds == NULL)
     return SYSERR;
 
-  MUTEX_LOCK(&ds->lock);
+  MUTEX_LOCK(ds->lock);
   pos = ds->first;
   ret = 0;
   cont = MALLOC(sizeof(HashCode512) + sizeof(DataContainer));
@@ -309,7 +313,7 @@
        if (OK != processor(&pos->key,
                            cont,
                            cls)) {
-         MUTEX_UNLOCK(&ds->lock);
+         MUTEX_UNLOCK(ds->lock);
          FREE(cont);
          return ret;
        }
@@ -317,7 +321,7 @@
     }
     pos = pos->next;
   }
-  MUTEX_UNLOCK(&ds->lock);
+  MUTEX_UNLOCK(ds->lock);
   FREE(cont);
   return SYSERR;
 }
@@ -329,7 +333,7 @@
   cron_t now;
 
   prev = NULL;
-  MUTEX_LOCK(&store->lock);
+  MUTEX_LOCK(store->lock);
   now = get_time();
   pos = store->first;
   while (pos != NULL) {
@@ -355,32 +359,36 @@
     prev = pos;
     pos = pos->next;
   }
-  MUTEX_UNLOCK(&store->lock);
+  MUTEX_UNLOCK(store->lock);
 }
 
 /**
  * Create a DHT Datastore (in memory)
  * @param max_memory do not use more than max_memory memory.
  */
-Blockstore * create_datastore_dht_master(size_t max_memory) {
+Blockstore * create_datastore_dht_master(struct GE_Context * ectx,
+                                        size_t max_memory) {
   Blockstore * res;
   MemoryDatastore * md;
 
   md = MALLOC(sizeof(MemoryDatastore));
   md->max_memory = max_memory;
   md->first = NULL;
-  MUTEX_CREATE_RECURSIVE(&md->lock);
-
+  md->lock = MUTEX_CREATE(YES);
+  md->cron = cron_create(ectx);
+  md->ectx = ectx;
   res = MALLOC(sizeof(Blockstore));
   res->get  = &lookup;
   res->put   = &store;
   res->del  = &ds_remove;
   res->iterate = &iterate;
   res->closure = md;
-  addCronJob((CronJob) &expirationJob,
-            5 * cronMINUTES,
-            5 * cronMINUTES,
-            md);
+  cron_add_job(md->cron,
+              (CronJob) &expirationJob,
+              5 * cronMINUTES,
+              5 * cronMINUTES,
+              md);
+  cron_start(md->cron);
   return res;
 }
 
@@ -393,17 +401,14 @@
   MemoryDatastore * md;
   HT_Entry * pos;
   HT_Entry * next;
-  int icr;
 
   md  = ds->closure;
-  icr = isCronRunning();
-  if (icr)
-    suspendCron();
-  delCronJob((CronJob) &expirationJob,
-            5 * cronMINUTES,
-            md);
-  if (icr)
-    resumeCron();
+  cron_stop(md->cron);
+  cron_del_job(md->cron,
+              (CronJob) &expirationJob,
+              5 * cronMINUTES,
+              md);
+  cron_destroy(md->cron);
 
   pos = md->first;
   while (pos != NULL) {
@@ -414,7 +419,7 @@
     FREE(pos);
     pos = next;
   }
-  MUTEX_DESTROY(&md->lock);
+  MUTEX_DESTROY(md->lock);
   FREE(md);
   FREE(ds);
 }

Modified: GNUnet/src/applications/dht/module/datastore_dht_master.h
===================================================================
--- GNUnet/src/applications/dht/module/datastore_dht_master.h   2006-10-01 
04:45:12 UTC (rev 3429)
+++ GNUnet/src/applications/dht/module/datastore_dht_master.h   2006-10-01 
05:16:17 UTC (rev 3430)
@@ -1,5 +1,6 @@
  /*
       This file is part of GNUnet
+      Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other 
contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -28,12 +29,14 @@
 #define DHT_DATASTORE_MASTER_H
 
 #include "gnunet_blockstore.h"
+#include "gnunet_util.h"
 
 /**
  * Create a DHT Master Datastore
  * @param max_memory do not use more than max_memory memory.
  */
-Blockstore * create_datastore_dht_master(size_t max_memory);
+Blockstore * create_datastore_dht_master(struct GE_Context * ectx,
+                                        size_t max_memory);
 
 /**
  * Destroy a DHT Master Datastore (in memory)

Modified: GNUnet/src/applications/dht/module/datastore_memory.c
===================================================================
--- GNUnet/src/applications/dht/module/datastore_memory.c       2006-10-01 
04:45:12 UTC (rev 3429)
+++ GNUnet/src/applications/dht/module/datastore_memory.c       2006-10-01 
05:16:17 UTC (rev 3430)
@@ -1,5 +1,6 @@
  /*
       This file is part of GNUnet
+      Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other 
contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -48,7 +49,7 @@
  * @brief the per-table data
  */
 typedef struct {
-  Mutex lock;
+  struct MUTEX * lock;
   size_t max_memory;
   HT_Entry * first;
 } MemoryDatastore;
@@ -81,7 +82,7 @@
 
   if ( (ds == NULL) || (keyCount != 1) )
     return SYSERR;
-  MUTEX_LOCK(&ds->lock);
+  MUTEX_LOCK(ds->lock);
   pos = ds->first;
   while (pos != NULL) {
     if (equalsHashCode512(&keys[0], &pos->key)) {
@@ -89,15 +90,15 @@
        if (OK != resultCallback(&pos->key,
                                 pos->values[i],
                                 resCallbackClosure)) {
-         MUTEX_UNLOCK(&ds->lock);
+         MUTEX_UNLOCK(ds->lock);
          return SYSERR;
        }
-      MUTEX_UNLOCK(&ds->lock);
+      MUTEX_UNLOCK(ds->lock);
       return pos->count;
     }
     pos = pos->next;
   }
-  MUTEX_UNLOCK(&ds->lock);
+  MUTEX_UNLOCK(ds->lock);
   return 0;
 }
 
@@ -113,7 +114,7 @@
                 const HashCode512 * key,
                 const DataContainer * value,
                 unsigned int prio) {
-  MemoryDatastore * ds = (MemoryDatastore*) closure;
+  MemoryDatastore * ds = closure;
   HT_Entry * pos;
   unsigned int size;
 
@@ -121,12 +122,12 @@
     return SYSERR;
 
   size = ntohl(value->size);
-  MUTEX_LOCK(&ds->lock);
+  MUTEX_LOCK(ds->lock);
   pos = ds->first;
   while (pos != NULL) {
     if (equalsHashCode512(key, &pos->key)) {
       if (ds->max_memory < size) {
-       MUTEX_UNLOCK(&ds->lock);
+       MUTEX_UNLOCK(ds->lock);
        return NO;
       }
       ds->max_memory -= size;
@@ -138,14 +139,14 @@
       memcpy(pos->values[pos->count-1],
             value,
             size);
-      MUTEX_UNLOCK(&ds->lock);
+      MUTEX_UNLOCK(ds->lock);
       return OK;
     } /* end key match */
     pos = pos->next;
   }
   /* no key matched, create fresh entry */
   if (ds->max_memory < sizeof(HT_Entry) + size) {
-    MUTEX_UNLOCK(&ds->lock);
+    MUTEX_UNLOCK(ds->lock);
     return NO;
   }
   ds->max_memory -= sizeof(HT_Entry) + size;
@@ -159,7 +160,7 @@
         size);
   pos->next = ds->first;
   ds->first = pos;
-  MUTEX_UNLOCK(&ds->lock);
+  MUTEX_UNLOCK(ds->lock);
   return OK;
 }
 
@@ -182,7 +183,7 @@
   if (ds == NULL)
     return SYSERR;
   size = ntohl(value->size);
-  MUTEX_LOCK(&ds->lock);
+  MUTEX_LOCK(ds->lock);
   prev = NULL;
   pos = ds->first;
   while (pos != NULL) {
@@ -207,7 +208,7 @@
              FREE(pos);
              ds->max_memory += sizeof(HT_Entry);       
            }
-           MUTEX_UNLOCK(&ds->lock);
+           MUTEX_UNLOCK(ds->lock);
            return OK;
          }
        }
@@ -228,13 +229,13 @@
        FREE(pos);
        ds->max_memory += sizeof(HT_Entry);
       }
-      MUTEX_UNLOCK(&ds->lock);
+      MUTEX_UNLOCK(ds->lock);
       return OK;
     }
     prev = pos;
     pos = pos->next;
   }
-  MUTEX_UNLOCK(&ds->lock);
+  MUTEX_UNLOCK(ds->lock);
   return SYSERR; /* not found */
 }
 
@@ -256,7 +257,7 @@
   if (ds == NULL)
     return SYSERR;
 
-  MUTEX_LOCK(&ds->lock);
+  MUTEX_LOCK(ds->lock);
   pos = ds->first;
   ret = 0;
   while (pos != NULL) {
@@ -266,13 +267,13 @@
        if (OK != processor(&pos->key,
                            pos->values[i],
                            cls)) {
-         MUTEX_UNLOCK(&ds->lock);
+         MUTEX_UNLOCK(ds->lock);
          return ret;
        }
     }
     pos = pos->next;
   }
-  MUTEX_UNLOCK(&ds->lock);
+  MUTEX_UNLOCK(ds->lock);
   return SYSERR;
 }
 
@@ -287,7 +288,7 @@
   md = MALLOC(sizeof(MemoryDatastore));
   md->max_memory = max_memory;
   md->first = NULL;
-  MUTEX_CREATE_RECURSIVE(&md->lock);
+  md->lock = MUTEX_CREATE(YES);
 
   res = MALLOC(sizeof(Blockstore));
   res->get = &lookup;
@@ -319,7 +320,7 @@
     FREE(pos);
     pos = next;
   }
-  MUTEX_DESTROY(&md->lock);
+  MUTEX_DESTROY(md->lock);
   FREE(md);
   FREE(ds);
 }

Modified: GNUnet/src/applications/dht/module/datastore_memory_test.c
===================================================================
--- GNUnet/src/applications/dht/module/datastore_memory_test.c  2006-10-01 
04:45:12 UTC (rev 3429)
+++ GNUnet/src/applications/dht/module/datastore_memory_test.c  2006-10-01 
05:16:17 UTC (rev 3430)
@@ -1,5 +1,6 @@
  /*
       This file is part of GNUnet
+      Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other 
contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published

Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c    2006-10-01 04:45:12 UTC (rev 
3429)
+++ GNUnet/src/applications/dht/module/dht.c    2006-10-01 05:16:17 UTC (rev 
3430)
@@ -60,7 +60,7 @@
 #define DEBUG_DHT YES
 
 #if DEBUG_DHT
-#define ENTER() LOG(LOG_EVERYTHING, "Entering method %s at %s:%d.\n", 
__FUNCTION__, __FILE__, __LINE__)
+#define ENTER() GE_LOG(ectx, GE_REQUEST | GE_DEVELOPER | GE_DEBUG, "Entering 
method %s at %s:%d.\n", __FUNCTION__, __FILE__, __LINE__)
 #else
 #define ENTER() do {} while (0)
 #endif
@@ -219,7 +219,7 @@
    * Signal used to return from findNodes when timeout has
    * expired.
    */
-  Semaphore * signal;
+  struct SEMAPHORE * signal;
 
   /**
    * Number of entries in matches.
@@ -262,7 +262,7 @@
   /**
    * Lock for accessing this struct.
    */
-  Mutex lock;
+  struct MUTEX * lock;
 } FindNodesContext;
 
 /**
@@ -329,7 +329,7 @@
   /**
    * Lock for accessing this struct.
    */
-  Mutex lock;
+  struct MUTEX * lock;
 
   /**
    * Callback to call on the k nodes.
@@ -393,7 +393,7 @@
   /**
    * Lock for concurrent access to the record.
    */
-  Mutex lock;
+  struct MUTEX * lock;
 
 } DHT_GET_RECORD;
 
@@ -448,7 +448,7 @@
   /**
    * Lock for concurrent access to the record.
    */
-  Mutex lock;
+  struct MUTEX * lock;
 
 } DHT_PUT_RECORD;
 
@@ -507,7 +507,7 @@
   /**
    * Lock for concurrent access to the record.
    */
-  Mutex lock;
+  struct MUTEX * lock;
 
 } DHT_REMOVE_RECORD;
 
@@ -544,7 +544,7 @@
   /**
    * Lock for accessing this struct.
    */
-  Mutex lock;
+  struct MUTEX * lock;
 } RPC_DHT_FindValue_Context;
 
 typedef struct {
@@ -569,7 +569,7 @@
   /**
    * Lock for accessing this struct.
    */
-  Mutex lock;
+  struct MUTEX * lock;
 } RPC_DHT_store_Context;
 
 typedef struct {
@@ -594,7 +594,7 @@
   /**
    * Lock for accessing this struct.
    */
-  Mutex lock;
+  struct MUTEX * lock;
 } RPC_DHT_remove_Context;
 
 /**
@@ -631,12 +631,14 @@
 /**
  * Global core API.
  */
-static CoreAPIForApplication * coreAPI = NULL;
+static CoreAPIForApplication * coreAPI;
 
+static struct GE_Context * ectx;
+
 /**
  * RPC API
  */
-static RPC_ServiceAPI * rpcAPI = NULL;
+static RPC_ServiceAPI * rpcAPI;
 
 /**
  * The buckets (Kademlia style routing table).
@@ -666,7 +668,7 @@
 /**
  * Mutex to synchronize access to tables.
  */
-static Mutex * lock;
+static struct MUTEX * lock;
 
 /**
  * Handle for the masterTable datastore that is used by this node
@@ -685,6 +687,8 @@
 
 static unsigned int abortTableSize;
 
+#define hostIdentityEquals(a,b) (0 == memcmp(a,b,sizeof(PeerIdentity)))
+
 /* *********************** CODE! ********************* */
 
 #if DEBUG_DHT
@@ -692,8 +696,9 @@
   unsigned int i;
 
   MUTEX_LOCK(lock);
-  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-      "DHT ROUTING TABLE:\n");
+  GE_LOG(ectx, 
+        GE_DEBUG | GE_REQUEST | GE_USER,
+        "DHT ROUTING TABLE:\n");
   for (i=0;i<bucketCount;i++) {
     if (buckets[i].peers != NULL) {
       PeerInfo * pos = NULL;
@@ -711,15 +716,16 @@
          hash2enc(&pos->tables[j],
                   &tabs[j]);
        
-       GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-           "[%4d: %3d-%3d]: %s with %u tables (%s, %s, %s)\n",
-           i,
-           buckets[i].bstart, buckets[i].bend,
-           &enc,
-           pos->tableCount,
-           &tabs[0],
-           &tabs[1],
-           &tabs[2]);
+       GE_LOG(ectx,
+              GE_DEBUG | GE_REQUEST | GE_USER,
+              "[%4d: %3d-%3d]: %s with %u tables (%s, %s, %s)\n",
+              i,
+              buckets[i].bstart, buckets[i].bend,
+              &enc,
+              pos->tableCount,
+              &tabs[0],
+              &tabs[1],
+              &tabs[2]);
        pos = vectorGetNext(buckets[i].peers);
       }
     }
@@ -1189,7 +1195,7 @@
   ENTER();
   now = get_time();
   param = RPC_paramNew();
-  MUTEX_LOCK(&fnc->lock);
+  MUTEX_LOCK(fnc->lock);
   if (equalsHashCode512(&fnc->key,
                        &coreAPI->myIdentity->hashPubKey)) {
     table = getLocalTableData(&fnc->table);
@@ -1221,7 +1227,7 @@
                        rel,
                        (RPC_Complete) &create_find_nodes_rpc_complete_callback,
                        fnc);
-  MUTEX_UNLOCK(&fnc->lock);
+  MUTEX_UNLOCK(fnc->lock);
   RPC_paramFree(param);
 }
 
@@ -1247,7 +1253,7 @@
   if (fnc == NULL)
     return;
   /* update k-best list */
-  MUTEX_LOCK(&fnc->lock);
+  MUTEX_LOCK(fnc->lock);
   pos = findPeerInfo(responder);
   /* does the peer support the table in question? */
   if (! equalsHashCode512(&fnc->table,
@@ -1257,7 +1263,7 @@
                            &pos->tables[i]))
        break;
     if (i == -1) {
-      MUTEX_UNLOCK(&fnc->lock);
+      MUTEX_UNLOCK(fnc->lock);
       return; /* peer does not support table in question */
     }
   }
@@ -1280,7 +1286,7 @@
   /* trigger transitive request searching for more nodes! */
   create_find_nodes_rpc(responder,
                        fnc);
-  MUTEX_UNLOCK(&fnc->lock);
+  MUTEX_UNLOCK(fnc->lock);
 }
 
 /**
@@ -1320,7 +1326,7 @@
   MUTEX_UNLOCK(lock);
 
   /* peer not in RPC buckets; try PINGing via RPC */
-  MUTEX_LOCK(&fnc->lock);
+  MUTEX_LOCK(fnc->lock);
   GROW(fnc->rpc,
        fnc->rpcRepliesExpected,
        fnc->rpcRepliesExpected+1);
@@ -1339,7 +1345,7 @@
                        (RPC_Complete) &ping_reply_handler,
                        fnc);
   vectorFree(request_param);
-  MUTEX_UNLOCK(&fnc->lock);
+  MUTEX_UNLOCK(fnc->lock);
 }
 
 /**
@@ -1449,12 +1455,12 @@
          &enc);
       return;
     }
-    MUTEX_LOCK(&record->lock);
+    MUTEX_LOCK(record->lock);
     if (record->callback != NULL)
       record->resultCallback(record->keys,
                             value,
                             record->resultClosure);
-    MUTEX_UNLOCK(&record->lock);
+    MUTEX_UNLOCK(record->lock);
     FREE(value);
   }
 }
@@ -1619,7 +1625,7 @@
   ret->resultsFound = 0;
   ret->callback = callback;
   ret->closure = closure;
-  MUTEX_CREATE_RECURSIVE(&ret->lock);
+  ret->lock = MUTEX_CREATE(YES);
   ret->rpc = NULL;
   ret->rpcRepliesExpected = 0;
   ret->kfnc = NULL;
@@ -1747,7 +1753,7 @@
 
   for (i=0;i<record->rpcRepliesExpected;i++)
     rpcAPI->RPC_stop(record->rpc[i]);
-  MUTEX_DESTROY(&record->lock);
+  MUTEX_DESTROY(record->lock);
   resultsFound = record->resultsFound;
   FREE(record);
 #if DEBUG_DHT
@@ -1859,7 +1865,7 @@
   fnc->rpcRepliesExpected = 0;
   fnc->rpcRepliesReceived = 0;
   fnc->async_handle = NULL;
-  MUTEX_CREATE_RECURSIVE(&fnc->lock);
+  fnc->lock = MUTEX_CREATE(YES);
 
   /* find peers in local peer-list that participate in
      the given table */
@@ -1947,7 +1953,7 @@
   for (i=fnc->rpcRepliesExpected-1;i>=0;i--)
     rpcAPI->RPC_stop(fnc->rpc[i]);
   SEMAPHORE_DESTROY(fnc->signal);
-  MUTEX_DESTROY(&fnc->lock);
+  MUTEX_DESTROY(fnc->lock);
 
   /* Finally perform callbacks on collected k-best nodes. */
   if (callback != NULL)
@@ -2007,7 +2013,7 @@
        &enc,
        "DHT_findValue");
 #endif
-    MUTEX_LOCK(&fnc->lock);
+    MUTEX_LOCK(fnc->lock);
     if (fnc->k > 0) {
       if (fnc->callback != NULL)
        fnc->callback(msg,
@@ -2015,7 +2021,7 @@
       fnc->k--;
       fnc->found++;
     }
-    MUTEX_UNLOCK(&fnc->lock);
+    MUTEX_UNLOCK(fnc->lock);
   }
   return OK;
 }
@@ -2082,7 +2088,7 @@
   fnc->rpcRepliesExpected = 0;
   fnc->rpcRepliesReceived = 0;
   fnc->found = 0;
-  MUTEX_CREATE_RECURSIVE(&fnc->lock);
+  fnc->lock = MUTEX_CREATE(YES);
   matches = MALLOC(sizeof(PeerIdentity) * fnc->k);
 
   /* find peers in local peer-list that participate in
@@ -2158,7 +2164,7 @@
   /* stop all async RPCs */
   for (i=fnc->rpcRepliesExpected-1;i>=0;i--)
     rpcAPI->RPC_stop(fnc->rpc[i]);
-  MUTEX_DESTROY(&fnc->lock);
+  MUTEX_DESTROY(fnc->lock);
 
   i = fnc->found;
   FREE(fnc);
@@ -2184,7 +2190,7 @@
 
   ENTER();
   processOptionalFields(responder, results);
-  MUTEX_LOCK(&record->lock);
+  MUTEX_LOCK(record->lock);
   pos = findPeerInfo(responder);
   pos->lastActivity = get_time();
 
@@ -2200,7 +2206,7 @@
         (dataLength != sizeof(PeerIdentity)) ) {
       EncName enc;
 
-      MUTEX_UNLOCK(&record->lock);
+      MUTEX_UNLOCK(record->lock);
       hash2enc(&responder->hashPubKey,
               &enc);
       GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
@@ -2210,7 +2216,7 @@
       return;
     }
   }
-  MUTEX_UNLOCK(&record->lock);
+  MUTEX_UNLOCK(record->lock);
 }
 
 /**
@@ -2351,7 +2357,7 @@
   memcpy(ret->value,
         value,
         ntohl(value->size));
-  MUTEX_CREATE_RECURSIVE(&ret->lock);
+  ret->lock = MUTEX_CREATE(YES);
   ret->rpc = NULL;
   ret->rpcRepliesExpected = 0;
   ret->kfnc = NULL;
@@ -2427,10 +2433,11 @@
   /* call OP_Complete callback after timeout! */
   addAbortJob(&dht_put_async_timeout,
              ret);
-  addCronJob(&dht_put_async_timeout,
-            timeout,
-            0,
-            ret);
+  cron_add_job(coreAPI->cron,
+              &dht_put_async_timeout,
+              timeout,
+              0,
+              ret);
   MUTEX_UNLOCK(lock);
   return ret;
 }
@@ -2447,7 +2454,10 @@
 
   /* cancel timeout cron job (if still live) */
   delAbortJob(&dht_put_async_timeout, record);
-  delCronJob(&dht_put_async_timeout, 0, record);
+  cron_del_job(coreAPI->cron,
+              &dht_put_async_timeout, 
+              0, 
+              record);
   /* abort findKNodes (if running) - it may cause
      the addition of additional RPCs otherwise! */
   if (record->kfnc != NULL)
@@ -2455,7 +2465,7 @@
 
   for (i=0;i<record->rpcRepliesExpected;i++)
     rpcAPI->RPC_stop(record->rpc[i]);
-  MUTEX_DESTROY(&record->lock);
+  MUTEX_DESTROY(record->lock);
   i = record->confirmed_stores;
   FREE(record->value);
   FREE(record);
@@ -2485,7 +2495,7 @@
 
   ENTER();
   processOptionalFields(responder, results);
-  MUTEX_LOCK(&record->lock);
+  MUTEX_LOCK(record->lock);
   pos = findPeerInfo(responder);
   pos->lastActivity = get_time();
   max = RPC_paramCount(results);
@@ -2500,7 +2510,7 @@
         (dataLength != sizeof(PeerIdentity)) ) {
       EncName enc;
 
-      MUTEX_UNLOCK(&record->lock);
+      MUTEX_UNLOCK(record->lock);
       hash2enc(&responder->hashPubKey,
               &enc);
       GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
@@ -2511,7 +2521,7 @@
     }
     record->confirmed_stores++;
   }
-  MUTEX_UNLOCK(&record->lock);
+  MUTEX_UNLOCK(record->lock);
 }
 
 /**
@@ -2629,7 +2639,7 @@
           value,
           ntohl(value->size));
   }
-  MUTEX_CREATE_RECURSIVE(&ret->lock);
+  ret->lock = MUTEX_CREATE(YES);
   ret->rpc = NULL;
   ret->rpcRepliesExpected = 0;
   ret->confirmed_stores = 0;
@@ -2713,7 +2723,7 @@
 
   for (i=0;i<record->rpcRepliesExpected;i++)
     rpcAPI->RPC_stop(record->rpc[i]);
-  MUTEX_DESTROY(&record->lock);
+  MUTEX_DESTROY(record->lock);
   i = record->confirmed_stores;
   FREE(record->value);
   FREE(record);
@@ -2910,9 +2920,9 @@
   ENTER();
   delAbortJob((CronJob) &rpc_DHT_findValue_abort,
              fw);
-  MUTEX_LOCK(&fw->lock);
+  MUTEX_LOCK(fw->lock);
   if (fw->done == YES) {
-    MUTEX_UNLOCK(&fw->lock);
+    MUTEX_UNLOCK(fw->lock);
     return;
   }
   dht_get_async_stop(fw->get_record);
@@ -2928,7 +2938,7 @@
     RPC_paramFree(results);
   }
   fw->done = YES;
-  MUTEX_UNLOCK(&fw->lock);
+  MUTEX_UNLOCK(fw->lock);
 }
 
 /**
@@ -2941,7 +2951,7 @@
                                      const DataContainer * value,
                                      RPC_DHT_FindValue_Context * fw) {
   ENTER();
-  MUTEX_LOCK(&fw->lock);
+  MUTEX_LOCK(fw->lock);
   GROW(fw->results,
        fw->count,
        fw->count+1);
@@ -2949,7 +2959,7 @@
   memcpy(fw->results[fw->count-1],
         value,
         ntohl(value->size));
-  MUTEX_UNLOCK(&fw->lock);
+  MUTEX_UNLOCK(fw->lock);
   return OK;
 }
 
@@ -3025,7 +3035,7 @@
 
   fw_context
     = MALLOC(sizeof(RPC_DHT_FindValue_Context));
-  MUTEX_CREATE_RECURSIVE(&fw_context->lock);
+  fw_context->lock = MUTEX_CREATE(YES);
   fw_context->count
     = 0;
   fw_context->done
@@ -3048,10 +3058,11 @@
                          fw_context);
   addAbortJob((CronJob)&rpc_DHT_findValue_abort,
              fw_context);
-  addCronJob((CronJob)&rpc_DHT_findValue_abort,
-            ntohll(*timeout),
-            0,
-            fw_context);
+  cron_add_job(coreAPI->cron,
+              (CronJob)&rpc_DHT_findValue_abort,
+              ntohll(*timeout),
+              0,
+              fw_context);
 }
 
 /**
@@ -3069,9 +3080,9 @@
   ENTER();
   delAbortJob(&rpc_DHT_store_abort,
              fw);
-  MUTEX_LOCK(&fw->lock);
+  MUTEX_LOCK(fw->lock);
   if (fw->done == YES) {
-    MUTEX_UNLOCK(&fw->lock);
+    MUTEX_UNLOCK(fw->lock);
     return;
   }
   dht_put_async_stop(fw->put_record);
@@ -3087,7 +3098,7 @@
     RPC_paramFree(results);
   }
   fw->done = YES;
-  MUTEX_UNLOCK(&fw->lock);
+  MUTEX_UNLOCK(fw->lock);
 }
 
 /**
@@ -3099,7 +3110,10 @@
 static void rpc_dht_store_callback(RPC_DHT_store_Context * fw) {
   RPC_Param * param;
   
-  delCronJob(&rpc_DHT_store_abort, 0, fw);
+  cron_del_job(coreAPI->cron,
+              &rpc_DHT_store_abort,
+              0, 
+              fw);
   delAbortJob(&rpc_DHT_store_abort, fw);
   param = RPC_paramNew();
   fw->callback(param,
@@ -3152,7 +3166,7 @@
 
   fw_context
     = MALLOC(sizeof(RPC_DHT_store_Context));
-  MUTEX_CREATE_RECURSIVE(&fw_context->lock);
+  fw_context->lock = MUTEX_CREATE(YES);
   MUTEX_LOCK(lock);
   ltd = getLocalTableData(table);
   if (ltd == NULL) {
@@ -3176,10 +3190,11 @@
                          fw_context);
   addAbortJob(&rpc_DHT_store_abort,
              fw_context);
-  addCronJob(&rpc_DHT_store_abort,
-            ntohll(*timeout),
-            0,
-            fw_context);
+  cron_add_job(coreAPI->cron,
+              &rpc_DHT_store_abort,
+              ntohll(*timeout),
+              0,
+              fw_context);
   FREE(value);
 }
 
@@ -3197,9 +3212,9 @@
   ENTER();
   delAbortJob((CronJob) &rpc_DHT_remove_abort,
              fw);
-  MUTEX_LOCK(&fw->lock);
+  MUTEX_LOCK(fw->lock);
   if (fw->done == YES) {
-    MUTEX_UNLOCK(&fw->lock);
+    MUTEX_UNLOCK(fw->lock);
     return;
   }
   dht_remove_async_stop(fw->remove_record);
@@ -3214,7 +3229,7 @@
                 fw->rpc_context);
   RPC_paramFree(results);
   fw->done = YES;
-  MUTEX_UNLOCK(&fw->lock);
+  MUTEX_UNLOCK(fw->lock);
 }
 
 /**
@@ -3226,7 +3241,10 @@
 static void rpc_dht_remove_callback(RPC_DHT_remove_Context * fw) {
   RPC_Param * param;
   
-  delCronJob(&rpc_DHT_store_abort, 0, fw);
+  cron_del_job(coreAPI->cron,
+              &rpc_DHT_store_abort, 
+              0, 
+              fw);
   delAbortJob(&rpc_DHT_store_abort, fw);
   param = RPC_paramNew();
 
@@ -3288,7 +3306,7 @@
                                       "value");
   fw_context
     = MALLOC(sizeof(RPC_DHT_remove_Context));
-  MUTEX_CREATE_RECURSIVE(&fw_context->lock);
+  fw_context->lock = MUTEX_CREATE(YES);
   MUTEX_LOCK(lock);
   ltd = getLocalTableData(table);
   if (ltd == NULL) {
@@ -3312,10 +3330,11 @@
                             fw_context);
   addAbortJob((CronJob)&rpc_DHT_remove_abort,
              fw_context);
-  addCronJob((CronJob)&rpc_DHT_remove_abort,
-            ntohll(*timeout),
-            0,
-            fw_context);
+  cron_add_job(coreAPI->cron,
+              (CronJob)&rpc_DHT_remove_abort,
+              ntohll(*timeout),
+              0,
+              fw_context);
   FREE(value);
 }
 
@@ -3354,9 +3373,10 @@
 #if DEBUG_DHT
   printRoutingTable();
   /* first, free resources from ASYNC calls started last time */
-  LOG(LOG_CRON,
-      "`%s' stops async requests from last cron round.\n",
-      __FUNCTION__);
+  GE_LOG(ectx,
+        GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
+        "`%s' stops async requests from last cron round.\n",
+        __FUNCTION__);
 #endif
   now = get_time();
   for (i=putRecordsSize-1;i>=0;i--) {
@@ -3418,7 +3438,7 @@
         coreAPI->myIdentity,
         sizeof(PeerIdentity));
 #if DEBUG_DHT
-  LOG(LOG_CRON,
+  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
       "`%s' issues DHT_PUTs to advertise tables this peer participates in.\n",
       __FUNCTION__);
 #endif
@@ -3452,7 +3472,7 @@
     for each table that we have joined gather OUR neighbours
   */
 #if DEBUG_DHT
-  LOG(LOG_CRON,
+  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
       "`%s' issues findNodes for each table that we participate in.\n",
       __FUNCTION__);
 #endif
@@ -3478,7 +3498,7 @@
      b) if lastActivity is very very old, drop
   */
 #if DEBUG_DHT
-  LOG(LOG_CRON,
+  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
       "`%s' issues put to advertise tables that we participate in.\n",
       __FUNCTION__);
 #endif
@@ -3553,17 +3573,32 @@
  */
 DHT_ServiceAPI * provide_module_dht(CoreAPIForApplication * capi) {
   static DHT_ServiceAPI api;
-  unsigned int i;
+  unsigned long long i;
+  unsigned long long j;
 
   ENTER();
   coreAPI = capi;
+  ectx = capi->ectx;
   rpcAPI = capi->requestService("rpc");
   if (rpcAPI == NULL)
     return NULL;
-  i = getConfigurationInt("DHT",
-                         "BUCKETCOUNT");
-  if ( (i == 0) || (i > 512) )
-    i = 512;
+  if ( (-1 == GC_get_configuration_value_number(capi->cfg,
+                                               "DHT",
+                                               "BUCKETCOUNT",
+                                               1,
+                                               512,
+                                               512,
+                                               &i)) ||
+       (-1 == GC_get_configuration_value_number(capi->cfg,
+                                               "DHT",
+                                               "MASTER-TABLE-SIZE",
+                                               1,
+                                               65536 * 1024,
+                                               65536,
+                                               &j)) ) {
+    capi->releaseService(rpcAPI);
+    return NULL;
+  }
   GROW(buckets,
        bucketCount,
        i);
@@ -3573,6 +3608,7 @@
     buckets[i].peers = vectorNew(4);
   }
 
+
   rpcAPI->RPC_register("DHT_ping",
                       &rpc_DHT_ping);
   rpcAPI->RPC_register("DHT_findNode",
@@ -3595,18 +3631,16 @@
 
   memset(&masterTableId, 0, sizeof(HashCode512));
   /* join the master table */
-  i = getConfigurationInt("DHT",
-                         "MASTER-TABLE-SIZE");
-  if (i == 0)
-    i = 65536; /* 64k memory should suffice */
   masterTableDatastore
-    = create_datastore_dht_master(i);
+    = create_datastore_dht_master(ectx,
+                                 j);
   dht_join(masterTableDatastore,
           &masterTableId);
-  addCronJob(&dhtMaintainJob,
-            0,
-            DHT_MAINTAIN_FREQUENCY,
-            NULL);
+  cron_add_job(coreAPI->cron,
+              &dhtMaintainJob,
+              0,
+              DHT_MAINTAIN_FREQUENCY,
+              NULL);
   return &api;
 }
 
@@ -3628,14 +3662,16 @@
                               &rpc_DHT_store);
   rpcAPI->RPC_unregister_async("DHT_remove",
                               &rpc_DHT_remove);
-  delCronJob(&dhtMaintainJob,
-            DHT_MAINTAIN_FREQUENCY,
-            NULL);
+  cron_del_job(coreAPI->cron,
+              &dhtMaintainJob,
+              DHT_MAINTAIN_FREQUENCY,
+              NULL);
   /* stop existing / pending DHT operations */
   while (abortTableSize > 0) {
-    delCronJob(abortTable[0].job,
-              0,
-              abortTable[0].arg);
+    cron_del_job(coreAPI->cron,
+                abortTable[0].job,
+                0,
+                abortTable[0].arg);
     abortTable[0].job(abortTable[0].arg);
   }
   /* leave the master table */

Modified: GNUnet/src/applications/dht/module/dht.h
===================================================================
--- GNUnet/src/applications/dht/module/dht.h    2006-10-01 04:45:12 UTC (rev 
3429)
+++ GNUnet/src/applications/dht/module/dht.h    2006-10-01 05:16:17 UTC (rev 
3430)
@@ -1,5 +1,6 @@
 /*
      This file is part of GNUnet
+     Copyright (C) 2004, 2005, 2006 Christian Grothoff (and other contributing 
authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published

Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-10-01 04:45:12 UTC (rev 3429)
+++ GNUnet/todo 2006-10-01 05:16:17 UTC (rev 3430)
@@ -15,7 +15,6 @@
 0.7.1 ['06] (aka "stabilization")
 - finish util refactoring: [RC]
   * Not compiling:
-    + dht/module
     + dht/tools
     + vpn
   * Testcases not compiling or passing:





reply via email to

[Prev in Thread] Current Thread [Next in Thread]