gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r32122 - in gnunet/src: include transport


From: gnunet
Subject: [GNUnet-SVN] r32122 - in gnunet/src: include transport
Date: Thu, 30 Jan 2014 17:58:20 +0100

Author: wachs
Date: 2014-01-30 17:58:20 +0100 (Thu, 30 Jan 2014)
New Revision: 32122

Modified:
   gnunet/src/include/gnunet_transport_plugin.h
   gnunet/src/transport/gnunet-service-transport.c
   gnunet/src/transport/gnunet-service-transport_neighbours.c
   gnunet/src/transport/gnunet-service-transport_neighbours.h
   gnunet/src/transport/gnunet-service-transport_plugins.c
   gnunet/src/transport/gnunet-service-transport_plugins.h
   gnunet/src/transport/plugin_transport_tcp.c
   gnunet/src/transport/transport_api.c
Log:
send receive delay rescheduling support


Modified: gnunet/src/include/gnunet_transport_plugin.h
===================================================================
--- gnunet/src/include/gnunet_transport_plugin.h        2014-01-30 15:59:28 UTC 
(rev 32121)
+++ gnunet/src/include/gnunet_transport_plugin.h        2014-01-30 16:58:20 UTC 
(rev 32122)
@@ -194,7 +194,16 @@
                                    const struct GNUNET_PeerIdentity *peer,
                                    size_t amount_recved);
 
+typedef void
+(*GNUNET_TRANSPORT_RegisterQuotaNotification) (void *cls,
+                                           const struct GNUNET_PeerIdentity 
*peer,
+                                           const char *plugin,
+                                           struct Session *session);
 
+typedef void
+(*GNUNET_TRANSPORT_UnregisterQuotaNotification) (void *cls,
+    const struct GNUNET_PeerIdentity *peer, const char *plugin, struct Session 
*session);
+
 /**
  * Function that returns a HELLO message.
  */
@@ -275,7 +284,10 @@
    */
   GNUNET_TRANSPORT_UpdateAddressMetrics update_address_metrics;
 
+  GNUNET_TRANSPORT_RegisterQuotaNotification register_quota_notification;
 
+  GNUNET_TRANSPORT_UnregisterQuotaNotification unregister_quota_notification;
+
   /**
    * What is the maximum number of connections that this transport
    * should allow?  Transports that do not have sessions (such as
@@ -484,6 +496,14 @@
                                           const struct GNUNET_PeerIdentity 
*peer,
                                           struct Session *session);
 
+
+
+typedef void
+(*GNUNET_TRANSPORT_UpdateInboundDelay) (void *cls,
+                                          const struct GNUNET_PeerIdentity 
*peer,
+                                          struct Session *session,
+                                          struct GNUNET_TIME_Relative delay);
+
 /**
  * Function called for a quick conversion of the binary address to
  * a numeric address.  Note that the caller must not free the
@@ -575,6 +595,8 @@
    */
   GNUNET_TRANSPORT_UpdateSessionTimeout update_session_timeout;
 
+  GNUNET_TRANSPORT_UpdateInboundDelay update_inbound_delay;
+
   /**
    * Function that will be called whenever the transport service wants to
    * notify the plugin that the inbound quota changed and that the plugin

Modified: gnunet/src/transport/gnunet-service-transport.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport.c     2014-01-30 15:59:28 UTC 
(rev 32121)
+++ gnunet/src/transport/gnunet-service-transport.c     2014-01-30 16:58:20 UTC 
(rev 32122)
@@ -934,6 +934,8 @@
       NULL );
   GST_manipulation_init (GST_cfg);
   GST_plugins_load (&GST_manipulation_recv,
+      &GST_neighbours_register_quota_notification,
+      &GST_neighbours_unregister_quota_notification,
       &plugin_env_address_change_notification,
       &plugin_env_session_start,
       &plugin_env_session_end,

Modified: gnunet/src/transport/gnunet-service-transport_neighbours.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport_neighbours.c  2014-01-30 
15:59:28 UTC (rev 32121)
+++ gnunet/src/transport/gnunet-service-transport_neighbours.c  2014-01-30 
16:58:20 UTC (rev 32122)
@@ -508,6 +508,8 @@
 static GNUNET_SCHEDULER_TaskIdentifier util_transmission_tk;
 
 
+static struct GNUNET_CONTAINER_MultiPeerMap *registered_quota_notifications;
+
 /**
  * Lookup a neighbour entry in the neighbours hash map.
  *
@@ -1689,13 +1691,152 @@
 
 }
 
+struct QuotaNotificationRequest
+{
+  struct GNUNET_PeerIdentity peer;
+  struct Session *session;
+  char *plugin;
+};
+
+struct QNR_LookContext
+{
+  struct GNUNET_PeerIdentity peer;
+  struct Session *session;
+  const char *plugin;
+
+  struct QuotaNotificationRequest *res;
+};
+
+static int
+find_notification_request (void *cls, const struct GNUNET_PeerIdentity *key, 
void *value)
+{
+  struct QNR_LookContext *qnr_ctx = cls;
+  struct QuotaNotificationRequest *qnr = value;
+
+  if ((qnr->session == qnr_ctx->session) &&
+      (0 == memcmp (&qnr->peer, &qnr_ctx->peer, sizeof (struct 
GNUNET_PeerIdentity))) &&
+      (0 == strcmp(qnr_ctx->plugin, qnr->plugin)))
+  {
+    qnr_ctx->res = value;
+    return GNUNET_NO;
+  }
+  return GNUNET_YES;
+}
+
+void
+GST_neighbours_register_quota_notification(void *cls,
+    const struct GNUNET_PeerIdentity *peer, const char *plugin,
+    struct Session *session)
+{
+  struct QuotaNotificationRequest *qnr;
+  struct QNR_LookContext qnr_ctx;
+
+  qnr_ctx.peer = (*peer);
+  qnr_ctx.plugin = plugin;
+  qnr_ctx.session = session;
+  qnr_ctx.res = NULL;
+  int res;
+
+  res = GNUNET_CONTAINER_multipeermap_get_multiple 
(registered_quota_notifications,
+      peer, &find_notification_request, &qnr_ctx);
+  if (NULL != qnr_ctx.res)
+  {
+    GNUNET_break(0);
+    return;
+  }
+
+  qnr = GNUNET_new (struct QuotaNotificationRequest);
+  qnr->peer =  (*peer);
+  qnr->plugin = GNUNET_strdup (plugin);
+  qnr->session = session;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      "Adding notification for peer `%s' plugin `%s' session %p \n",
+      GNUNET_i2s (peer), plugin, session);
+
+  GNUNET_CONTAINER_multipeermap_put (registered_quota_notifications, peer,
+      qnr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+}
+
+
+void
+GST_neighbours_unregister_quota_notification(void *cls,
+    const struct GNUNET_PeerIdentity *peer, const char *plugin, struct Session 
*session)
+{
+  struct QNR_LookContext qnr_ctx;
+  qnr_ctx.peer = (*peer);
+  qnr_ctx.plugin = plugin;
+  qnr_ctx.session = session;
+  qnr_ctx.res = NULL;
+  int res;
+
+  res = GNUNET_CONTAINER_multipeermap_iterate (registered_quota_notifications,
+      &find_notification_request, &qnr_ctx);
+  if (NULL == qnr_ctx.res)
+  {
+    GNUNET_break(0);
+    return;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      "Removing notification for peer `%s' plugin `%s' session %p \n",
+      GNUNET_i2s (peer), plugin, session);
+
+  GNUNET_CONTAINER_multipeermap_remove (registered_quota_notifications, peer,
+      qnr_ctx.res);
+  GNUNET_free (qnr_ctx.res->plugin);
+  GNUNET_free (qnr_ctx.res);
+}
+
+static int
+notification_cb(void *cls, const struct GNUNET_PeerIdentity *key, void *value)
+{
+  struct NeighbourMapEntry *n = cls;
+  struct QuotaNotificationRequest *qnr = value;
+  struct GNUNET_TRANSPORT_PluginFunctions *papi;
+  struct GNUNET_TIME_Relative delay;
+  int do_forward;
+
+  papi = GST_plugins_find(qnr->plugin);
+  if (NULL == papi)
+  {
+    GNUNET_break (0);
+    return GNUNET_OK;
+  }
+
+  delay = GST_neighbours_calculate_receive_delay (key, 0, &do_forward);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      "New inbound delay for peer `%s' is %llu ms\n", GNUNET_i2s (key),
+      delay.rel_value_us / 1000);
+
+  if (NULL != papi->update_inbound_delay)
+    papi->update_inbound_delay (papi->cls, key, qnr->session, delay);
+  return GNUNET_OK;
+}
+
+static
+int
+free_notification_cb(void *cls, const struct GNUNET_PeerIdentity *key,
+    void *value)
+{
+  struct NeighbourMapEntry *n = cls;
+  struct QuotaNotificationRequest *qnr = value;
+
+  GNUNET_CONTAINER_multipeermap_remove (registered_quota_notifications, key,
+      qnr);
+  GNUNET_free(qnr);
+
+  return GNUNET_OK;
+}
+
 static void
-inbound_bw_tracker_update (void *cls)
+inbound_bw_tracker_update(void *cls)
 {
-  struct Neighbour *n = cls;
+  struct NeighbourMapEntry *n = cls;
 
   /* Quota was updated, tell plugins to update the time to receive next */
-
+  GNUNET_CONTAINER_multipeermap_get_multiple (registered_quota_notifications,
+      &n->id, &notification_cb, n);
 }
 
 
@@ -3655,6 +3796,7 @@
   disconnect_notify_cb = disconnect_cb;
   neighbour_change_cb = peer_address_cb;
   neighbours = GNUNET_CONTAINER_multipeermap_create (NEIGHBOUR_TABLE_SIZE, 
GNUNET_NO);
+  registered_quota_notifications = GNUNET_CONTAINER_multipeermap_create 
(NEIGHBOUR_TABLE_SIZE, GNUNET_NO);
   util_transmission_tk = GNUNET_SCHEDULER_add_delayed 
(UTIL_TRANSMISSION_INTERVAL,
       utilization_transmission, NULL);
 }
@@ -3723,6 +3865,10 @@
     GNUNET_free (cur);
   }
 
+  GNUNET_CONTAINER_multipeermap_iterate (registered_quota_notifications,
+      &free_notification_cb, NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (registered_quota_notifications);
+
   neighbours = NULL;
   callback_cls = NULL;
   connect_notify_cb = NULL;

Modified: gnunet/src/transport/gnunet-service-transport_neighbours.h
===================================================================
--- gnunet/src/transport/gnunet-service-transport_neighbours.h  2014-01-30 
15:59:28 UTC (rev 32121)
+++ gnunet/src/transport/gnunet-service-transport_neighbours.h  2014-01-30 
16:58:20 UTC (rev 32122)
@@ -107,7 +107,16 @@
                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
                      GST_NeighbourSendContinuation cont, void *cont_cls);
 
+void
+GST_neighbours_register_quota_notification (void *cls,
+                                           const struct GNUNET_PeerIdentity 
*peer,
+                                           const char *plugin,
+                                           struct Session *session);
 
+void
+GST_neighbours_unregister_quota_notification(void *cls,
+    const struct GNUNET_PeerIdentity *peer, const char *plugin, struct Session 
*session);
+
 /**
  * We have received a message from the given sender.
  * How long should we delay before receiving more?

Modified: gnunet/src/transport/gnunet-service-transport_plugins.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport_plugins.c     2014-01-30 
15:59:28 UTC (rev 32121)
+++ gnunet/src/transport/gnunet-service-transport_plugins.c     2014-01-30 
16:58:20 UTC (rev 32122)
@@ -94,6 +94,8 @@
  */
 void
 GST_plugins_load (GNUNET_TRANSPORT_PluginReceiveCallback recv_cb,
+                  GNUNET_TRANSPORT_RegisterQuotaNotification register_quota_cb,
+                  GNUNET_TRANSPORT_UnregisterQuotaNotification 
unregister_quota_cb,
                   GNUNET_TRANSPORT_AddressNotification address_cb,
                   GNUNET_TRANSPORT_SessionStart session_start_cb,
                   GNUNET_TRANSPORT_SessionEnd session_end_cb,
@@ -142,6 +144,8 @@
     plug->env.session_end = session_end_cb;
     plug->env.get_address_type = address_type_cb;
     plug->env.update_address_metrics = metric_update_cb;
+    plug->env.register_quota_notification = register_quota_cb;
+    plug->env.unregister_quota_notification = unregister_quota_cb;
     plug->env.max_connections = tneigh;
     plug->env.stats = GST_stats;
     GNUNET_CONTAINER_DLL_insert (plugins_head, plugins_tail, plug);

Modified: gnunet/src/transport/gnunet-service-transport_plugins.h
===================================================================
--- gnunet/src/transport/gnunet-service-transport_plugins.h     2014-01-30 
15:59:28 UTC (rev 32121)
+++ gnunet/src/transport/gnunet-service-transport_plugins.h     2014-01-30 
16:58:20 UTC (rev 32122)
@@ -48,6 +48,8 @@
  */
 void
 GST_plugins_load (GNUNET_TRANSPORT_PluginReceiveCallback recv_cb,
+                  GNUNET_TRANSPORT_RegisterQuotaNotification register_quota_cb,
+                  GNUNET_TRANSPORT_UnregisterQuotaNotification 
unregister_quota_cb,
                   GNUNET_TRANSPORT_AddressNotification address_cb,
                   GNUNET_TRANSPORT_SessionStart session_start_cb,
                   GNUNET_TRANSPORT_SessionEnd session_end_cb,

Modified: gnunet/src/transport/plugin_transport_tcp.c
===================================================================
--- gnunet/src/transport/plugin_transport_tcp.c 2014-01-30 15:59:28 UTC (rev 
32121)
+++ gnunet/src/transport/plugin_transport_tcp.c 2014-01-30 16:58:20 UTC (rev 
32122)
@@ -779,6 +779,8 @@
     GNUNET_SERVER_notify_transmit_ready_cancel (session->transmit_handle);
     session->transmit_handle = NULL;
   }
+  plugin->env->unregister_quota_notification (plugin->env->cls,
+      &session->target, PLUGIN_NAME, session);
   session->plugin->env->session_end (session->plugin->env->cls,
       &session->target, session);
 
@@ -929,6 +931,8 @@
     GNUNET_STATISTICS_update (plugin->env->stats,
         gettext_noop ("# TCP sessions active"), 1, GNUNET_NO);
   }
+  plugin->env->register_quota_notification (plugin->env->cls,
+      &address->peer, PLUGIN_NAME, session);
   session->timeout_task = GNUNET_SCHEDULER_add_delayed (
       GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session);
   return session;
@@ -1323,6 +1327,41 @@
 }
 
 /**
+ * Task to signal the server that we can continue
+ * receiving from the TCP client now.
+ *
+ * @param cls the `struct Session*`
+ * @param tc task context (unused)
+ */
+static void
+delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Session *session = cls;
+
+  session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
+  reschedule_session_timeout (session);
+
+  GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
+}
+
+static void tcp_plugin_update_inbound_delay (void *cls,
+                                      const struct GNUNET_PeerIdentity *peer,
+                                      struct Session *session,
+                                      struct GNUNET_TIME_Relative delay)
+{
+  if (GNUNET_SCHEDULER_NO_TASK == session->receive_delay_task)
+    return;
+
+  LOG(GNUNET_ERROR_TYPE_DEBUG,
+      "New inbound delay %llu us\n",delay.rel_value_us);
+
+  GNUNET_SCHEDULER_cancel (session->receive_delay_task);
+  session->receive_delay_task = GNUNET_SCHEDULER_add_delayed (delay,
+      &delayed_done, session);
+}
+
+
+/**
  * Create a new session to transmit data to the target
  * This session will used to send data to this peer and the plugin will
  * notify us by calling the env->session_end function
@@ -2102,24 +2141,7 @@
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
-/**
- * Task to signal the server that we can continue
- * receiving from the TCP client now.
- *
- * @param cls the `struct Session*`
- * @param tc task context (unused)
- */
-static void
-delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct Session *session = cls;
 
-  session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
-  reschedule_session_timeout (session);
-
-  GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
-}
-
 /**
  * We've received data for this peer via TCP.  Unbox,
  * compute latency and forward.
@@ -2479,6 +2501,7 @@
   api->string_to_address = &tcp_string_to_address;
   api->get_network = &tcp_get_network;
   api->update_session_timeout = &tcp_plugin_update_session_timeout;
+  api->update_inbound_delay = &tcp_plugin_update_inbound_delay;
   plugin->service = service;
   if (NULL != service)
   {

Modified: gnunet/src/transport/transport_api.c
===================================================================
--- gnunet/src/transport/transport_api.c        2014-01-30 15:59:28 UTC (rev 
32121)
+++ gnunet/src/transport/transport_api.c        2014-01-30 16:58:20 UTC (rev 
32122)
@@ -430,6 +430,8 @@
 
   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
       n->th->notify_size + n->traffic_overhead);
+  LOG(GNUNET_ERROR_TYPE_DEBUG,
+      "New outbound delay %llu us\n",delay.rel_value_us);
   GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
       n->hn, delay.rel_value_us);
   schedule_transmission (n->h);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]