gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: handle transmission timeout


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: handle transmission timeouts
Date: Mon, 21 Jan 2019 17:45:13 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 32b387070 handle transmission timeouts
32b387070 is described below

commit 32b38707097f8dc9f7f39c526f67414f24283eca
Author: Christian Grothoff <address@hidden>
AuthorDate: Mon Jan 21 17:45:07 2019 +0100

    handle transmission timeouts
---
 src/transport/gnunet-service-tng.c | 192 +++++++++++++++++++++++++++++++------
 1 file changed, 163 insertions(+), 29 deletions(-)

diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 3673958ec..e205fa3d7 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -439,6 +439,11 @@ struct Neighbour
    */
   struct GNUNET_ATS_Session *session_tail;
 
+  /**
+   * Task run to cleanup pending messages that have exceeded their timeout.
+   */  
+  struct GNUNET_SCHEDULER_Task *timeout_task;
+
   /**
    * Quota at which CORE is allowed to transmit to this peer
    * according to ATS.
@@ -451,6 +456,11 @@ struct Neighbour
    */
   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
 
+  /**
+   * What is the earliest timeout of any message in @e pending_msg_tail?
+   */ 
+  struct GNUNET_TIME_Absolute earliest_timeout;
+  
 };
 
 
@@ -489,11 +499,22 @@ struct PendingMessage
    */
   struct TransportClient *client;
 
+  /**
+   * At what time should we give up on the transmission (and no longer retry)?
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * What is the earliest time for us to retry transmission of this message?
+   */
+  struct GNUNET_TIME_Absolute next_attempt;
+  
   /**
    * Size of the original message.
    */
   uint32_t bytes_msg;
 
+  /* Followed by @e bytes_msg to transmit */
 };
 
 
@@ -592,7 +613,8 @@ struct TransportClient
     struct {
 
       /**
-       * Head of list of messages pending for this client.
+       * Head of list of messages pending for this client, sorted by 
+       * transmission time ("next_attempt" + possibly internal prioritization).
        */
       struct PendingMessage *pending_msg_head;
 
@@ -920,6 +942,8 @@ free_neighbour (struct Neighbour *neighbour)
                 GNUNET_CONTAINER_multipeermap_remove (neighbours,
                                                       &neighbour->pid,
                                                       neighbour));
+  if (NULL != neighbour->timeout_task)
+    GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
   GNUNET_free (neighbour);
 }
 
@@ -1272,6 +1296,50 @@ client_send_response (struct PendingMessage *pm,
 }
 
 
+/**
+ * Checks the message queue for a neighbour for messages that have timed
+ * out and purges them.
+ *
+ * @param cls a `struct Neighbour`
+ */
+static void
+check_queue_timeouts (void *cls)
+{
+  struct Neighbour *n = cls;
+  struct PendingMessage *pm;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_TIME_Absolute earliest_timeout;
+
+  n->timeout_task = NULL;
+  earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
+  now = GNUNET_TIME_absolute_get ();
+  for (struct PendingMessage *pos = n->pending_msg_head;
+       NULL != pos;
+       pos = pm)
+  {
+    pm = pos->next_neighbour;
+    if (pos->timeout.abs_value_us <= now.abs_value_us)
+    {
+      GNUNET_STATISTICS_update (GST_stats,
+                               "# messages dropped (timeout before 
confirmation)",
+                               1,
+                               GNUNET_NO);
+      client_send_response (pm,
+                           GNUNET_NO,
+                           0);      
+      continue;
+    }
+    earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
+                                                pos->timeout);
+  }
+  n->earliest_timeout = earliest_timeout;
+  if (NULL != n->pending_msg_head)
+    n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
+                                              &check_queue_timeouts,
+                                              n);
+}
+
+
 /**
  * Client asked for transmission to a peer.  Process the request.
  *
@@ -1316,10 +1384,14 @@ handle_client_send (void *cls,
                              GNUNET_NO);
     return;
   }
-  pm = GNUNET_new (struct PendingMessage);
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
   pm->client = tc;
   pm->target = target;
   pm->bytes_msg = bytes_msg;
+  pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh 
(obm->timeout));
+  memcpy (&pm[1],
+         &obm[1],
+         bytes_msg);
   GNUNET_CONTAINER_MDLL_insert (neighbour,
                                target->pending_msg_head,
                                target->pending_msg_tail,
@@ -1328,10 +1400,16 @@ handle_client_send (void *cls,
                                tc->details.core.pending_msg_head,
                                tc->details.core.pending_msg_tail,
                                pm);
-  // FIXME: do the work, final continuation with call to:
-  client_send_response (pm,
-                       GNUNET_NO,
-                       0);
+  if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
+  {
+    target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
+    if (NULL != target->timeout_task)
+      GNUNET_SCHEDULER_cancel (target->timeout_task);
+    target->timeout_task 
+      = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
+                                &check_queue_timeouts,
+                                target);
+  }
 }
 
 
@@ -1652,45 +1730,37 @@ tracker_update_in_cb (void *cls)
  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
  */ 
 static void
-transmit_on_queue (void *cls)
-{
-  struct GNUNET_ATS_Session *queue = cls;
-
-  queue->transmit_task = NULL;
-  // FIXME: check if transmission is really ready
-  // FIXME: do transmission (fragmentation, adding signalling / RTT tracking 
logic, etc.)
-  // FIXME: re-schedule self
-}
+transmit_on_queue (void *cls);
 
 
 /**
- * Bandwidth tracker informs us that the delay until we
- * can transmit again changed.
+ * Schedule next run of #transmit_on_queue().  Does NOTHING if 
+ * we should run immediately or if the message queue is empty.
+ * Test for no task being added AND queue not being empty to
+ * transmit immediately afterwards!  This function must only
+ * be called if the message queue is non-empty!
  *
- * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
- */
+ * @param queue the queue to do scheduling for
+ */ 
 static void
-tracker_update_out_cb (void *cls)
+schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
 {
-  struct GNUNET_ATS_Session *queue = cls;
   struct Neighbour *n = queue->neighbour;
   struct PendingMessage *pm = n->pending_msg_head;
   struct GNUNET_TIME_Relative out_delay;
   unsigned int wsize;
 
-  if (NULL == pm)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Bandwidth allocation updated for empty transmission queue 
`%s'\n",
-               queue->address);
-    return; /* no message pending, nothing to do here! */
-  }
+  GNUNET_assert (NULL != pm);
   wsize = (0 == queue->mtu)
     ? pm->bytes_msg /* FIXME: add overheads? */
     : queue->mtu;
   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
                                                  wsize);
-  GNUNET_SCHEDULER_cancel (queue->transmit_task);
+  out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining 
(pm->next_attempt),
+                                       out_delay);
+  if (0 == out_delay.rel_value_us)
+    return; /* we should run immediately! */
+  /* queue has changed since we were scheduled, reschedule again */
   queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
                                                       &transmit_on_queue,
                                                       queue);
@@ -1709,6 +1779,69 @@ tracker_update_out_cb (void *cls)
 }
 
 
+/**
+ * We believe we are ready to transmit a message on a queue. Double-checks
+ * with the queue's "tracker_out" and then gives the message to the 
+ * communicator for transmission (updating the tracker, and re-scheduling
+ * itself if applicable).  
+ *
+ * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
+ */ 
+static void
+transmit_on_queue (void *cls)
+{
+  struct GNUNET_ATS_Session *queue = cls;
+  struct Neighbour *n = queue->neighbour;
+  struct PendingMessage *pm;
+
+  queue->transmit_task = NULL;
+  if (NULL == (pm = n->pending_msg_head))
+  {
+    /* no message pending, nothing to do here! */
+    return; 
+  }
+  schedule_transmit_on_queue (queue);
+  if (NULL != queue->transmit_task)
+    return; /* do it later */
+
+  // FIXME: do transmission (fragmentation, adding signalling / RTT tracking 
logic, etc.)
+  // FIXME: upon success, do (not here in continuation!)
+  if (0)
+    {
+      client_send_response (pm,
+                           GNUNET_YES,
+                           0);
+    }
+  /* finally, re-schedule self */
+  schedule_transmit_on_queue (queue);
+}
+
+
+/**
+ * Bandwidth tracker informs us that the delay until we
+ * can transmit again changed.
+ *
+ * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
+ */
+static void
+tracker_update_out_cb (void *cls)
+{
+  struct GNUNET_ATS_Session *queue = cls;
+  struct Neighbour *n = queue->neighbour;
+
+  if (NULL == n->pending_msg_head)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Bandwidth allocation updated for empty transmission queue 
`%s'\n",
+               queue->address);
+    return; /* no message pending, nothing to do here! */
+  }
+  GNUNET_SCHEDULER_cancel (queue->transmit_task);
+  queue->transmit_task = NULL;
+  schedule_transmit_on_queue (queue);
+}
+
+
 /**
  * Bandwidth tracker informs us that excessive outbound bandwidth was
  * allocated which is not being used.
@@ -1768,6 +1901,7 @@ handle_add_queue_message (void *cls,
   if (NULL == neighbour)
   {
     neighbour = GNUNET_new (struct Neighbour);
+    neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
     neighbour->pid = aqm->receiver;
     GNUNET_assert (GNUNET_OK ==
                   GNUNET_CONTAINER_multipeermap_put (neighbours,

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]