gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r16419 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r16419 - gnunet/src/transport
Date: Sun, 7 Aug 2011 08:04:52 +0200

Author: grothoff
Date: 2011-08-07 08:04:52 +0200 (Sun, 07 Aug 2011)
New Revision: 16419

Modified:
   gnunet/src/transport/gnunet-service-transport_neighbours.c
Log:
towards neighbour management

Modified: gnunet/src/transport/gnunet-service-transport_neighbours.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport_neighbours.c  2011-08-06 
20:43:50 UTC (rev 16418)
+++ gnunet/src/transport/gnunet-service-transport_neighbours.c  2011-08-07 
06:04:52 UTC (rev 16419)
@@ -26,7 +26,15 @@
 #include "platform.h"
 #include "gnunet-service-transport_neighbours.h"
 #include "gnunet-service-transport.h"
+#include "gnunet_constants.h"
 
+
+/**
+ * Size of the neighbour hash map.
+ */
+#define NEIGHBOUR_TABLE_SIZE 256
+
+
 // TODO:
 // - have a way to access the currently 'connected' session
 //   (for sending and to notice disconnect of it!)
@@ -34,8 +42,500 @@
 //   (for CostReport/TrafficReport callbacks)
 
 
+struct NeighbourMapEntry;
 
 /**
+ * For each neighbour we keep a list of messages
+ * that we still want to transmit to the neighbour.
+ */
+struct MessageQueue
+{
+
+  /**
+   * This is a doubly linked list.
+   */
+  struct MessageQueue *next;
+
+  /**
+   * This is a doubly linked list.
+   */
+  struct MessageQueue *prev;
+
+  /**
+   * The message(s) we want to transmit, GNUNET_MessageHeader(s)
+   * stuck together in memory.  Allocated at the end of this struct.
+   */
+  const char *message_buf;
+
+  /**
+   * Size of the message buf
+   */
+  size_t message_buf_size;
+
+  /**
+   * Client responsible for queueing the message; used to check that a
+   * client has no two messages pending for the same target and to
+   * notify the client of a successful transmission; NULL if this is
+   * an internal message.
+   */
+  struct TransportClient *client;
+
+  /**
+   * At what time should we fail?
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * Internal message of the transport system that should not be
+   * included in the usual SEND-SEND_OK transmission confirmation
+   * traffic management scheme.  Typically, "internal_msg" will
+   * be set whenever "client" is NULL (but it is not strictly
+   * required).
+   */
+  int internal_msg;
+
+  /**
+   * How important is the message?
+   */
+  unsigned int priority;
+
+};
+
+
+
+/**
+ * Entry in neighbours. 
+ */
+struct NeighbourMapEntry
+{
+
+  /**
+   * Head of list of messages we would like to send to this peer;
+   * must contain at most one message per client.
+   */
+  struct MessageQueue *messages_head;
+
+  /**
+   * Tail of list of messages we would like to send to this peer; must
+   * contain at most one message per client.
+   */
+  struct MessageQueue *messages_tail;
+
+  /**
+   * Context for peerinfo iteration.
+   * NULL after we are done processing peerinfo's information.
+   */
+  struct GNUNET_PEERINFO_IteratorContext *piter;
+
+  /**
+   * Performance data for the peer.
+   */
+  struct GNUNET_TRANSPORT_ATS_Information *ats;
+
+  /**
+   * Public key for this peer.  Valid only if the respective flag is set below.
+   */
+  struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
+
+  /**
+   * Identity of this neighbour.
+   */
+  struct GNUNET_PeerIdentity id;
+
+  /**
+   * ID of task scheduled to run when this peer is about to
+   * time out (will free resources associated with the peer).
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  /**
+   * ID of task scheduled to run when we should retry transmitting
+   * the head of the message queue.  Actually triggered when the
+   * transmission is timing out (we trigger instantly when we have
+   * a chance of success).
+   */
+  GNUNET_SCHEDULER_TaskIdentifier retry_task;
+
+  /**
+   * How long until we should consider this peer dead (if we don't
+   * receive another message in the meantime)?
+   */
+  struct GNUNET_TIME_Absolute peer_timeout;
+
+  /**
+   * Tracker for inbound bandwidth.
+   */
+  struct GNUNET_BANDWIDTH_Tracker in_tracker;
+
+  /**
+   * The latency we have seen for this particular address for
+   * this particular peer.  This latency may have been calculated
+   * over multiple transports.  This value reflects how long it took
+   * us to receive a response when SENDING via this particular
+   * transport/neighbour/address combination!
+   *
+   * FIXME: we need to periodically send PINGs to update this
+   * latency (at least more often than the current "huge" (11h?)
+   * update interval).
+   */
+  struct GNUNET_TIME_Relative latency;
+
+  /**
+   * How often has the other peer (recently) violated the inbound
+   * traffic limit?  Incremented by 10 per violation, decremented by 1
+   * per non-violation (for each time interval).
+   */
+  unsigned int quota_violation_count;
+
+  /**
+   * DV distance to this peer (1 if no DV is used).
+   */
+  uint32_t distance;
+
+  /**
+   * Have we seen an PONG from this neighbour in the past (and
+   * not had a disconnect since)?
+   */
+  int received_pong;
+
+  /**
+   * Do we have a valid public key for this neighbour?
+   */
+  int public_key_valid;
+
+  /**
+   * Are we already in the process of disconnecting this neighbour?
+   */
+  int in_disconnect;
+
+};
+
+
+/**
+ * All known neighbours and their HELLOs.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
+
+/**
+ * Closure for connect_notify_cb and disconnect_notify_cb
+ */
+static void *callback_cls;
+
+/**
+ * Function to call when we connected to a neighbour.
+ */
+static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
+
+/**
+ * Function to call when we disconnected from a neighbour.
+ */
+static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
+
+
+#if 0
+/**
+ * Check the ready list for the given neighbour and if a plugin is
+ * ready for transmission (and if we have a message), do so!
+ *
+ * @param neighbour target peer for which to transmit
+ */
+static void
+try_transmission_to_peer (struct NeighbourMapEntry *n)
+{
+  struct ReadyList *rl;
+  struct MessageQueue *mq;
+  struct GNUNET_TIME_Relative timeout;
+  ssize_t ret;
+  int force_address;
+
+  if (n->messages_head == NULL)
+    {
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Transmission queue for `%4s' is empty\n",
+                 GNUNET_i2s (&n->id));
+#endif
+      return;                     /* nothing to do */
+    }
+  rl = NULL;
+  mq = n->messages_head;
+  force_address = GNUNET_YES;
+  if (mq->specific_address == NULL)
+    {
+      /* TODO: ADD ATS */
+      mq->specific_address = get_preferred_ats_address(n);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# transport selected peer 
address freely"),
+                               1,
+                               GNUNET_NO);
+      force_address = GNUNET_NO;
+    }
+  if (mq->specific_address == NULL)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# transport failed to selected 
peer address"),
+                               1,
+                               GNUNET_NO);
+      timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+      if (timeout.rel_value == 0)
+       {
+#if DEBUG_TRANSPORT
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "No destination address available to transmit message of 
size %u to peer `%4s'\n",
+                     mq->message_buf_size,
+                     GNUNET_i2s (&mq->neighbour_id));
+#endif
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# bytes in message queue for 
other peers"),
+                                   - (int64_t) mq->message_buf_size,
+                                   GNUNET_NO);
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# bytes discarded (no 
destination address available)"),
+                                   mq->message_buf_size,
+                                   GNUNET_NO);
+         if (mq->client != NULL)
+           transmit_send_ok (mq->client, n, &n->id, GNUNET_NO);
+         GNUNET_CONTAINER_DLL_remove (n->messages_head,
+                                      n->messages_tail,
+                                      mq);
+         GNUNET_free (mq);
+         return;               /* nobody ready */
+       }
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# message delivery deferred (no 
address)"),
+                               1,
+                               GNUNET_NO);
+      if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (n->retry_task);
+      n->retry_task = GNUNET_SCHEDULER_add_delayed (timeout,
+                                                   &retry_transmission_task,
+                                                   n);
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "No validated destination address available to transmit 
message of size %u to peer `%4s', will wait %llums to find an address.\n",
+                 mq->message_buf_size,
+                 GNUNET_i2s (&mq->neighbour_id),
+                 timeout.rel_value);
+#endif
+      /* FIXME: might want to trigger peerinfo lookup here
+        (unless that's already pending...) */
+      return;
+    }
+  GNUNET_CONTAINER_DLL_remove (n->messages_head,
+                              n->messages_tail,
+                              mq);
+  if (mq->specific_address->connected == GNUNET_NO)
+    mq->specific_address->connect_attempts++;
+  rl = mq->specific_address->ready_list;
+  mq->plugin = rl->plugin;
+  if (!mq->internal_msg)
+    mq->specific_address->in_transmit = GNUNET_YES;
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n",
+              mq->message_buf_size,
+              GNUNET_i2s (&n->id),
+             (mq->specific_address->addr != NULL)
+             ? a2s (mq->plugin->short_name,
+                    mq->specific_address->addr,
+                    mq->specific_address->addrlen)
+             : "<inbound>",
+             rl->plugin->short_name);
+#endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# bytes in message queue for other 
peers"),
+                           - (int64_t) mq->message_buf_size,
+                           GNUNET_NO);
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# bytes pending with plugins"),
+                           mq->message_buf_size,
+                           GNUNET_NO);
+
+  GNUNET_CONTAINER_DLL_insert (n->cont_head,
+                               n->cont_tail,
+                               mq);
+
+  ret = rl->plugin->api->send (rl->plugin->api->cls,
+                              &mq->neighbour_id,
+                              mq->message_buf,
+                              mq->message_buf_size,
+                              mq->priority,
+                              GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                              mq->specific_address->session,
+                              mq->specific_address->addr,
+                              mq->specific_address->addrlen,
+                              force_address,
+                              &transmit_send_continuation, mq);
+  if (ret == -1)
+    {
+      /* failure, but 'send' would not call continuation in this case,
+        so we need to do it here! */
+      transmit_send_continuation (mq,
+                                 &mq->neighbour_id,
+                                 GNUNET_SYSERR);
+    }
+}
+
+
+/**
+ * Send the specified message to the specified peer.
+ *
+ * @param client source of the transmission request (can be NULL)
+ * @param peer_address ForeignAddressList where we should send this message
+ * @param priority how important is the message
+ * @param timeout how long do we have to transmit?
+ * @param message_buf message(s) to send GNUNET_MessageHeader(s)
+ * @param message_buf_size total size of all messages in message_buf
+ * @param is_internal is this an internal message; these are pre-pended and
+ *                    also do not count for plugins being "ready" to transmit
+ * @param neighbour handle to the neighbour for transmission
+ */
+static void
+transmit_to_peer (struct TransportClient *client,
+                  struct ForeignAddressList *peer_address,
+                  unsigned int priority,
+                 struct GNUNET_TIME_Relative timeout,
+                  const char *message_buf,
+                  size_t message_buf_size,
+                  int is_internal, struct NeighbourMapEntry *neighbour)
+{
+  struct MessageQueue *mq;
+
+#if EXTRA_CHECKS
+  if (client != NULL)
+    {
+      /* check for duplicate submission */
+      mq = neighbour->messages_head;
+      while (NULL != mq)
+        {
+          if (mq->client == client)
+            {
+              /* client transmitted to same peer twice
+                 before getting SEND_OK! */
+              GNUNET_break (0);
+              return;
+            }
+          mq = mq->next;
+        }
+    }
+#endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# bytes in message queue for other 
peers"),
+                           message_buf_size,
+                           GNUNET_NO);
+  mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
+  mq->specific_address = peer_address;
+  mq->client = client;
+  /* FIXME: this memcpy can be up to 7% of our total runtime! */
+  memcpy (&mq[1], message_buf, message_buf_size);
+  mq->message_buf = (const char*) &mq[1];
+  mq->message_buf_size = message_buf_size;
+  memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct 
GNUNET_PeerIdentity));
+  mq->internal_msg = is_internal;
+  mq->priority = priority;
+  mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+  if (is_internal)
+    GNUNET_CONTAINER_DLL_insert (neighbour->messages_head,
+                                neighbour->messages_tail,
+                                mq);
+  else
+    GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head,
+                                      neighbour->messages_tail,
+                                      neighbour->messages_tail,
+                                      mq);
+  try_transmission_to_peer (neighbour);
+}
+
+
+/**
+ * Create a fresh entry in our neighbour list for the given peer.
+ * Will try to transmit our current HELLO to the new neighbour.
+ * Do not call this function directly, use 'setup_peer_check_blacklist.
+ *
+ * @param peer the peer for which we create the entry
+ * @param do_hello should we schedule transmitting a HELLO
+ * @return the new neighbour list entry
+ */
+static struct NeighbourMapEntry *
+setup_new_neighbour (const struct GNUNET_PeerIdentity *peer,
+                    int do_hello)
+{
+  struct NeighbourMapEntry *n;
+  struct TransportPlugin *tp;
+  struct ReadyList *rl;
+
+  GNUNET_assert (0 != memcmp (peer,
+                             &my_identity,
+                             sizeof (struct GNUNET_PeerIdentity)));
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Setting up state for neighbour `%4s'\n",
+             GNUNET_i2s (peer));
+#endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# active neighbours"),
+                           1,
+                           GNUNET_NO);
+  n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
+  n->id = *peer;
+  n->peer_timeout =
+    GNUNET_TIME_relative_to_absolute
+    (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+  GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
+                                GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
+                                MAX_BANDWIDTH_CARRY_S);
+  tp = plugins;
+  while (tp != NULL)
+    {
+      if ((tp->api->send != NULL) && (!is_blacklisted(peer, tp)))
+        {
+          rl = GNUNET_malloc (sizeof (struct ReadyList));
+         rl->neighbour = n;
+          rl->next = n->plugins;
+          n->plugins = rl;
+          rl->plugin = tp;
+          rl->addresses = NULL;
+        }
+      tp = tp->next;
+    }
+  n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
+  n->distance = -1;
+  n->timeout_task = GNUNET_SCHEDULER_add_delayed 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                  &neighbour_timeout_task, n);
+  GNUNET_CONTAINER_multihashmap_put (neighbours,
+                                    &n->id.hashPubKey,
+                                    n,
+                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  if (do_hello)
+    {
+      GNUNET_STATISTICS_update (stats,
+                                gettext_noop ("# peerinfo new neighbor iterate 
requests"),
+                                1,
+                                GNUNET_NO);
+      GNUNET_STATISTICS_update (stats,
+                                gettext_noop ("# outstanding peerinfo iterate 
requests"),
+                                1,
+                                GNUNET_NO);
+      n->piter = GNUNET_PEERINFO_iterate (peerinfo, peer,
+                                         GNUNET_TIME_UNIT_FOREVER_REL,
+                                         &add_hello_for_peer, n);
+
+      GNUNET_STATISTICS_update (stats,
+                                gettext_noop ("# HELLO's sent to new 
neighbors"),
+                                1,
+                                GNUNET_NO);
+      if (NULL != our_hello)
+       transmit_to_peer (NULL, NULL, 0,
+                         HELLO_ADDRESS_EXPIRATION,
+                         (const char *) our_hello, 
GNUNET_HELLO_size(our_hello),
+                         GNUNET_NO, n);
+    }
+  return n;
+}
+#endif
+
+
+/**
  * Initialize the neighbours subsystem.
  *
  * @param cls closure for callbacks
@@ -47,15 +547,54 @@
                      GNUNET_TRANSPORT_NotifyConnect connect_cb,
                      GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
 {
+  callback_cls = cls;
+  connect_notify_cb = connect_cb;
+  disconnect_notify_cb = disconnect_cb;
+  neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
 }
 
 
 /**
+ * Disconnect from the given neighbour.
+ *
+ * @param cls unused
+ * @param key hash of neighbour's public key (not used)
+ * @param value the 'struct NeighbourMapEntry' of the neighbour
+ */
+static int
+disconnect_all_neighbours (void *cls,
+                          const GNUNET_HashCode *key,
+                          void *value)
+{
+  struct NeighbourMapEntry *n = value;
+
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Disconnecting peer `%4s', %s\n",
+             GNUNET_i2s(&n->id),
+             "SHUTDOWN_TASK");
+#endif
+  // FIXME:
+  // disconnect_neighbour (n);
+  n++; 
+  return GNUNET_OK;
+}
+
+
+/**
  * Cleanup the neighbours subsystem.
  */
 void
 GST_neighbours_stop ()
 {
+  GNUNET_CONTAINER_multihashmap_iterate (neighbours,
+                                        &disconnect_all_neighbours,
+                                        NULL);
+  GNUNET_CONTAINER_multihashmap_destroy (neighbours);
+  neighbours = NULL;
+  callback_cls = NULL;
+  connect_notify_cb = NULL;
+  disconnect_notify_cb = NULL;
 }
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]