[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r32122 - in gnunet/src: include transport
From: |
gnunet |
Subject: |
[GNUnet-SVN] r32122 - in gnunet/src: include transport |
Date: |
Thu, 30 Jan 2014 17:58:20 +0100 |
Author: wachs
Date: 2014-01-30 17:58:20 +0100 (Thu, 30 Jan 2014)
New Revision: 32122
Modified:
gnunet/src/include/gnunet_transport_plugin.h
gnunet/src/transport/gnunet-service-transport.c
gnunet/src/transport/gnunet-service-transport_neighbours.c
gnunet/src/transport/gnunet-service-transport_neighbours.h
gnunet/src/transport/gnunet-service-transport_plugins.c
gnunet/src/transport/gnunet-service-transport_plugins.h
gnunet/src/transport/plugin_transport_tcp.c
gnunet/src/transport/transport_api.c
Log:
send receive delay rescheduling support
Modified: gnunet/src/include/gnunet_transport_plugin.h
===================================================================
--- gnunet/src/include/gnunet_transport_plugin.h 2014-01-30 15:59:28 UTC
(rev 32121)
+++ gnunet/src/include/gnunet_transport_plugin.h 2014-01-30 16:58:20 UTC
(rev 32122)
@@ -194,7 +194,16 @@
const struct GNUNET_PeerIdentity *peer,
size_t amount_recved);
+typedef void
+(*GNUNET_TRANSPORT_RegisterQuotaNotification) (void *cls,
+ const struct GNUNET_PeerIdentity
*peer,
+ const char *plugin,
+ struct Session *session);
+typedef void
+(*GNUNET_TRANSPORT_UnregisterQuotaNotification) (void *cls,
+ const struct GNUNET_PeerIdentity *peer, const char *plugin, struct Session
*session);
+
/**
* Function that returns a HELLO message.
*/
@@ -275,7 +284,10 @@
*/
GNUNET_TRANSPORT_UpdateAddressMetrics update_address_metrics;
+ GNUNET_TRANSPORT_RegisterQuotaNotification register_quota_notification;
+ GNUNET_TRANSPORT_UnregisterQuotaNotification unregister_quota_notification;
+
/**
* What is the maximum number of connections that this transport
* should allow? Transports that do not have sessions (such as
@@ -484,6 +496,14 @@
const struct GNUNET_PeerIdentity
*peer,
struct Session *session);
+
+
+typedef void
+(*GNUNET_TRANSPORT_UpdateInboundDelay) (void *cls,
+ const struct GNUNET_PeerIdentity
*peer,
+ struct Session *session,
+ struct GNUNET_TIME_Relative delay);
+
/**
* Function called for a quick conversion of the binary address to
* a numeric address. Note that the caller must not free the
@@ -575,6 +595,8 @@
*/
GNUNET_TRANSPORT_UpdateSessionTimeout update_session_timeout;
+ GNUNET_TRANSPORT_UpdateInboundDelay update_inbound_delay;
+
/**
* Function that will be called whenever the transport service wants to
* notify the plugin that the inbound quota changed and that the plugin
Modified: gnunet/src/transport/gnunet-service-transport.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport.c 2014-01-30 15:59:28 UTC
(rev 32121)
+++ gnunet/src/transport/gnunet-service-transport.c 2014-01-30 16:58:20 UTC
(rev 32122)
@@ -934,6 +934,8 @@
NULL );
GST_manipulation_init (GST_cfg);
GST_plugins_load (&GST_manipulation_recv,
+ &GST_neighbours_register_quota_notification,
+ &GST_neighbours_unregister_quota_notification,
&plugin_env_address_change_notification,
&plugin_env_session_start,
&plugin_env_session_end,
Modified: gnunet/src/transport/gnunet-service-transport_neighbours.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport_neighbours.c 2014-01-30
15:59:28 UTC (rev 32121)
+++ gnunet/src/transport/gnunet-service-transport_neighbours.c 2014-01-30
16:58:20 UTC (rev 32122)
@@ -508,6 +508,8 @@
static GNUNET_SCHEDULER_TaskIdentifier util_transmission_tk;
+static struct GNUNET_CONTAINER_MultiPeerMap *registered_quota_notifications;
+
/**
* Lookup a neighbour entry in the neighbours hash map.
*
@@ -1689,13 +1691,152 @@
}
+struct QuotaNotificationRequest
+{
+ struct GNUNET_PeerIdentity peer;
+ struct Session *session;
+ char *plugin;
+};
+
+struct QNR_LookContext
+{
+ struct GNUNET_PeerIdentity peer;
+ struct Session *session;
+ const char *plugin;
+
+ struct QuotaNotificationRequest *res;
+};
+
+static int
+find_notification_request (void *cls, const struct GNUNET_PeerIdentity *key,
void *value)
+{
+ struct QNR_LookContext *qnr_ctx = cls;
+ struct QuotaNotificationRequest *qnr = value;
+
+ if ((qnr->session == qnr_ctx->session) &&
+ (0 == memcmp (&qnr->peer, &qnr_ctx->peer, sizeof (struct
GNUNET_PeerIdentity))) &&
+ (0 == strcmp(qnr_ctx->plugin, qnr->plugin)))
+ {
+ qnr_ctx->res = value;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+void
+GST_neighbours_register_quota_notification(void *cls,
+ const struct GNUNET_PeerIdentity *peer, const char *plugin,
+ struct Session *session)
+{
+ struct QuotaNotificationRequest *qnr;
+ struct QNR_LookContext qnr_ctx;
+
+ qnr_ctx.peer = (*peer);
+ qnr_ctx.plugin = plugin;
+ qnr_ctx.session = session;
+ qnr_ctx.res = NULL;
+ int res;
+
+ res = GNUNET_CONTAINER_multipeermap_get_multiple
(registered_quota_notifications,
+ peer, &find_notification_request, &qnr_ctx);
+ if (NULL != qnr_ctx.res)
+ {
+ GNUNET_break(0);
+ return;
+ }
+
+ qnr = GNUNET_new (struct QuotaNotificationRequest);
+ qnr->peer = (*peer);
+ qnr->plugin = GNUNET_strdup (plugin);
+ qnr->session = session;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding notification for peer `%s' plugin `%s' session %p \n",
+ GNUNET_i2s (peer), plugin, session);
+
+ GNUNET_CONTAINER_multipeermap_put (registered_quota_notifications, peer,
+ qnr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+}
+
+
+void
+GST_neighbours_unregister_quota_notification(void *cls,
+ const struct GNUNET_PeerIdentity *peer, const char *plugin, struct Session
*session)
+{
+ struct QNR_LookContext qnr_ctx;
+ qnr_ctx.peer = (*peer);
+ qnr_ctx.plugin = plugin;
+ qnr_ctx.session = session;
+ qnr_ctx.res = NULL;
+ int res;
+
+ res = GNUNET_CONTAINER_multipeermap_iterate (registered_quota_notifications,
+ &find_notification_request, &qnr_ctx);
+ if (NULL == qnr_ctx.res)
+ {
+ GNUNET_break(0);
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Removing notification for peer `%s' plugin `%s' session %p \n",
+ GNUNET_i2s (peer), plugin, session);
+
+ GNUNET_CONTAINER_multipeermap_remove (registered_quota_notifications, peer,
+ qnr_ctx.res);
+ GNUNET_free (qnr_ctx.res->plugin);
+ GNUNET_free (qnr_ctx.res);
+}
+
+static int
+notification_cb(void *cls, const struct GNUNET_PeerIdentity *key, void *value)
+{
+ struct NeighbourMapEntry *n = cls;
+ struct QuotaNotificationRequest *qnr = value;
+ struct GNUNET_TRANSPORT_PluginFunctions *papi;
+ struct GNUNET_TIME_Relative delay;
+ int do_forward;
+
+ papi = GST_plugins_find(qnr->plugin);
+ if (NULL == papi)
+ {
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+
+ delay = GST_neighbours_calculate_receive_delay (key, 0, &do_forward);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "New inbound delay for peer `%s' is %llu ms\n", GNUNET_i2s (key),
+ delay.rel_value_us / 1000);
+
+ if (NULL != papi->update_inbound_delay)
+ papi->update_inbound_delay (papi->cls, key, qnr->session, delay);
+ return GNUNET_OK;
+}
+
+static
+int
+free_notification_cb(void *cls, const struct GNUNET_PeerIdentity *key,
+ void *value)
+{
+ struct NeighbourMapEntry *n = cls;
+ struct QuotaNotificationRequest *qnr = value;
+
+ GNUNET_CONTAINER_multipeermap_remove (registered_quota_notifications, key,
+ qnr);
+ GNUNET_free(qnr);
+
+ return GNUNET_OK;
+}
+
static void
-inbound_bw_tracker_update (void *cls)
+inbound_bw_tracker_update(void *cls)
{
- struct Neighbour *n = cls;
+ struct NeighbourMapEntry *n = cls;
/* Quota was updated, tell plugins to update the time to receive next */
-
+ GNUNET_CONTAINER_multipeermap_get_multiple (registered_quota_notifications,
+ &n->id, ¬ification_cb, n);
}
@@ -3655,6 +3796,7 @@
disconnect_notify_cb = disconnect_cb;
neighbour_change_cb = peer_address_cb;
neighbours = GNUNET_CONTAINER_multipeermap_create (NEIGHBOUR_TABLE_SIZE,
GNUNET_NO);
+ registered_quota_notifications = GNUNET_CONTAINER_multipeermap_create
(NEIGHBOUR_TABLE_SIZE, GNUNET_NO);
util_transmission_tk = GNUNET_SCHEDULER_add_delayed
(UTIL_TRANSMISSION_INTERVAL,
utilization_transmission, NULL);
}
@@ -3723,6 +3865,10 @@
GNUNET_free (cur);
}
+ GNUNET_CONTAINER_multipeermap_iterate (registered_quota_notifications,
+ &free_notification_cb, NULL);
+ GNUNET_CONTAINER_multipeermap_destroy (registered_quota_notifications);
+
neighbours = NULL;
callback_cls = NULL;
connect_notify_cb = NULL;
Modified: gnunet/src/transport/gnunet-service-transport_neighbours.h
===================================================================
--- gnunet/src/transport/gnunet-service-transport_neighbours.h 2014-01-30
15:59:28 UTC (rev 32121)
+++ gnunet/src/transport/gnunet-service-transport_neighbours.h 2014-01-30
16:58:20 UTC (rev 32122)
@@ -107,7 +107,16 @@
size_t msg_size, struct GNUNET_TIME_Relative timeout,
GST_NeighbourSendContinuation cont, void *cont_cls);
+void
+GST_neighbours_register_quota_notification (void *cls,
+ const struct GNUNET_PeerIdentity
*peer,
+ const char *plugin,
+ struct Session *session);
+void
+GST_neighbours_unregister_quota_notification(void *cls,
+ const struct GNUNET_PeerIdentity *peer, const char *plugin, struct Session
*session);
+
/**
* We have received a message from the given sender.
* How long should we delay before receiving more?
Modified: gnunet/src/transport/gnunet-service-transport_plugins.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport_plugins.c 2014-01-30
15:59:28 UTC (rev 32121)
+++ gnunet/src/transport/gnunet-service-transport_plugins.c 2014-01-30
16:58:20 UTC (rev 32122)
@@ -94,6 +94,8 @@
*/
void
GST_plugins_load (GNUNET_TRANSPORT_PluginReceiveCallback recv_cb,
+ GNUNET_TRANSPORT_RegisterQuotaNotification register_quota_cb,
+ GNUNET_TRANSPORT_UnregisterQuotaNotification
unregister_quota_cb,
GNUNET_TRANSPORT_AddressNotification address_cb,
GNUNET_TRANSPORT_SessionStart session_start_cb,
GNUNET_TRANSPORT_SessionEnd session_end_cb,
@@ -142,6 +144,8 @@
plug->env.session_end = session_end_cb;
plug->env.get_address_type = address_type_cb;
plug->env.update_address_metrics = metric_update_cb;
+ plug->env.register_quota_notification = register_quota_cb;
+ plug->env.unregister_quota_notification = unregister_quota_cb;
plug->env.max_connections = tneigh;
plug->env.stats = GST_stats;
GNUNET_CONTAINER_DLL_insert (plugins_head, plugins_tail, plug);
Modified: gnunet/src/transport/gnunet-service-transport_plugins.h
===================================================================
--- gnunet/src/transport/gnunet-service-transport_plugins.h 2014-01-30
15:59:28 UTC (rev 32121)
+++ gnunet/src/transport/gnunet-service-transport_plugins.h 2014-01-30
16:58:20 UTC (rev 32122)
@@ -48,6 +48,8 @@
*/
void
GST_plugins_load (GNUNET_TRANSPORT_PluginReceiveCallback recv_cb,
+ GNUNET_TRANSPORT_RegisterQuotaNotification register_quota_cb,
+ GNUNET_TRANSPORT_UnregisterQuotaNotification
unregister_quota_cb,
GNUNET_TRANSPORT_AddressNotification address_cb,
GNUNET_TRANSPORT_SessionStart session_start_cb,
GNUNET_TRANSPORT_SessionEnd session_end_cb,
Modified: gnunet/src/transport/plugin_transport_tcp.c
===================================================================
--- gnunet/src/transport/plugin_transport_tcp.c 2014-01-30 15:59:28 UTC (rev
32121)
+++ gnunet/src/transport/plugin_transport_tcp.c 2014-01-30 16:58:20 UTC (rev
32122)
@@ -779,6 +779,8 @@
GNUNET_SERVER_notify_transmit_ready_cancel (session->transmit_handle);
session->transmit_handle = NULL;
}
+ plugin->env->unregister_quota_notification (plugin->env->cls,
+ &session->target, PLUGIN_NAME, session);
session->plugin->env->session_end (session->plugin->env->cls,
&session->target, session);
@@ -929,6 +931,8 @@
GNUNET_STATISTICS_update (plugin->env->stats,
gettext_noop ("# TCP sessions active"), 1, GNUNET_NO);
}
+ plugin->env->register_quota_notification (plugin->env->cls,
+ &address->peer, PLUGIN_NAME, session);
session->timeout_task = GNUNET_SCHEDULER_add_delayed (
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session);
return session;
@@ -1323,6 +1327,41 @@
}
/**
+ * Task to signal the server that we can continue
+ * receiving from the TCP client now.
+ *
+ * @param cls the `struct Session*`
+ * @param tc task context (unused)
+ */
+static void
+delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Session *session = cls;
+
+ session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ reschedule_session_timeout (session);
+
+ GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
+}
+
+static void tcp_plugin_update_inbound_delay (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ struct Session *session,
+ struct GNUNET_TIME_Relative delay)
+{
+ if (GNUNET_SCHEDULER_NO_TASK == session->receive_delay_task)
+ return;
+
+ LOG(GNUNET_ERROR_TYPE_DEBUG,
+ "New inbound delay %llu us\n",delay.rel_value_us);
+
+ GNUNET_SCHEDULER_cancel (session->receive_delay_task);
+ session->receive_delay_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &delayed_done, session);
+}
+
+
+/**
* Create a new session to transmit data to the target
* This session will used to send data to this peer and the plugin will
* notify us by calling the env->session_end function
@@ -2102,24 +2141,7 @@
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
-/**
- * Task to signal the server that we can continue
- * receiving from the TCP client now.
- *
- * @param cls the `struct Session*`
- * @param tc task context (unused)
- */
-static void
-delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct Session *session = cls;
- session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
- reschedule_session_timeout (session);
-
- GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
-}
-
/**
* We've received data for this peer via TCP. Unbox,
* compute latency and forward.
@@ -2479,6 +2501,7 @@
api->string_to_address = &tcp_string_to_address;
api->get_network = &tcp_get_network;
api->update_session_timeout = &tcp_plugin_update_session_timeout;
+ api->update_inbound_delay = &tcp_plugin_update_inbound_delay;
plugin->service = service;
if (NULL != service)
{
Modified: gnunet/src/transport/transport_api.c
===================================================================
--- gnunet/src/transport/transport_api.c 2014-01-30 15:59:28 UTC (rev
32121)
+++ gnunet/src/transport/transport_api.c 2014-01-30 16:58:20 UTC (rev
32122)
@@ -430,6 +430,8 @@
delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
n->th->notify_size + n->traffic_overhead);
+ LOG(GNUNET_ERROR_TYPE_DEBUG,
+ "New outbound delay %llu us\n",delay.rel_value_us);
GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
n->hn, delay.rel_value_us);
schedule_transmission (n->h);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r32122 - in gnunet/src: include transport,
gnunet <=