gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r11353 - gnunet/src/datastore


From: gnunet
Subject: [GNUnet-SVN] r11353 - gnunet/src/datastore
Date: Wed, 12 May 2010 16:22:36 +0200

Author: grothoff
Date: 2010-05-12 16:22:36 +0200 (Wed, 12 May 2010)
New Revision: 11353

Modified:
   gnunet/src/datastore/Makefile.am
   gnunet/src/datastore/datastore_api.c
   gnunet/src/datastore/perf_datastore_api.c
   gnunet/src/datastore/test_datastore_api.c
   gnunet/src/datastore/test_datastore_api_management.c
Log:
stuff

Modified: gnunet/src/datastore/Makefile.am
===================================================================
--- gnunet/src/datastore/Makefile.am    2010-05-12 14:17:29 UTC (rev 11352)
+++ gnunet/src/datastore/Makefile.am    2010-05-12 14:22:36 UTC (rev 11353)
@@ -64,7 +64,7 @@
  perf_datastore_api \
  perf_plugin_datastore
 
-#TESTS = $(check_PROGRAMS)
+TESTS = $(check_PROGRAMS)
 
 test_datastore_api_SOURCES = \
  test_datastore_api.c

Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c        2010-05-12 14:17:29 UTC (rev 
11352)
+++ gnunet/src/datastore/datastore_api.c        2010-05-12 14:22:36 UTC (rev 
11353)
@@ -20,11 +20,13 @@
 
 /**
  * @file datastore/datastore_api.c
- * @brief Management for the datastore for files stored on a GNUnet node
+ * @brief Management for the datastore for files stored on a GNUnet node.  
Implements
+ *        a priority queue for requests (with timeouts).
  * @author Christian Grothoff
  */
 #include "platform.h"
 #include "gnunet_arm_service.h"
+#include "gnunet_constants.h"
 #include "gnunet_datastore_service.h"
 #include "datastore.h"
 
@@ -50,6 +52,29 @@
   struct GNUNET_DATASTORE_Handle *h;
 
   /**
+   * Response processor (NULL if we are not waiting for a response).
+   * This struct should be used for the closure, function-specific
+   * arguments can be passed via 'client_ctx'.
+   */
+  GNUNET_CLIENT_MessageHandler response_proc;
+  
+  /**
+   * Specific context (variable argument that
+   * can be used by the response processor).
+   */
+  void *client_ctx;
+
+  /**
+   * Function to call after transmission of the request.
+   */
+  GNUNET_DATASTORE_ContinuationWithStatus cont;
+   
+  /**
+   * Closure for 'cont'.
+   */
+  void *cont_cls;
+
+  /**
    * Task for timeout signalling.
    */
   GNUNET_SCHEDULER_TaskIdentifier task;
@@ -72,34 +97,23 @@
 
   /**
    * Number of bytes in the request message following
-   * this struct.
+   * this struct.  32-bit value for nicer memory
+   * access (and overall struct alignment).
    */
-  uint16_t message_size;
+  uint32_t message_size;
 
   /**
    * Has this message been transmitted to the service?
    * Only ever GNUNET_YES for the head of the queue.
+   * Note that the overall struct should end at a 
+   * multiple of 64 bits.
    */
-  int16_t was_transmitted;
+  int32_t was_transmitted;
 
-  /**
-   * Response processor (NULL if we are not waiting for a response).
-   * This struct should be used for the closure, function-specific
-   * arguments can be passed via 'client_ctx'.
-   */
-  GNUNET_CLIENT_MessageHandler response_proc;
-  
-  /**
-   * Specific context (variable argument that
-   * can be used by the response processor).
-   */
-  void *client_ctx;
-
 };
 
 /**
- * Handle to the datastore service.  Followed
- * by 65536 bytes used for storing messages.
+ * Handle to the datastore service. 
  */
 struct GNUNET_DATASTORE_Handle
 {
@@ -120,6 +134,11 @@
   struct GNUNET_CLIENT_Connection *client;
 
   /**
+   * Current transmit handle.
+   */
+  struct GNUNET_CLIENT_TransmitHandle *th;
+
+  /**
    * Current head of priority queue.
    */
   struct QueueEntry *queue_head;
@@ -130,6 +149,17 @@
   struct QueueEntry *queue_tail;
 
   /**
+   * Task for trying to reconnect.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
+
+  /**
+   * How quickly should we retry?  Used for exponential back-off on
+   * connect-errors.
+   */
+  struct GNUNET_TIME_Relative retry_time;
+
+  /**
    * Number of entries in the queue.
    */
   unsigned int queue_size;
@@ -214,6 +244,9 @@
 
   if (h->client != NULL)
     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+  if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (h->sched,
+                            h->reconnect_task);
   h->client = NULL;
   while (NULL != (qe = h->queue_head))
     {
@@ -245,135 +278,275 @@
 }
 
 
-#if 0
 /**
- * Type of a function to call when we receive a message
- * from the service.  This specific function is used
- * to handle messages of type "struct StatusMessage".
+ * A request has timed out (before being transmitted to the service).
  *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * @param cls the 'struct QueueEntry'
+ * @param tc scheduler context
  */
-static void 
-with_status_response_handler (void *cls,
-                             const struct
-                             GNUNET_MessageHeader * msg)
+static void
+timeout_queue_entry (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_DATASTORE_Handle *h = cls;
-  GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc;
-  const struct StatusMessage *sm;
-  const char *emsg;
-  int status;
+  struct QueueEntry *qe = cls;
+  struct GNUNET_DATASTORE_Handle *h = qe->h;
 
-  h->message_size = 0;
-  if (msg == NULL)
+  qe->task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (qe->was_transmitted == GNUNET_NO);
+  GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                              h->queue_tail,
+                              qe);
+  if (qe->cont != NULL)
+    qe->cont (qe->cont_cls, GNUNET_NO, _("timeout"));
+  if (qe->response_proc != NULL)
+    qe->response_proc (qe, NULL);
+  GNUNET_free (qe);
+}
+
+
+/**
+ * Create a new entry for our priority queue (and possibly discard other 
entires if
+ * the queue is getting too long).
+ *
+ * @param h handle to the datastore
+ * @param msize size of the message to queue
+ * @param queue_priority priority of the entry
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param timeout timeout for the operation
+ * @param cont continuation to call when done with request transmission (can 
be NULL)
+ * @param cont_cls closure for cont
+ * @param response_proc function to call with replies (can be NULL)
+ * @param client_ctx client context (NOT a closure for response_proc)
+ * @return NULL if the queue is full (and this entry was dropped)
+ */
+static struct QueueEntry *
+make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
+                 size_t msize,
+                 unsigned int queue_priority,
+                 unsigned int max_queue_size,
+                 struct GNUNET_TIME_Relative timeout,
+                 GNUNET_DATASTORE_ContinuationWithStatus cont,
+                 void *cont_cls,
+                 GNUNET_CLIENT_MessageHandler response_proc,            
+                 void *client_ctx)
+{
+  struct QueueEntry *ret;
+  struct QueueEntry *pos;
+  unsigned int c;
+
+  c = 0;
+  pos = h->queue_head;
+  while ( (pos != NULL) &&
+         (c < max_queue_size) &&
+         (pos->priority >= queue_priority) )
     {
-      h->response_proc = NULL;
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-      h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-      cont (h->response_proc_cls, 
-           GNUNET_SYSERR,
-           _("Timeout trying to read response from datastore service"));
-      return;
+      c++;
+      pos = pos->next;
     }
-  if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
-       (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
+  if (c >= max_queue_size)
+    return NULL;
+  if (pos == NULL)
     {
-      GNUNET_break (0);
-      h->response_proc = NULL;
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-      h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-      cont (h->response_proc_cls, 
-           GNUNET_SYSERR,
-           _("Error reading response from datastore service"));
-      return;
+      /* append at the tail */
+      pos = h->queue_tail;
     }
-  sm = (const struct StatusMessage*) msg;
-  status = ntohl(sm->status);
-  emsg = NULL;
-  if (ntohs(msg->size) > sizeof(struct StatusMessage))
+  else
     {
-      emsg = (const char*) &sm[1];
-      if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
+      pos = pos->prev; 
+      /* do not insert at HEAD if HEAD query was already
+        transmitted and we are still receiving replies! */
+      if ( (pos == NULL) &&
+          (h->queue_head->was_transmitted) )
+       pos = h->queue_head;
+    }
+  ret = GNUNET_malloc (sizeof (struct QueueEntry) + msize);
+  GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
+                                    h->queue_tail,
+                                    pos,
+                                    ret);
+  ret->h = h;
+  ret->response_proc = response_proc;
+  ret->client_ctx = client_ctx;
+  ret->cont = cont;
+  ret->cont_cls = cont_cls;  
+  ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
+                                           timeout,
+                                           &timeout_queue_entry,
+                                           ret);
+  ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+  ret->priority = queue_priority;
+  ret->max_queue = max_queue_size;
+  ret->message_size = msize;
+  ret->was_transmitted = GNUNET_NO;
+  h->queue_size++;
+  c++;
+  pos = ret->next;
+  while (pos != NULL) 
+    {
+      if (pos->max_queue < h->queue_size)
        {
-         GNUNET_break (0);
-         emsg = _("Invalid error message received from datastore service");
+         GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                                      h->queue_tail,
+                                      pos);
+         GNUNET_SCHEDULER_cancel (h->sched,
+                                  pos->task);
+         if (pos->cont != NULL)
+           pos->cont (pos->cont_cls, GNUNET_NO, _("Message queue full"));
+         if (pos->response_proc != NULL)
+           pos->response_proc (pos, NULL);
+         GNUNET_free (pos);
+         h->queue_size--;
+         break;
        }
-    }  
-  if ( (status == GNUNET_SYSERR) &&
-       (emsg == NULL) )
-    {
-      GNUNET_break (0);
-      emsg = _("Invalid error message received from datastore service");
+      pos = pos->next;
     }
-  h->response_proc = NULL;
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received status %d/%s\n",
-             status,
-             emsg);
-#endif
-  cont (h->response_proc_cls, 
-       status,
-       emsg);
+  return ret;
 }
 
 
 /**
- * Helper function that will initiate the
- * transmission of a message to the datastore
- * service.  The message must already be prepared
- * and stored in the buffer at the end of the
- * handle.  The message must be of a type that
- * expects a "StatusMessage" in response.
+ * Process entries in the queue (or do nothing if we are already
+ * doing so).
+ * 
+ * @param h handle to the datastore
+ */
+static void
+process_queue (struct GNUNET_DATASTORE_Handle *h);
+
+
+/**
+ * Try reconnecting to the datastore service.
  *
- * @param h handle to the service with prepared message
- * @param cont function to call with result
- * @param cont_cls closure
- * @param timeout timeout for the operation
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param tc scheduler context
  */
 static void
-transmit_for_status (struct GNUNET_DATASTORE_Handle *h,
-                    GNUNET_DATASTORE_ContinuationWithStatus cont,
-                    void *cont_cls,
-                    struct GNUNET_TIME_Relative timeout)
+try_reconnect (void *cls,
+              const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  const struct GNUNET_MessageHeader *hdr;
-  uint16_t msize;
+  struct GNUNET_DATASTORE_Handle *h = cls;
 
-  GNUNET_assert (cont != NULL);
-  hdr = (const struct GNUNET_MessageHeader*) &h[1];
-  msize = ntohs(hdr->size);
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u byte message of type %u to datastore service\n",
-             msize,
-             ntohs(hdr->type));
-#endif
-  GNUNET_assert (h->response_proc == NULL);
-  h->response_proc = cont;
-  h->response_proc_cls = cont_cls;
-  h->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  h->message_size = msize;
-  if (GNUNET_OK !=
-      GNUNET_CLIENT_transmit_and_get_response (h->client,
-                                              hdr,                             
               
-                                              timeout,
-                                              GNUNET_YES,
-                                              &with_status_response_handler,   
                                       
-                                              h))
+  if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
+    h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
+  else
+    h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
+  if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
+    h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
+  h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+  h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
+  if (h->client == NULL)
+    return;
+  process_queue (h);
+}
+
+
+/**
+ * Disconnect from the service and then try reconnecting to the datastore 
service
+ * after some delay.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param tc scheduler context
+ */
+static void
+do_disconnect (struct GNUNET_DATASTORE_Handle *h)
+{
+  GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+  h->client = NULL;
+  h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
+                                                   h->retry_time,
+                                                   &try_reconnect,
+                                                   h);      
+}
+
+
+/**
+ * Transmit request from queue to datastore service.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param size number of bytes that can be copied to buf
+ * @param buf where to copy the drop message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_request (void *cls,
+                 size_t size, 
+                 void *buf)
+{
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct QueueEntry *qe;
+  size_t msize;
+
+  h->th = NULL;
+  if (NULL == (qe = h->queue_head))
+    return 0; /* no entry in queue */
+  if (buf == NULL)
     {
-      GNUNET_break (0);
-      h->response_proc = NULL;
-      h->message_size = 0;
-      cont (cont_cls,
-           GNUNET_SYSERR,
-           _("Not ready to transmit request to datastore service"));
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 _("Failed to transmit request to database.\n"));
+      do_disconnect (h);
+      return 0;
     }
+  if (size < (msize = qe->message_size))
+    {
+      process_queue (h);
+      return 0;
+    }
+  memcpy (buf, &qe[1], msize);
+  qe->was_transmitted = GNUNET_YES;
+  if (qe->cont != NULL)
+    qe->cont (qe->cont_cls, GNUNET_OK, NULL);
+  GNUNET_SCHEDULER_cancel (h->sched,
+                          qe->task);
+  qe->task = GNUNET_SCHEDULER_NO_TASK;
+  if (qe->response_proc == NULL)
+    {
+      GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                                  h->queue_tail,
+                                  qe);
+      GNUNET_free (qe);
+      process_queue (h);
+    }
+  else
+    {
+      GNUNET_CLIENT_receive (h->client,
+                            qe->response_proc,
+                            qe,
+                            GNUNET_TIME_absolute_get_remaining (qe->timeout));
+    }  
+  return msize;
 }
 
 
 /**
+ * Process entries in the queue (or do nothing if we are already
+ * doing so).
+ * 
+ * @param h handle to the datastore
+ */
+static void
+process_queue (struct GNUNET_DATASTORE_Handle *h)
+{
+  struct QueueEntry *qe;
+
+  if (NULL == (qe = h->queue_head))
+    return; /* no entry in queue */
+  if (qe->was_transmitted == GNUNET_YES)
+    return; /* waiting for replies */
+  if (h->th != NULL)
+    return; /* request pending */
+  if (h->client == NULL)
+    return; /* waiting for reconnect */
+  h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
+                                              qe->message_size,
+                                              
GNUNET_TIME_absolute_get_remaining (qe->timeout),
+                                              GNUNET_YES,
+                                              &transmit_request,
+                                              h);
+}
+
+
+/**
  * Store an item in the datastore.  If the item is already present,
  * the priorities are summed up and the higher expiration time and
  * lower anonymity level is used.
@@ -388,6 +561,9 @@
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
  * @param expiration expiration time for the content
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
  * @param timeout timeout for the operation
  * @param cont continuation to call when done
  * @param cont_cls closure for cont
@@ -402,10 +578,13 @@
                       uint32_t priority,
                       uint32_t anonymity,
                       struct GNUNET_TIME_Absolute expiration,
+                     unsigned int queue_priority,
+                     unsigned int max_queue_size,
                       struct GNUNET_TIME_Relative timeout,
                      GNUNET_DATASTORE_ContinuationWithStatus cont,
                      void *cont_cls)
 {
+  struct QueueEntry *qe;
   struct DataMessage *dm;
   size_t msize;
 
@@ -417,7 +596,12 @@
 #endif
   msize = sizeof(struct DataMessage) + size;
   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  dm = (struct DataMessage*) &h[1];
+  qe = make_queue_entry (h, msize,
+                        queue_priority, max_queue_size, timeout,
+                        cont, cont_cls, NULL, NULL);
+  if (qe == NULL)
+    return;
+  dm = (struct DataMessage* ) &qe[1];
   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
   dm->header.size = htons(msize);
   dm->rid = htonl(rid);
@@ -429,11 +613,104 @@
   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
   dm->key = *key;
   memcpy (&dm[1], data, size);
-  transmit_for_status (h, cont, cont_cls, timeout);
+  process_queue (h);
 }
 
 
 /**
+ * Context for processing status messages.
+ */
+struct StatusContext
+{
+  /**
+   * Continuation to call with the status.
+   */
+  GNUNET_DATASTORE_ContinuationWithStatus cont;
+
+  /**
+   * Closure for cont.
+   */
+  void *cont_cls;
+
+};
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void 
+process_status_message (void *cls,
+                       const struct
+                       GNUNET_MessageHeader * msg)
+{
+  struct QueueEntry *qe = cls;
+  struct GNUNET_DATASTORE_Handle *h = qe->h;
+  struct StatusContext *rc = qe->client_ctx;
+  const struct StatusMessage *sm;
+  const char *emsg;
+  int32_t status;
+
+  GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                              h->queue_tail,
+                              qe);
+  GNUNET_free (qe);
+  if (msg == NULL)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 _("Failed to receive response from database.\n"));
+      do_disconnect (h);
+      return;
+    }
+
+  if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
+       (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
+    {
+      GNUNET_break (0);
+      h->retry_time = GNUNET_TIME_UNIT_ZERO;
+      do_disconnect (h);
+      rc->cont (rc->cont_cls, 
+               GNUNET_SYSERR,
+               _("Error reading response from datastore service"));
+      GNUNET_free (rc);
+      return;
+    }
+  sm = (const struct StatusMessage*) msg;
+  status = ntohl(sm->status);
+  emsg = NULL;
+  if (ntohs(msg->size) > sizeof(struct StatusMessage))
+    {
+      emsg = (const char*) &sm[1];
+      if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
+       {
+         GNUNET_break (0);
+         emsg = _("Invalid error message received from datastore service");
+       }
+    }  
+  if ( (status == GNUNET_SYSERR) &&
+       (emsg == NULL) )
+    {
+      GNUNET_break (0);
+      emsg = _("Invalid error message received from datastore service");
+    }
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received status %d/%s\n",
+             (int) status,
+             emsg);
+#endif
+  rc->cont (rc->cont_cls, 
+           status,
+           emsg);
+  GNUNET_free (rc);  
+  process_queue (h);
+}
+
+
+/**
  * Reserve space in the datastore.  This function should be used
  * to avoid "out of space" failures during a longer sequence of "put"
  * operations (for example, when a file is being inserted).
@@ -441,27 +718,48 @@
  * @param h handle to the datastore
  * @param amount how much space (in bytes) should be reserved (for content 
only)
  * @param entries how many entries will be created (to calculate per-entry 
overhead)
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response (or before dying in 
queue)
  * @param cont continuation to call when done; "success" will be set to
  *             a positive reservation value if space could be reserved.
  * @param cont_cls closure for cont
- * @param timeout how long to wait at most for a response
  */
 void
 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
                          uint64_t amount,
                          uint32_t entries,
+                         unsigned int queue_priority,
+                         unsigned int max_queue_size,
+                         struct GNUNET_TIME_Relative timeout,
                          GNUNET_DATASTORE_ContinuationWithStatus cont,
-                         void *cont_cls,
-                         struct GNUNET_TIME_Relative timeout)
+                         void *cont_cls)
 {
+  struct QueueEntry *qe;
   struct ReserveMessage *rm;
+  struct StatusContext *scont;
 
-  rm = (struct ReserveMessage*) &h[1];
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Asked to reserve %llu bytes of data and %u entries'\n",
+             (unsigned long long) amount,
+             (unsigned int) entries);
+#endif
+  scont = GNUNET_malloc (sizeof (struct StatusContext));
+  scont->cont = cont;
+  scont->cont_cls = cont_cls;
+  qe = make_queue_entry (h, sizeof(struct ReserveMessage),
+                        queue_priority, max_queue_size, timeout,
+                        NULL, NULL, &process_status_message, scont);
+  if (qe == NULL)
+    return;
+  rm = (struct ReserveMessage*) &qe[1];
   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
   rm->header.size = htons(sizeof (struct ReserveMessage));
   rm->entries = htonl(entries);
   rm->amount = GNUNET_htonll(amount);
-  transmit_for_status (h, cont, cont_cls, timeout);
+  process_queue (h);
 }
 
 
@@ -473,24 +771,48 @@
  * @param h handle to the datastore
  * @param rid reservation ID (value of "success" in original continuation
  *        from the "reserve" function).
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont_cls closure for cont
- * @param timeout how long to wait at most for a response
  */
 void
 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
                                  int rid,
+                                 unsigned int queue_priority,
+                                 unsigned int max_queue_size,
+                                 struct GNUNET_TIME_Relative timeout,
                                  GNUNET_DATASTORE_ContinuationWithStatus cont,
-                                 void *cont_cls,
-                                 struct GNUNET_TIME_Relative timeout)
+                                 void *cont_cls)
 {
+  struct QueueEntry *qe;
   struct ReleaseReserveMessage *rrm;
+  struct StatusContext *scont;
 
-  rrm = (struct ReleaseReserveMessage*) &h[1];
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Asked to reserve %llu bytes of data and %u entries'\n",
+             (unsigned long long) amount,
+             (unsigned int) entries);
+#endif
+  scont = GNUNET_malloc (sizeof (struct StatusContext));
+  scont->cont = cont;
+  scont->cont_cls = cont_cls;
+  qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
+                        queue_priority, max_queue_size, timeout,
+                        NULL, NULL, &process_status_message, scont);
+  if (qe == NULL)
+    return;
+  rrm = (struct ReleaseReserveMessage*) &qe[1];
   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
   rrm->rid = htonl(rid);
-  transmit_for_status (h, cont, cont_cls, timeout);
+  process_queue (h);
 }
 
 
@@ -501,264 +823,334 @@
  * @param uid identifier for the value
  * @param priority how much to increase the priority of the value
  * @param expiration new expiration value should be MAX of existing and this 
argument
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont_cls closure for cont
- * @param timeout how long to wait at most for a response
  */
 void
 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
                         unsigned long long uid,
                         uint32_t priority,
                         struct GNUNET_TIME_Absolute expiration,
+                        unsigned int queue_priority,
+                        unsigned int max_queue_size,
+                        struct GNUNET_TIME_Relative timeout,
                         GNUNET_DATASTORE_ContinuationWithStatus cont,
-                        void *cont_cls,
-                        struct GNUNET_TIME_Relative timeout)
+                        void *cont_cls)
 {
+  struct QueueEntry *qe;
   struct UpdateMessage *um;
+  struct StatusContext *scont;
 
-  um = (struct UpdateMessage*) &h[1];
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Asked to update entry %llu raising priority by %u and expiration 
to %llu\n",
+             uid,
+             (unsigned int) priority,
+             (unsigned long long) expiration.value);
+#endif
+  scont = GNUNET_malloc (sizeof (struct StatusContext));
+  scont->cont = cont;
+  scont->cont_cls = cont_cls;
+  qe = make_queue_entry (h, sizeof(struct UpdateMessage),
+                        queue_priority, max_queue_size, timeout,
+                        NULL, NULL, &process_status_message, scont);
+  if (qe == NULL)
+    return;
+  um = (struct UpdateMessage*) &qe[1];
   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
   um->header.size = htons(sizeof (struct UpdateMessage));
   um->priority = htonl(priority);
   um->expiration = GNUNET_TIME_absolute_hton(expiration);
   um->uid = GNUNET_htonll(uid);
-  transmit_for_status (h, cont, cont_cls, timeout);
+  process_queue (h);
 }
 
 
 /**
- * Helper function that will initiate the transmission of a message to
- * the datastore service.  The message must already be prepared and
- * stored in the buffer at the end of the handle.  The message must be
- * of a type that expects a "DataMessage" in response.
+ * Explicitly remove some content from the database.
+ * The "cont"inuation will be called with status
+ * "GNUNET_OK" if content was removed, "GNUNET_NO"
+ * if no matching entry was found and "GNUNET_SYSERR"
+ * on all other types of errors.
  *
- * @param h handle to the service with prepared message
- * @param cont function to call with result
- * @param cont_cls closure
- * @param timeout timeout for the operation
+ * @param h handle to the datastore
+ * @param key key for the value
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
+ * @param cont continuation to call when done
+ * @param cont_cls closure for cont
  */
-static void
-transmit_for_result (struct GNUNET_DATASTORE_Handle *h,
-                    GNUNET_DATASTORE_Iterator cont,
-                    void *cont_cls,
-                    struct GNUNET_TIME_Relative timeout);
+void
+GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
+                         const GNUNET_HashCode *key,
+                         uint32_t size, 
+                        const void *data,
+                        unsigned int queue_priority,
+                        unsigned int max_queue_size,
+                        struct GNUNET_TIME_Relative timeout,
+                        GNUNET_DATASTORE_ContinuationWithStatus cont,
+                        void *cont_cls)
+{
+  struct QueueEntry *qe;
+  struct DataMessage *dm;
+  size_t msize;
+  struct StatusContext *scont;
 
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Asked to remove %u bytes under key `%s'\n",
+             size,
+             GNUNET_h2s (key));
+#endif
+  scont = GNUNET_malloc (sizeof (struct StatusContext));
+  scont->cont = cont;
+  scont->cont_cls = cont_cls;
+  msize = sizeof(struct DataMessage) + size;
+  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  qe = make_queue_entry (h, msize,
+                        queue_priority, max_queue_size, timeout,
+                        NULL, NULL, &process_status_message, scont);
+  if (qe == NULL)
+    return;
+  dm = (struct DataMessage*) &qe[1];
+  dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
+  dm->header.size = htons(msize);
+  dm->rid = htonl(0);
+  dm->size = htonl(size);
+  dm->type = htonl(0);
+  dm->priority = htonl(0);
+  dm->anonymity = htonl(0);
+  dm->uid = GNUNET_htonll(0);
+  dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
+  dm->key = *key;
+  memcpy (&dm[1], data, size);
+  process_queue (h);
+}
 
+
+
 /**
+ * Context for processing result messages.
+ */
+struct ResultContext
+{
+  /**
+   * Iterator to call with the result.
+   */
+  GNUNET_DATASTORE_Iterator iter;
+
+  /**
+   * Closure for iter.
+   */
+  void *iter_cls;
+
+  /**
+   * Automatically get the next result, or wait for a call to
+   * GNUNET_DATASTORE_get_next?  GNUNET_YES means we automatically
+   * get the next one (if there are more).
+   */
+  int get_next;
+
+};
+
+
+/**
  * Type of a function to call when we receive a message
- * from the service.  This specific function is used
- * to handle messages of type "struct DataMessage".
+ * from the service.
  *
  * @param cls closure
  * @param msg message received, NULL on timeout or fatal error
  */
 static void 
-with_result_response_handler (void *cls,
-                             const struct
-                             GNUNET_MessageHeader * msg)
+process_result_message (void *cls,
+                       const struct GNUNET_MessageHeader * msg)
 {
-  struct GNUNET_DATASTORE_Handle *h = cls;
-  GNUNET_DATASTORE_Iterator cont = h->response_proc;
+  struct QueueEntry *qe = cls;
+  struct GNUNET_DATASTORE_Handle *h = qe->h;
+  struct ResultContext *rc = qe->client_ctx;
   const struct DataMessage *dm;
-  size_t msize;
-  struct GNUNET_TIME_Relative remaining;
 
   if (msg == NULL)
     {
 #if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Got disconnected from datastore\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 _("Failed to receive response from datastore\n"));
 #endif
-      h->response_proc = NULL;
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-      h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-      remaining = GNUNET_TIME_absolute_get_remaining (h->timeout);
-      if (remaining.value > 0)
-       {
-         transmit_for_result (h,
-                              cont,
-                              h->response_proc_cls,
-                              remaining);
-       }
-      else
-       {
-         h->message_size = 0;
-         cont (h->response_proc_cls, 
+      GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                                  h->queue_tail,
+                                  qe);
+      GNUNET_free (qe);
+      do_disconnect (h);
+      rc->iter (rc->iter_cls,
                NULL, 0, NULL, 0, 0, 0, 
-               GNUNET_TIME_UNIT_ZERO_ABS, 0);
-       }
+               GNUNET_TIME_UNIT_ZERO_ABS, 0);  
+      GNUNET_free (rc);
       return;
     }
-  h->message_size = 0;
   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
     {
       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
-      h->response_proc = NULL;
 #if DEBUG_DATASTORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Received end of result set\n");
 #endif
-      cont (h->response_proc_cls, 
-           NULL, 0, NULL, 0, 0, 0, 
-           GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                                  h->queue_tail,
+                                  qe);
+      GNUNET_free (qe);
+      rc->iter (rc->iter_cls,
+               NULL, 0, NULL, 0, 0, 0, 
+               GNUNET_TIME_UNIT_ZERO_ABS, 0);  
+      GNUNET_free (rc);
+      process_queue (h);
       return;
     }
   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
-       (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ) 
+       (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
+       (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct 
DataMessage*)msg)->size)) )
     {
       GNUNET_break (0);
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-      h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-      h->response_proc = NULL;
-      cont (h->response_proc_cls, 
-           NULL, 0, NULL, 0, 0, 0, 
-           GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                                  h->queue_tail,
+                                  qe);
+      GNUNET_free (qe);
+      h->retry_time = GNUNET_TIME_UNIT_ZERO;
+      do_disconnect (h);
+      rc->iter (rc->iter_cls,
+               NULL, 0, NULL, 0, 0, 0, 
+               GNUNET_TIME_UNIT_ZERO_ABS, 0);  
+      GNUNET_free (rc);
       return;
     }
   dm = (const struct DataMessage*) msg;
-  msize = ntohl(dm->size);
-  if (ntohs(msg->size) != msize + sizeof(struct DataMessage))
-    {
-      GNUNET_break (0);
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-      h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-      h->response_proc = NULL;
-      cont (h->response_proc_cls, 
-           NULL, 0, NULL, 0, 0, 0, 
-           GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      return;
-    }
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received result %llu with type %u and size %u with key %s\n",
              (unsigned long long) GNUNET_ntohll(dm->uid),
              ntohl(dm->type),
-             msize,
+             ntohl(dm->size),
              GNUNET_h2s(&dm->key));
 #endif
-  cont (h->response_proc_cls, 
-       &dm->key,
-       msize,
-       &dm[1],
-       ntohl(dm->type),
-       ntohl(dm->priority),
-       ntohl(dm->anonymity),
-       GNUNET_TIME_absolute_ntoh(dm->expiration),      
-       GNUNET_ntohll(dm->uid));
+  rc->iter (rc->iter_cls,
+           &dm->key,
+           ntohl(dm->size),
+           &dm[1],
+           ntohl(dm->type),
+           ntohl(dm->priority),
+           ntohl(dm->anonymity),
+           GNUNET_TIME_absolute_ntoh(dm->expiration),  
+           GNUNET_ntohll(dm->uid));
+  if (rc->get_next == GNUNET_YES)
+    GNUNET_CLIENT_receive (h->client,
+                          qe->response_proc,
+                          qe,
+                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
 }
 
 
 /**
- * Function called to trigger obtaining the next result
- * from the datastore.
- * 
+ * Get a random value from the datastore.
+ *
  * @param h handle to the datastore
- * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
- *        iteration (with a final call to "iter" with key/data == NULL).
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
+ * @param iter function to call on a random value; it
+ *        will be called once with a value (if available)
+ *        and always once with a value of NULL.
+ * @param iter_cls closure for iter
  */
-void 
-GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
-                          int more)
+void
+GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
+                            unsigned int queue_priority,
+                            unsigned int max_queue_size,
+                            struct GNUNET_TIME_Relative timeout,
+                             GNUNET_DATASTORE_Iterator iter, 
+                            void *iter_cls)
 {
-  GNUNET_DATASTORE_Iterator cont;
+  struct QueueEntry *qe;
+  struct GNUNET_MessageHeader *m;
+  struct ResultContext *rcont;
 
-  if (GNUNET_YES == more)
-    {
-      GNUNET_CLIENT_receive (h->client,
-                            &with_result_response_handler,
-                            h,
-                            GNUNET_TIME_absolute_get_remaining (h->timeout));
-      return;
-    }
-  cont = h->response_proc;
-  h->response_proc = NULL;
-  GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-  h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-  cont (h->response_proc_cls, 
-       NULL, 0, NULL, 0, 0, 0, 
-       GNUNET_TIME_UNIT_ZERO_ABS, 0);
-}
-
-
-/**
- * Helper function that will initiate the transmission of a message to
- * the datastore service.  The message must already be prepared and
- * stored in the buffer at the end of the handle.  The message must be
- * of a type that expects a "DataMessage" in response.
- *
- * @param h handle to the service with prepared message
- * @param cont function to call with result
- * @param cont_cls closure
- * @param timeout timeout for the operation
- */
-static void
-transmit_for_result (struct GNUNET_DATASTORE_Handle *h,
-                    GNUNET_DATASTORE_Iterator cont,
-                    void *cont_cls,
-                    struct GNUNET_TIME_Relative timeout)
-{
-  const struct GNUNET_MessageHeader *hdr;
-  uint16_t msize;
-
-  GNUNET_assert (cont != NULL);
-  hdr = (const struct GNUNET_MessageHeader*) &h[1];
-  msize = ntohs(hdr->size);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u byte message of type %u to datastore service\n",
-             msize,
-             ntohs(hdr->type));
+             "Asked to get random entry in %llu ms\n",
+             (unsigned long long) timeout.value);
 #endif
-  GNUNET_assert (h->response_proc == NULL);
-  h->response_proc = cont;
-  h->response_proc_cls = cont_cls;
-  h->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  h->message_size = msize;
-  if (GNUNET_OK !=
-      GNUNET_CLIENT_transmit_and_get_response (h->client,
-                                              hdr,
-                                              timeout,
-                                              GNUNET_YES,
-                                              &with_result_response_handler,
-                                              h))
-    {
-      GNUNET_break (0);
-      h->response_proc = NULL;
-      h->message_size = 0;
-      cont (h->response_proc_cls, 
-           NULL, 0, NULL, 0, 0, 0, 
-           GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    }
+  rcont = GNUNET_malloc (sizeof (struct ResultContext));
+  rcont->iter = iter;
+  rcont->iter_cls = iter_cls;
+  rcont->get_next = GNUNET_YES;
+  qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
+                        queue_priority, max_queue_size, timeout,
+                        NULL, NULL, &process_result_message, rcont);
+  if (qe == NULL)
+    return;
+  m = (struct GNUNET_MessageHeader*) &qe[1];
+  m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
+  m->size = htons(sizeof (struct GNUNET_MessageHeader));
+  process_queue (h);
 }
 
 
+
 /**
  * Iterate over the results for a particular key
- * in the datastore.
+ * in the datastore.  The iterator will only be called
+ * once initially; if the first call did contain a
+ * result, further results can be obtained by calling
+ * "GNUNET_DATASTORE_get_next" with the given argument.
  *
  * @param h handle to the datastore
  * @param key maybe NULL (to match all entries)
  * @param type desired type, 0 for any
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
  * @param iter function to call on each matching value;
  *        will be called once with a NULL value at the end
  * @param iter_cls closure for iter
- * @param timeout how long to wait at most for a response
  */
 void
 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
                       const GNUNET_HashCode * key,
-                      enum GNUNET_BLOCK_Type type,
-                      GNUNET_DATASTORE_Iterator iter, void *iter_cls,
-                     struct GNUNET_TIME_Relative timeout)
+                     enum GNUNET_BLOCK_Type type,
+                     unsigned int queue_priority,
+                     unsigned int max_queue_size,
+                     struct GNUNET_TIME_Relative timeout,
+                      GNUNET_DATASTORE_Iterator iter, 
+                     void *iter_cls)
 {
+  struct QueueEntry *qe;
   struct GetMessage *gm;
+  struct ResultContext *rcont;
 
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to look for data under key `%s'\n",
+             "Asked to look for data of type %u under key `%s'\n",
+             (unsigned int) type,
              GNUNET_h2s (key));
 #endif
-  gm = (struct GetMessage*) &h[1];
+  rcont = GNUNET_malloc (sizeof (struct ResultContext));
+  rcont->iter = iter;
+  rcont->iter_cls = iter_cls;
+  rcont->get_next = GNUNET_NO;
+  qe = make_queue_entry (h, sizeof(struct GetMessage),
+                        queue_priority, max_queue_size, timeout,
+                        NULL, NULL, &process_result_message, rcont);
+  if (qe == NULL)
+    return;
+  gm = (struct GetMessage*) &qe[1];
   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
   gm->type = htonl(type);
   if (key != NULL)
@@ -770,80 +1162,47 @@
     {
       gm->header.size = htons(sizeof (struct GetMessage) - 
sizeof(GNUNET_HashCode));
     }
-  GNUNET_assert (h->response_proc == NULL);
-  transmit_for_result (h, iter, iter_cls, timeout);
+  process_queue (h);
 }
 
 
 /**
- * Get a random value from the datastore.
- *
+ * Function called to trigger obtaining the next result
+ * from the datastore.
+ * 
  * @param h handle to the datastore
- * @param iter function to call on a random value; it
- *        will be called exactly once; if no values
- *        are available, the value will be NULL.
- * @param iter_cls closure for iter
- * @param timeout how long to wait at most for a response
+ * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
+ *        iteration (with a final call to "iter" with key/data == NULL).
  */
-void
-GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
-                             GNUNET_DATASTORE_Iterator iter, void *iter_cls,
-                            struct GNUNET_TIME_Relative timeout)
+void 
+GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
+                          int more)
 {
-  struct GNUNET_MessageHeader *m;
+  struct QueueEntry *qe = h->queue_head;
+  struct ResultContext *rc = qe->client_ctx;
 
-  m = (struct GNUNET_MessageHeader*) &h[1];
-  m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
-  m->size = htons(sizeof (struct GNUNET_MessageHeader));
-  GNUNET_assert (h->response_proc == NULL);
-  transmit_for_result (h, iter, iter_cls, timeout);
+  GNUNET_assert (NULL != qe);
+  GNUNET_assert (&process_result_message == qe->response_proc);
+  GNUNET_assert (rc->get_next == GNUNET_NO);
+  if (GNUNET_YES == more)
+    {     
+      GNUNET_CLIENT_receive (h->client,
+                            qe->response_proc,
+                            qe,
+                            GNUNET_TIME_absolute_get_remaining (qe->timeout));
+      return;
+    }
+  GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                              h->queue_tail,
+                              qe);
+  GNUNET_free (qe);
+  h->retry_time = GNUNET_TIME_UNIT_ZERO;
+  do_disconnect (h);
+  rc->iter (rc->iter_cls,
+           NULL, 0, NULL, 0, 0, 0, 
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);      
+  GNUNET_free (rc);
 }
 
 
-/**
- * Explicitly remove some content from the database.
- *
- * @param h handle to the datastore
- * @param key key for the value
- * @param size number of bytes in data
- * @param data content stored
- * @param cont continuation to call when done
- * @param cont_cls closure for cont
- * @param timeout how long to wait at most for a response
- */
-void
-GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
-                         const GNUNET_HashCode * key,
-                         uint32_t size, const void *data,
-                        GNUNET_DATASTORE_ContinuationWithStatus cont,
-                        void *cont_cls,
-                        struct GNUNET_TIME_Relative timeout)
-{
-  struct DataMessage *dm;
-  size_t msize;
-
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to remove %u bytes of data under key `%s'\n",
-             size,
-             GNUNET_h2s (key));
-#endif
-  msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  dm = (struct DataMessage*) &h[1];
-  dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
-  dm->header.size = htons(msize);
-  dm->rid = htonl(0);
-  dm->size = htonl(size);
-  dm->type = htonl(0);
-  dm->priority = htonl(0);
-  dm->anonymity = htonl(0);
-  dm->uid = GNUNET_htonll(0);
-  dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
-  dm->key = *key;
-  memcpy (&dm[1], data, size);
-  transmit_for_status (h, cont, cont_cls, timeout);
-}
-#endif
-
 /* end of datastore_api.c */

Modified: gnunet/src/datastore/perf_datastore_api.c
===================================================================
--- gnunet/src/datastore/perf_datastore_api.c   2010-05-12 14:17:29 UTC (rev 
11352)
+++ gnunet/src/datastore/perf_datastore_api.c   2010-05-12 14:22:36 UTC (rev 
11353)
@@ -186,9 +186,9 @@
                           &crc->key,
                           crc->esize,
                           crc->data,
+                          1, 1, TIMEOUT,
                           &remove_next,
-                          crc,
-                          TIMEOUT);
+                          crc);
 }
 
 
@@ -275,16 +275,16 @@
                            GNUNET_TIME_relative_to_absolute 
                            (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS,
                                                            
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
-                           TIMEOUT,
+                           1, 1, TIMEOUT,
                            &check_success, 
                            crc);
       break;
     case RP_CUT:
       /* trim down below MAX_SIZE again */
       GNUNET_DATASTORE_get_random (datastore, 
+                                  1, 1, TIMEOUT,
                                   &delete_value,
-                                  crc,
-                                  TIMEOUT);
+                                  crc);
       break;
     case RP_REPORT:
       printf (

Modified: gnunet/src/datastore/test_datastore_api.c
===================================================================
--- gnunet/src/datastore/test_datastore_api.c   2010-05-12 14:17:29 UTC (rev 
11352)
+++ gnunet/src/datastore/test_datastore_api.c   2010-05-12 14:22:36 UTC (rev 
11353)
@@ -355,7 +355,7 @@
                            get_priority (crc->i),
                            get_anonymity (crc->i),
                            get_expiration (crc->i),
-                           TIMEOUT,
+                           1, 1, TIMEOUT,
                            &check_success,
                            crc);
       crc->i++;
@@ -374,9 +374,9 @@
       GNUNET_DATASTORE_get (datastore, 
                            &crc->key,
                            get_type (crc->i),
+                           1, 1, TIMEOUT,
                            &check_value,
-                           crc,
-                           TIMEOUT);
+                           crc);
       break;
     case RP_DEL:
       crc->i--;
@@ -390,9 +390,9 @@
       GNUNET_DATASTORE_get (datastore, 
                            &crc->key,
                            get_type (crc->i),
+                           1, 1, TIMEOUT,
                            &delete_value,
-                           crc,
-                           TIMEOUT);
+                           crc);
       break;
     case RP_DO_DEL:
 #if VERBOSE
@@ -414,9 +414,9 @@
                               &crc->key,
                               crc->size,
                               crc->data,
+                              1, 1, TIMEOUT,
                               &check_success,
-                              crc,
-                              TIMEOUT);
+                              crc);
       break;   
     case RP_DELVALIDATE:
       crc->i--;
@@ -430,18 +430,18 @@
       GNUNET_DATASTORE_get (datastore, 
                            &crc->key,
                            get_type (crc->i),
+                           1, 1, TIMEOUT,
                            &check_nothing,
-                           crc,
-                           TIMEOUT);
+                           crc);
       break;
     case RP_RESERVE:
       crc->phase = RP_PUT_MULTIPLE;
       GNUNET_DATASTORE_reserve (datastore,
                                128*1024,
                                2,
+                               1, 1, TIMEOUT,
                                &get_reserved,
-                               crc,
-                               TIMEOUT);
+                               crc);
       break;
     case RP_PUT_MULTIPLE:
       crc->phase = RP_PUT_MULTIPLE_NEXT;
@@ -454,7 +454,7 @@
                            get_priority (42),
                            get_anonymity (42),
                            get_expiration (42),
-                           TIMEOUT,
+                           1, 1, TIMEOUT,
                            &check_success,
                            crc);
       break;
@@ -469,7 +469,7 @@
                            get_priority (43),
                            get_anonymity (43),
                            get_expiration (43),
-                           TIMEOUT,
+                           1, 1, TIMEOUT,
                            &check_success,
                            crc);
       break;
@@ -477,9 +477,9 @@
       GNUNET_DATASTORE_get (datastore,
                            &crc->key, 
                            get_type (42),
+                           1, 1, TIMEOUT,
                            &check_multiple,
-                           crc,
-                           TIMEOUT);
+                           crc);
       break;
     case RP_GET_MULTIPLE_NEXT:
     case RP_GET_MULTIPLE_DONE:
@@ -492,17 +492,17 @@
                               crc->uid,
                               100,
                               get_expiration (42),
+                              1, 1, TIMEOUT,
                               &check_success,
-                              crc,
-                              TIMEOUT);
+                              crc);
       break;
     case RP_UPDATE_VALIDATE:
       GNUNET_DATASTORE_get (datastore,
                            &crc->key, 
                            get_type (42),
+                           1, 1, TIMEOUT,
                            &check_update,
-                           crc,
-                           TIMEOUT);   
+                           crc);   
       break;
     case RP_UPDATE_DONE:
       GNUNET_assert (0);

Modified: gnunet/src/datastore/test_datastore_api_management.c
===================================================================
--- gnunet/src/datastore/test_datastore_api_management.c        2010-05-12 
14:17:29 UTC (rev 11352)
+++ gnunet/src/datastore/test_datastore_api_management.c        2010-05-12 
14:22:36 UTC (rev 11353)
@@ -235,7 +235,7 @@
                            get_priority (crc->i),
                            get_anonymity (crc->i),
                            get_expiration (crc->i),
-                           TIMEOUT,
+                           1, 1, TIMEOUT,
                            &check_success,
                            crc);
       crc->i++;
@@ -259,9 +259,9 @@
       GNUNET_DATASTORE_get (datastore, 
                            &crc->key,
                            get_type (crc->i),
+                           1, 1, TIMEOUT,
                            &check_value,
-                           crc,
-                           TIMEOUT);
+                           crc);
       break;
     case RP_GET_FAIL:
 #if VERBOSE
@@ -274,9 +274,9 @@
       GNUNET_DATASTORE_get (datastore, 
                            &crc->key,
                            get_type (crc->i),
+                           1, 1, TIMEOUT,
                            &check_nothing,
-                           crc,
-                           TIMEOUT);
+                           crc);
       break;
     case RP_DONE:
       GNUNET_assert (0 == crc->i);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]