gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33778 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r33778 - gnunet/src/transport
Date: Mon, 23 Jun 2014 13:26:59 +0200

Author: grothoff
Date: 2014-06-23 13:26:59 +0200 (Mon, 23 Jun 2014)
New Revision: 33778

Modified:
   gnunet/src/transport/plugin_transport_tcp.c
   gnunet/src/transport/plugin_transport_udp.c
   gnunet/src/transport/plugin_transport_unix.c
Log:
-towards having the monitoring API supported by TCP

Modified: gnunet/src/transport/plugin_transport_tcp.c
===================================================================
--- gnunet/src/transport/plugin_transport_tcp.c 2014-06-23 11:11:40 UTC (rev 
33777)
+++ gnunet/src/transport/plugin_transport_tcp.c 2014-06-23 11:26:59 UTC (rev 
33778)
@@ -272,6 +272,11 @@
   struct GNUNET_SERVER_TransmitHandle *transmit_handle;
 
   /**
+   * Address of the other peer.
+   */
+  struct GNUNET_HELLO_Address *address;
+
+  /**
    * ID of task used to delay receiving more to throttle sender.
    */
   GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
@@ -281,26 +286,28 @@
    */
   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
 
-  struct GNUNET_HELLO_Address *address;
-
   /**
-   * Address of the other peer (either based on our 'connect'
-   * call or on our 'accept' call).
-   *
-   * struct IPv4TcpAddress or struct IPv6TcpAddress
+   * When will this session time out?
    */
-  //void *addr;
+  struct GNUNET_TIME_Absolute timeout;
+
   /**
-   * Length of @e addr.
-   */
-  //size_t addrlen;
-  /**
    * Last activity on this connection.  Used to select preferred
    * connection.
    */
   struct GNUNET_TIME_Absolute last_activity;
 
   /**
+   * Number of bytes waiting for transmission to this peer.
+   */
+  unsigned long long bytes_in_queue;
+
+  /**
+   * Number of messages waiting for transmission to this peer.
+   */
+  unsigned int msgs_in_queue;
+
+  /**
    * Are we still expecting the welcome message? (#GNUNET_YES/#GNUNET_NO)
    */
   int expecting_welcome;
@@ -378,6 +385,16 @@
   struct GNUNET_RESOLVER_RequestHandle *ext_dns;
 
   /**
+   * Function to call about session status changes.
+   */
+  GNUNET_TRANSPORT_SessionInfoCallback sic;
+
+  /**
+   * Closure for @e sic.
+   */
+  void *sic_cls;
+
+  /**
    * How many more TCP sessions are we allowed to open right now?
    */
   unsigned long long max_connections;
@@ -410,7 +427,41 @@
 
 };
 
+
 /**
+ * If a session monitor is attached, notify it about the new
+ * session state.
+ *
+ * @param plugin our plugin
+ * @param session session that changed state
+ * @param state new state of the session
+ */
+static void
+notify_session_monitor (struct Plugin *plugin,
+                        struct Session *session,
+                        enum GNUNET_TRANSPORT_SessionState state)
+{
+  struct GNUNET_TRANSPORT_SessionInfo info;
+
+  if (NULL == plugin->sic)
+    return;
+  memset (&info, 0, sizeof (info));
+  info.state = state;
+  info.is_inbound = GNUNET_SYSERR; /* hard to say */
+  info.num_msg_pending = session->msgs_in_queue;
+  info.num_bytes_pending = session->bytes_in_queue;
+  /* info.receive_delay remains zero as this is not supported by UDP
+     (cannot selectively not receive from 'some' peer while continuing
+     to receive from others) */
+  info.session_timeout = session->timeout;
+  info.address = session->address;
+  plugin->sic (plugin->sic_cls,
+               session,
+               &info);
+}
+
+
+/**
  * Function called for a quick conversion of the binary address to
  * a numeric address.  Note that the caller must not free the
  * address and that the next call to this function is allowed
@@ -780,6 +831,7 @@
   {
     GNUNET_SCHEDULER_cancel (session->timeout_task);
     session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    session->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
   }
 
   if (GNUNET_YES
@@ -825,16 +877,26 @@
          : "Could not deliver message to `%4s', notifying.\n",
          GNUNET_i2s (&session->target));
     GNUNET_STATISTICS_update (session->plugin->env->stats,
-        gettext_noop ("# bytes currently in TCP buffers"),
-        -(int64_t) pm->message_size, GNUNET_NO);
-    GNUNET_STATISTICS_update (session->plugin->env->stats, gettext_noop
-    ("# bytes discarded by TCP (disconnect)"), pm->message_size, GNUNET_NO);
-    GNUNET_CONTAINER_DLL_remove(session->pending_messages_head,
-        session->pending_messages_tail, pm);
+                              gettext_noop ("# bytes currently in TCP 
buffers"),
+                              -(int64_t) pm->message_size, GNUNET_NO);
+    GNUNET_STATISTICS_update (session->plugin->env->stats,
+                              gettext_noop ("# bytes discarded by TCP 
(disconnect)"),
+                              pm->message_size,
+                              GNUNET_NO);
+    GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
+                                 session->pending_messages_tail,
+                                 pm);
+    GNUNET_assert (0 < session->msgs_in_queue);
+    session->msgs_in_queue--;
+    GNUNET_assert (pm->message_size <= session->bytes_in_queue);
+    session->bytes_in_queue -= pm->message_size;
     if (NULL != pm->transmit_cont)
-      pm->transmit_cont (pm->transmit_cont_cls, &session->target, 
GNUNET_SYSERR,
-          pm->message_size, 0);
-    GNUNET_free(pm);
+      pm->transmit_cont (pm->transmit_cont_cls,
+                         &session->target,
+                         GNUNET_SYSERR,
+                         pm->message_size,
+                         0);
+    GNUNET_free (pm);
   }
   if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK )
   {
@@ -881,10 +943,25 @@
                  const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Session *s = cls;
+  struct GNUNET_TIME_Relative left;
 
   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  left = GNUNET_TIME_absolute_get_remaining (s->timeout);
+  if (0 != left.rel_value_us)
+  {
+    /* not actually our turn yet, but let's at least update
+       the monitor, it may think we're about to die ... */
+    notify_session_monitor (s->plugin,
+                            s,
+                            GNUNET_TRANSPORT_SS_UP);
+    s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
+                                                    &session_timeout,
+                                                    s);
+    return;
+  }
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
-             "Session %p was idle for %s, disconnecting\n", s,
+             "Session %p was idle for %s, disconnecting\n",
+             s,
              GNUNET_STRINGS_relative_time_to_string 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
                                                      GNUNET_YES));
   /* call session destroy function */
@@ -901,14 +978,10 @@
 reschedule_session_timeout (struct Session *s)
 {
   GNUNET_assert(GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
-  GNUNET_SCHEDULER_cancel (s->timeout_task);
-  s->timeout_task = GNUNET_SCHEDULER_add_delayed (
-      GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, s);
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
-      "Timeout rescheduled for session %p set to %s\n", s,
-      GNUNET_STRINGS_relative_time_to_string 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, GNUNET_YES));
+  s->timeout = GNUNET_TIME_relative_to_absolute 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
 }
 
+
 /**
  * Create a new session.  Also queues a welcome message.
  *
@@ -959,8 +1032,11 @@
   GNUNET_STATISTICS_update (plugin->env->stats,
       gettext_noop ("# bytes currently in TCP buffers"), pm->message_size,
       GNUNET_NO);
-  GNUNET_CONTAINER_DLL_insert(session->pending_messages_head,
-      session->pending_messages_tail, pm);
+  GNUNET_CONTAINER_DLL_insert (session->pending_messages_head,
+                               session->pending_messages_tail,
+                               pm);
+  session->msgs_in_queue++;
+  session->bytes_in_queue += pm->message_size;
   if (GNUNET_YES != is_nat)
   {
     GNUNET_STATISTICS_update (plugin->env->stats,
@@ -968,8 +1044,10 @@
   }
   plugin->env->register_quota_notification (plugin->env->cls,
       &address->peer, PLUGIN_NAME, session);
-  session->timeout_task = GNUNET_SCHEDULER_add_delayed (
-      GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session);
+  session->timeout = GNUNET_TIME_relative_to_absolute 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+  session->timeout_task = GNUNET_SCHEDULER_add_delayed 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                        &session_timeout,
+                                                        session);
   return session;
 }
 
@@ -1012,9 +1090,9 @@
   plugin = session->plugin;
   if (NULL == buf)
   {
-    LOG(GNUNET_ERROR_TYPE_DEBUG,
-        "Timeout trying to transmit to peer `%4s', discarding message 
queue.\n",
-        GNUNET_i2s (&session->target));
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Timeout trying to transmit to peer `%4s', discarding message 
queue.\n",
+         GNUNET_i2s (&session->target));
     /* timeout; cancel all messages that have already expired */
     hd = NULL;
     tl = NULL;
@@ -1023,13 +1101,19 @@
     while ((NULL != (pos = session->pending_messages_head))
         && (pos->timeout.abs_value_us <= now.abs_value_us))
     {
-      GNUNET_CONTAINER_DLL_remove(session->pending_messages_head,
-          session->pending_messages_tail, pos);
-      LOG(GNUNET_ERROR_TYPE_DEBUG,
-          "Failed to transmit %u byte message to `%4s'.\n", pos->message_size,
-          GNUNET_i2s (&session->target));
+      GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
+                                   session->pending_messages_tail,
+                                   pos);
+      GNUNET_assert (0 < session->msgs_in_queue);
+      session->msgs_in_queue--;
+      GNUNET_assert (pos->message_size <= session->bytes_in_queue);
+      session->bytes_in_queue -= pos->message_size;
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Failed to transmit %u byte message to `%4s'.\n",
+           pos->message_size,
+           GNUNET_i2s (&session->target));
       ret += pos->message_size;
-      GNUNET_CONTAINER_DLL_insert_after(hd, tl, tl, pos);
+      GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
     }
     /* do this call before callbacks (so that if callbacks destroy
      * session, they have a chance to cancel actions done by this
@@ -1040,10 +1124,12 @@
      * the callbacks may abort the session */
     while (NULL != (pos = hd))
     {
-      GNUNET_CONTAINER_DLL_remove(hd, tl, pos);
-      if (pos->transmit_cont != NULL )
-        pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_SYSERR,
-            pos->message_size, 0);
+      GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+      if (pos->transmit_cont != NULL)
+        pos->transmit_cont (pos->transmit_cont_cls,
+                            &pid,
+                            GNUNET_SYSERR,
+                            pos->message_size, 0);
       GNUNET_free(pos);
     }
     GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1062,10 +1148,16 @@
   {
     if (ret + pos->message_size > size)
       break;
-    GNUNET_CONTAINER_DLL_remove(session->pending_messages_head,
-        session->pending_messages_tail, pos);
+    GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
+                                 session->pending_messages_tail,
+                                 pos);
+    GNUNET_assert (0 < session->msgs_in_queue);
+    session->msgs_in_queue--;
+    GNUNET_assert (pos->message_size <= session->bytes_in_queue);
+    session->bytes_in_queue -= pos->message_size;
     GNUNET_assert(size >= pos->message_size);
-    LOG(GNUNET_ERROR_TYPE_DEBUG, "Transmitting message of type %u size %u\n",
+    LOG(GNUNET_ERROR_TYPE_DEBUG,
+        "Transmitting message of type %u size %u\n",
         ntohs (((struct GNUNET_MessageHeader * ) pos->msg)->type),
         pos->message_size);
     /* FIXME: this memcpy can be up to 7% of our total runtime */
@@ -1073,7 +1165,7 @@
     cbuf += pos->message_size;
     ret += pos->message_size;
     size -= pos->message_size;
-    GNUNET_CONTAINER_DLL_insert_tail(hd, tl, pos);
+    GNUNET_CONTAINER_DLL_insert_tail (hd, tl, pos);
   }
   /* schedule 'continuation' before callbacks so that callbacks that
    * cancel everything don't cause us to use a session that no longer
@@ -1085,10 +1177,13 @@
    * we should not use 'session' after this point */
   while (NULL != (pos = hd))
   {
-    GNUNET_CONTAINER_DLL_remove(hd, tl, pos);
-    if (pos->transmit_cont != NULL )
-      pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_OK,
-          pos->message_size, pos->message_size); /* FIXME: include TCP 
overhead */
+    GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+    if (pos->transmit_cont != NULL)
+      pos->transmit_cont (pos->transmit_cont_cls,
+                          &pid,
+                          GNUNET_OK,
+                          pos->message_size,
+                          pos->message_size); /* FIXME: include TCP overhead */
     GNUNET_free(pos);
   }
   GNUNET_assert(hd == NULL);
@@ -1253,11 +1348,12 @@
       "Asked to transmit %u bytes to `%s', added message to list.\n",
       msgbuf_size, GNUNET_i2s (&session->target));
 
-  if (GNUNET_YES
-      == GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap,
-          &session->target, session))
+  if (GNUNET_YES ==
+      GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap,
+                                                    &session->target,
+                                                    session))
   {
-    GNUNET_assert(NULL != session->client);
+    GNUNET_assert (NULL != session->client);
     GNUNET_SERVER_client_set_timeout (session->client,
         GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
     GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1265,9 +1361,11 @@
         GNUNET_NO);
 
     /* append pm to pending_messages list */
-    GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head,
-        session->pending_messages_tail, pm);
-
+    GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
+                                      session->pending_messages_tail,
+                                      pm);
+    session->msgs_in_queue++;
+    session->bytes_in_queue += pm->message_size;
     process_pending_messages (session);
     return msgbuf_size;
   }
@@ -1283,17 +1381,25 @@
         GNUNET_NO);
 
     /* append pm to pending_messages list */
-    GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head,
-        session->pending_messages_tail, pm);
+    GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
+                                      session->pending_messages_tail,
+                                      pm);
+    session->msgs_in_queue++;
+    session->bytes_in_queue += pm->message_size;
     return msgbuf_size;
   }
   else
   {
-    LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid session %p\n", session);
+    LOG(GNUNET_ERROR_TYPE_ERROR,
+        "Invalid session %p\n", session);
     if (NULL != cont)
-      cont (cont_cls, &session->target, GNUNET_SYSERR, pm->message_size, 0);
-    GNUNET_break(0);
-    GNUNET_free(pm);
+      cont (cont_cls,
+            &session->target,
+            GNUNET_SYSERR,
+            pm->message_size,
+            0);
+    GNUNET_break (0);
+    GNUNET_free (pm);
     return GNUNET_SYSERR; /* session does not exist here */
   }
 }
@@ -2346,9 +2452,10 @@
   size_t ret;
 
   tcp_probe_ctx->transmit_handle = NULL;
-  GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail,
-      tcp_probe_ctx);
-  if (buf == NULL )
+  GNUNET_CONTAINER_DLL_remove (plugin->probe_head,
+                               plugin->probe_tail,
+                               tcp_probe_ctx);
+  if (buf == NULL)
   {
     GNUNET_CONNECTION_destroy (tcp_probe_ctx->sock);
     GNUNET_free(tcp_probe_ctx);
@@ -2402,8 +2509,9 @@
       sizeof(struct GNUNET_PeerIdentity));
   tcp_probe_ctx->plugin = plugin;
   tcp_probe_ctx->sock = sock;
-  GNUNET_CONTAINER_DLL_insert(plugin->probe_head, plugin->probe_tail,
-      tcp_probe_ctx);
+  GNUNET_CONTAINER_DLL_insert (plugin->probe_head,
+                               plugin->probe_tail,
+                               tcp_probe_ctx);
   tcp_probe_ctx->transmit_handle = GNUNET_CONNECTION_notify_transmit_ready (
       sock, ntohs (tcp_probe_ctx->message.header.size),
       GNUNET_TIME_UNIT_FOREVER_REL, &notify_send_probe, tcp_probe_ctx);
@@ -2433,6 +2541,62 @@
 
 
 /**
+ * Return information about the given session to the
+ * monitor callback.
+ *
+ * @param cls the `struct Plugin` with the monitor callback (`sic`)
+ * @param peer peer we send information about
+ * @param value our `struct Session` to send information about
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+send_session_info_iter (void *cls,
+                        const struct GNUNET_PeerIdentity *peer,
+                        void *value)
+{
+  struct Plugin *plugin = cls;
+  struct Session *session = value;
+
+  notify_session_monitor (plugin,
+                          session,
+                          GNUNET_TRANSPORT_SS_UP);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Begin monitoring sessions of a plugin.  There can only
+ * be one active monitor per plugin (i.e. if there are
+ * multiple monitors, the transport service needs to
+ * multiplex the generated events over all of them).
+ *
+ * @param cls closure of the plugin
+ * @param sic callback to invoke, NULL to disable monitor;
+ *            plugin will being by iterating over all active
+ *            sessions immediately and then enter monitor mode
+ * @param sic_cls closure for @a sic
+ */
+static void
+tcp_plugin_setup_monitor (void *cls,
+                          GNUNET_TRANSPORT_SessionInfoCallback sic,
+                          void *sic_cls)
+{
+  struct Plugin *plugin = cls;
+
+  plugin->sic = sic;
+  plugin->sic_cls = sic_cls;
+  if (NULL != sic)
+  {
+    GNUNET_CONTAINER_multipeermap_iterate (plugin->sessionmap,
+                                           &send_session_info_iter,
+                                           plugin);
+    /* signal end of first iteration */
+    sic (sic_cls, NULL, NULL);
+  }
+}
+
+
+/**
  * Entry point for the plugin.
  *
  * @param cls closure, the 'struct GNUNET_TRANSPORT_PluginEnvironment*'
@@ -2569,6 +2733,7 @@
   api->get_network = &tcp_get_network;
   api->update_session_timeout = &tcp_plugin_update_session_timeout;
   api->update_inbound_delay = &tcp_plugin_update_inbound_delay;
+  api->setup_monitor = &tcp_plugin_setup_monitor;
   plugin->service = service;
   if (NULL != service)
   {
@@ -2678,8 +2843,9 @@
     GNUNET_NAT_unregister (plugin->nat);
   while (NULL != (tcp_probe = plugin->probe_head))
   {
-    GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail,
-        tcp_probe);
+    GNUNET_CONTAINER_DLL_remove (plugin->probe_head,
+                                 plugin->probe_tail,
+                                 tcp_probe);
     GNUNET_CONNECTION_destroy (tcp_probe->sock);
     GNUNET_free(tcp_probe);
   }

Modified: gnunet/src/transport/plugin_transport_udp.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp.c 2014-06-23 11:11:40 UTC (rev 
33777)
+++ gnunet/src/transport/plugin_transport_udp.c 2014-06-23 11:26:59 UTC (rev 
33778)
@@ -1573,6 +1573,7 @@
   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
+  s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
                                                   &session_timeout, s);
   return s;

Modified: gnunet/src/transport/plugin_transport_unix.c
===================================================================
--- gnunet/src/transport/plugin_transport_unix.c        2014-06-23 11:11:40 UTC 
(rev 33777)
+++ gnunet/src/transport/plugin_transport_unix.c        2014-06-23 11:26:59 UTC 
(rev 33778)
@@ -886,6 +886,7 @@
   session->target = address->peer;
   session->address = GNUNET_HELLO_address_copy (address);
   session->plugin = plugin;
+  session->timeout = GNUNET_TIME_relative_to_absolute 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
   session->timeout_task = GNUNET_SCHEDULER_add_delayed 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
                                                         &session_timeout,
                                                         session);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]