gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: handle communicator status,


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: handle communicator status, address a few FIXMEs
Date: Sun, 21 Apr 2019 12:13:43 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new d97224045 handle communicator status, address a few FIXMEs
d97224045 is described below

commit d97224045fe41e824406f771e24c46fb89514942
Author: Christian Grothoff <address@hidden>
AuthorDate: Sun Apr 21 12:13:33 2019 +0200

    handle communicator status, address a few FIXMEs
---
 src/transport/gnunet-service-tng.c | 197 ++++++++++++++++++++++++++-----------
 1 file changed, 139 insertions(+), 58 deletions(-)

diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 62e9c0d8e..697c43f0d 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -1097,6 +1097,10 @@ struct DistanceVector
  */
 struct Queue;
 
+/**
+ * Message awaiting transmission. See detailed comments below.
+ */
+struct PendingMessage;
 
 /**
  * Entry identifying transmission in one of our `struct
@@ -1125,6 +1129,11 @@ struct QueueEntry
    */
   struct Queue *queue;
 
+  /**
+   * Pending message this entry is for, or NULL for none.
+   */
+  struct PendingMessage *pm;
+
   /**
    * Message ID used for this message with the queue used for transmission.
    */
@@ -1584,6 +1593,16 @@ struct PendingMessage
    */
   struct Neighbour *target;
 
+  /**
+   * Set to non-NULL value if this message is currently being given to a
+   * communicator and we are awaiting that communicator's acknowledgement.
+   * Note that we must not retransmit a pending message while we're still
+   * in the process of giving it to a communicator. If a pending message
+   * is free'd while this entry is non-NULL, the @e qe reference to us
+   * should simply be set to NULL.
+   */
+  struct QueueEntry *qe;
+
   /**
    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
    */
@@ -2472,9 +2491,12 @@ transmit_on_queue (void *cls);
  * be called if the message queue is non-empty!
  *
  * @param queue the queue to do scheduling for
+ * @param inside_job set to #GNUNET_YES if called from
+ *            #transmit_on_queue() itself and NOT setting
+ *            the task means running immediately
  */
 static void
-schedule_transmit_on_queue (struct Queue *queue)
+schedule_transmit_on_queue (struct Queue *queue, int inside_job)
 {
   struct Neighbour *n = queue->neighbour;
   struct PendingMessage *pm = n->pending_msg_head;
@@ -2507,7 +2529,7 @@ schedule_transmit_on_queue (struct Queue *queue)
   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (
                                           pm->next_attempt),
                                         out_delay);
-  if (0 == out_delay.rel_value_us)
+  if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
     return; /* we should run immediately! */
   /* queue has changed since we were scheduled, reschedule again */
   queue->transmit_task =
@@ -2575,6 +2597,11 @@ free_queue (struct Queue *queue)
     GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
     queue->queue_length--;
     tc->details.communicator.total_queue_length--;
+    if (NULL != qe->pm)
+    {
+      GNUNET_assert (qe == qe->pm->qe);
+      qe->pm->qe = NULL;
+    }
     GNUNET_free (qe);
   }
   GNUNET_assert (0 == queue->queue_length);
@@ -2589,7 +2616,7 @@ free_queue (struct Queue *queue)
       GNUNET_NO);
     for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
          s = s->next_client)
-      schedule_transmit_on_queue (s);
+      schedule_transmit_on_queue (s, GNUNET_NO);
   }
   notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
@@ -2859,6 +2886,11 @@ free_pending_message (struct PendingMessage *pm)
                                 target->pending_msg_tail,
                                 pm);
   free_fragment_tree (pm);
+  if (NULL != pm->qe)
+  {
+    GNUNET_assert (pm == pm->qe->pm);
+    pm->qe->pm = NULL;
+  }
   GNUNET_free_non_null (pm->bpm);
   GNUNET_free (pm);
 }
@@ -3245,8 +3277,12 @@ queue_send_msg (struct Queue *queue,
     qe = GNUNET_new (struct QueueEntry);
     qe->mid = queue->mid_gen++;
     qe->queue = queue;
-    // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'!
-    // (also, note that pm may be NULL!)
+    if (NULL != pm)
+    {
+      qe->pm = pm;
+      GNUNET_assert (NULL == pm->qe);
+      pm->qe = qe;
+    }
     GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
     GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
     queue->queue_length++;
@@ -6037,6 +6073,60 @@ reliability_box_message (struct PendingMessage *pm)
 }
 
 
+/**
+ * Change the value of the `next_attempt` field of @a pm
+ * to @a next_attempt and re-order @a pm in the transmission
+ * list as required by the new timestmap.
+ *
+ * @param pm a pending message to update
+ * @param next_attempt timestamp to use
+ */
+static void
+update_pm_next_attempt (struct PendingMessage *pm,
+                        struct GNUNET_TIME_Absolute next_attempt)
+{
+  struct Neighbour *neighbour = pm->target;
+
+  pm->next_attempt = next_attempt;
+  if (NULL == pm->frag_parent)
+  {
+    struct PendingMessage *pos;
+
+    /* re-insert sort in neighbour list */
+    GNUNET_CONTAINER_MDLL_remove (neighbour,
+                                  neighbour->pending_msg_head,
+                                  neighbour->pending_msg_tail,
+                                  pm);
+    pos = neighbour->pending_msg_tail;
+    while ((NULL != pos) &&
+           (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
+      pos = pos->prev_neighbour;
+    GNUNET_CONTAINER_MDLL_insert_after (neighbour,
+                                        neighbour->pending_msg_head,
+                                        neighbour->pending_msg_tail,
+                                        pos,
+                                        pm);
+  }
+  else
+  {
+    /* re-insert sort in fragment list */
+    struct PendingMessage *fp = pm->frag_parent;
+    struct PendingMessage *pos;
+
+    GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, pm);
+    pos = fp->tail_frag;
+    while ((NULL != pos) &&
+           (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
+      pos = pos->prev_frag;
+    GNUNET_CONTAINER_MDLL_insert_after (frag,
+                                        fp->head_frag,
+                                        fp->tail_frag,
+                                        pos,
+                                        pm);
+  }
+}
+
+
 /**
  * We believe we are ready to transmit a message on a queue. Double-checks
  * with the queue's "tracker_out" and then gives the message to the
@@ -6060,7 +6150,13 @@ transmit_on_queue (void *cls)
     /* no message pending, nothing to do here! */
     return;
   }
-  schedule_transmit_on_queue (queue);
+  if (NULL != pm->qe)
+  {
+    /* message still pending with communciator!
+       LOGGING-FIXME: Use stats? logging? Should this not be rare? */
+    return;
+  }
+  schedule_transmit_on_queue (queue, GNUNET_YES);
   if (NULL != queue->transmit_task)
     return; /* do it later */
   overhead = 0;
@@ -6081,7 +6177,7 @@ transmit_on_queue (void *cls)
   if (NULL == s)
   {
     /* Fragmentation failed, try next message... */
-    schedule_transmit_on_queue (queue);
+    schedule_transmit_on_queue (queue, GNUNET_NO);
     return;
   }
   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
@@ -6089,7 +6185,7 @@ transmit_on_queue (void *cls)
   if (NULL == s)
   {
     /* Reliability boxing failed, try next message... */
-    schedule_transmit_on_queue (queue);
+    schedule_transmit_on_queue (queue, GNUNET_NO);
     return;
   }
 
@@ -6141,57 +6237,21 @@ transmit_on_queue (void *cls)
   }
   else
   {
-    /* message not finished, waiting for acknowledgement */
-    struct Neighbour *neighbour = pm->target;
-    /* Update time by which we might retransmit 's' based on queue
+    /* Message not finished, waiting for acknowledgement.
+       Update time by which we might retransmit 's' based on queue
        characteristics (i.e. RTT); it takes one RTT for the message to
        arrive and the ACK to come back in the best case; but the other
        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
        retransmitting.  Note that in the future this heuristic should
        likely be improved further (measure RTT stability, consider
        message urgency and size when delaying ACKs, etc.) */
-    s->next_attempt = GNUNET_TIME_relative_to_absolute (
-      GNUNET_TIME_relative_multiply (queue->rtt, 4));
-    if (s == pm)
-    {
-      struct PendingMessage *pos;
-
-      /* re-insert sort in neighbour list */
-      GNUNET_CONTAINER_MDLL_remove (neighbour,
-                                    neighbour->pending_msg_head,
-                                    neighbour->pending_msg_tail,
-                                    pm);
-      pos = neighbour->pending_msg_tail;
-      while ((NULL != pos) &&
-             (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
-        pos = pos->prev_neighbour;
-      GNUNET_CONTAINER_MDLL_insert_after (neighbour,
-                                          neighbour->pending_msg_head,
-                                          neighbour->pending_msg_tail,
-                                          pos,
-                                          pm);
-    }
-    else
-    {
-      /* re-insert sort in fragment list */
-      struct PendingMessage *fp = s->frag_parent;
-      struct PendingMessage *pos;
-
-      GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, s);
-      pos = fp->tail_frag;
-      while ((NULL != pos) &&
-             (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
-        pos = pos->prev_frag;
-      GNUNET_CONTAINER_MDLL_insert_after (frag,
-                                          fp->head_frag,
-                                          fp->tail_frag,
-                                          pos,
-                                          s);
-    }
+    update_pm_next_attempt (s,
+                            GNUNET_TIME_relative_to_absolute (
+                              GNUNET_TIME_relative_multiply (queue->rtt, 4)));
   }
 
   /* finally, re-schedule queue transmission task itself */
-  schedule_transmit_on_queue (queue);
+  schedule_transmit_on_queue (queue, GNUNET_NO);
 }
 
 
@@ -6216,7 +6276,7 @@ tracker_update_out_cb (void *cls)
   }
   GNUNET_SCHEDULER_cancel (queue->transmit_task);
   queue->transmit_task = NULL;
-  schedule_transmit_on_queue (queue);
+  schedule_transmit_on_queue (queue, GNUNET_NO);
 }
 
 
@@ -6309,6 +6369,7 @@ handle_send_message_ack (void *cls,
 {
   struct TransportClient *tc = cls;
   struct QueueEntry *qe;
+  struct PendingMessage *pm;
 
   if (CT_COMMUNICATOR != tc->type)
   {
@@ -6352,7 +6413,8 @@ handle_send_message_ack (void *cls,
   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 ==
       tc->details.communicator.total_queue_length)
   {
-    /* Communicator dropped below threshold, resume all queues */
+    /* Communicator dropped below threshold, resume all queues
+       incident with this client! */
     GNUNET_STATISTICS_update (
       GST_stats,
       "# Transmission throttled due to communicator queue limit",
@@ -6361,7 +6423,7 @@ handle_send_message_ack (void *cls,
     for (struct Queue *queue = tc->details.communicator.queue_head;
          NULL != queue;
          queue = queue->next_client)
-      schedule_transmit_on_queue (queue);
+      schedule_transmit_on_queue (queue, GNUNET_NO);
   }
   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
   {
@@ -6370,14 +6432,33 @@ handle_send_message_ack (void *cls,
                               "# Transmission throttled due to queue queue 
limit",
                               -1,
                               GNUNET_NO);
-    schedule_transmit_on_queue (qe->queue);
+    schedule_transmit_on_queue (qe->queue, GNUNET_NO);
   }
 
-  /* TODO: we also should react on the status! */
-  // FIXME: this probably requires queue->pm = s assignment!
-  // FIXME: react to communicator status about transmission request. We got:
-  sma->status; // OK success, SYSERR failure
+  if (NULL != (pm = qe->pm))
+  {
+    struct Neighbour *n;
 
+    GNUNET_assert (qe == pm->qe);
+    pm->qe = NULL;
+    /* If waiting for this communicator may have blocked transmission
+       of pm on other queues for this neighbour, force schedule
+       transmit on queue for queues of the neighbour */
+    n = pm->target;
+    if (n->pending_msg_head == pm)
+    {
+      for (struct Queue *queue = n->queue_head; NULL != queue;
+           queue = queue->next_neighbour)
+        schedule_transmit_on_queue (queue, GNUNET_NO);
+    }
+    if (GNUNET_OK != ntohl (sma->status))
+    {
+      GNUNET_log (
+        GNUNET_ERROR_TYPE_INFO,
+        "Queue failed in transmission, will try retransmission immediately\n");
+      update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
+    }
+  }
   GNUNET_free (qe);
 }
 

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]