gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r11392 - in gnunet: . src/datastore src/fs


From: gnunet
Subject: [GNUnet-SVN] r11392 - in gnunet: . src/datastore src/fs
Date: Mon, 17 May 2010 09:17:48 +0200

Author: grothoff
Date: 2010-05-17 09:17:48 +0200 (Mon, 17 May 2010)
New Revision: 11392

Modified:
   gnunet/TODO
   gnunet/src/datastore/datastore_api.c
   gnunet/src/fs/fs.h
   gnunet/src/fs/fs_test_lib_data.conf
   gnunet/src/fs/gnunet-service-fs.c
Log:
lc stuff

Modified: gnunet/TODO
===================================================================
--- gnunet/TODO 2010-05-17 05:28:03 UTC (rev 11391)
+++ gnunet/TODO 2010-05-17 07:17:48 UTC (rev 11392)
@@ -1,12 +1,55 @@
 0.9.0pre1:
 * FS: [CG]
-  - migration:
-    + on-demand encoding
-    + peer selection => how to consider latency/bw/etc?
-    + content transmission => how often the same block?
-    + testing
-  - gnunet-service-fs (hot-path routing, load-based routing, nitpicks)  
-    + active reply route caching design & implementation of service; gap 
extension!
+  - test migration
+  - TTL/priority calculations
+  - hot-path routing, load considerations
+  - statistics
+  - active reply route caching design & implementation of service; gap 
extension!
+  - Indexing:
+May 16 12:49:50 fs-13737 WARNING `open' failed on file 
`/home/grothoff/svn/gnunet/src/fs/H/' at disk.c:1253 with error: No such file 
or directory
+May 16 12:49:50 fs-13737 WARNING Could not access indexed file `ENUTBMBR' at 
offset 2064384: No such file or directory
+    NOTE: corrupted filename in open message
+    NOTE: odd directory name in open message
+
+==14995== 8 bytes in 1 blocks are definitely lost in loss record 1 of 12
+==14995==    at 0x4024C4C: malloc (vg_replace_malloc.c:195)
+==14995==    by 0x4068F05: GNUNET_xmalloc_unchecked_ (common_allocation.c:92)
+==14995==    by 0x4068E33: GNUNET_xmalloc_ (common_allocation.c:61)
+==14995==    by 0x40519F5: GNUNET_DATASTORE_get_random (datastore_api.c:1102)
+==14995==    by 0x804ADCF: gather_migration_blocks (gnunet-service-fs.c:969)
+==14995==    by 0x40864C8: run_ready (scheduler.c:514)
+==14995==    by 0x4086970: GNUNET_SCHEDULER_run (scheduler.c:642)
+==14995==    by 0x408CF1B: GNUNET_SERVICE_run (service.c:1404)
+==14995==    by 0x804F725: main (gnunet-service-fs.c:3506)
+==14995== 
+==14995== 8 bytes in 1 blocks are definitely lost in loss record 2 of 12
+==14995==    at 0x4024C4C: malloc (vg_replace_malloc.c:195)
+==14995==    by 0x4068F05: GNUNET_xmalloc_unchecked_ (common_allocation.c:92)
+==14995==    by 0x4068E33: GNUNET_xmalloc_ (common_allocation.c:61)
+==14995==    by 0x4051ACB: GNUNET_DATASTORE_get (datastore_api.c:1160)
+==14995==    by 0x804F39A: handle_start_search (gnunet-service-fs.c:3352)
+==14995==    by 0x4087F9A: GNUNET_SERVER_inject (server.c:653)
+==14995==    by 0x40880A8: process_client_buffer (server.c:714)
+==14995==    by 0x4088529: restart_processing (server.c:848)
+==14995==    by 0x40864C8: run_ready (scheduler.c:514)
+==14995==    by 0x4086970: GNUNET_SCHEDULER_run (scheduler.c:642)
+==14995==    by 0x408CF1B: GNUNET_SERVICE_run (service.c:1404)
+==14995==    by 0x804F725: main (gnunet-service-fs.c:3506)
+==14995== 
+==14995== 120 bytes in 15 blocks are definitely lost in loss record 5 of 12
+==14995==    at 0x4024C4C: malloc (vg_replace_malloc.c:195)
+==14995==    by 0x4068F05: GNUNET_xmalloc_unchecked_ (common_allocation.c:92)
+==14995==    by 0x4068E33: GNUNET_xmalloc_ (common_allocation.c:61)
+==14995==    by 0x4050DA1: GNUNET_DATASTORE_put (datastore_api.c:695)
+==14995==    by 0x804DD79: handle_p2p_put (gnunet-service-fs.c:2591)
+==14995==    by 0x40588B8: main_notify_handler (core_api.c:468)
+==14995==    by 0x4067DAE: receive_task (client.c:499)
+==14995==    by 0x40864C8: run_ready (scheduler.c:514)
+==14995==    by 0x4086970: GNUNET_SCHEDULER_run (scheduler.c:642)
+==14995==    by 0x408CF1B: GNUNET_SERVICE_run (service.c:1404)
+==14995==    by 0x804F725: main (gnunet-service-fs.c:3506)
+==14995== 
+
 * TBENCH: [MW]
   - good to have for transport/DV evaluation! 
 * DV: [Nate]

Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c        2010-05-17 05:28:03 UTC (rev 
11391)
+++ gnunet/src/datastore/datastore_api.c        2010-05-17 07:17:48 UTC (rev 
11392)
@@ -255,20 +255,8 @@
     }
   while (NULL != (qe = h->queue_head))
     {
-      if (NULL != qe->response_proc)
-       {
-         qe->response_proc (qe, NULL);
-       }
-      else
-       {
-         GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                      h->queue_tail,
-                                      qe);
-         if (qe->task != GNUNET_SCHEDULER_NO_TASK)
-           GNUNET_SCHEDULER_cancel (h->sched,
-                                    qe->task);
-         GNUNET_free (qe);
-       }
+      GNUNET_assert (NULL != qe->response_proc);
+      qe->response_proc (qe, NULL);
     }
   if (GNUNET_YES == drop) 
     {
@@ -385,15 +373,8 @@
     {
       if (pos->max_queue < h->queue_size)
        {
-         GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                      h->queue_tail,
-                                      pos);
-         GNUNET_SCHEDULER_cancel (h->sched,
-                                  pos->task);
-         if (pos->response_proc != NULL)
-           pos->response_proc (pos, NULL);
-         GNUNET_free (pos);
-         h->queue_size--;
+         GNUNET_assert (pos->response_proc != NULL);
+         pos->response_proc (pos, NULL);
          break;
        }
       pos = pos->next;
@@ -565,6 +546,24 @@
 }
 
 
+static void
+free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
+{
+  struct GNUNET_DATASTORE_Handle *h = qe->h;
+
+  GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                              h->queue_tail,
+                              qe);
+  if (qe->task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (h->sched,
+                              qe->task);
+      qe->task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  h->queue_size--;
+  GNUNET_free (qe);
+}
+
 /**
  * Type of a function to call when we receive a message
  * from the service.
@@ -584,16 +583,7 @@
   const char *emsg;
   int32_t status;
 
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                              h->queue_tail,
-                              qe);
-  if (qe->task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (h->sched,
-                              qe->task);
-      qe->task = GNUNET_SCHEDULER_NO_TASK;
-    }
-  GNUNET_free (qe);
+  free_queue_entry (qe);
   if (msg == NULL)
     {      
       if (NULL == h->client)
@@ -1018,10 +1008,7 @@
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                  _("Failed to receive response from datastore\n"));
 #endif
-      GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                  h->queue_tail,
-                                  qe);
-      GNUNET_free (qe);
+      free_queue_entry (qe);
       do_disconnect (h);
       rc->iter (rc->iter_cls,
                NULL, 0, NULL, 0, 0, 0, 
@@ -1036,10 +1023,7 @@
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Received end of result set\n");
 #endif
-      GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                  h->queue_tail,
-                                  qe);
-      GNUNET_free (qe);
+      free_queue_entry (qe);
       rc->iter (rc->iter_cls,
                NULL, 0, NULL, 0, 0, 0, 
                GNUNET_TIME_UNIT_ZERO_ABS, 0);  
@@ -1052,10 +1036,7 @@
        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct 
DataMessage*)msg)->size)) )
     {
       GNUNET_break (0);
-      GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                  h->queue_tail,
-                                  qe);
-      GNUNET_free (qe);
+      free_queue_entry (qe);
       h->retry_time = GNUNET_TIME_UNIT_ZERO;
       do_disconnect (h);
       rc->iter (rc->iter_cls,
@@ -1226,10 +1207,7 @@
                             GNUNET_TIME_absolute_get_remaining (qe->timeout));
       return;
     }
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                              h->queue_tail,
-                              qe);
-  GNUNET_free (qe);
+  free_queue_entry (qe);
   h->retry_time = GNUNET_TIME_UNIT_ZERO;
   do_disconnect (h);
   rc->iter (rc->iter_cls,
@@ -1253,13 +1231,8 @@
 
   h = qe->h;
   reconnect = qe->was_transmitted;
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                              h->queue_tail,
-                              qe);
-  if (qe->task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (h->sched,
-                            qe->task);
-  GNUNET_free (qe);
+  free_queue_entry (qe);
+  h->queue_size--;
   if (reconnect)
     {
       h->retry_time = GNUNET_TIME_UNIT_ZERO;

Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h  2010-05-17 05:28:03 UTC (rev 11391)
+++ gnunet/src/fs/fs.h  2010-05-17 07:17:48 UTC (rev 11392)
@@ -1,3 +1,4 @@
+
 /*
      This file is part of GNUnet.
      (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Christian Grothoff (and 
other contributing authors)
@@ -42,6 +43,20 @@
 #define MAX_MIGRATION_QUEUE 32
 
 /**
+ * How many peers do we select as possible
+ * targets per block obtained for migration?
+ */
+#define MIGRATION_LIST_SIZE 4
+
+/**
+ * To how many peers do we forward each migration block ultimately?
+ * This number must be smaller or equal to MIGRATION_LIST_SIZE.  Using
+ * a smaller value allows for variation in available bandwidth (for
+ * migration) between the peers.
+ */
+#define MIGRATION_TARGET_COUNT 2
+
+/**
  * Ratio for moving average delay calculation.  The previous
  * average goes in with a factor of (n-1) into the calculation.
  * Must be > 0.

Modified: gnunet/src/fs/fs_test_lib_data.conf
===================================================================
--- gnunet/src/fs/fs_test_lib_data.conf 2010-05-17 05:28:03 UTC (rev 11391)
+++ gnunet/src/fs/fs_test_lib_data.conf 2010-05-17 07:17:48 UTC (rev 11392)
@@ -53,7 +53,7 @@
 HOSTNAME = localhost
 #OPTIONS = -L DEBUG
 #DEBUG = YES
-#PREFIX = valgrind --tool=memcheck --leak-check=yes
+PREFIX = valgrind --tool=memcheck --leak-check=yes
 #BINARY = /home/grothoff/bin/gnunet-service-fs
 #PREFIX = xterm -e gdb -x cmd --args 
 

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2010-05-17 05:28:03 UTC (rev 11391)
+++ gnunet/src/fs/gnunet-service-fs.c   2010-05-17 07:17:48 UTC (rev 11392)
@@ -28,8 +28,7 @@
  * TODO:
  * - have non-zero preference / priority for requests we initiate!
  * - implement hot-path routing decision procedure
- * - implement: bound_priority, test_load_too_high, validate_nblock
- * - add content migration support (forward from migration list)
+ * - implement: bound_priority, test_load_too_high
  * - statistics
  */
 #include "platform.h"
@@ -586,11 +585,22 @@
   struct GNUNET_TIME_Absolute expiration;
 
   /**
+   * Peers we would consider forwarding this
+   * block to.  Zero for empty entries.
+   */
+  GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
+
+  /**
    * Size of the block.
    */
   size_t size;
 
   /**
+   *  Number of targets already used.
+   */
+  unsigned int used_targets;
+
+  /**
    * Type of the block.
    */
   enum GNUNET_BLOCK_Type type;
@@ -684,6 +694,25 @@
  */
 static int active_migration;
 
+
+/**
+ * Transmit messages by copying it to the target buffer
+ * "buf".  "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime.  In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
+ *
+ * @param cls closure, pointer to the 'struct ConnectedPeer*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_to_peer (void *cls,
+                 size_t size, void *buf);
+
+
 /* ******************* clean up functions ************************ */
 
 
@@ -698,16 +727,38 @@
   GNUNET_CONTAINER_DLL_remove (mig_head,
                               mig_tail,
                               mb);
+  GNUNET_PEER_decrement_rcs (mb->target_list,
+                            MIGRATION_LIST_SIZE);
   mig_size--;
   GNUNET_free (mb);
 }
 
 
 /**
+ * Compare the distance of two peers to a key.
+ *
+ * @param key key
+ * @param p1 first peer
+ * @param p2 second peer
+ * @return GNUNET_YES if P1 is closer to key than P2
+ */
+static int
+is_closer (const GNUNET_HashCode *key,
+          const struct GNUNET_PeerIdentity *p1,
+          const struct GNUNET_PeerIdentity *p2)
+{
+  return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
+                                   &p2->hashPubKey,
+                                   key);
+}
+
+
+/**
  * Consider migrating content to a given peer.
  *
- * @param cls not used
- * @param key ID of the peer (not used)
+ * @param cls 'struct MigrationReadyBlock*' to select
+ *            targets for (or NULL for none)
+ * @param key ID of the peer 
  * @param value 'struct ConnectedPeer' of the peer
  * @return GNUNET_YES (always continue iteration)2
  */
@@ -716,17 +767,92 @@
                    const GNUNET_HashCode *key,
                    void *value)
 {
+  struct MigrationReadyBlock *mb = cls;
   struct ConnectedPeer *cp = value;
+  struct MigrationReadyBlock *pos;
+  struct GNUNET_PeerIdentity cppid;
+  struct GNUNET_PeerIdentity otherpid;
+  struct GNUNET_PeerIdentity worstpid;
+  size_t msize;
+  unsigned int i;
+  unsigned int repl;
   
+  /* consider 'cp' as a migration target for mb */
+  if (mb != NULL)
+    {
+      GNUNET_PEER_resolve (cp->pid,
+                          &cppid);
+      repl = MIGRATION_LIST_SIZE;
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (mb->target_list[i] == 0)
+           {
+             mb->target_list[i] = cp->pid;
+             GNUNET_PEER_change_rc (mb->target_list[i], 1);
+             repl = MIGRATION_LIST_SIZE;
+             break;
+           }
+         GNUNET_PEER_resolve (mb->target_list[i],
+                              &otherpid);
+         if ( (repl == MIGRATION_LIST_SIZE) &&
+              is_closer (&mb->query,
+                         &cppid,
+                         &otherpid)) 
+           {
+             repl = i;
+             worstpid = otherpid;
+           }
+         else if ( (repl != MIGRATION_LIST_SIZE) &&
+                   (is_closer (&mb->query,
+                               &worstpid,
+                               &otherpid) ) )
+           {
+             repl = i;
+             worstpid = otherpid;
+           }       
+       }
+      if (repl != MIGRATION_LIST_SIZE) 
+       {
+         GNUNET_PEER_change_rc (mb->target_list[repl], -1);
+         mb->target_list[repl] = cp->pid;
+         GNUNET_PEER_change_rc (mb->target_list[repl], 1);
+       }
+    }
+
+  /* consider scheduling transmission to cp for content migration */
   if (cp->cth != NULL)
-    return GNUNET_YES; /* or what? */
-  /* FIXME: not implemented! */
+    return GNUNET_YES; 
+  msize = 0;
+  pos = mig_head;
+  while (pos != NULL)
+    {
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (cp->pid == pos->target_list[i])
+           {
+             if (msize == 0)
+               msize = pos->size;
+             else
+               msize = GNUNET_MIN (msize,
+                                   pos->size);
+             break;
+           }
+       }
+      pos = pos->next;
+    }
+  if (msize == 0)
+    return GNUNET_YES; /* no content available */
+  cp->cth 
+    = GNUNET_CORE_notify_transmit_ready (core,
+                                        0, GNUNET_TIME_UNIT_FOREVER_REL,
+                                        (const struct GNUNET_PeerIdentity*) 
key,
+                                        msize + sizeof (struct PutMessage),
+                                        &transmit_to_peer,
+                                        cp);
   return GNUNET_YES;
 }
 
 
-
-
 /**
  * Task that is run periodically to obtain blocks for content
  * migration
@@ -740,6 +866,32 @@
 
 
 /**
+ * If the migration task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_migration_gathering ()
+{
+  struct GNUNET_TIME_Relative delay;
+
+  if (mig_qe != NULL)
+    return;
+  if (mig_task != GNUNET_SCHEDULER_NO_TASK)
+    return;
+  delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                        mig_size);
+  delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
+                                      MAX_MIGRATION_QUEUE);
+  delay = GNUNET_TIME_relative_max (delay,
+                                   min_migration_delay);
+  mig_task = GNUNET_SCHEDULER_add_delayed (sched,
+                                          delay,
+                                          &gather_migration_blocks,
+                                          NULL);
+}
+
+
+/**
  * Process content offered for migration.
  *
  * @param cls closure
@@ -765,26 +917,25 @@
                           expiration, uint64_t uid)
 {
   struct MigrationReadyBlock *mb;
-  struct GNUNET_TIME_Relative delay;
   
   if (key == NULL)
     {
       mig_qe = NULL;
       if (mig_size < MAX_MIGRATION_QUEUE)  
-       {
-         delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
-                                                mig_size);
-         delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
-                                              MAX_MIGRATION_QUEUE);
-         delay = GNUNET_TIME_relative_max (delay,
-                                           min_migration_delay);
-         mig_task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                  delay,
-                                                  &gather_migration_blocks,
-                                                  NULL);
-       }
+       consider_migration_gathering ();
       return;
     }
+  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+    {
+      if (GNUNET_OK !=
+         GNUNET_FS_handle_on_demand_block (key, size, data,
+                                           type, priority, anonymity,
+                                           expiration, uid, 
+                                           &process_migration_content,
+                                           NULL))
+       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+      return;
+    }
   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
   mb->query = *key;
   mb->expiration = expiration;
@@ -796,10 +947,9 @@
                                     mig_tail,
                                     mb);
   mig_size++;
-  if (mig_size == 1)
-    GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                          &consider_migration,
-                                          NULL);
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &consider_migration,
+                                        mb);
   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
 }
 
@@ -978,7 +1128,8 @@
                      uint32_t distance)
 {
   struct ConnectedPeer *cp;
-
+  struct MigrationReadyBlock *pos;
+  
   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
   cp->pid = GNUNET_PEER_intern (peer);
   GNUNET_break (GNUNET_OK ==
@@ -986,8 +1137,13 @@
                                                   &peer->hashPubKey,
                                                   cp,
                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  if (mig_size > 0)
-    (void) consider_migration (NULL, &peer->hashPubKey, cp);
+
+  pos = mig_head;
+  while (NULL != pos)
+    {
+      (void) consider_migration (pos, &peer->hashPubKey, cp);
+      pos = pos->next;
+    }
 }
 
 
@@ -1031,6 +1187,8 @@
   struct ConnectedPeer *cp;
   struct PendingMessage *pm;
   unsigned int i;
+  struct MigrationReadyBlock *pos;
+  struct MigrationReadyBlock *next;
 
   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
                                              &peer->hashPubKey,
@@ -1052,6 +1210,31 @@
                GNUNET_CONTAINER_multihashmap_remove (connected_peers,
                                                      &peer->hashPubKey,
                                                      cp));
+  /* remove this peer from migration considerations; schedule
+     alternatives */
+  next = mig_head;
+  while (NULL != (pos = next))
+    {
+      next = pos->next;
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (pos->target_list[i] == cp->pid)
+           {
+             GNUNET_PEER_change_rc (pos->target_list[i], -1);
+             pos->target_list[i] = 0;
+             if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size 
(connected_peers))
+               {
+                 delete_migration_block (pos);
+                 consider_migration_gathering ();
+               }
+             GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                                    &consider_migration,
+                                                    pos);
+             break;
+           }
+       }
+    }
+
   GNUNET_PEER_change_rc (cp->pid, -1);
   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
   if (NULL != cp->cth)
@@ -1231,7 +1414,7 @@
 
 
 /**
- * Transmit the given message by copying it to the target buffer
+ * Transmit messages by copying it to the target buffer
  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
  * for writing in the meantime.  In that case, do nothing
  * (the disconnect or shutdown handler will take care of the rest).
@@ -1251,7 +1434,11 @@
   char *cbuf = buf;
   struct GNUNET_PeerIdentity pid;
   struct PendingMessage *pm;
+  struct MigrationReadyBlock *mb;
+  struct MigrationReadyBlock *next;
+  struct PutMessage migm;
   size_t msize;
+  unsigned int i;
  
   cp->cth = NULL;
   if (NULL == buf)
@@ -1283,6 +1470,44 @@
                                                   &transmit_to_peer,
                                                   cp);
     }
+  else
+    {      
+      next = mig_head;
+      while (NULL != (mb = next))
+       {
+         next = mb->next;
+         for (i=0;i<MIGRATION_LIST_SIZE;i++)
+           {
+             if ( (cp->pid == mb->target_list[i]) &&
+                  (mb->size + sizeof (migm) <= size) )
+               {
+                 GNUNET_PEER_change_rc (mb->target_list[i], -1);
+                 mb->target_list[i] = 0;
+                 mb->used_targets++;
+                 migm.header.size = htons (sizeof (migm) + mb->size);
+                 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+                 migm.type = htonl (mb->type);
+                 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
+                 memcpy (&cbuf[msize], &migm, sizeof (migm));
+                 msize += sizeof (migm);
+                 size -= sizeof (migm);
+                 memcpy (&cbuf[msize], &mb[1], mb->size);
+                 msize += mb->size;
+                 size -= mb->size;               
+                 break;
+               }
+           }
+         if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
+              (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size 
(connected_peers)) )
+           {
+             delete_migration_block (mb);
+             consider_migration_gathering ();
+           }
+       }
+      consider_migration (NULL, 
+                         &pid.hashPubKey,
+                         cp);
+    }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Transmitting %u bytes to peer %u\n",
@@ -1330,20 +1555,19 @@
   cp->pending_requests++;
   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
     destroy_pending_message (cp->pending_messages_tail, 0);  
+  GNUNET_PEER_resolve (cp->pid, &pid);
+  if (NULL != cp->cth)
+    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+  /* need to schedule transmission */
+  cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                              
cp->pending_messages_head->priority,
+                                              MAX_TRANSMIT_DELAY,
+                                              &pid,
+                                              cp->pending_messages_head->msize,
+                                              &transmit_to_peer,
+                                              cp);
   if (cp->cth == NULL)
     {
-      /* need to schedule transmission */
-      GNUNET_PEER_resolve (cp->pid, &pid);
-      cp->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                                  
cp->pending_messages_head->priority,
-                                                  MAX_TRANSMIT_DELAY,
-                                                  &pid,
-                                                  
cp->pending_messages_head->msize,
-                                                  &transmit_to_peer,
-                                                  cp);
-    }
-  if (cp->cth == NULL)
-    {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Failed to schedule transmission with core!\n");
@@ -2095,9 +2319,14 @@
        }
       else
        {
+         if (NULL != prq->sender->last_client_replies
+             [(prq->sender->last_client_replies_woff) % 
CS2P_SUCCESS_LIST_SIZE])
+           GNUNET_SERVER_client_drop (prq->sender->last_client_replies
+                                      [(prq->sender->last_client_replies_woff) 
% CS2P_SUCCESS_LIST_SIZE]);
          prq->sender->last_client_replies
            [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
            = pr->client_request_list->client_list->client;
+         GNUNET_SERVER_client_keep 
(pr->client_request_list->client_list->client);
        }
     }
   GNUNET_CRYPTO_hash (prq->data,
@@ -2255,12 +2484,10 @@
       memcpy (&pm[1], prq->data, prq->size);
       add_to_pending_messages_for_peer (cp, reply, pr);
     }
-  // FIXME: implement hot-path routing statistics keeping!
   return GNUNET_YES;
 }
 
 
-
 /**
  * Continuation called to notify client about result of the
  * operation.
@@ -2586,18 +2813,13 @@
       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
-  prq.type = type;
-  prq.priority = priority;  
-  process_reply (&prq, key, pr);
-
   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
     {
       if (pr->qe != NULL)
        GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
-      return;
     }
-  if ( (pr->client_request_list == NULL) &&
+  else if ( (pr->client_request_list == NULL) &&
        ( (GNUNET_YES == test_load_too_high()) ||
         (pr->results_found > 5 + 2 * pr->priority) ) )
     {
@@ -2611,10 +2833,12 @@
                                GNUNET_NO);
       if (pr->qe != NULL)
        GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
-      return;
     }
-  if (pr->qe != NULL)
+  else if (pr->qe != NULL)
     GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+  prq.type = type;
+  prq.priority = priority;  
+  process_reply (&prq, key, pr);
 }
 
 
@@ -3221,11 +3445,7 @@
     }
   /* FIXME: distinguish between sending and storing in options? */
   if (active_migration) 
-    {
-      mig_task = GNUNET_SCHEDULER_add_now (sched,
-                                          &gather_migration_blocks,
-                                          NULL);
-    }
+    consider_migration_gathering ();
   GNUNET_SERVER_disconnect_notify (server, 
                                   &handle_client_disconnect,
                                   NULL);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]