[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33799 - gnunet/src/transport
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33799 - gnunet/src/transport |
Date: |
Mon, 23 Jun 2014 21:58:56 +0200 |
Author: grothoff
Date: 2014-06-23 21:58:56 +0200 (Mon, 23 Jun 2014)
New Revision: 33799
Modified:
gnunet/src/transport/plugin_transport_http_client.c
Log:
-add support for 'update_inbound_delay' to HTTP client, complete plugin
monitoring implementation
Modified: gnunet/src/transport/plugin_transport_http_client.c
===================================================================
--- gnunet/src/transport/plugin_transport_http_client.c 2014-06-23 19:24:14 UTC
(rev 33798)
+++ gnunet/src/transport/plugin_transport_http_client.c 2014-06-23 19:58:56 UTC
(rev 33799)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2002-2013 Christian Grothoff (and other contributing authors)
+ (C) 2002-2014 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -22,6 +22,7 @@
* @file transport/plugin_transport_http_client.c
* @brief HTTP/S client transport plugin
* @author Matthias Wachs
+ * @author Christian Grothoff
*/
#if BUILD_HTTPS
@@ -237,7 +238,7 @@
uint32_t ats_address_network_type;
/**
- * Is the client PUT handle currently paused
+ * Is the client PUT handle currently paused?
*/
int put_paused;
@@ -394,9 +395,7 @@
info.is_inbound = GNUNET_SYSERR; /* hard to say */
info.num_msg_pending = session->msgs_in_queue;
info.num_bytes_pending = session->bytes_in_queue;
- /* info.receive_delay remains zero as this is not supported by UDP
- (cannot selectively not receive from 'some' peer while continuing
- to receive from others) */
+ info.receive_delay = session->next_receive;
info.session_timeout = session->timeout;
info.address = session->address;
plugin->sic (plugin->sic_cls,
@@ -406,11 +405,16 @@
/**
- * Increment session timeout due to activity for a session
+ * Increment session timeout due to activity for session @a s.
+ *
* @param s the session
*/
static void
-client_reschedule_session_timeout (struct Session *s);
+client_reschedule_session_timeout (struct Session *s)
+{
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
+ s->timeout = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+}
/**
@@ -564,7 +568,9 @@
GNUNET_STATISTICS_update (plugin->env->stats,
stat_txt, msgbuf_size, GNUNET_NO);
GNUNET_free (stat_txt);
-
+ notify_session_monitor (plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
if (GNUNET_YES == s->put_tmp_disconnecting)
{
/* PUT connection is currently getting disconnected */
@@ -597,12 +603,10 @@
s->put_tmp_disconnected = GNUNET_NO;
GNUNET_break (s->client_put == NULL);
if (GNUNET_SYSERR == client_connect_put (s))
- {
return GNUNET_SYSERR;
- }
}
-
- client_schedule (s->plugin, GNUNET_YES);
+ client_schedule (s->plugin,
+ GNUNET_YES);
return msgbuf_size;
}
@@ -655,7 +659,11 @@
s->overhead = 0;
GNUNET_free (pos);
}
-
+ GNUNET_assert (0 == s->msgs_in_queue);
+ GNUNET_assert (0 == s->bytes_in_queue);
+ notify_session_monitor (plugin,
+ s,
+ GNUNET_TRANSPORT_SS_DOWN);
if (NULL != s->msg_tk)
{
GNUNET_SERVER_mst_destroy (s->msg_tk);
@@ -675,8 +683,8 @@
* @return #GNUNET_OK on success, #GNUNET_SYSERR on error
*/
static int
-http_client_session_disconnect (void *cls,
- struct Session *s)
+http_client_plugin_session_disconnect (void *cls,
+ struct Session *s)
{
struct HTTP_Client_Plugin *plugin = cls;
struct HTTP_Message *msg;
@@ -688,7 +696,9 @@
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Session %p/connection %p: disconnecting PUT connection to peer
`%s'\n",
- s, s->client_put, GNUNET_i2s (&s->target));
+ s,
+ s->client_put,
+ GNUNET_i2s (&s->target));
/* remove curl handle from multi handle */
mret = curl_multi_remove_handle (plugin->curl_multi_handle, s->client_put);
@@ -801,7 +811,7 @@
struct HTTP_Client_Plugin *plugin = cls;
struct Session *session = value;
- http_client_session_disconnect (plugin, session);
+ http_client_plugin_session_disconnect (plugin, session);
return GNUNET_OK;
}
@@ -815,8 +825,8 @@
* @param target peer from which to disconnect
*/
static void
-http_client_peer_disconnect (void *cls,
- const struct GNUNET_PeerIdentity *target)
+http_client_plugin_peer_disconnect (void *cls,
+ const struct GNUNET_PeerIdentity *target)
{
struct HTTP_Client_Plugin *plugin = cls;
@@ -896,7 +906,12 @@
/**
- * FIXME.
+ * When we have nothing to transmit, we pause the HTTP PUT
+ * after a while (so that gnurl stops asking). This task
+ * is the delayed task that actually pauses the PUT.
+ *
+ * @param cls the `struct Session *` with the put
+ * @param tc scheduler context
*/
static void
client_put_disconnect (void *cls,
@@ -952,7 +967,9 @@
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Session %p/connection %p: nothing to send, suspending\n",
s, s->client_put);
- s->put_disconnect_task = GNUNET_SCHEDULER_add_delayed
(PUT_DISCONNECT_TIMEOUT, &client_put_disconnect, s);
+ s->put_disconnect_task = GNUNET_SCHEDULER_add_delayed
(PUT_DISCONNECT_TIMEOUT,
+
&client_put_disconnect,
+ s);
s->put_paused = GNUNET_YES;
return CURL_READFUNC_PAUSE;
}
@@ -985,6 +1002,9 @@
s->overhead = 0;
GNUNET_free (msg);
}
+ notify_session_monitor (plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
GNUNET_asprintf (&stat_txt,
"# bytes currently in %s_client buffers",
plugin->protocol);
@@ -1024,10 +1044,19 @@
"Session %p/connection %p: Waking up GET handle\n",
s,
s->client_get);
- s->put_paused = GNUNET_NO;
+ if (GNUNET_YES == s->put_paused)
+ {
+ /* PUT connection was paused, unpause */
+ GNUNET_assert (s->put_disconnect_task != GNUNET_SCHEDULER_NO_TASK);
+ GNUNET_SCHEDULER_cancel (s->put_disconnect_task);
+ s->put_disconnect_task = GNUNET_SCHEDULER_NO_TASK;
+ s->put_paused = GNUNET_NO;
+ if (NULL != s->client_put)
+ curl_easy_pause (s->client_put, CURLPAUSE_CONT);
+ }
if (NULL != s->client_get)
- curl_easy_pause (s->client_get, CURLPAUSE_CONT);
-
+ curl_easy_pause (s->client_get,
+ CURLPAUSE_CONT);
}
@@ -1055,7 +1084,10 @@
atsi.value = s->ats_address_network_type;
GNUNET_break (s->ats_address_network_type != ntohl
(GNUNET_ATS_NET_UNSPECIFIED));
- delay = s->plugin->env->receive (plugin->env->cls, s->address, s, message);
+ delay = s->plugin->env->receive (plugin->env->cls,
+ s->address,
+ s,
+ message);
plugin->env->update_address_metrics (plugin->env->cls,
s->address, s,
&atsi, 1);
@@ -1064,12 +1096,12 @@
"# bytes received via %s_client",
plugin->protocol);
GNUNET_STATISTICS_update (plugin->env->stats,
- stat_txt, ntohs(message->size), GNUNET_NO);
+ stat_txt,
+ ntohs (message->size),
+ GNUNET_NO);
GNUNET_free (stat_txt);
- s->next_receive =
- GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), delay);
-
+ s->next_receive = GNUNET_TIME_relative_to_absolute (delay);
if (GNUNET_TIME_absolute_get ().abs_value_us < s->next_receive.abs_value_us)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1117,7 +1149,10 @@
* @return bytes read from stream
*/
static size_t
-client_receive (void *stream, size_t size, size_t nmemb, void *cls)
+client_receive (void *stream,
+ size_t size,
+ size_t nmemb,
+ void *cls)
{
struct Session *s = cls;
struct GNUNET_TIME_Absolute now;
@@ -1131,11 +1166,13 @@
if (now.abs_value_us < s->next_receive.abs_value_us)
{
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
- struct GNUNET_TIME_Relative delta =
- GNUNET_TIME_absolute_get_difference (now, s->next_receive);
+ struct GNUNET_TIME_Relative delta
+ = GNUNET_TIME_absolute_get_difference (now, s->next_receive);
+
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Session %p / connection %p: No inbound bandwidth available! Next
read was delayed for %s\n",
- s, s->client_get,
+ s,
+ s->client_get,
GNUNET_STRINGS_relative_time_to_string (delta,
GNUNET_YES));
if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
@@ -1143,13 +1180,21 @@
GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
}
- s->recv_wakeup_task =
- GNUNET_SCHEDULER_add_delayed (delta, &client_wake_up, s);
+ s->recv_wakeup_task
+ = GNUNET_SCHEDULER_add_delayed (delta,
+ &client_wake_up,
+ s);
return CURL_WRITEFUNC_PAUSE;
}
if (NULL == s->msg_tk)
- s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s);
- GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, len, GNUNET_NO, GNUNET_NO);
+ s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb,
+ s);
+ GNUNET_SERVER_mst_receive (s->msg_tk,
+ s,
+ stream,
+ len,
+ GNUNET_NO,
+ GNUNET_NO);
return len;
}
@@ -1161,7 +1206,8 @@
* @param tc gnunet scheduler task context
*/
static void
-client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+client_run (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
/**
@@ -1172,7 +1218,8 @@
* @return #GNUNET_SYSERR for hard failure, #GNUNET_OK for ok
*/
static int
-client_schedule (struct HTTP_Client_Plugin *plugin, int now)
+client_schedule (struct HTTP_Client_Plugin *plugin,
+ int now)
{
fd_set rs;
fd_set ws;
@@ -1212,7 +1259,8 @@
if (mret != CURLM_OK)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("%s failed at %s:%d: `%s'\n"),
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("%s failed at %s:%d: `%s'\n"),
"curl_multi_timeout", __FILE__, __LINE__,
curl_multi_strerror (mret));
return GNUNET_SYSERR;
@@ -1237,18 +1285,17 @@
* Task performing curl operations
*
* @param cls plugin as closure
- * @param tc gnunet scheduler task context
+ * @param tc scheduler task context
*/
static void
-client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+client_run (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct HTTP_Client_Plugin *plugin = cls;
int running;
long http_statuscode;
CURLMcode mret;
- GNUNET_assert (cls != NULL);
-
plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
@@ -1267,7 +1314,7 @@
struct Session *s = NULL;
char *d = (char *) s;
- if (easy_h == NULL)
+ if (NULL == easy_h)
{
GNUNET_break (0);
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1353,7 +1400,7 @@
/* Disconnect other transmission direction and tell transport */
s->get.easyhandle = NULL;
s->get.s = NULL;
- http_client_session_disconnect (plugin, s);
+ http_client_plugin_session_disconnect (plugin, s);
}
}
}
@@ -1568,7 +1615,6 @@
static int
client_connect (struct Session *s)
{
-
struct HTTP_Client_Plugin *plugin = s->plugin;
int res = GNUNET_OK;
@@ -1584,8 +1630,9 @@
}
GNUNET_asprintf (&s->url, "%s/%s;%u",
- http_common_plugin_address_to_url (NULL, s->address->address,
- s->address->address_length),
+ http_common_plugin_address_to_url (NULL,
+ s->address->address,
+
s->address->address_length),
GNUNET_i2s_full (plugin->env->my_identity),
plugin->last_tag);
@@ -1613,7 +1660,6 @@
HTTP_STAT_STR_CONNECTIONS,
plugin->cur_connections,
GNUNET_NO);
-
/* Re-schedule since handles have changed */
if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK)
{
@@ -1633,8 +1679,8 @@
* @return the network type
*/
static enum GNUNET_ATS_Network_Type
-http_client_get_network (void *cls,
- struct Session *session)
+http_client_plugin_get_network (void *cls,
+ struct Session *session)
{
return ntohl (session->ats_address_network_type);
}
@@ -1673,7 +1719,7 @@
GNUNET_STRINGS_relative_time_to_string (HTTP_CLIENT_SESSION_TIMEOUT,
GNUNET_YES));
GNUNET_assert (GNUNET_OK ==
- http_client_session_disconnect (s->plugin,
+ http_client_plugin_session_disconnect (s->plugin,
s));
}
@@ -1732,7 +1778,6 @@
salen = sizeof (struct sockaddr_in6);
}
ats = plugin->env->get_address_type (plugin->env->cls, sa, salen);
- //fprintf (stderr, "Address %s is in %s\n", GNUNET_a2s (sa,salen),
GNUNET_ATS_print_network_type(ntohl(ats.value)));
GNUNET_free (sa);
}
else if (GNUNET_NO == res)
@@ -1783,6 +1828,9 @@
client_delete_session (s);
return NULL;
}
+ notify_session_monitor (plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP); /* or handshake? */
return s;
}
@@ -1811,19 +1859,6 @@
/**
- * Increment session timeout due to activity for session @a s.
- *
- * @param s the session
- */
-static void
-client_reschedule_session_timeout (struct Session *s)
-{
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
- s->timeout = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-}
-
-
-/**
* Another peer has suggested an address for this
* peer and transport plugin. Check that this could be a valid
* address. If so, consider adding it to the list
@@ -1907,9 +1942,11 @@
/* Optional parameters */
- if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg,
- plugin->name,
- "MAX_CONNECTIONS", &max_connections))
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg,
+ plugin->name,
+ "MAX_CONNECTIONS",
+ &max_connections))
max_connections = 128;
plugin->max_connections = max_connections;
@@ -1990,14 +2027,15 @@
}
/* proxy http tunneling */
- if (GNUNET_SYSERR == (plugin->proxy_use_httpproxytunnel =
GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg,
- plugin->name, "PROXY_HTTP_TUNNELING")))
+ plugin->proxy_use_httpproxytunnel
+ = GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg,
+ plugin->name,
+ "PROXY_HTTP_TUNNELING");
+ if (GNUNET_SYSERR == plugin->proxy_use_httpproxytunnel)
plugin->proxy_use_httpproxytunnel = GNUNET_NO;
GNUNET_free_non_null (proxy_type);
}
-
-
return GNUNET_OK;
}
@@ -2046,6 +2084,38 @@
/**
+ * Function that will be called whenever the transport service wants to
+ * notify the plugin that the inbound quota changed and that the plugin
+ * should update it's delay for the next receive value
+ *
+ * @param cls closure
+ * @param peer which peer was the session for
+ * @param session which session is being updated
+ * @param delay new delay to use for receiving
+ */
+static void
+http_client_plugin_update_inbound_delay (void *cls,
+ const struct GNUNET_PeerIdentity
*peer,
+ struct Session *s,
+ struct GNUNET_TIME_Relative delay)
+{
+ s->next_receive = GNUNET_TIME_relative_to_absolute (delay);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "New inbound delay %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_NO));
+ if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+ s->recv_wakeup_task
+ = GNUNET_SCHEDULER_add_delayed (delay,
+ &client_wake_up,
+ s);
+ }
+}
+
+
+/**
* Return information about the given session to the
* monitor callback.
*
@@ -2131,16 +2201,17 @@
api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
api->cls = plugin;
api->send = &http_client_plugin_send;
- api->disconnect_session = &http_client_session_disconnect;
+ api->disconnect_session = &http_client_plugin_session_disconnect;
api->query_keepalive_factor = &http_client_query_keepalive_factor;
- api->disconnect_peer = &http_client_peer_disconnect;
+ api->disconnect_peer = &http_client_plugin_peer_disconnect;
api->check_address = &http_client_plugin_address_suggested;
api->get_session = &http_client_plugin_get_session;
api->address_to_string = &http_client_plugin_address_to_string;
api->string_to_address = &http_common_plugin_string_to_address;
api->address_pretty_printer = &http_common_plugin_address_pretty_printer;
- api->get_network = &http_client_get_network;
+ api->get_network = &http_client_plugin_get_network;
api->update_session_timeout = &http_client_plugin_update_session_timeout;
+ api->update_inbound_delay = &http_client_plugin_update_inbound_delay;
api->setup_monitor = &http_client_plugin_setup_monitor;
#if BUILD_HTTPS
plugin->name = "transport-https_client";
@@ -2150,7 +2221,6 @@
plugin->protocol = "http";
#endif
plugin->last_tag = 1;
- plugin->options = 0; /* Setup options */
if (GNUNET_SYSERR == client_configure_plugin (plugin))
{
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33799 - gnunet/src/transport,
gnunet <=