gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r35327 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r35327 - gnunet/src/fs
Date: Sun, 1 Mar 2015 02:12:22 +0100

Author: grothoff
Date: 2015-03-01 02:12:22 +0100 (Sun, 01 Mar 2015)
New Revision: 35327

Modified:
   gnunet/src/fs/gnunet-service-fs_cp.c
Log:
count number of pending replies and refuse to process queries if queue gets too 
big

Modified: gnunet/src/fs/gnunet-service-fs_cp.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.c        2015-02-28 21:12:03 UTC (rev 
35326)
+++ gnunet/src/fs/gnunet-service-fs_cp.c        2015-03-01 01:12:22 UTC (rev 
35327)
@@ -86,7 +86,7 @@
   /**
    * Task called on timeout, or 0 for none.
    */
-  struct GNUNET_SCHEDULER_Task * timeout_task;
+  struct GNUNET_SCHEDULER_Task *timeout_task;
 
   /**
    * Function to call to get the actual message.
@@ -155,7 +155,7 @@
   /**
    * Task for the delay.
    */
-  struct GNUNET_SCHEDULER_Task * delay_task;
+  struct GNUNET_SCHEDULER_Task *delay_task;
 
   /**
    * Size of the message.
@@ -184,7 +184,7 @@
   /**
    * Task for asynchronous stopping of this request.
    */
-  struct GNUNET_SCHEDULER_Task * kill_task;
+  struct GNUNET_SCHEDULER_Task *kill_task;
 
 };
 
@@ -209,7 +209,7 @@
   /**
    * Task scheduled to revive migration to this peer.
    */
-  struct GNUNET_SCHEDULER_Task * mig_revive_task;
+  struct GNUNET_SCHEDULER_Task *mig_revive_task;
 
   /**
    * Messages (replies, queries, content migration) we would like to
@@ -248,7 +248,7 @@
   /**
    * Task scheduled if we need to retry bandwidth reservation later.
    */
-  struct GNUNET_SCHEDULER_Task * rc_delay_task;
+  struct GNUNET_SCHEDULER_Task *rc_delay_task;
 
   /**
    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
@@ -276,6 +276,11 @@
   unsigned int cth_in_progress;
 
   /**
+   * Number of entries in @e delayed_head DLL.
+   */
+  unsigned int delay_queue_size;
+
+  /**
    * Respect rating for this peer on disk.
    */
   uint32_t disk_respect;
@@ -298,8 +303,8 @@
   unsigned int last_request_times_off;
 
   /**
-   * GNUNET_YES if we did successfully reserve 32k bandwidth,
-   * GNUNET_NO if not.
+   * #GNUNET_YES if we did successfully reserve 32k bandwidth,
+   * #GNUNET_NO if not.
    */
   int did_reserve;
 
@@ -439,10 +444,13 @@
   GNUNET_assert (NULL == cp->cth);
   cp->cth_in_progress++;
   cp->cth =
-    GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
+    GNUNET_CORE_notify_transmit_ready (GSF_core,
+                                       GNUNET_YES,
                                        GNUNET_CORE_PRIO_BACKGROUND,
                                       GNUNET_TIME_absolute_get_remaining
-                                      (pth->timeout), &target, pth->size,
+                                      (pth->timeout),
+                                       &target,
+                                       pth->size,
                                       &peer_transmit_ready_cb, cp);
   GNUNET_assert (NULL != cp->cth);
   GNUNET_assert (0 < cp->cth_in_progress--);
@@ -458,7 +466,9 @@
  * @return number of bytes copied to @a buf
  */
 static size_t
-peer_transmit_ready_cb (void *cls, size_t size, void *buf)
+peer_transmit_ready_cb (void *cls,
+                        size_t size,
+                        void *buf)
 {
   struct GSF_ConnectedPeer *cp = cls;
   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
@@ -478,7 +488,9 @@
     GNUNET_SCHEDULER_cancel (pth->timeout_task);
     pth->timeout_task = NULL;
   }
-  GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+  GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+                               cp->pth_tail,
+                               pth);
   if (GNUNET_YES == pth->is_query)
   {
     cp->ppd.last_request_times[(cp->last_request_times_off++) %
@@ -511,7 +523,8 @@
  * @param tc scheduler context
  */
 static void
-retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+retry_reservation (void *cls,
+                   const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GSF_ConnectedPeer *cp = cls;
   struct GNUNET_PeerIdentity target;
@@ -519,7 +532,9 @@
   GNUNET_PEER_resolve (cp->ppd.pid, &target);
   cp->rc_delay_task = NULL;
   cp->rc =
-    GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
+    GNUNET_ATS_reserve_bandwidth (GSF_ats,
+                                  &target,
+                                  DBLOCK_SIZE,
                                  &ats_reserve_callback, cp);
 }
 
@@ -736,7 +751,9 @@
  * @return number of bytes copied to @a buf, can be 0 (without indicating an 
error)
  */
 static size_t
-copy_reply (void *cls, size_t buf_size, void *buf)
+copy_reply (void *cls,
+            size_t buf_size,
+            void *buf)
 {
   struct PutMessage *pm = cls;
   size_t size;
@@ -845,15 +862,23 @@
   struct GSF_DelayedHandle *dh = cls;
   struct GSF_ConnectedPeer *cp = dh->cp;
 
-  GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
+  GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+                               cp->delayed_tail,
+                               dh);
+  cp->delay_queue_size--;
   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
   {
     GNUNET_free (dh->pm);
     GNUNET_free (dh);
     return;
   }
-  (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
-                             dh->msize, &copy_reply, dh->pm);
+  (void) GSF_peer_transmit_ (cp,
+                             GNUNET_NO,
+                             UINT32_MAX,
+                             REPLY_TIMEOUT,
+                             dh->msize,
+                             &copy_reply,
+                             dh->pm);
   GNUNET_free (dh);
 }
 
@@ -967,8 +992,9 @@
   pm->type = htonl (type);
   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
   memcpy (&pm[1], data, data_len);
-  if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
-      (GNUNET_YES == GSF_enable_randomized_delays))
+  if ( (UINT32_MAX != reply_anonymity_level) &&
+       (0 != reply_anonymity_level) &&
+       (GNUNET_YES == GSF_enable_randomized_delays) )
   {
     struct GSF_DelayedHandle *dh;
 
@@ -976,15 +1002,24 @@
     dh->cp = cp;
     dh->pm = pm;
     dh->msize = msize;
-    GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
+    GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
+                                 cp->delayed_tail,
+                                 dh);
+    cp->delay_queue_size++;
     dh->delay_task =
         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
-                                      &transmit_delayed_now, dh);
+                                      &transmit_delayed_now,
+                                      dh);
   }
   else
   {
-    (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
-                               &copy_reply, pm);
+    (void) GSF_peer_transmit_ (cp,
+                               GNUNET_NO,
+                               UINT32_MAX,
+                               REPLY_TIMEOUT,
+                               msize,
+                               &copy_reply,
+                               pm);
   }
   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
     return;
@@ -1164,7 +1199,6 @@
   enum GNUNET_BLOCK_Type type;
   GNUNET_PEER_Id spid;
 
-  GNUNET_assert (other != NULL);
   msize = ntohs (message->size);
   if (msize < sizeof (struct GetMessage))
   {
@@ -1173,7 +1207,8 @@
   }
   GNUNET_STATISTICS_update (GSF_stats,
                             gettext_noop
-                            ("# GET requests received (from other peers)"), 1,
+                            ("# GET requests received (from other peers)"),
+                            1,
                             GNUNET_NO);
   gm = (const struct GetMessage *) message;
   type = ntohl (gm->type);
@@ -1219,25 +1254,36 @@
   {
     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to find RETURN-TO peer `%4s' in connection set. 
Dropping query.\n",
+                  "Failed to find RETURN-TO peer `%s' in connection set. 
Dropping query.\n",
                   GNUNET_i2s (&opt[bits - 1]));
 
     else
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to find peer `%4s' in connection set. Dropping 
query.\n",
+                  "Failed to find peer `%s' in connection set. Dropping 
query.\n",
                   GNUNET_i2s (other));
-#if INSANE_STATISTICS
     GNUNET_STATISTICS_update (GSF_stats,
                               gettext_noop
                               ("# requests dropped due to missing reverse 
route"),
-                              1, GNUNET_NO);
-#endif
+                              1,
+                              GNUNET_NO);
     return NULL;
   }
+  if (cp->ppd.pending_replies + cp->delay_queue_size > 128)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Peer `%s' has too many replies queued already. Dropping 
query.\n",
+                GNUNET_i2s (other));
+    GNUNET_STATISTICS_update (GSF_stats,
+                              gettext_noop ("# requests dropped due to full 
reply queue"),
+                              1,
+                              GNUNET_NO);
+    return NULL;
+  }
   /* note that we can really only check load here since otherwise
    * peers could find out that we are overloaded by not being
    * disconnected after sending us a malformed query... */
-  priority = bound_priority (ntohl (gm->priority), cps);
+  priority = bound_priority (ntohl (gm->priority),
+                             cps);
   if (priority < 0)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1246,7 +1292,7 @@
     return NULL;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received request for `%s' of type %u from peer `%4s' with flags 
%u\n",
+              "Received request for `%s' of type %u from peer `%s' with flags 
%u\n",
               GNUNET_h2s (&gm->query),
               (unsigned int) type,
               GNUNET_i2s (other),
@@ -1359,7 +1405,9 @@
               "Timeout trying to transmit to other peer\n");
   pth->timeout_task = NULL;
   cp = pth->cp;
-  GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+  GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+                               cp->pth_tail,
+                               pth);
   if (GNUNET_YES == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_queries--);
   else if (GNUNET_NO == pth->is_query)
@@ -1419,13 +1467,18 @@
     prev = pos;
     pos = pos->next;
   }
-  GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
+  GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
+                                     cp->pth_tail,
+                                     prev,
+                                     pth);
   if (GNUNET_YES == is_query)
     cp->ppd.pending_queries++;
   else if (GNUNET_NO == is_query)
     cp->ppd.pending_replies++;
-  pth->timeout_task =
-      GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
+  pth->timeout_task
+    = GNUNET_SCHEDULER_add_delayed (timeout,
+                                    &peer_transmit_timeout,
+                                    pth);
   schedule_transmission (pth);
   return pth;
 }
@@ -1447,7 +1500,9 @@
     pth->timeout_task = NULL;
   }
   cp = pth->cp;
-  GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+  GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+                               cp->pth_tail,
+                               pth);
   if (GNUNET_YES == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_queries--);
   else if (GNUNET_NO == pth->is_query)
@@ -1614,13 +1669,22 @@
       GNUNET_SCHEDULER_cancel (pth->timeout_task);
       pth->timeout_task = NULL;
     }
-    GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+    GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+                                 cp->pth_tail,
+                                 pth);
+    if (GNUNET_YES == pth->is_query)
+      GNUNET_assert (0 < cp->ppd.pending_queries--);
+    else if (GNUNET_NO == pth->is_query)
+      GNUNET_assert (0 < cp->ppd.pending_replies--);
     pth->gmc (pth->gmc_cls, 0, NULL);
     GNUNET_free (pth);
   }
   while (NULL != (dh = cp->delayed_head))
   {
-    GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
+    GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+                                 cp->delayed_tail,
+                                 dh);
+    cp->delay_queue_size--;
     GNUNET_SCHEDULER_cancel (dh->delay_task);
     GNUNET_free (dh->pm);
     GNUNET_free (dh);
@@ -1631,6 +1695,8 @@
     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
     cp->mig_revive_task = NULL;
   }
+  GNUNET_break (0 == cp->ppd.pending_queries);
+  GNUNET_break (0 == cp->ppd.pending_replies);
   GNUNET_free (cp);
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]