gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r5232 - in GNUnet/src/applications: datastore fs/module


From: gnunet
Subject: [GNUnet-SVN] r5232 - in GNUnet/src/applications: datastore fs/module
Date: Wed, 4 Jul 2007 16:36:37 -0600 (MDT)

Author: grothoff
Date: 2007-07-04 16:36:37 -0600 (Wed, 04 Jul 2007)
New Revision: 5232

Modified:
   GNUnet/src/applications/datastore/prefetch.c
   GNUnet/src/applications/fs/module/migration.c
Log:
do not duplicate functionality between prefetch and migration

Modified: GNUnet/src/applications/datastore/prefetch.c
===================================================================
--- GNUnet/src/applications/datastore/prefetch.c        2007-07-04 22:14:59 UTC 
(rev 5231)
+++ GNUnet/src/applications/datastore/prefetch.c        2007-07-04 22:36:37 UTC 
(rev 5232)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2001, 2002, 2003, 2004, 2006 Christian Grothoff (and other 
contributing authors)
+     (C) 2001, 2002, 2003, 2004, 2006, 2007 Christian Grothoff (and other 
contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -20,7 +20,7 @@
 
 /**
  * @file applications/datastore/prefetch.c
- * @brief This module is responsible for prefetching
+ * @brief This module is responsible for fetching
  *   content that can be pushed out into the network
  * @author Christian Grothoff, Igor Wronsky
  */
@@ -31,33 +31,11 @@
 
 #define DEBUG_PREFETCH NO
 
-/* use a 64-entry RCB buffer */
-#define RCB_SIZE 64
+static HashCode512 rkey;
 
-/* how many blocks to cache from on-demand files in a row */
-#define RCB_ONDEMAND_MAX 16
+static Datastore_Value * rvalue;
 
 /**
- * Buffer with pre-fetched, encoded random content for migration.
- */
-typedef struct {
-
-  HashCode512 key;
-
-  Datastore_Value * value;
-  /**
-   * 0 if we have never used this content with any peer.  Otherwise
-   * the value is set to the lowest 32 bit of the peer ID (to avoid
-   * sending it to the same peer twice).  After sending out the
-   * content twice, it is discarded.
-   */
-  int used;
-} ContentBuffer;
-
-
-static ContentBuffer randomContentBuffer[RCB_SIZE];
-
-/**
  * SQ-store handle
  */
 static SQstore_ServiceAPI * sq;
@@ -78,11 +56,6 @@
  */
 static struct MUTEX * lock;
 
-/**
- * Highest index in RCB that is valid.
- */
-static int rCBPos = 0;
-
 static struct PTHREAD * gather_thread;
 
 static struct GE_Context * ectx;
@@ -93,68 +66,19 @@
 static int acquire(const HashCode512 * key,
                   const Datastore_Value * value,
                   void * closure) {
-  int loadc;
-  int loadi;
-  int load;
-
   if (doneSignal)
     return SYSERR;
   SEMAPHORE_DOWN(acquireMoreSignal, YES);
   if (doneSignal)
     return SYSERR;
   MUTEX_LOCK(lock);
-  load = 0;
-  while (randomContentBuffer[rCBPos].value != NULL) {
-    rCBPos = (rCBPos + 1) % RCB_SIZE;
-    load++;
-    if (load > RCB_SIZE) {
-      GE_BREAK(ectx, 0);
-      MUTEX_UNLOCK(lock);
-      return SYSERR;
-    }
-  }
-#if DEBUG_PREFETCH
-  {
-    EncName enc;
-
-    hash2enc(key,
-            &enc);
-    GE_LOG(ectx,
-          GE_DEBUG | GE_BULK | GE_USER,
-          "Adding content `%s' of type %u/size %u/exp %llu to prefetch buffer 
(%u)\n",
-          &enc,
-          ntohl(value->type),
-          ntohl(value->size),
-          ntohll(value->expirationTime),
-          rCBPos);
-  }
-#endif
-  randomContentBuffer[rCBPos].key = *key;
-  randomContentBuffer[rCBPos].used = 0;
-  randomContentBuffer[rCBPos].value
-    = MALLOC(ntohl(value->size));
-  memcpy(randomContentBuffer[rCBPos].value,
+  GE_ASSERT(NULL, rvalue == NULL);
+  rkey = *key;
+  rvalue = MALLOC(ntohl(value->size));
+  memcpy(rvalue,
         value,
         ntohl(value->size));
   MUTEX_UNLOCK(lock);
-  loadi = os_disk_get_load(ectx,
-                          cfg);
-  loadc = os_cpu_get_load(ectx,
-                         cfg);
-  if (loadi > loadc)
-    load = loadi;
-  else
-    load = loadc;
-  if (load < 10)
-    load = 10;    /* never sleep less than 500 ms */
-  if (load > 100)
-    load = 100;   /* never sleep longer than 5 seconds */
-  if (doneSignal)
-    return SYSERR;
-  /* the higher the load, the longer the sleep */
-  PTHREAD_SLEEP(50 * cronMILLIS * load);
-  if (doneSignal)
-    return SYSERR;
   return OK;
 }
 
@@ -166,7 +90,7 @@
   while (doneSignal == NO) {
     sq->iterateMigrationOrder(&acquire,
                              NULL);
-    /* sleep here, too - otherwise we start looping immediately
+    /* sleep here - otherwise we may start looping immediately
        if there is no content in the DB! */
     load = os_cpu_get_load(ectx,
                           cfg);
@@ -191,65 +115,19 @@
              HashCode512 * key,
              Datastore_Value ** value,
              unsigned int type) {
-  unsigned int dist;
-  unsigned int minDist;
   int minIdx;
   int i;
 
-  minIdx = -1;
-  minDist = -1; /* max */
   MUTEX_LOCK(lock);
-  for (i=0;i<RCB_SIZE;i++) {
-    if (randomContentBuffer[i].value == NULL)
-      continue;
-    if (randomContentBuffer[i].used == *(int*) receiver)
-      continue; /* used this content for this peer already! */
-    if ( ( ( (type != ntohl(randomContentBuffer[i].value->type)) &&
-            (type != 0) ) ) ||
-        (sizeLimit < ntohl(randomContentBuffer[i].value->size)) )
-      continue;
-    if ( (ntohl(randomContentBuffer[i].value->type) == ONDEMAND_BLOCK) &&
-        (sizeLimit < 32768) )
-      continue; /* 32768 == ecrs/tree.h: DBLOCK_SIZE */
-    dist = distanceHashCode512(&randomContentBuffer[i].key,
-                              receiver);
-    if (dist < minDist) {
-      minIdx = i;
-      minDist = dist;
-    }
-  }
-  if (minIdx == -1) {
+  if (rvalue == NULL) {
     MUTEX_UNLOCK(lock);
-#if DEBUG_PREFETCH
-    GE_LOG(ectx,
-          GE_DEBUG | GE_REQUEST | GE_USER,
-          "Failed to find content in prefetch buffer\n");
-#endif
     return SYSERR;
   }
-#if DEBUG_PREFETCH
-    GE_LOG(ectx,
-          GE_DEBUG | GE_REQUEST | GE_USER,
-          "Found content in prefetch buffer (%u)\n",
-          minIdx);
-#endif
-  *key = randomContentBuffer[minIdx].key;
-  *value = randomContentBuffer[minIdx].value;
-
-  if ( (randomContentBuffer[minIdx].used == 0) &&
-       (0 != *(int*) receiver) ) {
-    /* re-use once more! */
-    randomContentBuffer[minIdx].used = *(int*) receiver;
-    randomContentBuffer[minIdx].value = MALLOC(ntohl((*value)->size));
-    memcpy(randomContentBuffer[minIdx].value,
-          *value,
-          ntohl((*value)->size));
-  } else {
-    randomContentBuffer[minIdx].used = 0;
-    randomContentBuffer[minIdx].value = NULL;
-    SEMAPHORE_UP(acquireMoreSignal);
-  }
+  *value = rvalue;
+  *key = rkey;
+  rvalue = NULL;
   MUTEX_UNLOCK(lock);
+  SEMAPHORE_UP(acquireMoreSignal);
   return OK;
 }
                                
@@ -259,17 +137,16 @@
   ectx = e;
   cfg = c;
   sq = s;
-  memset(randomContentBuffer,
-        0,
-        sizeof(ContentBuffer *)*RCB_SIZE);
-  acquireMoreSignal = SEMAPHORE_CREATE(RCB_SIZE);
+  acquireMoreSignal = SEMAPHORE_CREATE(1);
   doneSignal = NO;
   lock = MUTEX_CREATE(NO);
   gather_thread = PTHREAD_CREATE(&rcbAcquire,
                                 NULL,
                                 64*1024);
-  GE_ASSERT(NULL,
-           gather_thread != NULL);
+  if (gather_thread == NULL) 
+    GE_LOG_STRERROR(ectx,
+                   GE_ERROR | GE_ADMIN | GE_USER | GE_IMMEDIATE,
+                   "pthread_create");  
 }
 
 void donePrefetch() {
@@ -277,12 +154,14 @@
   void * unused;
 
   doneSignal = YES;
-  PTHREAD_STOP_SLEEP(gather_thread);
+  if (gather_thread != NULL) 
+    PTHREAD_STOP_SLEEP(gather_thread);
   SEMAPHORE_UP(acquireMoreSignal);
-  PTHREAD_JOIN(gather_thread, &unused);
+  if (gather_thread != NULL) 
+    PTHREAD_JOIN(gather_thread, &unused);
   SEMAPHORE_DESTROY(acquireMoreSignal);
-  for (i=0;i<RCB_SIZE;i++)
-    FREENONNULL(randomContentBuffer[i].value);
+  FREENONNULL(rvalue);
+  rvalue = NULL;
   MUTEX_DESTROY(lock);
   lock = NULL;
   sq = NULL;

Modified: GNUnet/src/applications/fs/module/migration.c
===================================================================
--- GNUnet/src/applications/fs/module/migration.c       2007-07-04 22:14:59 UTC 
(rev 5231)
+++ GNUnet/src/applications/fs/module/migration.c       2007-07-04 22:36:37 UTC 
(rev 5232)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other 
contributing authors)
+     (C) 2001, 2002, 2003, 2004, 2005, 2007 Christian Grothoff (and other 
contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -47,11 +47,11 @@
 /**
  * How many migration records do we keep in memory
  * at the same time?  Each record is about 32k, so
- * 32 records will use about 1 MB of memory.
+ * 64 records will use about 2 MB of memory.
  * We might want to allow users to specify larger
  * values in the configuration file some day.
  */
-#define MAX_RECORDS 32
+#define MAX_RECORDS 64
 
 /**
  * Datastore service.
@@ -136,12 +136,15 @@
   int i;
   int j;
   int match;
+  unsigned int dist;
+  unsigned int minDist;
 
   index = coreAPI->computeIndex(receiver);
   MUTEX_LOCK(lock);
   entry = -1;
   discard_entry = -1;
   discard_match = -1;
+  minDist = -1; /* max */
   for (i=0;i<MAX_RECORDS;i++) {
     if (content[i].value == NULL) {
       discard_entry = i;
@@ -159,10 +162,13 @@
       }
     }
     if (match == 0) {
-      /* TODO: consider key proximity in matching as 
-        well! */
-      entry = i;
-      break;
+      dist = distanceHashCode512(&content[i]->key,
+                                &receiver->hashPubKey);
+      if (dist <= minDist) {
+       entry = i;
+       minDist = dist;
+       break;
+      }
     } else {
       if ( (content[i].sentCount > discard_match) ||
           (discard_match == -1) ) {
@@ -183,6 +189,7 @@
                                   &content[entry].key,
                                   &content[entry].value,
                                   0)) {
+      content[entry] = NULL; /* just to be sure...*/
       MUTEX_UNLOCK(lock);
 #if DEBUG_MIGRATION
       GE_LOG(ectx,
@@ -266,9 +273,8 @@
           "gap's tryMigrate returned %u\n",
           ret);
 #endif
-    if (ret != 0) {
-      content[entry].receiverIndices[content[entry].sentCount++] = index;
-    }
+    if (ret != 0) 
+      content[entry].receiverIndices[content[entry].sentCount++] = index;    
   }
   MUTEX_UNLOCK(lock);
   if ( (ret > 0)&&





reply via email to

[Prev in Thread] Current Thread [Next in Thread]