gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r11078 - in gnunet: . src/fs src/include


From: gnunet
Subject: [GNUnet-SVN] r11078 - in gnunet: . src/fs src/include
Date: Tue, 27 Apr 2010 15:40:04 +0200

Author: grothoff
Date: 2010-04-27 15:40:04 +0200 (Tue, 27 Apr 2010)
New Revision: 11078

Modified:
   gnunet/TODO
   gnunet/src/fs/fs.c
   gnunet/src/fs/fs.h
   gnunet/src/fs/fs_download.c
   gnunet/src/fs/fs_search.c
   gnunet/src/fs/fs_test_lib.c
   gnunet/src/fs/gnunet-download.c
   gnunet/src/include/gnunet_fs_service.h
Log:
bounded parallelism

Modified: gnunet/TODO
===================================================================
--- gnunet/TODO 2010-04-27 12:35:37 UTC (rev 11077)
+++ gnunet/TODO 2010-04-27 13:40:04 UTC (rev 11078)
@@ -1,8 +1,9 @@
 0.9.0pre1:
 * FS: [CG]
-  - bound parallelism (# fs downloads)
-  - distinguish in performance tracking and event signalling between
-    downloads that are actually running and those that are merely in the queue
+  - search: availability probes [needed for full persistence support...]
+  - Allow checking of presence of search results and/or content via 
command-line tools
+    (add options to gnunet-search / gnunet-download to limit search to local 
peer) 
+    [needed for full persistence support...]
   - persistence support (publish, unindex, search, download)
   - gnunet-service-fs (hot-path routing, load-based routing, nitpicks)  
   - [gnunet-service-fs.c:208]: member 'LocalGetContext::results_bf_size' is 
never used
@@ -70,7 +71,6 @@
   - shutdown sequence?
 * FS: [CG]
   - datastore reservation (publishing)
-  - search: availability probes
   - location URIs (publish, search, download)
   - non-anonymous FS service (needs DHT)
     + DHT integration for search
@@ -116,8 +116,7 @@
   - convert documentation pages to books
   - update books (especially for developers)
   - create good Drupal theme for GNUnet
-  - make a NICE download page and figure out how to 
-    enable developers to publish TGZs nicely
+  - make a NICE download page and figure out how to enable developers to 
publish TGZs nicely
   - port "contact" page
   - add content type for "todo" items?
 * POSTGRES database backends: [CG]
@@ -151,8 +150,6 @@
     (Note: build library always, build service when libxml2/etc. are available)
 * FS: [CG]
   - Remove KBlocks in gnunet-unindex (see discussion with Kenneth Almquist on 
gnunet-devs in 9/2009)
-  - Allow checking of presence of search results and/or content via 
command-line tools
-    (add options to gnunet-search / gnunet-download to limit search to local 
peer)
 * PEERINFO: [CG]
   - expire 'ancient' HELLOs (those without valid addresses AND that 
     we have not 'used' (for their public keys) in a while; need a way

Modified: gnunet/src/fs/fs.c
===================================================================
--- gnunet/src/fs/fs.c  2010-04-27 12:35:37 UTC (rev 11077)
+++ gnunet/src/fs/fs.c  2010-04-27 13:40:04 UTC (rev 11078)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008, 2009 Christian Grothoff 
(and other contributing authors)
+     (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008, 2009, 2010 Christian 
Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -45,20 +45,13 @@
       return;
     }
   qe->start (qe->cls, qe->client);
-  switch (qe->category)
-    {
-    case GNUNET_FS_QC_DOWNLOAD:
-      qe->h->active_downloads++;
-      break;
-    case GNUNET_FS_QC_PROBE:
-      qe->h->active_probes++;
-      break;
-    }
+  qe->start_times++;
+  qe->h->active_blocks += qe->blocks;
   qe->start_time = GNUNET_TIME_absolute_get ();
   GNUNET_CONTAINER_DLL_remove (qe->h->pending_head,
                               qe->h->pending_tail,
                               qe);
-  GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head,
+  GNUNET_CONTAINER_DLL_insert_after (qe->h->running_head,
                                     qe->h->running_tail,
                                     qe->h->running_tail,
                                     qe);
@@ -76,15 +69,8 @@
 {
   qe->client = NULL;
   qe->stop (qe->cls);
-  switch (qe->category)
-    {
-    case GNUNET_FS_QC_DOWNLOAD:
-      qe->h->active_downloads--;
-      break;
-    case GNUNET_FS_QC_PROBE:
-      qe->h->active_probes--;
-      break;
-    }
+  qe->h->active_downloads--;
+  qe->h->active_blocks -= qe->blocks;
   qe->run_time = GNUNET_TIME_relative_add (qe->run_time,
                                           GNUNET_TIME_absolute_get_duration 
(qe->start_time));
   GNUNET_CONTAINER_DLL_remove (qe->h->running_head,
@@ -109,13 +95,54 @@
                   const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_FS_Handle *h = cls;
+  struct GNUNET_FS_QueueEntry *qe;
+  struct GNUNET_FS_QueueEntry *next;
+  struct GNUNET_TIME_Relative run_time;
+  struct GNUNET_TIME_Relative restart_at;
+  struct GNUNET_TIME_Relative rst;
+  struct GNUNET_TIME_Absolute end_time;
 
   h->queue_job = GNUNET_SCHEDULER_NO_TASK;
-  /* FIXME: stupid implementation that just starts everything follows... */
-  while (NULL != h->pending_head)
-    start_job (h->pending_head);
-  
-  /* FIXME: possibly re-schedule queue-job! */
+  next = h->pending_head;
+  while (NULL != (qe = next))
+    {
+      next = qe->next;
+      if (h->running_head == NULL)
+       {
+         start_job (qe);
+         continue;
+       }
+      if ( (qe->blocks + h->active_blocks <= h->max_parallel_requests) &&
+          (h->active_downloads + 1 <= h->max_parallel_downloads) )
+       {
+         start_job (qe);
+         continue;
+       }
+    }
+  if (h->pending_head == NULL)
+    return; /* no need to stop anything */
+  restart_at = GNUNET_TIME_UNIT_FOREVER_REL;
+  next = h->running_head;
+  while (NULL != (qe = next))
+    {
+      next = qe->next;
+      /* FIXME: might be faster/simpler to do this calculation only once
+        when we start a job (OTOH, this would allow us to dynamically
+        and easily adjust qe->blocks over time, given the right API...) */
+      run_time = GNUNET_TIME_relative_multiply (h->avg_block_latency,
+                                               qe->blocks * qe->start_times);
+      end_time = GNUNET_TIME_absolute_add (qe->start_time,
+                                          run_time);
+      rst = GNUNET_TIME_absolute_get_remaining (end_time);
+      restart_at = GNUNET_TIME_relative_min (rst, restart_at);
+      if (rst.value > 0)
+       continue;       
+      stop_job (qe);
+    }
+  h->queue_job = GNUNET_SCHEDULER_add_delayed (h->sched,
+                                              restart_at,
+                                              &process_job_queue,
+                                              h);
 }
 
 /**
@@ -125,7 +152,7 @@
  * @param start function to call to begin the job
  * @param stop function to call to pause the job, or on dequeue (if the job 
was running)
  * @param cls closure for start and stop
- * @param cat category of the job
+ * @param blocks number of blocks this jobs uses
  * @return queue handle
  */
 struct GNUNET_FS_QueueEntry *
@@ -133,7 +160,7 @@
                  GNUNET_FS_QueueStart start,
                  GNUNET_FS_QueueStop stop,
                  void *cls,
-                 enum GNUNET_FS_QueueCategory cat)
+                 unsigned int blocks)
 {
   struct GNUNET_FS_QueueEntry *qe;
 
@@ -143,7 +170,7 @@
   qe->stop = stop;
   qe->cls = cls;
   qe->queue_time = GNUNET_TIME_absolute_get ();
-  qe->category = cat;
+  qe->blocks = blocks;
   GNUNET_CONTAINER_DLL_insert_after (h->pending_head,
                                     h->pending_tail,
                                     h->pending_tail,
@@ -166,21 +193,22 @@
 void
 GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh)
 {
+  struct GNUNET_FS_Handle *h;
+
+  h = qh->h;
   if (qh->client != NULL)    
-    {
-      if (qh->h->queue_job != GNUNET_SCHEDULER_NO_TASK)
-       GNUNET_SCHEDULER_cancel (qh->h->sched,
-                                qh->h->queue_job);
-      qh->h->queue_job 
-       = GNUNET_SCHEDULER_add_now (qh->h->sched,
-                                   &process_job_queue,
-                                   qh->h);
-      stop_job (qh);
-    }
-  GNUNET_CONTAINER_DLL_remove (qh->h->pending_head,
-                              qh->h->pending_tail,
+    stop_job (qh);    
+  GNUNET_CONTAINER_DLL_remove (h->pending_head,
+                              h->pending_tail,
                               qh);
   GNUNET_free (qh);
+  if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (h->sched,
+                            h->queue_job);
+  h->queue_job 
+    = GNUNET_SCHEDULER_add_now (h->sched,
+                               &process_job_queue,
+                               h);
 }
 
 
@@ -207,7 +235,9 @@
 {
   struct GNUNET_FS_Handle *ret;
   struct GNUNET_CLIENT_Connection *client;
-  
+  enum GNUNET_FS_OPTIONS opt;
+  va_list ap;
+
   client = GNUNET_CLIENT_connect (sched,
                                  "fs",
                                  cfg);
@@ -221,7 +251,29 @@
   ret->upcb_cls = upcb_cls;
   ret->client = client;
   ret->flags = flags;
-  // FIXME: process varargs!
+  ret->max_parallel_downloads = 1;
+  ret->max_parallel_requests = 1;
+  ret->avg_block_latency = GNUNET_TIME_UNIT_MINUTES; /* conservative starting 
point */
+  va_start (ap, flags);  
+  while (GNUNET_FS_OPTIONS_END != (opt = va_arg (ap, enum GNUNET_FS_OPTIONS)))
+    {
+      switch (opt)
+       {
+       case GNUNET_FS_OPTIONS_DOWNLOAD_PARALLELISM:
+         ret->max_parallel_downloads = va_arg (ap, unsigned int);
+         break;
+       case GNUNET_FS_OPTIONS_REQUEST_PARALLELISM:
+         ret->max_parallel_requests = va_arg (ap, unsigned int);
+         break;
+       default:
+         GNUNET_break (0);
+         GNUNET_free (ret->client_name);
+         GNUNET_free (ret);
+         va_end (ap);
+         return NULL;
+       }
+    }
+  va_end (ap);
   // FIXME: setup receive-loop with client
 
   // FIXME: deserialize state; use client-name to find master-directory!

Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h  2010-04-27 12:35:37 UTC (rev 11077)
+++ gnunet/src/fs/fs.h  2010-04-27 13:40:04 UTC (rev 11078)
@@ -462,23 +462,7 @@
  */
 typedef void (*GNUNET_FS_QueueStop)(void *cls);
 
-/**
- * Categories of jobs in the FS queue.
- */
-enum GNUNET_FS_QueueCategory 
-  {
-    /**
-     * File download.
-     */
-    GNUNET_FS_QC_DOWNLOAD,
 
-    /**
-     * Availability probe (related to search).
-     */
-    GNUNET_FS_QC_PROBE
-
-  };
-
 /**
  * Entry in the job queue.
  */
@@ -536,10 +520,15 @@
   struct GNUNET_TIME_Relative run_time;
 
   /**
-   * What type of job is this?
+   * How many blocks do the active downloads have?
    */
-  enum GNUNET_FS_QueueCategory category;
+  unsigned int blocks;
 
+  /**
+   * How often have we (re)started this download?
+   */
+  unsigned int start_times;
+
 };
 
 
@@ -552,6 +541,7 @@
  * @param start function to call to begin the job
  * @param stop function to call to pause the job, or on dequeue (if the job 
was running)
  * @param cls closure for start and stop
+ * @param blocks number of blocks this download has
  * @return queue handle
  */
 struct GNUNET_FS_QueueEntry *
@@ -559,7 +549,7 @@
                  GNUNET_FS_QueueStart start,
                  GNUNET_FS_QueueStop stop,
                  void *cls,
-                 enum GNUNET_FS_QueueCategory cat);
+                 unsigned int blocks);
 
 
 /**
@@ -632,10 +622,10 @@
   GNUNET_SCHEDULER_TaskIdentifier queue_job;
 
   /**
-   * How many downloads probing availability of search results do we
-   * have running right now?
+   * Average time we take for a single request to be satisfied.
+   * FIXME: not yet calcualted properly...
    */
-  unsigned int active_probes;
+  struct GNUNET_TIME_Relative avg_block_latency;
 
   /**
    * How many actual downloads do we have running right now?
@@ -643,10 +633,25 @@
   unsigned int active_downloads;
 
   /**
+   * How many blocks do the active downloads have?
+   */
+  unsigned int active_blocks;
+
+  /**
    * General flags.
    */
   enum GNUNET_FS_Flags flags;
 
+  /**
+   * Maximum number of parallel downloads.
+   */
+  unsigned int max_parallel_downloads;
+
+  /**
+   * Maximum number of parallel requests.
+   */
+  unsigned int max_parallel_requests;
+
 };
 
 
@@ -1189,6 +1194,11 @@
   struct GNUNET_CLIENT_TransmitHandle *th;
 
   /**
+   * Our entry in the job queue.
+   */
+  struct GNUNET_FS_QueueEntry *job_queue;
+
+  /**
    * Identity of the peer having the content, or all-zeros
    * if we don't know of such a peer.
    */

Modified: gnunet/src/fs/fs_download.c
===================================================================
--- gnunet/src/fs/fs_download.c 2010-04-27 12:35:37 UTC (rev 11077)
+++ gnunet/src/fs/fs_download.c 2010-04-27 13:40:04 UTC (rev 11078)
@@ -23,12 +23,9 @@
  * @author Christian Grothoff
  *
  * TODO:
- * - handle recursive downloads (need directory & 
- *   fs-level download-parallelism management)
  * - location URI suppport (can wait, easy)
- * - check if blocks exist already (can wait, easy)
+ * - persistence (can wait)
  * - check if iblocks can be computed from existing blocks (can wait, hard)
- * - persistence (can wait)
  */
 #include "platform.h"
 #include "gnunet_constants.h"
@@ -990,7 +987,11 @@
                                      "truncate",
                                      dc->filename);
        }
-
+      if (dc->job_queue != NULL)
+       {
+         GNUNET_FS_dequeue_ (dc->job_queue);
+         dc->job_queue = NULL;
+       }
       if (is_recursive_download (dc))
        full_recursive_download (dc);
       if (dc->child_head == NULL)
@@ -1273,7 +1274,74 @@
 }
 
 
+
 /**
+ * We're allowed to ask the FS service for our blocks.  Start the download.
+ *
+ * @param cls the 'struct GNUNET_FS_DownloadContext'
+ * @param client handle to use for communcation with FS (we must destroy it!)
+ */
+static void
+activate_fs_download (void *cls,
+                     struct GNUNET_CLIENT_Connection *client)
+{
+  struct GNUNET_FS_DownloadContext *dc = cls;
+  struct GNUNET_FS_ProgressInfo pi;
+
+  GNUNET_assert (NULL != client);
+  dc->client = client;
+  GNUNET_CLIENT_receive (client,
+                        &receive_results,
+                        dc,
+                        GNUNET_TIME_UNIT_FOREVER_REL);
+  pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
+  make_download_status (&pi, dc);
+  dc->client_info = dc->h->upcb (dc->h->upcb_cls,
+                                &pi);
+  GNUNET_CONTAINER_multihashmap_iterate (dc->active,
+                                        &retry_entry,
+                                        dc);
+  if ( (dc->th == NULL) &&
+       (dc->client != NULL) )
+    dc->th = GNUNET_CLIENT_notify_transmit_ready (dc->client,
+                                                 sizeof (struct SearchMessage),
+                                                 
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+                                                 GNUNET_NO,
+                                                 &transmit_download_request,
+                                                 dc);
+}
+
+
+/**
+ * We must stop to ask the FS service for our blocks.  Pause the download.
+ *
+ * @param cls the 'struct GNUNET_FS_DownloadContext'
+ * @param client handle to use for communcation with FS (we must destroy it!)
+ */
+static void
+deactivate_fs_download (void *cls)
+{
+  struct GNUNET_FS_DownloadContext *dc = cls;
+  struct GNUNET_FS_ProgressInfo pi;
+  
+  if (NULL != dc->th)
+    {
+      GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
+      dc->th = NULL;
+    }
+  if (NULL != dc->client)
+    {
+      GNUNET_CLIENT_disconnect (dc->client, GNUNET_NO);
+      dc->client = NULL;
+    }
+  pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE;
+  make_download_status (&pi, dc);
+  dc->client_info = dc->h->upcb (dc->h->upcb_cls,
+                                &pi);
+}
+
+
+/**
  * Download parts of a file.  Note that this will store
  * the blocks at the respective offset in the given file.  Also, the
  * download is still using the blocking of the underlying FS
@@ -1318,7 +1386,6 @@
 {
   struct GNUNET_FS_ProgressInfo pi;
   struct GNUNET_FS_DownloadContext *dc;
-  struct GNUNET_CLIENT_Connection *client;
 
   GNUNET_assert (GNUNET_FS_uri_test_chk (uri));
   if ( (offset + length < offset) ||
@@ -1401,26 +1468,15 @@
   pi.value.download.specifics.start.meta = meta;
   dc->client_info = dc->h->upcb (dc->h->upcb_cls,
                                 &pi);
-
-  
-  // FIXME: bound parallelism here
-  client = GNUNET_CLIENT_connect (h->sched,
-                                 "fs",
-                                 h->cfg);
-  GNUNET_assert (NULL != client);
-  dc->client = client;
-  GNUNET_CLIENT_receive (client,
-                        &receive_results,
-                        dc,
-                        GNUNET_TIME_UNIT_FOREVER_REL);
-  pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
-  make_download_status (&pi, dc);
-  dc->client_info = dc->h->upcb (dc->h->upcb_cls,
-                                &pi);
   schedule_block_download (dc, 
                           &dc->uri->data.chk.chk,
                           0, 
-                          1 /* 0 == CHK, 1 == top */);
+                          1 /* 0 == CHK, 1 == top */); 
+  dc->job_queue = GNUNET_FS_queue_ (h, 
+                                   &activate_fs_download,
+                                   &deactivate_fs_download,
+                                   dc,
+                                   (length + DBLOCK_SIZE-1) / DBLOCK_SIZE);
   return dc;
 }
 
@@ -1455,6 +1511,11 @@
 {
   struct GNUNET_FS_ProgressInfo pi;
 
+  if (dc->job_queue != NULL)
+    {
+      GNUNET_FS_dequeue_ (dc->job_queue);
+      dc->job_queue = NULL;
+    }
   while (NULL != dc->child_head)
     GNUNET_FS_download_stop (dc->child_head, 
                             do_delete);
@@ -1472,13 +1533,6 @@
   if (GNUNET_SCHEDULER_NO_TASK != dc->task)
     GNUNET_SCHEDULER_cancel (dc->h->sched,
                             dc->task);
-  if (NULL != dc->th)
-    {
-      GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
-      dc->th = NULL;
-    }
-  if (NULL != dc->client)
-    GNUNET_CLIENT_disconnect (dc->client, GNUNET_NO);
   GNUNET_CONTAINER_multihashmap_iterate (dc->active,
                                         &free_entry,
                                         NULL);

Modified: gnunet/src/fs/fs_search.c
===================================================================
--- gnunet/src/fs/fs_search.c   2010-04-27 12:35:37 UTC (rev 11077)
+++ gnunet/src/fs/fs_search.c   2010-04-27 13:40:04 UTC (rev 11078)
@@ -1022,19 +1022,10 @@
   GNUNET_FS_uri_destroy (sr->uri);
   GNUNET_CONTAINER_meta_data_destroy (sr->meta);
   if (sr->probe_ctx != NULL)
-    {
-      GNUNET_FS_download_stop (sr->probe_ctx, GNUNET_YES);
-      h->active_probes--;
-      /* FIXME: trigger starting of new
-        probes here!? Maybe not -- could
-        cause new probes to be immediately
-        stopped again... */
-    }
+    GNUNET_FS_download_stop (sr->probe_ctx, GNUNET_YES);    
   if (sr->probe_cancel_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (h->sched,
-                              sr->probe_cancel_task);
-    }
+    GNUNET_SCHEDULER_cancel (h->sched,
+                            sr->probe_cancel_task);    
   GNUNET_free (sr);
   return GNUNET_OK;
 }

Modified: gnunet/src/fs/fs_test_lib.c
===================================================================
--- gnunet/src/fs/fs_test_lib.c 2010-04-27 12:35:37 UTC (rev 11077)
+++ gnunet/src/fs/fs_test_lib.c 2010-04-27 13:40:04 UTC (rev 11078)
@@ -208,6 +208,9 @@
                                         daemon,
                                         GNUNET_SCHEDULER_REASON_PREREQ_DONE);
       break;
+    case GNUNET_FS_STATUS_DOWNLOAD_ACTIVE:
+    case GNUNET_FS_STATUS_DOWNLOAD_INACTIVE:
+      break;
       /* FIXME: monitor data correctness during download progress */
       /* FIXME: do performance reports given sufficient verbosity */
       /* FIXME: advance timeout task to "immediate" on error */

Modified: gnunet/src/fs/gnunet-download.c
===================================================================
--- gnunet/src/fs/gnunet-download.c     2010-04-27 12:35:37 UTC (rev 11077)
+++ gnunet/src/fs/gnunet-download.c     2010-04-27 13:40:04 UTC (rev 11078)
@@ -138,6 +138,9 @@
                                           NULL,
                                           GNUNET_SCHEDULER_REASON_PREREQ_DONE);
       break;      
+    case GNUNET_FS_STATUS_DOWNLOAD_ACTIVE:
+    case GNUNET_FS_STATUS_DOWNLOAD_INACTIVE:
+      break;
     default:
       fprintf (stderr,
               _("Unexpected status: %d\n"),

Modified: gnunet/src/include/gnunet_fs_service.h
===================================================================
--- gnunet/src/include/gnunet_fs_service.h      2010-04-27 12:35:37 UTC (rev 
11077)
+++ gnunet/src/include/gnunet_fs_service.h      2010-04-27 13:40:04 UTC (rev 
11078)
@@ -1476,8 +1476,16 @@
      * followed by an "unsigned int" giving the desired maximum number
      * of parallel downloads).
      */
-    GNUNET_FS_OPTIONS_DOWNLOAD_PARALLELISM = 1
+    GNUNET_FS_OPTIONS_DOWNLOAD_PARALLELISM = 1,
 
+    /**
+     * Maximum number of requests that should be pending at a given
+     * point in time (invidivual downloads may go above this, but
+     * if we are above this threshold, we should not activate any
+     * additional downloads.
+     */
+    GNUNET_FS_OPTIONS_REQUEST_PARALLELISM = 2
+
   };
 
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]