gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r37644 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r37644 - gnunet/src/fs
Date: Sun, 31 Jul 2016 23:23:23 +0200

Author: grothoff
Date: 2016-07-31 23:23:23 +0200 (Sun, 31 Jul 2016)
New Revision: 37644

Modified:
   gnunet/src/fs/gnunet-service-fs.c
   gnunet/src/fs/gnunet-service-fs.h
   gnunet/src/fs/gnunet-service-fs_cp.c
   gnunet/src/fs/gnunet-service-fs_cp.h
   gnunet/src/fs/gnunet-service-fs_pe.c
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_pr.h
   gnunet/src/fs/gnunet-service-fs_push.c
Log:
converting FS to new MQ-based core API

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2016-07-31 15:42:37 UTC (rev 37643)
+++ gnunet/src/fs/gnunet-service-fs.c   2016-07-31 21:23:23 UTC (rev 37644)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2009-2014 GNUnet e.V.
+     Copyright (C) 2009-2014, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -177,7 +177,7 @@
 /**
  * Identity of this peer.
  */
-static struct GNUNET_PeerIdentity my_id;
+struct GNUNET_PeerIdentity GSF_my_id;
 
 
 /**
@@ -277,33 +277,26 @@
 
 
 /**
- * Handle P2P "PUT" message.
+ * Check P2P "PUT" message.
  *
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
+ * @param cls closure with the `struct GSF_ConnectedPeer`
  * @param message the actual message
  * @return #GNUNET_OK to keep the connection open,
  *         #GNUNET_SYSERR to close it (signal serious error)
  */
 static int
-handle_p2p_put (void *cls,
-                const struct GNUNET_PeerIdentity *other,
-                const struct GNUNET_MessageHeader *message)
+check_p2p_put (void *cls,
+              const struct PutMessage *put)
 {
-  struct GSF_ConnectedPeer *cp;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received P2P PUT from %s\n",
-              GNUNET_i2s (other));
-  cp = GSF_peer_get_ (other);
-  if (NULL == cp)
+  enum GNUNET_BLOCK_Type type;
+  
+  type = ntohl (put->type);
+  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
   {
-    GNUNET_break (0);
-    return GNUNET_OK;
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
-  GSF_cover_content_count++;
-  return GSF_handle_p2p_content_ (cp, message);
+  return GNUNET_OK;
 }
 
 
@@ -324,7 +317,8 @@
 {
   struct GSF_PendingRequest *pr = cls;
 
-  if (GNUNET_YES != GSF_pending_request_test_target_ (pr, peer))
+  if (GNUNET_YES !=
+      GSF_pending_request_test_target_ (pr, peer))
   {
 #if INSANE_STATISTICS
     GNUNET_STATISTICS_update (GSF_stats,
@@ -333,7 +327,8 @@
 #endif
     return;
   }
-  GSF_plan_add_ (cp, pr);
+  GSF_plan_add_ (cp,
+                pr);
 }
 
 
@@ -347,10 +342,10 @@
  * @param pr the pending request we were processing
  * @param result final datastore lookup result
  */
-static void
-consider_forwarding (void *cls,
-                     struct GSF_PendingRequest *pr,
-                     enum GNUNET_BLOCK_EvaluationResult result)
+void
+GSF_consider_forwarding (void *cls,
+                        struct GSF_PendingRequest *pr,
+                        enum GNUNET_BLOCK_EvaluationResult result)
 {
   if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
     return;                     /* we're done... */
@@ -363,31 +358,44 @@
 
 
 /**
- * Handle P2P "GET" request.
+ * Check P2P "GET" request.
  *
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
+ * @param cls closure
+ * @param gm the actual message
  * @return #GNUNET_OK to keep the connection open,
  *         #GNUNET_SYSERR to close it (signal serious error)
  */
 static int
-handle_p2p_get (void *cls,
-                const struct GNUNET_PeerIdentity *other,
-                const struct GNUNET_MessageHeader *message)
+check_p2p_get (void *cls,
+              const struct GetMessage *gm)
 {
-  struct GSF_PendingRequest *pr;
-
-  pr = GSF_handle_p2p_query_ (other,
-                              message);
-  if (NULL == pr)
-    return GNUNET_OK; /* exists, identical to existing request, or malformed */
-  GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
-  GSF_local_lookup_ (pr,
-                     &consider_forwarding,
-                     NULL);
-  return GNUNET_OK;
+  size_t msize;
+  unsigned int bm;
+  unsigned int bits;
+  size_t bfsize;
+  
+  msize = ntohs (gm->header.size);
+  bm = ntohl (gm->hash_bitmap);
+  bits = 0;
+  while (bm > 0)
+  {
+    if (1 == (bm & 1))
+      bits++;
+    bm >>= 1;
+  }
+  if (msize < sizeof (struct GetMessage) + bits * sizeof (struct 
GNUNET_PeerIdentity))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct 
GNUNET_PeerIdentity);
+  /* bfsize must be power of 2, check! */
+  if (0 != ((bfsize - 1) & bfsize))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }  
+  return GNUNET_OK; 
 }
 
 
@@ -416,7 +424,8 @@
   prd = GSF_pending_request_get_data_ (pr);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Finished database lookup for local request `%s' with result 
%d\n",
-              GNUNET_h2s (&prd->query), result);
+              GNUNET_h2s (&prd->query),
+             result);
   if (0 == prd->anonymity_level)
   {
     switch (prd->type)
@@ -439,7 +448,7 @@
       break;
     }
   }
-  consider_forwarding (NULL, pr, result);
+  GSF_consider_forwarding (NULL, pr, result);
 }
 
 
@@ -538,7 +547,7 @@
   GSF_cadet_stop_server ();
   if (NULL != GSF_core)
   {
-    GNUNET_CORE_disconnect (GSF_core);
+    GNUNET_CORE_disconnecT (GSF_core);
     GSF_core = NULL;
   }
   if (NULL != GSF_ats)
@@ -575,80 +584,7 @@
 
 
 /**
- * Function called for each pending request whenever a new
- * peer connects, giving us a chance to decide about submitting
- * the existing request to the new peer.
- *
- * @param cls the `struct GSF_ConnectedPeer` of the new peer
- * @param key query for the request
- * @param pr handle to the pending request
- * @return #GNUNET_YES to continue to iterate
- */
-static int
-consider_peer_for_forwarding (void *cls,
-                              const struct GNUNET_HashCode *key,
-                              struct GSF_PendingRequest *pr)
-{
-  struct GSF_ConnectedPeer *cp = cls;
-  struct GNUNET_PeerIdentity pid;
-
-  if (GNUNET_YES !=
-      GSF_pending_request_test_active_ (pr))
-    return GNUNET_YES; /* request is not actually active, skip! */
-  GSF_connected_peer_get_identity_ (cp, &pid);
-  if (GNUNET_YES !=
-      GSF_pending_request_test_target_ (pr, &pid))
-  {
-    GNUNET_STATISTICS_update (GSF_stats,
-                              gettext_noop ("# Loopback routes suppressed"),
-                              1,
-                              GNUNET_NO);
-    return GNUNET_YES;
-  }
-  GSF_plan_add_ (cp, pr);
-  return GNUNET_YES;
-}
-
-
-/**
- * Function called after the creation of a connected peer record is complete.
- *
- * @param cls closure (unused)
- * @param cp handle to the newly created connected peer record
- */
-static void
-connected_peer_cb (void *cls,
-                   struct GSF_ConnectedPeer *cp)
-{
-  if (NULL == cp)
-    return;
-  GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
-                                 cp);
-}
-
-
-/**
- * Method called whenever a given peer connects.
- *
- * @param cls closure, not used
- * @param peer peer identity this notification is about
- */
-static void
-peer_connect_handler (void *cls,
-                      const struct GNUNET_PeerIdentity *peer)
-{
-  if (0 ==
-      GNUNET_CRYPTO_cmp_peer_identity (&my_id,
-                                       peer))
-    return;
-  GSF_peer_connect_handler_ (peer,
-                             &connected_peer_cb,
-                             NULL);
-}
-
-
-/**
- * Function called after GNUNET_CORE_connect has succeeded
+ * Function called after GNUNET_CORE_connecT has succeeded
  * (or failed for good).  Note that the private key of the
  * peer is intentionally not exposed here; if you need it,
  * your process should try to read the private key file
@@ -661,7 +597,7 @@
 peer_init_handler (void *cls,
                    const struct GNUNET_PeerIdentity *my_identity)
 {
-  if (0 != GNUNET_CRYPTO_cmp_peer_identity (&my_id,
+  if (0 != GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id,
                                             my_identity))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -681,18 +617,23 @@
 main_init (struct GNUNET_SERVER_Handle *server,
            const struct GNUNET_CONFIGURATION_Handle *c)
 {
-  static const struct GNUNET_CORE_MessageHandler no_p2p_handlers[] = {
-    { NULL, 0, 0 }
+  GNUNET_MQ_hd_var_size (p2p_get,
+                        GNUNET_MESSAGE_TYPE_FS_GET,
+                        struct GetMessage);
+  GNUNET_MQ_hd_var_size (p2p_put,
+                        GNUNET_MESSAGE_TYPE_FS_PUT,
+                        struct PutMessage);
+  GNUNET_MQ_hd_fixed_size (p2p_migration_stop,
+                          GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
+                          struct MigrationStopMessage);
+  struct GNUNET_MQ_MessageHandler no_p2p_handlers[] = {
+    GNUNET_MQ_handler_end ()
   };
-  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = {
-    { &handle_p2p_get,
-      GNUNET_MESSAGE_TYPE_FS_GET, 0 },
-    { &handle_p2p_put,
-      GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
-    { &GSF_handle_p2p_migration_stop_,
-      GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
-      sizeof (struct MigrationStopMessage) },
-    { NULL, 0, 0 }
+  struct GNUNET_MQ_MessageHandler p2p_handlers[] = {
+    make_p2p_get_handler (NULL),
+    make_p2p_put_handler (NULL),
+    make_p2p_migration_stop_handler (NULL),
+    GNUNET_MQ_handler_end ()
   };
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
     { &GNUNET_FS_handle_index_start, NULL,
@@ -735,18 +676,17 @@
   GNUNET_free (keyfile);
   GNUNET_assert (NULL != pk);
   GNUNET_CRYPTO_eddsa_key_get_public (pk,
-                                      &my_id.public_key);
+                                      &GSF_my_id.public_key);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "I am peer %s\n",
-              GNUNET_i2s (&my_id));
+              GNUNET_i2s (&GSF_my_id));
   GSF_core
-    = GNUNET_CORE_connect (GSF_cfg, NULL,
+    = GNUNET_CORE_connecT (GSF_cfg,
+                          NULL,
                            &peer_init_handler,
-                           &peer_connect_handler,
-                           &GSF_peer_disconnect_handler_,
-                           NULL, GNUNET_NO,
-                           NULL, GNUNET_NO,
+                           &GSF_peer_connect_handler,
+                           &GSF_peer_disconnect_handler,
                           (GNUNET_YES == anon_p2p_off)
                           ? no_p2p_handlers
                           : p2p_handlers);
@@ -753,10 +693,12 @@
   if (NULL == GSF_core)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Failed to connect to `%s' service.\n"), "core");
+                _("Failed to connect to `%s' service.\n"),
+               "core");
     return GNUNET_SYSERR;
   }
-  GNUNET_SERVER_disconnect_notify (server, &GSF_client_disconnect_handler_,
+  GNUNET_SERVER_disconnect_notify (server,
+                                  &GSF_client_disconnect_handler_,
                                    NULL);
   GNUNET_SERVER_add_handlers (server, handlers);
   cover_age_task =

Modified: gnunet/src/fs/gnunet-service-fs.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs.h   2016-07-31 15:42:37 UTC (rev 37643)
+++ gnunet/src/fs/gnunet-service-fs.h   2016-07-31 21:23:23 UTC (rev 37644)
@@ -223,6 +223,10 @@
  */
 extern struct GNUNET_ATS_PerformanceHandle *GSF_ats;
 
+/**
+ * Identity of this peer.
+ */
+extern struct GNUNET_PeerIdentity GSF_my_id;
 
 /**
  * Typical priorities we're seeing from other peers right now.  Since
@@ -265,13 +269,29 @@
 
 
 /**
+ * Function to be called after we're done processing
+ * replies from the local lookup.  If the result status
+ * code indicates that there may be more replies, plan
+ * forwarding the request.
+ *
+ * @param cls closure (NULL)
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
+ */
+void
+GSF_consider_forwarding (void *cls,
+                        struct GSF_PendingRequest *pr,
+                        enum GNUNET_BLOCK_EvaluationResult result);
+
+
+/**
  * Test if the DATABASE (GET) load on this peer is too high
  * to even consider processing the query at
  * all.
  *
- * @return GNUNET_YES if the load is too high to do anything (load high)
- *         GNUNET_NO to process normally (load normal)
- *         GNUNET_SYSERR to process for free (load low)
+ * @return #GNUNET_YES if the load is too high to do anything (load high)
+ *         #GNUNET_NO to process normally (load normal)
+ *         #GNUNET_SYSERR to process for free (load low)
  */
 int
 GSF_test_get_load_too_high_ (uint32_t priority);

Modified: gnunet/src/fs/gnunet-service-fs_cp.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.c        2016-07-31 15:42:37 UTC (rev 
37643)
+++ gnunet/src/fs/gnunet-service-fs_cp.c        2016-07-31 21:23:23 UTC (rev 
37644)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2011 GNUnet e.V.
+     Copyright (C) 2011, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -78,36 +78,16 @@
   struct GNUNET_TIME_Absolute transmission_request_start_time;
 
   /**
-   * Timeout for this request.
+   * Envelope with the actual message.
    */
-  struct GNUNET_TIME_Absolute timeout;
+  struct GNUNET_MQ_Envelope *env;
 
   /**
-   * Task called on timeout, or 0 for none.
-   */
-  struct GNUNET_SCHEDULER_Task *timeout_task;
-
-  /**
-   * Function to call to get the actual message.
-   */
-  GSF_GetMessageCallback gmc;
-
-  /**
    * Peer this request targets.
    */
   struct GSF_ConnectedPeer *cp;
 
   /**
-   * Closure for @e gmc.
-   */
-  void *gmc_cls;
-
-  /**
-   * Size of the message to be transmitted.
-   */
-  size_t size;
-
-  /**
    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
    */
   int is_query;
@@ -147,9 +127,9 @@
   struct GSF_ConnectedPeer *cp;
 
   /**
-   * The PUT that was delayed.
+   * Envelope of the message that was delayed.
    */
-  struct PutMessage *pm;
+  struct GNUNET_MQ_Envelope *env;
 
   /**
    * Task for the delay.
@@ -235,11 +215,6 @@
   struct GSF_DelayedHandle *delayed_tail;
 
   /**
-   * Migration stop message in our queue, or NULL if we have none pending.
-   */
-  struct GSF_PeerTransmitHandle *migration_pth;
-
-  /**
    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
    */
   struct GNUNET_ATS_ReservationContext *rc;
@@ -256,9 +231,9 @@
 
   /**
    * Handle for an active request for transmission to this
-   * peer, or NULL (if core queue was full).
+   * peer.
    */
-  struct GNUNET_CORE_TransmitHandle *cth;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Increase in traffic preference still to be submitted
@@ -267,14 +242,6 @@
   uint64_t inc_preference;
 
   /**
-   * Set to 1 if we're currently in the process of calling
-   * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
-   * NULL, we should not call notify_transmit_ready for this
-   * handle right now).
-   */
-  unsigned int cth_in_progress;
-
-  /**
    * Number of entries in @e delayed_head DLL.
    */
   unsigned int delay_queue_size;
@@ -308,16 +275,6 @@
   int did_reserve;
 
   /**
-   * Function called when the creation of this record is complete.
-   */
-  GSF_ConnectedPeerCreationCallback creation_cb;
-
-  /**
-   * Closure for @e creation_cb
-   */
-  void *creation_cb_cls;
-
-  /**
    * Handle to the PEERSTORE iterate request for peer respect value
    */
   struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
@@ -377,15 +334,10 @@
 /**
  * Core is ready to transmit to a peer, get the message.
  *
- * @param cls the `struct GSF_PeerTransmitHandle` of the message
- * @param size number of bytes core is willing to take
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
+ * @param cp which peer to send a message to
  */
-static size_t
-peer_transmit_ready_cb (void *cls,
-                        size_t size,
-                        void *buf);
+static void
+peer_transmit (struct GSF_ConnectedPeer *cp);
 
 
 /**
@@ -418,8 +370,6 @@
   struct GNUNET_PeerIdentity target;
 
   cp = pth->cp;
-  if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
-    return;                     /* already done */
   GNUNET_assert (0 != cp->ppd.pid);
   GNUNET_PEER_resolve (cp->ppd.pid, &target);
 
@@ -449,18 +399,7 @@
                                            cp);
     return;
   }
-  GNUNET_assert (NULL == cp->cth);
-  cp->cth_in_progress++;
-  cp->cth =
-    GNUNET_CORE_notify_transmit_ready (GSF_core,
-                                       GNUNET_YES,
-                                       GNUNET_CORE_PRIO_BACKGROUND,
-                                      GNUNET_TIME_absolute_get_remaining 
(pth->timeout),
-                                       &target,
-                                       pth->size,
-                                      &peer_transmit_ready_cb, cp);
-  GNUNET_assert (NULL != cp->cth);
-  GNUNET_assert (0 < cp->cth_in_progress--);
+  peer_transmit (cp);
 }
 
 
@@ -467,34 +406,16 @@
 /**
  * Core is ready to transmit to a peer, get the message.
  *
- * @param cls the `struct GSF_PeerTransmitHandle` of the message
- * @param size number of bytes core is willing to take
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
+ * @param cp which peer to send a message to
  */
-static size_t
-peer_transmit_ready_cb (void *cls,
-                        size_t size,
-                        void *buf)
+static void
+peer_transmit (struct GSF_ConnectedPeer *cp)
 {
-  struct GSF_ConnectedPeer *cp = cls;
   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
   struct GSF_PeerTransmitHandle *pos;
-  size_t ret;
 
-  cp->cth = NULL;
   if (NULL == pth)
-    return 0;
-  if (pth->size > size)
-  {
-    schedule_transmission (pth);
-    return 0;
-  }
-  if (NULL != pth->timeout_task)
-  {
-    GNUNET_SCHEDULER_cancel (pth->timeout_task);
-    pth->timeout_task = NULL;
-  }
+    return;
   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
                                cp->pth_tail,
                                pth);
@@ -512,14 +433,14 @@
   GNUNET_LOAD_update (cp->ppd.transmission_delay,
                       GNUNET_TIME_absolute_get_duration
                       (pth->transmission_request_start_time).rel_value_us);
-  ret = pth->gmc (pth->gmc_cls, size, buf);
+  GNUNET_MQ_send (cp->mq,
+                 pth->env);
+  GNUNET_free (pth);
   if (NULL != (pos = cp->pth_head))
   {
     GNUNET_assert (pos != pth);
     schedule_transmission (pos);
   }
-  GNUNET_free (pth);
-  return ret;
 }
 
 
@@ -578,23 +499,10 @@
   }
   cp->did_reserve = GNUNET_YES;
   pth = cp->pth_head;
-  if ( (NULL != pth) &&
-       (NULL == cp->cth) &&
-       (0 == cp->cth_in_progress) )
+  if (NULL != pth)
   {
     /* reservation success, try transmission now! */
-    cp->cth_in_progress++;
-    cp->cth =
-        GNUNET_CORE_notify_transmit_ready (GSF_core,
-                                           GNUNET_YES,
-                                           GNUNET_CORE_PRIO_BACKGROUND,
-                                           GNUNET_TIME_absolute_get_remaining 
(pth->timeout),
-                                           peer,
-                                           pth->size,
-                                           &peer_transmit_ready_cb,
-                                           cp);
-    GNUNET_assert (NULL != cp->cth);
-    GNUNET_assert (0 < cp->cth_in_progress--);
+    peer_transmit (cp);
   }
 }
 
@@ -614,11 +522,13 @@
   struct GSF_ConnectedPeer *cp = cls;
 
   GNUNET_assert (NULL != cp->respect_iterate_req);
-  if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
-    cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
+  if ( (NULL != record) &&
+       (sizeof (cp->disk_respect) == record->value_size))
+  {
+    cp->disk_respect = *((uint32_t *)record->value);
+    cp->ppd.respect += *((uint32_t *)record->value);
+  }
   GSF_push_start_ (cp);
-  if (NULL != cp->creation_cb)
-    cp->creation_cb (cp->creation_cb_cls, cp);
   if (NULL != record)
     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
   cp->respect_iterate_req = NULL;
@@ -626,25 +536,68 @@
 
 
 /**
+ * Function called for each pending request whenever a new
+ * peer connects, giving us a chance to decide about submitting
+ * the existing request to the new peer.
+ *
+ * @param cls the `struct GSF_ConnectedPeer` of the new peer
+ * @param key query for the request
+ * @param pr handle to the pending request
+ * @return #GNUNET_YES to continue to iterate
+ */
+static int
+consider_peer_for_forwarding (void *cls,
+                              const struct GNUNET_HashCode *key,
+                              struct GSF_PendingRequest *pr)
+{
+  struct GSF_ConnectedPeer *cp = cls;
+  struct GNUNET_PeerIdentity pid;
+
+  if (GNUNET_YES !=
+      GSF_pending_request_test_active_ (pr))
+    return GNUNET_YES; /* request is not actually active, skip! */
+  GSF_connected_peer_get_identity_ (cp, &pid);
+  if (GNUNET_YES !=
+      GSF_pending_request_test_target_ (pr, &pid))
+  {
+    GNUNET_STATISTICS_update (GSF_stats,
+                              gettext_noop ("# Loopback routes suppressed"),
+                              1,
+                              GNUNET_NO);
+    return GNUNET_YES;
+  }
+  GSF_plan_add_ (cp, pr);
+  return GNUNET_YES;
+}
+
+
+/**
  * A peer connected to us.  Setup the connected peer
  * records.
  *
+ * @param cls NULL
  * @param peer identity of peer that connected
- * @param creation_cb callback function when the record is created.
- * @param creation_cb_cls closure for @creation_cb
+ * @param mq message queue for talking to @a peer
+ * @return our internal handle for the peer
  */
-void
-GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
-                           GSF_ConnectedPeerCreationCallback creation_cb,
-                           void *creation_cb_cls)
+void *
+GSF_peer_connect_handler (void *cls,
+                         const struct GNUNET_PeerIdentity *peer,
+                         struct GNUNET_MQ_Handle *mq)
 {
   struct GSF_ConnectedPeer *cp;
 
+  if (0 ==
+      GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id,
+                                       peer))
+    return NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Connected to peer %s\n",
               GNUNET_i2s (peer));
   cp = GNUNET_new (struct GSF_ConnectedPeer);
   cp->ppd.pid = GNUNET_PEER_intern (peer);
+  cp->ppd.peer = peer;
+  cp->mq = mq;
   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
   cp->rc =
       GNUNET_ATS_reserve_bandwidth (GSF_ats,
@@ -662,14 +615,17 @@
                          gettext_noop ("# peers connected"),
                          GNUNET_CONTAINER_multipeermap_size (cp_map),
                          GNUNET_NO);
-  cp->creation_cb = creation_cb;
-  cp->creation_cb_cls = creation_cb_cls;
-  cp->respect_iterate_req =
-      GNUNET_PEERSTORE_iterate (peerstore, "fs",
-                                peer, "respect",
+  cp->respect_iterate_req 
+    = GNUNET_PEERSTORE_iterate (peerstore,
+                               "fs",
+                                peer,
+                               "respect",
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &peer_respect_cb,
                                 cp);
+  GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
+                                 cp);
+  return cp;
 }
 
 
@@ -714,31 +670,18 @@
 
 
 /**
- * Handle P2P "MIGRATION_STOP" message.
+ * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message. 
  *
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param cls closure, the `struct GSF_ConnectedPeer`
+ * @param msm the actual message
  */
-int
-GSF_handle_p2p_migration_stop_ (void *cls,
-                                const struct GNUNET_PeerIdentity *other,
-                                const struct GNUNET_MessageHeader *message)
+void
+handle_p2p_migration_stop (void *cls,
+                          const struct MigrationStopMessage *msm)
 {
-  struct GSF_ConnectedPeer *cp;
-  const struct MigrationStopMessage *msm;
+  struct GSF_ConnectedPeer *cp = cls;
   struct GNUNET_TIME_Relative bt;
 
-  msm = (const struct MigrationStopMessage *) message;
-  cp = GSF_peer_get_ (other);
-  if (NULL == cp)
-  {
-    GNUNET_break (0);
-    return GNUNET_OK;
-  }
   GNUNET_STATISTICS_update (GSF_stats,
                             gettext_noop ("# migration stop messages 
received"),
                             1, GNUNET_NO);
@@ -745,7 +688,7 @@
   bt = GNUNET_TIME_relative_ntoh (msm->duration);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               _("Migration of content to peer `%s' blocked for %s\n"),
-              GNUNET_i2s (other),
+              GNUNET_i2s (cp->ppd.peer),
              GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
   if ( (NULL == cp->mig_revive_task) &&
@@ -756,50 +699,10 @@
         GNUNET_SCHEDULER_add_delayed (bt,
                                       &revive_migration, cp);
   }
-  return GNUNET_OK;
 }
 
 
 /**
- * Copy reply and free put message.
- *
- * @param cls the `struct PutMessage`
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message, NULL on error (peer disconnect)
- * @return number of bytes copied to @a buf, can be 0 (without indicating an 
error)
- */
-static size_t
-copy_reply (void *cls,
-            size_t buf_size,
-            void *buf)
-{
-  struct PutMessage *pm = cls;
-  size_t size;
-
-  if (NULL != buf)
-  {
-    GNUNET_assert (buf_size >= ntohs (pm->header.size));
-    size = ntohs (pm->header.size);
-    GNUNET_memcpy (buf, pm, size);
-    GNUNET_STATISTICS_update (GSF_stats,
-                              gettext_noop ("# replies transmitted to other 
peers"),
-                              1,
-                              GNUNET_NO);
-  }
-  else
-  {
-    size = 0;
-    GNUNET_STATISTICS_update (GSF_stats,
-                              gettext_noop ("# replies dropped"),
-                              1,
-                              GNUNET_NO);
-  }
-  GNUNET_free (pm);
-  return size;
-}
-
-
-/**
  * Free resources associated with the given peer request.
  *
  * @param peerreq request to free
@@ -886,13 +789,10 @@
                                cp->delayed_tail,
                                dh);
   cp->delay_queue_size--;
-  (void) GSF_peer_transmit_ (cp,
-                             GNUNET_NO,
-                             UINT32_MAX,
-                             REPLY_TIMEOUT,
-                             dh->msize,
-                             &copy_reply,
-                             dh->pm);
+  GSF_peer_transmit_ (cp,
+                     GNUNET_NO,
+                     UINT32_MAX,
+                     dh->env);
   GNUNET_free (dh);
 }
 
@@ -954,6 +854,7 @@
   struct PeerRequest *peerreq = cls;
   struct GSF_ConnectedPeer *cp = peerreq->cp;
   struct GSF_PendingRequestData *prd;
+  struct GNUNET_MQ_Envelope *env;
   struct PutMessage *pm;
   size_t msize;
 
@@ -1000,12 +901,14 @@
     GSF_cover_content_count -= (reply_anonymity_level - 1);
   }
 
-  pm = GNUNET_malloc (msize);
-  pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
-  pm->header.size = htons (msize);
+  env = GNUNET_MQ_msg_extra (pm,
+                            data_len,
+                            GNUNET_MESSAGE_TYPE_FS_PUT);
   pm->type = htonl (type);
   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
-  GNUNET_memcpy (&pm[1], data, data_len);
+  GNUNET_memcpy (&pm[1],
+                data,
+                data_len);
   if ( (UINT32_MAX != reply_anonymity_level) &&
        (0 != reply_anonymity_level) &&
        (GNUNET_YES == GSF_enable_randomized_delays) )
@@ -1014,7 +917,7 @@
 
     dh = GNUNET_new (struct GSF_DelayedHandle);
     dh->cp = cp;
-    dh->pm = pm;
+    dh->env = env;
     dh->msize = msize;
     GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
                                  cp->delayed_tail,
@@ -1027,13 +930,10 @@
   }
   else
   {
-    (void) GSF_peer_transmit_ (cp,
-                               GNUNET_NO,
-                               UINT32_MAX,
-                               REPLY_TIMEOUT,
-                               msize,
-                               &copy_reply,
-                               pm);
+    GSF_peer_transmit_ (cp,
+                       GNUNET_NO,
+                       UINT32_MAX,
+                       env);
   }
   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
     return;
@@ -1265,23 +1165,20 @@
  * process replies properly.  Does not initiate forwarding or
  * local database lookups.
  *
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return pending request handle, NULL on error
+ * @param cls the other peer involved (sender of the message)
+ * @param gm the GET message
  */
-struct GSF_PendingRequest *
-GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
-                       const struct GNUNET_MessageHeader *message)
+void
+handle_p2p_get (void *cls,
+               const struct GetMessage *gm)
 {
+  struct GSF_ConnectedPeer *cps = cls;
   struct PeerRequest *peerreq;
   struct GSF_PendingRequest *pr;
   struct GSF_ConnectedPeer *cp;
-  struct GSF_ConnectedPeer *cps;
   const struct GNUNET_PeerIdentity *target;
   enum GSF_PendingRequestOptions options;
   uint16_t msize;
-  const struct GetMessage *gm;
   unsigned int bits;
   const struct GNUNET_PeerIdentity *opt;
   uint32_t bm;
@@ -1291,18 +1188,7 @@
   GNUNET_PEER_Id spid;
   const struct GSF_PendingRequestData *prd;
 
-  msize = ntohs (message->size);
-  if (msize < sizeof (struct GetMessage))
-  {
-    GNUNET_break_op (0);
-    return NULL;
-  }
-  GNUNET_STATISTICS_update (GSF_stats,
-                            gettext_noop
-                            ("# GET requests received (from other peers)"),
-                            1,
-                            GNUNET_NO);
-  gm = (const struct GetMessage *) message;
+  msize = ntohs (gm->header.size);
   tec.type = ntohl (gm->type);
   bm = ntohl (gm->hash_bitmap);
   bits = 0;
@@ -1312,32 +1198,16 @@
       bits++;
     bm >>= 1;
   }
-  if (msize < sizeof (struct GetMessage) + bits * sizeof (struct 
GNUNET_PeerIdentity))
-  {
-    GNUNET_break_op (0);
-    return NULL;
-  }
   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct 
GNUNET_PeerIdentity);
-  /* bfsize must be power of 2, check! */
-  if (0 != ((bfsize - 1) & bfsize))
-  {
-    GNUNET_break_op (0);
-    return NULL;
-  }
+  GNUNET_STATISTICS_update (GSF_stats,
+                            gettext_noop
+                            ("# GET requests received (from other peers)"),
+                            1,
+                            GNUNET_NO);
   GSF_cover_query_count++;
   bm = ntohl (gm->hash_bitmap);
   bits = 0;
-  cps = GSF_peer_get_ (other);
-  if (NULL == cps)
-  {
-    /* peer must have just disconnected */
-    GNUNET_STATISTICS_update (GSF_stats,
-                              gettext_noop
-                              ("# requests dropped due to initiator not being 
connected"),
-                              1, GNUNET_NO);
-    return NULL;
-  }
   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
     cp = GSF_peer_get_ (&opt[bits++]);
   else
@@ -1352,24 +1222,24 @@
     else
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Failed to find peer `%s' in connection set. Dropping 
query.\n",
-                  GNUNET_i2s (other));
+                  GNUNET_i2s (cps->ppd.peer));
     GNUNET_STATISTICS_update (GSF_stats,
                               gettext_noop
                               ("# requests dropped due to missing reverse 
route"),
                               1,
                               GNUNET_NO);
-    return NULL;
+    return;
   }
   if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Peer `%s' has too many replies queued already. Dropping 
query.\n",
-                GNUNET_i2s (other));
+                GNUNET_i2s (cps->ppd.peer));
     GNUNET_STATISTICS_update (GSF_stats,
                               gettext_noop ("# requests dropped due to full 
reply queue"),
                               1,
                               GNUNET_NO);
-    return NULL;
+    return;
   }
   /* note that we can really only check load here since otherwise
    * peers could find out that we are overloaded by not being
@@ -1380,14 +1250,14 @@
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Dropping query from `%s', this peer is too busy.\n",
-                GNUNET_i2s (other));
-    return NULL;
+                GNUNET_i2s (cps->ppd.peer));
+    return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received request for `%s' of type %u from peer `%s' with flags 
%u\n",
               GNUNET_h2s (&gm->query),
               (unsigned int) tec.type,
-              GNUNET_i2s (other),
+              GNUNET_i2s (cps->ppd.peer),
               (unsigned int) bm);
   target =
       (0 !=
@@ -1403,7 +1273,7 @@
      * so at best indirect the query */
     tec.priority = 0;
     options |= GSF_PRO_FORWARD_ONLY;
-    spid = GNUNET_PEER_intern (other);
+    spid = GNUNET_PEER_intern (cps->ppd.peer);
     GNUNET_assert (0 != spid);
   }
   tec.ttl = bound_ttl (ntohl (gm->ttl),
@@ -1412,11 +1282,12 @@
   ttl_decrement =
       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                     TTL_DECREMENT);
-  if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0))
+  if ( (tec.ttl < 0) &&
+       (((int32_t) (tec.ttl - ttl_decrement)) > 0) )
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
-                GNUNET_i2s (other),
+                GNUNET_i2s (cps->ppd.peer),
                 tec.ttl,
                 ttl_decrement);
     GNUNET_STATISTICS_update (GSF_stats,
@@ -1424,7 +1295,7 @@
                               ("# requests dropped due TTL underflow"), 1,
                               GNUNET_NO);
     /* integer underflow => drop (should be very rare)! */
-    return NULL;
+    return;
   }
   tec.ttl -= ttl_decrement;
 
@@ -1435,7 +1306,7 @@
                                               &test_exist_cb,
                                               &tec);
   if (GNUNET_YES == tec.finished)
-    return NULL; /* merged into existing request, we're done */
+    return; /* merged into existing request, we're done */
 
   peerreq = GNUNET_new (struct PeerRequest);
   peerreq->cp = cp;
@@ -1452,7 +1323,7 @@
                                     (uint32_t) tec.priority,
                                     tec.ttl,
                                     spid,
-                                    GNUNET_PEER_intern (other),
+                                    GNUNET_PEER_intern (cps->ppd.peer),
                                     NULL, 0,        /* replies_seen */
                                     &handle_p2p_reply,
                                     peerreq);
@@ -1472,47 +1343,14 @@
                             gettext_noop ("# P2P searches active"),
                             1,
                             GNUNET_NO);
-  return pr;
+  GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
+  GSF_local_lookup_ (pr,
+                     &GSF_consider_forwarding,
+                     NULL);
 }
 
 
 /**
- * Function called if there has been a timeout trying to satisfy
- * a transmission request.
- *
- * @param cls the `struct GSF_PeerTransmitHandle` of the request
- */
-static void
-peer_transmit_timeout (void *cls)
-{
-  struct GSF_PeerTransmitHandle *pth = cls;
-  struct GSF_ConnectedPeer *cp;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Timeout trying to transmit to other peer\n");
-  pth->timeout_task = NULL;
-  cp = pth->cp;
-  GNUNET_CONTAINER_DLL_remove (cp->pth_head,
-                               cp->pth_tail,
-                               pth);
-  if (GNUNET_YES == pth->is_query)
-    GNUNET_assert (0 < cp->ppd.pending_queries--);
-  else if (GNUNET_NO == pth->is_query)
-    GNUNET_assert (0 < cp->ppd.pending_replies--);
-  GNUNET_LOAD_update (cp->ppd.transmission_delay,
-                      UINT64_MAX);
-  if (NULL != cp->cth)
-  {
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
-    cp->cth = NULL;
-  }
-  pth->gmc (pth->gmc_cls, 0, NULL);
-  GNUNET_assert (0 == cp->cth_in_progress);
-  GNUNET_free (pth);
-}
-
-
-/**
  * Transmit a message to the given peer as soon as possible.
  * If the peer disconnects before the transmission can happen,
  * the callback is invoked with a `NULL` @a buffer.
@@ -1520,19 +1358,15 @@
  * @param cp target peer
  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or 
neither (#GNUNET_SYSERR)
  * @param priority how important is this request?
- * @param timeout when does this request timeout (call gmc with error)
+ * @param timeout when does this request timeout 
  * @param size number of bytes we would like to send to the peer
- * @param gmc function to call to get the message
- * @param gmc_cls closure for @a gmc
- * @return handle to cancel request
+ * @param env message to send
  */
-struct GSF_PeerTransmitHandle *
+void
 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
                     int is_query,
                     uint32_t priority,
-                    struct GNUNET_TIME_Relative timeout,
-                    size_t size,
-                    GSF_GetMessageCallback gmc, void *gmc_cls)
+                    struct GNUNET_MQ_Envelope *env)
 {
   struct GSF_PeerTransmitHandle *pth;
   struct GSF_PeerTransmitHandle *pos;
@@ -1540,10 +1374,7 @@
 
   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
-  pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  pth->gmc = gmc;
-  pth->gmc_cls = gmc_cls;
-  pth->size = size;
+  pth->env = env;
   pth->is_query = is_query;
   pth->priority = priority;
   pth->cp = cp;
@@ -1563,43 +1394,11 @@
     cp->ppd.pending_queries++;
   else if (GNUNET_NO == is_query)
     cp->ppd.pending_replies++;
-  pth->timeout_task
-    = GNUNET_SCHEDULER_add_delayed (timeout,
-                                    &peer_transmit_timeout,
-                                    pth);
   schedule_transmission (pth);
-  return pth;
 }
 
 
 /**
- * Cancel an earlier request for transmission.
- *
- * @param pth request to cancel
- */
-void
-GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
-{
-  struct GSF_ConnectedPeer *cp;
-
-  if (NULL != pth->timeout_task)
-  {
-    GNUNET_SCHEDULER_cancel (pth->timeout_task);
-    pth->timeout_task = NULL;
-  }
-  cp = pth->cp;
-  GNUNET_CONTAINER_DLL_remove (cp->pth_head,
-                               cp->pth_tail,
-                               pth);
-  if (GNUNET_YES == pth->is_query)
-    GNUNET_assert (0 < cp->ppd.pending_queries--);
-  else if (GNUNET_NO == pth->is_query)
-    GNUNET_assert (0 < cp->ppd.pending_replies--);
-  GNUNET_free (pth);
-}
-
-
-/**
  * Report on receiving a reply; update the performance record of the given 
peer.
  *
  * @param cp responding peer (will be updated)
@@ -1683,7 +1482,9 @@
   GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
                           sizeof (cp->ppd.respect),
                           GNUNET_TIME_UNIT_FOREVER_ABS,
-                          GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
+                          GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                         NULL,
+                         NULL);
   return GNUNET_OK;
 }
 
@@ -1693,26 +1494,30 @@
  * record.
  *
  * @param cls unused
- * @param peer identity of peer that connected
+ * @param peer identity of peer that disconnected
+ * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
  */
 void
-GSF_peer_disconnect_handler_ (void *cls,
-                              const struct GNUNET_PeerIdentity *peer)
+GSF_peer_disconnect_handler (void *cls,
+                            const struct GNUNET_PeerIdentity *peer,
+                            void *internal_cls)
 {
-  struct GSF_ConnectedPeer *cp;
+  struct GSF_ConnectedPeer *cp = internal_cls;
   struct GSF_PeerTransmitHandle *pth;
   struct GSF_DelayedHandle *dh;
 
-  cp = GSF_peer_get_ (peer);
   if (NULL == cp)
-    return;                     /* must have been disconnect from core with
-                                 * 'peer' == my_id, ignore */
-  flush_respect (NULL, peer, cp);
+    return;  /* must have been disconnect from core with
+             * 'peer' == my_id, ignore */
+  flush_respect (NULL,
+                peer,
+                cp);
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
                                                        peer,
                                                       cp));
-  GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
+  GNUNET_STATISTICS_set (GSF_stats,
+                        gettext_noop ("# peers connected"),
                          GNUNET_CONTAINER_multipeermap_size (cp_map),
                          GNUNET_NO);
   if (NULL != cp->respect_iterate_req)
@@ -1720,11 +1525,6 @@
     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
     cp->respect_iterate_req = NULL;
   }
-  if (NULL != cp->migration_pth)
-  {
-    GSF_peer_transmit_cancel_ (cp->migration_pth);
-    cp->migration_pth = NULL;
-  }
   if (NULL != cp->rc)
   {
     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
@@ -1748,19 +1548,8 @@
           0,
           sizeof (cp->ppd.last_p2p_replies));
   GSF_push_stop_ (cp);
-  if (NULL != cp->cth)
-  {
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
-    cp->cth = NULL;
-  }
-  GNUNET_assert (0 == cp->cth_in_progress);
   while (NULL != (pth = cp->pth_head))
   {
-    if (pth->timeout_task != NULL)
-    {
-      GNUNET_SCHEDULER_cancel (pth->timeout_task);
-      pth->timeout_task = NULL;
-    }
     GNUNET_CONTAINER_DLL_remove (cp->pth_head,
                                  cp->pth_tail,
                                  pth);
@@ -1768,7 +1557,6 @@
       GNUNET_assert (0 < cp->ppd.pending_queries--);
     else if (GNUNET_NO == pth->is_query)
       GNUNET_assert (0 < cp->ppd.pending_replies--);
-    pth->gmc (pth->gmc_cls, 0, NULL);
     GNUNET_free (pth);
   }
   while (NULL != (dh = cp->delayed_head))
@@ -1776,9 +1564,9 @@
     GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
                                  cp->delayed_tail,
                                  dh);
+    GNUNET_MQ_discard (dh->env);
     cp->delay_queue_size--;
     GNUNET_SCHEDULER_cancel (dh->delay_task);
-    GNUNET_free (dh->pm);
     GNUNET_free (dh);
   }
   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
@@ -1883,40 +1671,6 @@
 
 
 /**
- * Assemble a migration stop message for transmission.
- *
- * @param cls the `struct GSF_ConnectedPeer` to use
- * @param size number of bytes we're allowed to write to @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-create_migration_stop_message (void *cls,
-                               size_t size,
-                               void *buf)
-{
-  struct GSF_ConnectedPeer *cp = cls;
-  struct MigrationStopMessage msm;
-
-  cp->migration_pth = NULL;
-  if (NULL == buf)
-    return 0;
-  GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
-  msm.header.size = htons (sizeof (struct MigrationStopMessage));
-  msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
-  msm.reserved = htonl (0);
-  msm.duration =
-      GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
-                                 (cp->last_migration_block));
-  GNUNET_memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
-  GNUNET_STATISTICS_update (GSF_stats,
-                            gettext_noop ("# migration stop messages sent"),
-                            1, GNUNET_NO);
-  return sizeof (struct MigrationStopMessage);
-}
-
-
-/**
  * Ask a peer to stop migrating data to us until the given point
  * in time.
  *
@@ -1927,6 +1681,9 @@
 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
                            struct GNUNET_TIME_Absolute block_time)
 {
+  struct GNUNET_MQ_Envelope *env;
+  struct MigrationStopMessage *msm;
+  
   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1939,13 +1696,20 @@
               GNUNET_STRINGS_relative_time_to_string 
(GNUNET_TIME_absolute_get_remaining (block_time),
                                                      GNUNET_YES));
   cp->last_migration_block = block_time;
-  if (NULL != cp->migration_pth)
-    GSF_peer_transmit_cancel_ (cp->migration_pth);
-  cp->migration_pth =
-      GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
-                          GNUNET_TIME_UNIT_FOREVER_REL,
-                          sizeof (struct MigrationStopMessage),
-                          &create_migration_stop_message, cp);
+  env = GNUNET_MQ_msg (msm,
+                      GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+  msm->reserved = htonl (0);
+  msm->duration
+    = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
+                                (cp->last_migration_block));
+  GNUNET_STATISTICS_update (GSF_stats,
+                            gettext_noop ("# migration stop messages sent"),
+                            1,
+                           GNUNET_NO);
+  GSF_peer_transmit_ (cp,
+                     GNUNET_SYSERR,
+                     UINT32_MAX,
+                     env);
 }
 
 
@@ -1998,24 +1762,6 @@
 
 
 /**
- * Iterator to free peer entries.
- *
- * @param cls closure, unused
- * @param key current key code
- * @param value value in the hash map (peer entry)
- * @return #GNUNET_YES (we should continue to iterate)
- */
-static int
-clean_peer (void *cls,
-           const struct GNUNET_PeerIdentity *key,
-           void *value)
-{
-  GSF_peer_disconnect_handler_ (NULL, key);
-  return GNUNET_YES;
-}
-
-
-/**
  * Shutdown peer management subsystem.
  */
 void
@@ -2024,9 +1770,6 @@
   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
                                          &flush_respect,
                                          NULL);
-  GNUNET_CONTAINER_multipeermap_iterate (cp_map,
-                                         &clean_peer,
-                                        NULL);
   GNUNET_SCHEDULER_cancel (fr_task);
   fr_task = NULL;
   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
@@ -2072,7 +1815,8 @@
 {
   if (NULL == cp_map)
     return;                     /* already cleaned up */
-  GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
+  GNUNET_CONTAINER_multipeermap_iterate (cp_map,
+                                        &clean_local_client,
                                          (void *) lc);
 }
 

Modified: gnunet/src/fs/gnunet-service-fs_cp.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.h        2016-07-31 15:42:37 UTC (rev 
37643)
+++ gnunet/src/fs/gnunet-service-fs_cp.h        2016-07-31 21:23:23 UTC (rev 
37644)
@@ -120,11 +120,16 @@
   double avg_priority;
 
   /**
-   * The peer's identity.
+   * The peer's identity (interned version).
    */
   GNUNET_PEER_Id pid;
 
   /**
+   * The peer's identity (pointer).
+   */
+  const struct GNUNET_PeerIdentity *peer;
+  
+  /**
    * Respect rating for this peer
    */
   uint32_t respect;
@@ -185,17 +190,6 @@
 
 
 /**
- * Function called after the creation of a connected peer record is complete.
- *
- * @param cls closure
- * @param cp handle to the newly created connected peer record
- */
-typedef void
-(*GSF_ConnectedPeerCreationCallback) (void *cls,
-                                      struct GSF_ConnectedPeer *cp);
-
-
-/**
  * Handle to cancel a transmission request.
  */
 struct GSF_PeerTransmitHandle;
@@ -205,14 +199,15 @@
  * A peer connected to us.  Setup the connected peer
  * records.
  *
+ * @param cls NULL
  * @param peer identity of peer that connected
- * @param creation_cb callback function when the record is created.
- * @param creation_cb_cls closure for @creation_cb
+ * @param mq queue for sending messages to @a peer
+ * @return internal handle for the peer
  */
-void
-GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
-                           GSF_ConnectedPeerCreationCallback creation_cb,
-                           void *creation_cb_cls);
+void *
+GSF_peer_connect_handler (void *cls,
+                         const struct GNUNET_PeerIdentity *peer,
+                         struct GNUNET_MQ_Handle *mq);
 
 
 /**
@@ -242,33 +237,18 @@
  * the callback is invoked with a 'NULL' buffer.
  *
  * @param cp target peer
- * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
+ * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO)
  * @param priority how important is this request?
- * @param timeout when does this request timeout (call gmc with error)
- * @param size number of bytes we would like to send to the peer
- * @param gmc function to call to get the message
- * @param gmc_cls closure for gmc
- * @return handle to cancel request
+ * @param env envelope of message to send
  */
-struct GSF_PeerTransmitHandle *
+void
 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
                     int is_query,
                     uint32_t priority,
-                    struct GNUNET_TIME_Relative timeout,
-                    size_t size, GSF_GetMessageCallback gmc,
-                    void *gmc_cls);
+                   struct GNUNET_MQ_Envelope *env);
 
 
 /**
- * Cancel an earlier request for transmission.
- *
- * @param pth request to cancel
- */
-void
-GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth);
-
-
-/**
  * Report on receiving a reply; update the performance record of the given 
peer.
  *
  * @param cp responding peer (will be updated)
@@ -307,35 +287,25 @@
 
 
 /**
- * Handle P2P "MIGRATION_STOP" message.
+ * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
  *
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param cls closure, the `struct GSF_ConnectedPeer`
+ * @param msm the actual message
  */
-int
-GSF_handle_p2p_migration_stop_ (void *cls,
-                                const struct GNUNET_PeerIdentity *other,
-                                const struct GNUNET_MessageHeader *message);
+void
+handle_p2p_migration_stop (void *cls,
+                          const struct MigrationStopMessage *message);
 
 
 /**
- * Handle P2P "QUERY" message.  Only responsible for creating the
- * request entry itself and setting up reply callback and cancellation
- * on peer disconnect.  Does NOT execute the actual request strategy
- * (planning) or local database operations.
+ * Handle P2P "QUERY" message.
  *
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return pending request handle, NULL on error
+ * @param cls the `struct GSF_ConnectedPeer` of the other sender
+ * @param gm the actual message
  */
-struct GSF_PendingRequest *
-GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
-                       const struct GNUNET_MessageHeader *message);
+void
+handle_p2p_get (void *cls,
+               const struct GetMessage *gm);
 
 
 /**
@@ -366,10 +336,12 @@
  *
  * @param cls unused
  * @param peer identity of peer that connected
+ * @param internal_cls our `struct GSF_ConnectedPeer` for @a peer
  */
 void
-GSF_peer_disconnect_handler_ (void *cls,
-                              const struct GNUNET_PeerIdentity *peer);
+GSF_peer_disconnect_handler (void *cls,
+                            const struct GNUNET_PeerIdentity *peer,
+                            void *internal_cls);
 
 
 /**

Modified: gnunet/src/fs/gnunet-service-fs_pe.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pe.c        2016-07-31 15:42:37 UTC (rev 
37643)
+++ gnunet/src/fs/gnunet-service-fs_pe.c        2016-07-31 21:23:23 UTC (rev 
37644)
@@ -189,11 +189,6 @@
   struct GNUNET_CONTAINER_MultiHashMap *plan_map;
 
   /**
-   * Current transmission request handle.
-   */
-  struct GSF_PeerTransmitHandle *pth;
-
-  /**
    * Peer for which this is the plan.
    */
   struct GSF_ConnectedPeer *cp;
@@ -202,6 +197,12 @@
    * Current task for executing the plan.
    */
   struct GNUNET_SCHEDULER_Task *task;
+
+  /**
+   * Current message under transmission for the plan.
+   */
+  struct GNUNET_MQ_Envelope *env;
+
 };
 
 
@@ -241,15 +242,6 @@
 
 
 /**
- * Figure out when and how to transmit to the given peer.
- *
- * @param cls the `struct GSF_ConnectedPeer` for transmission
- */
-static void
-schedule_peer_transmission (void *cls);
-
-
-/**
  * Insert the given request plan into the heap with the appropriate weight.
  *
  * @param pp associated peer's plan
@@ -329,21 +321,22 @@
   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Earliest (re)transmission for `%s' in %us\n",
-              GNUNET_h2s (&prd->query), rp->transmission_counter);
+              GNUNET_h2s (&prd->query),
+             rp->transmission_counter);
   GNUNET_assert (rp->hn == NULL);
   if (0 == GNUNET_TIME_absolute_get_remaining 
(rp->earliest_transmission).rel_value_us)
-    rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, 
rp->priority);
+    rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+                                          rp,
+                                          rp->priority);
   else
     rp->hn =
-        GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
+        GNUNET_CONTAINER_heap_insert (pp->delay_heap,
+                                     rp,
                                       rp->earliest_transmission.abs_value_us);
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
                                                                get_rp_key (rp),
                                                                rp));
-  if (NULL != pp->task)
-    GNUNET_SCHEDULER_cancel (pp->task);
-  pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
 #undef N
 }
 
@@ -383,75 +376,6 @@
 
 
 /**
- * Function called to get a message for transmission.
- *
- * @param cls closure
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message, NULL on error (peer disconnect)
- * @return number of bytes copied to @a buf, can be 0 (without indicating an 
error)
- */
-static size_t
-transmit_message_callback (void *cls,
-                           size_t buf_size,
-                           void *buf)
-{
-  struct PeerPlan *pp = cls;
-  struct GSF_RequestPlan *rp;
-  size_t msize;
-
-  pp->pth = NULL;
-  if (NULL == buf)
-  {
-    /* failed, try again... */
-    if (NULL != pp->task)
-      GNUNET_SCHEDULER_cancel (pp->task);
-
-    pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
-    GNUNET_STATISTICS_update (GSF_stats,
-                              gettext_noop
-                              ("# transmission failed (core has no 
bandwidth)"),
-                              1, GNUNET_NO);
-    return 0;
-  }
-  rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
-  if (NULL == rp)
-  {
-    if (NULL != pp->task)
-      GNUNET_SCHEDULER_cancel (pp->task);
-    pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
-    return 0;
-  }
-  msize = GSF_pending_request_get_message_ (get_latest (rp),
-                                            buf_size,
-                                            buf);
-  if (msize > buf_size)
-  {
-    if (NULL != pp->task)
-      GNUNET_SCHEDULER_cancel (pp->task);
-    /* buffer to small (message changed), try again */
-    pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
-    return 0;
-  }
-  /* remove from root, add again elsewhere... */
-  GNUNET_assert (rp ==
-                 GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
-  rp->hn = NULL;
-  rp->last_transmission = GNUNET_TIME_absolute_get ();
-  rp->transmission_counter++;
-  total_delay++;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Executing plan %p executed %u times, planning retransmission\n",
-              rp, rp->transmission_counter);
-  plan (pp, rp);
-  GNUNET_STATISTICS_update (GSF_stats,
-                            gettext_noop ("# query messages sent to other 
peers"),
-                            1,
-                            GNUNET_NO);
-  return msize;
-}
-
-
-/**
  * Figure out when and how to transmit to the given peer.
  *
  * @param cls the `struct PeerPlan`
@@ -461,15 +385,17 @@
 {
   struct PeerPlan *pp = cls;
   struct GSF_RequestPlan *rp;
-  size_t msize;
   struct GNUNET_TIME_Relative delay;
 
-  pp->task = NULL;
-  if (NULL != pp->pth)
+  if (NULL != pp->task)
   {
-    GSF_peer_transmit_cancel_ (pp->pth);
-    pp->pth = NULL;
+    pp->task = NULL;
   }
+  else
+  {
+    GNUNET_assert (NULL != pp->env);
+    pp->env = NULL;
+  }
   /* move ready requests to priority queue */
   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
          (0 == GNUNET_TIME_absolute_get_remaining
@@ -508,23 +434,40 @@
     return;
   }
 #if INSANE_STATISTICS
-  GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"),
-                            1, GNUNET_NO);
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# query plans executed"),
+                            1,
+                           GNUNET_NO);
 #endif
   /* process from priority heap */
-  rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
+  rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Executing query plan %p\n",
               rp);
   GNUNET_assert (NULL != rp);
-  msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
-  pp->pth =
-      GSF_peer_transmit_ (pp->cp, GNUNET_YES,
-                          rp->priority,
-                          GNUNET_TIME_UNIT_FOREVER_REL,
-                          msize,
-                          &transmit_message_callback, pp);
-  GNUNET_assert (NULL != pp->pth);
+  rp->hn = NULL;
+  rp->last_transmission = GNUNET_TIME_absolute_get ();
+  rp->transmission_counter++;
+  total_delay++;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Executing plan %p executed %u times, planning retransmission\n",
+              rp,
+             rp->transmission_counter);
+  GNUNET_assert (NULL == pp->env);
+  pp->env = GSF_pending_request_get_message_ (get_latest (rp));
+  GNUNET_MQ_notify_sent (pp->env,
+                        &schedule_peer_transmission,
+                        pp);
+  GSF_peer_transmit_ (pp->cp,
+                     GNUNET_YES,
+                     rp->priority,
+                     pp->env);
+  GNUNET_STATISTICS_update (GSF_stats,
+                            gettext_noop ("# query messages sent to other 
peers"),
+                            1,
+                            GNUNET_NO);
+  plan (pp,
+       rp);
 }
 
 
@@ -646,6 +589,8 @@
                                                       id,
                                                       pp,
                                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
+                                        pp);
   }
   mpc.merged = GNUNET_NO;
   mpc.pr = pr;
@@ -710,16 +655,16 @@
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (plans, id,
                                                        pp));
-  if (NULL != pp->pth)
-  {
-    GSF_peer_transmit_cancel_ (pp->pth);
-    pp->pth = NULL;
-  }
   if (NULL != pp->task)
   {
     GNUNET_SCHEDULER_cancel (pp->task);
     pp->task = NULL;
   }
+  if (NULL != pp->env)
+  {
+    GNUNET_MQ_send_cancel (pp->env);
+    pp->env = NULL;
+  }
   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
   {
     GNUNET_break (GNUNET_YES ==

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2016-07-31 15:42:37 UTC (rev 
37643)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2016-07-31 21:23:23 UTC (rev 
37644)
@@ -512,21 +512,17 @@
 
 /**
  * Generate the message corresponding to the given pending request for
- * transmission to other peers (or at least determine its size).
+ * transmission to other peers.
  *
  * @param pr request to generate the message for
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message (can be NULL)
- * @return number of bytes needed (if `>` @a buf_size) or used
+ * @return envelope with the request message
  */
-size_t
-GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
-                                  size_t buf_size, void *buf)
+struct GNUNET_MQ_Envelope *
+GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
 {
-  char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+  struct GNUNET_MQ_Envelope *env;
   struct GetMessage *gm;
   struct GNUNET_PeerIdentity *ext;
-  size_t msize;
   unsigned int k;
   uint32_t bm;
   uint32_t prio;
@@ -535,11 +531,10 @@
   int64_t ttl;
   int do_route;
 
-  if (buf_size > 0)
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Building request message for `%s' of type %d\n",
-                GNUNET_h2s (&pr->public_data.query),
-                pr->public_data.type);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Building request message for `%s' of type %d\n",
+             GNUNET_h2s (&pr->public_data.query),
+             pr->public_data.type);
   k = 0;
   bm = 0;
   do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
@@ -559,13 +554,9 @@
     k++;
   }
   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
-  msize = sizeof (struct GetMessage) + bf_size + k * sizeof (struct 
GNUNET_PeerIdentity);
-  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  if (buf_size < msize)
-    return msize;
-  gm = (struct GetMessage *) lbuf;
-  gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
-  gm->header.size = htons (msize);
+  env = GNUNET_MQ_msg_extra (gm,
+                            bf_size + k * sizeof (struct GNUNET_PeerIdentity),
+                            GNUNET_MESSAGE_TYPE_FS_GET);
   gm->type = htonl (pr->public_data.type);
   if (do_route)
     prio =
@@ -585,7 +576,7 @@
   gm->query = pr->public_data.query;
   ext = (struct GNUNET_PeerIdentity *) &gm[1];
   k = 0;
-  if (!do_route)
+  if (! do_route)
     GNUNET_PEER_resolve (pr->sender_pid,
                          &ext[k++]);
   if (NULL != pr->public_data.target)
@@ -595,8 +586,7 @@
                    GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
                                                               (char *) &ext[k],
                                                               bf_size));
-  GNUNET_memcpy (buf, gm, msize);
-  return msize;
+  return env;
 }
 
 
@@ -1699,18 +1689,14 @@
  * this content and possibly passes it on (to local clients or other
  * peers).  Does NOT perform migration (content caching at this peer).
  *
- * @param cp the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return #GNUNET_OK if the message was well-formed,
- *         #GNUNET_SYSERR if the message was malformed (close connection,
- *         do not cache under any circumstances)
+ * @param cls the other peer involved
+ * @param put the actual message
  */
-int
-GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
-                         const struct GNUNET_MessageHeader *message)
+void
+handle_p2p_put (void *cls,
+               const struct PutMessage *put)
 {
-  const struct PutMessage *put;
+  struct GSF_ConnectedPeer *cp = cls;
   uint16_t msize;
   size_t dsize;
   enum GNUNET_BLOCK_Type type;
@@ -1721,13 +1707,11 @@
   double putl;
   struct PutMigrationContext *pmc;
 
-  msize = ntohs (message->size);
-  if (msize < sizeof (struct PutMessage))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  put = (const struct PutMessage *) message;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received P2P PUT from %s\n",
+              GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer));
+  GSF_cover_content_count++;
+  msize = ntohs (put->header.size);
   dsize = msize - sizeof (struct PutMessage);
   type = ntohl (put->type);
   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
@@ -1734,8 +1718,6 @@
   /* do not allow migrated content to live longer than 1 year */
   expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute 
(GNUNET_TIME_UNIT_YEARS),
                                         expiration);
-  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
-    return GNUNET_SYSERR;
   if (GNUNET_OK !=
       GNUNET_BLOCK_get_key (GSF_block_ctx,
                             type,
@@ -1744,7 +1726,7 @@
                             &query))
   {
     GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    return;
   }
   GNUNET_STATISTICS_update (GSF_stats,
                             gettext_noop ("# GAP PUT messages received"),
@@ -1786,11 +1768,19 @@
     GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
                          &pmc->origin);
     if (NULL ==
-        GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type,
-                              prq.priority, 1 /* anonymity */ ,
+        GNUNET_DATASTORE_put (GSF_dsh,
+                             0,
+                             &query,
+                             dsize,
+                             &put[1],
+                             type,
+                              prq.priority,
+                             1 /* anonymity */ ,
                               0 /* replication */ ,
-                              expiration, 1 + prq.priority, 
MAX_DATASTORE_QUEUE,
-                              &put_migration_continuation, pmc))
+                              expiration, 1 + prq.priority,
+                             MAX_DATASTORE_QUEUE,
+                              &put_migration_continuation,
+                             pmc))
     {
       put_migration_continuation (pmc,
                                   GNUNET_SYSERR,
@@ -1802,7 +1792,8 @@
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Choosing not to keep content `%s' (%d/%d)\n",
-                GNUNET_h2s (&query), active_to_migration,
+                GNUNET_h2s (&query),
+               active_to_migration,
                 test_put_load_too_high (prq.priority));
   }
   putl = GNUNET_LOAD_get_load (datastore_put_load);
@@ -1826,9 +1817,9 @@
                putl,
                active_to_migration,
                (GNUNET_NO == prq.request_found));
-    GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute 
(block_time));
+    GSF_block_peer_migration_ (cp,
+                              GNUNET_TIME_relative_to_absolute (block_time));
   }
-  return GNUNET_OK;
 }
 
 

Modified: gnunet/src/fs/gnunet-service-fs_pr.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.h        2016-07-31 15:42:37 UTC (rev 
37643)
+++ gnunet/src/fs/gnunet-service-fs_pr.h        2016-07-31 21:23:23 UTC (rev 
37644)
@@ -40,34 +40,34 @@
    */
   GSF_PRO_DEFAULTS = 0,
 
-    /**
-     * Request must only be processed locally.
-     */
+  /**
+   * Request must only be processed locally.
+   */
   GSF_PRO_LOCAL_ONLY = 1,
 
-    /**
-     * Request must only be forwarded (no routing)
-     */
+  /**
+   * Request must only be forwarded (no routing)
+   */
   GSF_PRO_FORWARD_ONLY = 2,
 
-    /**
-     * Request persists indefinitely (no expiration).
-     */
+  /**
+   * Request persists indefinitely (no expiration).
+   */
   GSF_PRO_REQUEST_NEVER_EXPIRES = 4,
 
-    /**
-     * Request is allowed to refresh bloomfilter and change mingle value.
-     */
+  /**
+   * Request is allowed to refresh bloomfilter and change mingle value.
+   */
   GSF_PRO_BLOOMFILTER_FULL_REFRESH = 8,
 
-    /**
-     * Request priority is allowed to be exceeded.
-     */
+  /**
+   * Request priority is allowed to be exceeded.
+   */
   GSF_PRO_PRIORITY_UNLIMITED = 16,
 
-    /**
-     * Option mask for typical local requests.
-     */
+  /**
+   * Option mask for typical local requests.
+   */
   GSF_PRO_LOCAL_REQUEST =
       (GSF_PRO_BLOOMFILTER_FULL_REFRESH | GSF_PRO_PRIORITY_UNLIMITED | 
GSF_PRO_REQUEST_NEVER_EXPIRES)
 };
@@ -288,17 +288,13 @@
 
 /**
  * Generate the message corresponding to the given pending request for
- * transmission to other peers (or at least determine its size).
+ * transmission to other peers.
  *
  * @param pr request to generate the message for
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message (can be NULL)
- * @return number of bytes needed (if @a buf_size too small) or used
+ * @return envelope with the request message
  */
-size_t
-GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
-                                  size_t buf_size,
-                                  void *buf);
+struct GNUNET_MQ_Envelope *
+GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr);
 
 
 /**
@@ -344,16 +340,12 @@
  * this content and possibly passes it on (to local clients or other
  * peers).  Does NOT perform migration (content caching at this peer).
  *
- * @param cp the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return #GNUNET_OK if the message was well-formed,
- *         #GNUNET_SYSERR if the message was malformed (close connection,
- *         do not cache under any circumstances)
+ * @param cls the other peer involved (sender)
+ * @param put the actual message
  */
-int
-GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
-                         const struct GNUNET_MessageHeader *message);
+void
+handle_p2p_put (void *cls,
+               const struct PutMessage *put);
 
 
 /**

Modified: gnunet/src/fs/gnunet-service-fs_push.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_push.c      2016-07-31 15:42:37 UTC (rev 
37643)
+++ gnunet/src/fs/gnunet-service-fs_push.c      2016-07-31 21:23:23 UTC (rev 
37644)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2011 GNUnet e.V.
+     Copyright (C) 2011, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -102,8 +102,7 @@
 
 
 /**
- * Information about a peer waiting for
- * migratable data.
+ * Information about a peer waiting for migratable data.
  */
 struct MigrationReadyPeer
 {
@@ -123,15 +122,9 @@
   struct GSF_ConnectedPeer *peer;
 
   /**
-   * Handle for current transmission request,
-   * or NULL for none.
+   * Envelope of the currently pushed message.
    */
-  struct GSF_PeerTransmitHandle *th;
-
-  /**
-   * Message we are trying to push right now (or NULL)
-   */
-  struct PutMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
 };
 
 
@@ -163,7 +156,7 @@
 /**
  * ID of task that collects blocks for migration.
  */
-static struct GNUNET_SCHEDULER_Task * mig_task;
+static struct GNUNET_SCHEDULER_Task *mig_task;
 
 /**
  * What is the maximum frequency at which we are allowed to
@@ -195,8 +188,11 @@
 static void
 delete_migration_block (struct MigrationReadyBlock *mb)
 {
-  GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
-  GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
+  GNUNET_CONTAINER_DLL_remove (mig_head,
+                              mig_tail,
+                              mb);
+  GNUNET_PEER_decrement_rcs (mb->target_list,
+                            MIGRATION_LIST_SIZE);
   mig_size--;
   GNUNET_free (mb);
 }
@@ -204,52 +200,14 @@
 
 /**
  * Find content for migration to this peer.
+ *
+ * @param cls a `struct MigrationReadyPeer *`
  */
 static void
-find_content (struct MigrationReadyPeer *mrp);
+find_content (void *cls);
 
 
 /**
- * Transmit the message currently scheduled for transmission.
- *
- * @param cls the `struct MigrationReadyPeer`
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message, NULL on error (peer disconnect)
- * @return number of bytes copied to @a buf, can be 0 (without indicating an 
error)
- */
-static size_t
-transmit_message (void *cls,
-                  size_t buf_size,
-                  void *buf)
-{
-  struct MigrationReadyPeer *peer = cls;
-  struct PutMessage *msg;
-  uint16_t msize;
-
-  peer->th = NULL;
-  msg = peer->msg;
-  peer->msg = NULL;
-  if (NULL == buf)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Failed to migrate content to another peer (disconnect)\n");
-    GNUNET_free (msg);
-    return 0;
-  }
-  msize = ntohs (msg->header.size);
-  GNUNET_assert (msize <= buf_size);
-  GNUNET_memcpy (buf, msg, msize);
-  GNUNET_free (msg);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Pushing %u bytes to %s\n",
-              msize,
-              GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
-  find_content (peer);
-  return msize;
-}
-
-
-/**
  * Send the given block to the given peer.
  *
  * @param peer target peer
@@ -257,31 +215,30 @@
  * @return #GNUNET_YES if the block was deleted (!)
  */
 static int
-transmit_content (struct MigrationReadyPeer *peer,
+transmit_content (struct MigrationReadyPeer *mrp,
                   struct MigrationReadyBlock *block)
 {
-  size_t msize;
   struct PutMessage *msg;
   unsigned int i;
   struct GSF_PeerPerformanceData *ppd;
   int ret;
 
-  ppd = GSF_get_peer_performance_data_ (peer->peer);
-  GNUNET_assert (NULL == peer->th);
-  msize = sizeof (struct PutMessage) + block->size;
-  msg = GNUNET_malloc (msize);
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
-  msg->header.size = htons (msize);
+  ppd = GSF_get_peer_performance_data_ (mrp->peer);
+  mrp->env = GNUNET_MQ_msg_extra (msg,
+                                 block->size,
+                                 GNUNET_MESSAGE_TYPE_FS_PUT);
   msg->type = htonl (block->type);
   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
-  GNUNET_memcpy (&msg[1], &block[1], block->size);
-  peer->msg = msg;
+  GNUNET_memcpy (&msg[1],
+                &block[1],
+                block->size);
   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
   {
     if (block->target_list[i] == 0)
     {
       block->target_list[i] = ppd->pid;
-      GNUNET_PEER_change_rc (block->target_list[i], 1);
+      GNUNET_PEER_change_rc (block->target_list[i],
+                            1);
       break;
     }
   }
@@ -294,15 +251,13 @@
   {
     ret = GNUNET_NO;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Asking for transmission of %u bytes to %s for migration\n",
-              (unsigned int) msize,
-              GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
-  peer->th = GSF_peer_transmit_ (peer->peer,
-                                 GNUNET_NO, 0 /* priority */ ,
-                                 GNUNET_TIME_UNIT_FOREVER_REL,
-                                 msize,
-                                 &transmit_message, peer);
+  GNUNET_MQ_notify_sent (mrp->env,
+                        &find_content,
+                        mrp);
+  GSF_peer_transmit_ (mrp->peer,
+                     GNUNET_NO,
+                     0 /* priority */ ,
+                     mrp->env);
   return ret;
 }
 
@@ -330,12 +285,12 @@
  * Check if sending this block to this peer would
  * be a good idea.
  *
- * @param peer target peer
+ * @param mrp target peer
  * @param block the block
  * @return score (>= 0: feasible, negative: infeasible)
  */
 static long
-score_content (struct MigrationReadyPeer *peer,
+score_content (struct MigrationReadyPeer *mrp,
                struct MigrationReadyBlock *block)
 {
   unsigned int i;
@@ -344,14 +299,18 @@
   struct GNUNET_HashCode hc;
   uint32_t dist;
 
-  ppd = GSF_get_peer_performance_data_ (peer->peer);
+  ppd = GSF_get_peer_performance_data_ (mrp->peer);
   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
     if (block->target_list[i] == ppd->pid)
       return -1;
   GNUNET_assert (0 != ppd->pid);
-  GNUNET_PEER_resolve (ppd->pid, &id);
-  GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc);
-  dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc);
+  GNUNET_PEER_resolve (ppd->pid,
+                      &id);
+  GNUNET_CRYPTO_hash (&id,
+                     sizeof (struct GNUNET_PeerIdentity),
+                     &hc);
+  dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
+                                         &hc);
   /* closer distance, higher score: */
   return UINT32_MAX - dist;
 }
@@ -368,17 +327,18 @@
 /**
  * Find content for migration to this peer.
  *
- * @param mrp peer to find content for
+ * @param cls peer to find content for
  */
 static void
-find_content (struct MigrationReadyPeer *mrp)
+find_content (void *cls)
 {
+  struct MigrationReadyPeer *mrp = cls;
   struct MigrationReadyBlock *pos;
   long score;
   long best_score;
   struct MigrationReadyBlock *best;
 
-  GNUNET_assert (NULL == mrp->th);
+  mrp->env = NULL;
   best = NULL;
   best_score = -1;
   pos = mig_head;
@@ -423,7 +383,8 @@
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Preparing to push best content to peer\n");
-  transmit_content (mrp, best);
+  transmit_content (mrp,
+                   best);
 }
 
 
@@ -454,9 +415,12 @@
     return;
   if (mig_size >= MAX_MIGRATION_QUEUE)
     return;
-  delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
-  delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
-  delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
+  delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                        mig_size);
+  delay = GNUNET_TIME_relative_divide (delay,
+                                      MAX_MIGRATION_QUEUE);
+  delay = GNUNET_TIME_relative_max (delay,
+                                   min_migration_delay);
   if (GNUNET_NO == value_found)
   {
     /* wait at least 5s if the datastore is empty */
@@ -467,8 +431,9 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Scheduling gathering task (queue size: %u)\n",
               mig_size);
-  mig_task =
-      GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
+  mig_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                          &gather_migration_blocks,
+                                          NULL);
 }
 
 
@@ -549,14 +514,12 @@
   mig_size++;
   for (pos = peer_head; NULL != pos; pos = pos->next)
   {
-    if (NULL == pos->th)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Preparing to push best content to peer %s\n",
-                  GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
-      if (GNUNET_YES == transmit_content (pos, mb))
-        break;                  /* 'mb' was freed! */
-    }
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Preparing to push best content to peer %s\n",
+               GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
+    if (GNUNET_YES == transmit_content (pos,
+                                       mb))
+      break;                  /* 'mb' was freed! */
   }
   consider_gathering ();
 }
@@ -580,9 +543,11 @@
               "Asking datastore for content for replication (queue size: 
%u)\n",
               mig_size);
   value_found = GNUNET_NO;
-  mig_qe =
-    GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
-                                          &process_migration_content, NULL);
+  mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
+                                                0,
+                                                UINT_MAX,
+                                                &process_migration_content,
+                                                NULL);
   if (NULL == mig_qe)
     consider_gathering ();
 }
@@ -640,19 +605,11 @@
       break;
   if (NULL == pos)
     return;
+  if (NULL != pos->env)
+    GNUNET_MQ_send_cancel (pos->env);
   GNUNET_CONTAINER_DLL_remove (peer_head,
                                peer_tail,
                                pos);
-  if (NULL != pos->th)
-  {
-    GSF_peer_transmit_cancel_ (pos->th);
-    pos->th = NULL;
-  }
-  if (NULL != pos->msg)
-  {
-    GNUNET_free (pos->msg);
-    pos->msg = NULL;
-  }
   GNUNET_free (pos);
 }
 
@@ -664,16 +621,21 @@
 GSF_push_init_ ()
 {
   enabled =
-      GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
+    GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
+                                         "FS",
+                                         "CONTENT_PUSHING");
   if (GNUNET_YES != enabled)
     return;
 
   if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", 
"MIN_MIGRATION_DELAY",
+      GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
+                                          "fs",
+                                          "MIN_MIGRATION_DELAY",
                                            &min_migration_delay))
   {
     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
-                              "fs", "MIN_MIGRATION_DELAY",
+                              "fs",
+                              "MIN_MIGRATION_DELAY",
                               _("time required, content pushing disabled"));
     return;
   }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]