[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r5517 - GNUnet/src/transports
From: |
gnunet |
Subject: |
[GNUnet-SVN] r5517 - GNUnet/src/transports |
Date: |
Sun, 19 Aug 2007 02:50:53 -0600 (MDT) |
Author: grothoff
Date: 2007-08-19 02:50:43 -0600 (Sun, 19 Aug 2007)
New Revision: 5517
Modified:
GNUnet/src/transports/http.c
Log:
getting http to pass gnunet-transport-check
Modified: GNUnet/src/transports/http.c
===================================================================
--- GNUnet/src/transports/http.c 2007-08-19 08:48:40 UTC (rev 5516)
+++ GNUnet/src/transports/http.c 2007-08-19 08:50:43 UTC (rev 5517)
@@ -24,19 +24,7 @@
* @author Christian Grothoff
*
* TODO:
- * - connection timeout (shutdown inactive connections)
- * => CURL and MHD can help do this, we mostly need
- * to make sure we clean up properly...
- * - proper connection re-establishment (i.e., if a GET times out or
- * dies otherwise, we need to re-start the TSession if the
- * core wants to keep using it!)
- * - free resources allocated for PUT inside of CURL
- * select loop (as soon as PUT is complete)
- * - bound the number of concurrent PUTs for a given
- * connection (to 1 + urgent?)
- * - why does valgrind show "conditional jump depends on uninit values"
- * for curl_multi_perform?
- * - where does the 1s loopback-ping latency come from?
+ * - test GETs (gnunet-transport-check does NOT!)
*/
#include "gnunet_util.h"
@@ -52,10 +40,19 @@
#define DEBUG_HTTP NO
/**
- * after how much time of the core not being associated with a http
+ * Disable GET (for debugging only!). Must be YES
+ * in production use!
+ */
+#define DO_GET YES
+
+/**
+ * After how much time of the core not being associated with a http
* connection anymore do we close it?
+ *
+ * Needs to be larger than SECONDS_INACTIVE_DROP in
+ * core's connection.s
*/
-#define HTTP_TIMEOUT (30 * cronSECONDS)
+#define HTTP_TIMEOUT (600 * cronSECONDS)
/**
* Default maximum size of the HTTP read and write buffer.
@@ -63,6 +60,13 @@
#define HTTP_BUF_SIZE (64 * 1024)
/**
+ * Text of the response sent back after the last bytes of a PUT
+ * request have been received (just to formally obey the HTTP
+ * protocol).
+ */
+#define HTTP_PUT_RESPONSE "Thank you!"
+
+/**
* Host-Address in a HTTP network.
*/
typedef struct
@@ -84,93 +88,141 @@
} HostAddress;
+/**
+ * Client-side data per PUT request.
+ */
struct HTTPPutData
{
+ /**
+ * This is a linked list.
+ */
struct HTTPPutData *next;
+ /**
+ * Handle to our CURL request.
+ */
+ CURL *curl_put;
+
+ /**
+ * Last time we made progress with the PUT.
+ */
+ cron_t last_activity;
+
+ /**
+ * The message we are sending.
+ */
char *msg;
- CURL *curl_put;
-
+ /**
+ * Size of msg.
+ */
unsigned int size;
+ /**
+ * Current position in msg.
+ */
unsigned int pos;
+ /**
+ * Are we done sending? Set to 1 after we
+ * completed sending and started to receive
+ * a response ("Thank you!") or once the
+ * timeout has been reached.
+ */
int done;
};
/**
- * Transport Session handle.
+ * Server-side data per PUT request.
*/
-typedef struct
+struct MHDPutData
{
+ /**
+ * This is a linked list.
+ */
+ struct MHDPutData *next;
/**
- * mutex for synchronized access to struct
+ * MHD connection handle for this request.
*/
- struct MUTEX *lock;
+ struct MHD_Connection * session;
/**
- * Read buffer for the header.
+ * Last time we received data on this PUT
+ * connection.
*/
- char rbuff1[sizeof (MESSAGE_HEADER)];
+ cron_t last_activity;
/**
- * The read buffer (used only for the actual data).
+ * Read buffer for the header (from PUT)
*/
+ char rbuff1[sizeof (MESSAGE_HEADER)];
+
+ /**
+ * The read buffer (used only receiving PUT data).
+ */
char *rbuff2;
-
+
/**
- * The write buffer.
+ * Number of valid bytes in rbuff1
*/
- char *wbuff;
-
+ unsigned int rpos1;
+
/**
- * Last time this connection was used
+ * Number of valid bytes in rbuff2
*/
- cron_t lastUse;
+ unsigned int rpos2;
+
/**
- * To whom are we talking to (set to our identity
- * if we are still waiting for the welcome message)
+ * Size of the rbuff2 buffer.
*/
- PeerIdentity sender;
+ unsigned int rsize2;
/**
- * number of users of this session
+ * Should we sent a response for this PUT yet?
*/
- unsigned int users;
+ int ready;
/**
- * Number of valid bytes in rbuff1
+ * Have we sent a response for this PUT yet?
*/
- unsigned int rpos1;
+ int done;
+};
+
+/**
+ * Transport Session handle.
+ */
+typedef struct
+{
+
/**
- * Number of valid bytes in rbuff2
+ * TSession for this session.
*/
- unsigned int rpos2;
+ TSession * tsession;
/**
- * Current size of the read buffer rbuff2.
+ * mutex for synchronized access to struct
*/
- unsigned int rsize2;
+ struct MUTEX *lock;
/**
- * Current write position in wbuff
+ * Last time this connection was used
*/
- unsigned int woff;
+ cron_t lastUse;
/**
- * Number of valid bytes in wbuff (starting at woff)
+ * To whom are we talking to (set to our identity
+ * if we are still waiting for the welcome message)
*/
- unsigned int wpos;
+ PeerIdentity sender;
/**
- * Size of the write buffer.
+ * number of users of this session
*/
- unsigned int wsize;
+ unsigned int users;
/**
* Has this session been destroyed?
@@ -178,16 +230,13 @@
int destroyed;
/**
- * Are we client or server?
+ * Are we client or server? Determines which of the
+ * structs in the union below is being used for this
+ * connection!
*/
int is_client;
/**
- * TSession for this session.
- */
- TSession *tsession;
-
- /**
* Data maintained for the http client-server connection
* (depends on if we are client or server).
*/
@@ -196,23 +245,92 @@
struct
{
+ /**
+ * Active PUT requests (linked list).
+ */
+ struct MHDPutData * puts;
+#if DO_GET
/**
* GET session response handle
*/
- struct MHD_Response *get;
+ struct MHD_Response * get;
+ /**
+ * The write buffer (for sending GET response)
+ */
+ char * wbuff;
+
+ /**
+ * What was the last time we were able to
+ * transmit data using the current get handle?
+ */
+ cron_t last_get_activity;
+
+ /**
+ * Current write position in wbuff
+ */
+ unsigned int woff;
+
+ /**
+ * Number of valid bytes in wbuff (starting at woff)
+ */
+ unsigned int wpos;
+
+ /**
+ * Size of the write buffer.
+ */
+ unsigned int wsize;
+#endif
+
} server;
struct
{
/**
+ * Address of the other peer.
+ */
+ HostAddress address;
+
+#if DO_GET
+ /**
+ * Last time the GET was active.
+ */
+ cron_t last_get_activity;
+
+ /**
* GET operation
*/
CURL *get;
/**
+ * Read buffer for the header (from GET).
+ */
+ char rbuff1[sizeof (MESSAGE_HEADER)];
+
+ /**
+ * The read buffer (used only receiving GET data).
+ */
+ char *rbuff2;
+
+ /**
+ * Number of valid bytes in rbuff1
+ */
+ unsigned int rpos1;
+
+ /**
+ * Number of valid bytes in rbuff2
+ */
+ unsigned int rpos2;
+
+ /**
+ * Current size of the read buffer rbuff2.
+ */
+ unsigned int rsize2;
+#endif
+
+ /**
* URL of the get and put operations.
*/
char *url;
@@ -243,6 +361,8 @@
static int stat_bytesDropped;
+static int signal_pipe[2];
+
static char *proxy;
/**
@@ -270,8 +390,14 @@
*/
static TSession **tsessions;
+/**
+ * Number of valid entries in tsessions.
+ */
static unsigned int tsessionCount;
+/**
+ * Sie of the tsessions array.
+ */
static unsigned int tsessionArrayLength;
/**
@@ -284,7 +410,6 @@
*/
static UPnP_ServiceAPI *upnp;
-
/**
* Lock for access to mutable state of the module,
* that is the configuration and the tsessions array.
@@ -299,7 +424,17 @@
*/
static struct MUTEX *httplock;
+
/**
+ * Signal select thread that its selector
+ * set may have changed.
+ */
+static void signal_select() {
+ static char c;
+ write(signal_pipe[1], &c, sizeof(c));
+}
+
+/**
* Check if we are allowed to connect to the given IP.
*/
static int
@@ -319,13 +454,30 @@
}
else
{
+#if DEBUG_HTTP
+ GE_LOG(coreAPI->ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Rejecting HTTP connection\n");
+#endif
return MHD_NO;
}
MUTEX_LOCK (httplock);
ret = check_ipv4_listed (filteredNetworks_, ip);
MUTEX_UNLOCK (httplock);
- if (YES == ret)
- return MHD_NO;
+ if (YES == ret)
+ {
+#if DEBUG_HTTP
+ GE_LOG(coreAPI->ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Rejecting HTTP connection\n");
+#endif
+ return MHD_NO;
+ }
+#if DEBUG_HTTP
+ GE_LOG(coreAPI->ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Accepting HTTP connection\n");
+#endif
return MHD_YES;
}
@@ -335,6 +487,9 @@
* For the core, aquiration means to call associate or
* connect. The number of disconnects must match the
* number of calls to connect+associate.
+ *
+ * Sessions are actually discarded in cleanup_connections.
+ *
*
* @param tsession the session that is closed
* @return OK on success, SYSERR if the operation failed
@@ -343,10 +498,6 @@
httpDisconnect (TSession * tsession)
{
HTTPSession *httpsession = tsession->internal;
- struct HTTPPutData *pos;
- struct HTTPPutData *next;
- int i;
-
if (httpsession == NULL)
{
FREE (tsession);
@@ -354,13 +505,19 @@
}
MUTEX_LOCK (httpsession->lock);
httpsession->users--;
- if (httpsession->users > 0)
- {
- MUTEX_UNLOCK (httpsession->lock);
- return OK;
- }
- httpsession->destroyed = YES;
MUTEX_UNLOCK (httpsession->lock);
+ return OK;
+}
+
+static void
+destroy_tsession(TSession * tsession) {
+ HTTPSession *httpsession = tsession->internal;
+ struct HTTPPutData *pos;
+ struct HTTPPutData *next;
+ struct MHDPutData * mpos;
+ struct MHDPutData * mnext;
+ int i;
+
MUTEX_LOCK (httplock);
for (i = 0; i < tsessionCount; i++)
{
@@ -373,31 +530,52 @@
MUTEX_UNLOCK (httplock);
if (httpsession->is_client)
{
+#if DO_GET
curl_multi_remove_handle (curl_multi, httpsession->cs.client.get);
+ signal_select();
curl_easy_cleanup (httpsession->cs.client.get);
+ GROW (httpsession->cs.client.rbuff2,
+ httpsession->cs.client.rsize2, 0);
+#endif
FREE (httpsession->cs.client.url);
pos = httpsession->cs.client.puts;
while (pos != NULL)
{
next = pos->next;
curl_multi_remove_handle (curl_multi, pos->curl_put);
+ signal_select();
curl_easy_cleanup (pos->curl_put);
FREE (pos->msg);
FREE (pos);
pos = next;
}
-
}
else
{
- MHD_destroy_response (httpsession->cs.server.get);
+#if DO_GET
+ GROW (httpsession->cs.server.wbuff,
+ httpsession->cs.server.wsize, 0);
+ if (httpsession->cs.server.get != NULL) {
+ MHD_destroy_response (httpsession->cs.server.get);
+ httpsession->cs.server.get = NULL;
+ }
+#endif
+ mpos = httpsession->cs.server.puts;
+ /* this should be NULL already, but just
+ in case it is not, we free it anyway... */
+ while (mpos != NULL) {
+ mnext = mpos->next;
+ GROW(mpos->rbuff2,
+ mpos->rsize2,
+ 0);
+ FREE(mpos);
+ mpos = mnext;
+ }
+
}
- GROW (httpsession->rbuff2, httpsession->rsize2, 0);
- GROW (httpsession->wbuff, httpsession->wsize, 0);
MUTEX_DESTROY (httpsession->lock);
FREE (httpsession);
FREE (tsession);
- return OK;
}
/**
@@ -479,8 +657,10 @@
(ntohs (hello->protocol) != HTTP_PROTOCOL_NUMBER) ||
(MHD_NO == acceptPolicyCallback (NULL,
(const struct sockaddr *) haddr,
- sizeof (IPaddr))))
+ sizeof (IPaddr)))) {
+ GE_BREAK_OP(NULL, 0);
return SYSERR; /* obviously invalid */
+ }
return OK;
}
@@ -557,10 +737,22 @@
return i;
}
+#if DO_GET
+/**
+ * Callback for processing GET requests if our side is the
+ * MHD HTTP server.
+ *
+ * @param cls the HTTP session
+ * @param pos read-offset in the stream
+ * @param buf where to write the data
+ * @param max how much data to write (at most)
+ * @return number of bytes written, 0 is allowed!
+ */
static int
contentReaderCallback (void *cls, size_t pos, char *buf, int max)
{
HTTPSession *session = cls;
+ cron_t now;
MUTEX_LOCK (session->lock);
if (session->destroyed)
@@ -568,18 +760,28 @@
MUTEX_UNLOCK (session->lock);
return -1;
}
- if (session->wpos < max)
- max = session->wpos;
- memcpy (buf, &session->wbuff[session->woff], max);
- session->wpos -= max;
- session->woff += max;
- session->lastUse = get_time ();
- if (session->wpos == 0)
- session->woff = 0;
+ if (session->cs.server.wpos < max)
+ max = session->cs.server.wpos;
+ memcpy (buf, &session->cs.server.wbuff[session->cs.server.woff], max);
+ session->cs.server.wpos -= max;
+ session->cs.server.woff += max;
+ now = get_time ();
+ session->lastUse = now;
+ session->cs.server.last_get_activity = now;
+ if (session->cs.server.wpos == 0)
+ session->cs.server.woff = 0;
MUTEX_UNLOCK (session->lock);
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP returns %u bytes in MHD GET handler.\n",
+ max);
+#endif
return max;
}
+#endif
+#if DO_GET
/**
* Notification that libmicrohttpd no longer needs the
* response object.
@@ -587,15 +789,18 @@
static void
contentReaderFreeCallback (void *cls)
{
- HTTPSession *session = cls;
+ HTTPSession * session = cls;
- session->destroyed = YES;
+ GE_ASSERT(NULL, session->cs.server.get == NULL);
}
+#endif
/**
- * Create a new session for an inbound connection on the given
- * socket. Adds the session to the array of sessions watched
- * by the select thread.
+ * Process GET or PUT request received via MHD. For
+ * GET, queue response that will send back our pending
+ * messages. For PUT, process incoming data and send
+ * to GNUnet core. In either case, check if a session
+ * already exists and create a new one if not.
*/
static int
accessHandlerCallback (void *cls,
@@ -606,8 +811,9 @@
const char *upload_data,
unsigned int *upload_data_size)
{
- TSession *tsession;
- struct MHD_Response *response;
+ TSession * tsession;
+ struct MHD_Response * response;
+ struct MHDPutData * put;
HTTPSession *httpSession;
HashCode512 client;
int i;
@@ -617,123 +823,192 @@
unsigned int cpy;
unsigned int poff;
+ /* convert URL to sender peer id */
if ((strlen (url) < 2) || (OK != enc2hash (&url[1], &client)))
{
+ /* invalid request */
+ GE_BREAK_OP(NULL, 0);
return MHD_NO;
}
/* check if we already have a session for this */
+ httpSession = NULL;
MUTEX_LOCK (httplock);
for (i = 0; i < tsessionCount; i++)
{
tsession = tsessions[i];
httpSession = tsession->internal;
- if (0 == memcmp (&httpSession->sender, &client, sizeof (HashCode512)))
+ if ( (0 == memcmp (&httpSession->sender, &client, sizeof (HashCode512)))
&&
+ (httpSession->is_client == NO) )
break;
tsession = NULL;
httpSession = NULL;
}
- if (tsession != NULL)
- {
- MUTEX_LOCK (httpSession->lock);
- httpSession->users++;
- MUTEX_UNLOCK (httpSession->lock);
- }
MUTEX_UNLOCK (httplock);
+ /* create new session if necessary */
if (httpSession == NULL)
{
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/MHD creates new session for request from `%s'.\n",
+ &url[1]);
+#endif
httpSession = MALLOC (sizeof (HTTPSession));
memset (httpSession, 0, sizeof (HTTPSession));
- httpSession->sender = *(coreAPI->myIdentity);
+ httpSession->sender.hashPubKey = client;
httpSession->lock = MUTEX_CREATE (YES);
- httpSession->users = 1; /* us only, core has not seen this tsession! */
- httpSession->lastUse = get_time ();
+ httpSession->users = 0; /* nobody yet */
tsession = MALLOC (sizeof (TSession));
memset (tsession, 0, sizeof (TSession));
tsession->ttype = HTTP_PROTOCOL_NUMBER;
tsession->internal = httpSession;
- tsession->peer = *(coreAPI->myIdentity);
+ tsession->peer.hashPubKey = client;
httpSession->tsession = tsession;
addTSession (tsession);
}
- if (0 == strcmp ("GET", method))
+ MUTEX_LOCK (httpSession->lock);
+ httpSession->lastUse = get_time ();
+#if DO_GET
+ if (0 == strcasecmp (MHD_HTTP_METHOD_GET, method))
{
- /* handle get */
- response = MHD_create_response_from_callback (-1,
- 64 * 1024,
- contentReaderCallback,
- httpSession,
- contentReaderFreeCallback);
- httpSession->cs.client.get = response;
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/MHD receives GET request from `%s'.\n",
+ &url[1]);
+#endif
+
+ /* handle get; create response object if we do not
+ have one already */
+ response = httpSession->cs.server.get;
+ if (response == NULL) {
+ response = MHD_create_response_from_callback (-1,
+ 64 * 1024,
+ contentReaderCallback,
+ httpSession,
+
contentReaderFreeCallback);
+ httpSession->cs.server.get = response;
+ }
+ httpSession->cs.server.last_get_activity = get_time();
MHD_queue_response (session, MHD_HTTP_OK, response);
+ MUTEX_UNLOCK (httpSession->lock);
+ return MHD_YES;
}
- else if (0 == strcmp ("PUT", method))
+#endif
+ if (0 == strcasecmp (MHD_HTTP_METHOD_PUT, method))
{
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/MHD receives PUT request from `%s' with %u bytes.\n",
+ &url[1],
+ *upload_data_size);
+#endif
+ put = httpSession->cs.server.puts;
+ while ( (put != NULL) &&
+ (put->session != session) )
+ put = put->next;
+ if (put == NULL) {
+ put = MALLOC(sizeof(struct MHDPutData));
+ memset(put, 0, sizeof(struct MHDPutData));
+ put->next = httpSession->cs.server.puts;
+ httpSession->cs.server.puts = put;
+ put->session = session;
+ }
+ put->last_activity = get_time();
+
/* handle put (upload_data!) */
- MUTEX_LOCK (httpSession->lock);
poff = 0;
have = *upload_data_size;
*upload_data_size = 0; /* we will always process everything */
+ if ( (have == 0) &&
+ (put->done == NO) &&
+ (put->ready == YES) ) {
+ put->done = YES;
+ /* end of upload, send response! */
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/MHD queues dummy response to completed PUT request.\n");
+#endif
+ response = MHD_create_response_from_data (strlen(HTTP_PUT_RESPONSE),
+ HTTP_PUT_RESPONSE,
+ MHD_NO,
+ MHD_NO);
+ MHD_queue_response (session, MHD_HTTP_OK, response);
+ MHD_destroy_response(response);
+ MUTEX_UNLOCK (httpSession->lock);
+ return MHD_YES;
+ }
while (have > 0)
{
- if (httpSession->rpos1 < sizeof (MESSAGE_HEADER))
+ put->ready = NO;
+ if (put->rpos1 < sizeof (MESSAGE_HEADER))
{
- cpy = sizeof (MESSAGE_HEADER) - httpSession->rpos1;
+ cpy = sizeof (MESSAGE_HEADER) - put->rpos1;
if (cpy > have)
cpy = have;
- memcpy (&httpSession->rbuff1[httpSession->rpos1],
+ memcpy (&put->rbuff1[put->rpos1],
&upload_data[poff], cpy);
- httpSession->rpos1 += cpy;
+ put->rpos1 += cpy;
have -= cpy;
poff += cpy;
- httpSession->rpos2 = 0;
+ put->rpos2 = 0;
}
- if (httpSession->rpos1 < sizeof (MESSAGE_HEADER))
+ if (put->rpos1 < sizeof (MESSAGE_HEADER))
break;
- hdr = (MESSAGE_HEADER *) httpSession->rbuff1;
- GROW (httpSession->rbuff2,
- httpSession->rsize2,
+ hdr = (MESSAGE_HEADER *) put->rbuff1;
+ GROW (put->rbuff2,
+ put->rsize2,
ntohs (hdr->size) - sizeof (MESSAGE_HEADER));
- if (httpSession->rpos2 <
+ if (put->rpos2 <
ntohs (hdr->size) - sizeof (MESSAGE_HEADER))
{
cpy =
ntohs (hdr->size) - sizeof (MESSAGE_HEADER) -
- httpSession->rpos2;
+ put->rpos2;
if (cpy > have)
cpy = have;
- memcpy (&httpSession->rbuff2[httpSession->rpos2],
+ memcpy (&put->rbuff2[put->rpos2],
&upload_data[poff], cpy);
have -= cpy;
poff += cpy;
- httpSession->rpos2 += cpy;
+ put->rpos2 += cpy;
}
- if (httpSession->rpos2 <
+ if (put->rpos2 <
ntohs (hdr->size) - sizeof (MESSAGE_HEADER))
break;
mp = MALLOC (sizeof (P2P_PACKET));
- mp->msg = httpSession->rbuff2;
+ mp->msg = put->rbuff2;
mp->sender = httpSession->sender;
mp->tsession = httpSession->tsession;
mp->size = ntohs (hdr->size) - sizeof (MESSAGE_HEADER);
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/MHD passes %u bytes to core (received via PUT
request).\n",
+ mp->size);
+#endif
coreAPI->receive (mp);
- httpSession->rbuff2 = NULL;
- httpSession->rpos2 = 0;
- httpSession->rsize2 = 0;
- httpSession->rpos1 = 0;
+ put->rbuff2 = NULL;
+ put->rpos2 = 0;
+ put->rsize2 = 0;
+ put->rpos1 = 0;
+ put->ready = YES;
}
MUTEX_UNLOCK (httpSession->lock);
+ return MHD_YES;
}
- else
- {
- return MHD_NO; /* must be get or put! */
- }
- return MHD_YES;
+ MUTEX_UNLOCK (httpSession->lock);
+ GE_BREAK_OP(NULL, 0); /* invalid request */
+ return MHD_NO;
}
+#if DO_GET
/**
- * Process downloaded bits
+ * Process downloaded bits (from GET via CURL).
*/
static size_t
receiveContentCallback (void *ptr, size_t size, size_t nmemb, void *ctx)
@@ -746,58 +1021,66 @@
MESSAGE_HEADER *hdr;
P2P_PACKET *mp;
- printf ("Receiving %u bytes from GET\n", have);
+ httpSession->cs.client.last_get_activity = get_time();
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/CURL receives %u bytes as response to GET.\n",
+ size * nmemb);
+#endif
while (have > 0)
{
- if (httpSession->rpos1 < sizeof (MESSAGE_HEADER))
+ if (httpSession->cs.client.rpos1 < sizeof (MESSAGE_HEADER))
{
- cpy = sizeof (MESSAGE_HEADER) - httpSession->rpos1;
+ cpy = sizeof (MESSAGE_HEADER) - httpSession->cs.client.rpos1;
if (cpy > have)
cpy = have;
- memcpy (&httpSession->rbuff1[httpSession->rpos1],
+ memcpy (&httpSession->cs.client.rbuff1[httpSession->cs.client.rpos1],
&inbuf[poff], cpy);
- httpSession->rpos1 += cpy;
+ httpSession->cs.client.rpos1 += cpy;
have -= cpy;
poff += cpy;
- httpSession->rpos2 = 0;
+ httpSession->cs.client.rpos2 = 0;
}
- if (httpSession->rpos1 < sizeof (MESSAGE_HEADER))
- return size * nmemb;
- hdr = (MESSAGE_HEADER *) httpSession->rbuff1;
- GROW (httpSession->rbuff2,
- httpSession->rsize2, ntohs (hdr->size) - sizeof (MESSAGE_HEADER));
+ if (httpSession->cs.client.rpos1 < sizeof (MESSAGE_HEADER))
+ break;
+ hdr = (MESSAGE_HEADER *) httpSession->cs.client.rbuff1;
+ GROW (httpSession->cs.client.rbuff2,
+ httpSession->cs.client.rsize2, ntohs (hdr->size) - sizeof
(MESSAGE_HEADER));
printf ("Expecting message of %u bytes via GET\n", ntohs (hdr->size));
- if (httpSession->rpos2 < ntohs (hdr->size) - sizeof (MESSAGE_HEADER))
+ if (httpSession->cs.client.rpos2 < ntohs (hdr->size) - sizeof
(MESSAGE_HEADER))
{
cpy =
- ntohs (hdr->size) - sizeof (MESSAGE_HEADER) - httpSession->rpos2;
+ ntohs (hdr->size) - sizeof (MESSAGE_HEADER) -
httpSession->cs.client.rpos2;
if (cpy > have)
cpy = have;
- memcpy (&httpSession->rbuff2[httpSession->rpos2],
+ memcpy (&httpSession->cs.client.rbuff2[httpSession->cs.client.rpos2],
&inbuf[poff], cpy);
have -= cpy;
poff += cpy;
- httpSession->rpos2 += cpy;
+ httpSession->cs.client.rpos2 += cpy;
}
- if (httpSession->rpos2 < ntohs (hdr->size) - sizeof (MESSAGE_HEADER))
- return size * nmemb;
+ if (httpSession->cs.client.rpos2 < ntohs (hdr->size) - sizeof
(MESSAGE_HEADER))
+ break;
mp = MALLOC (sizeof (P2P_PACKET));
- mp->msg = httpSession->rbuff2;
+ mp->msg = httpSession->cs.client.rbuff2;
mp->sender = httpSession->sender;
mp->tsession = httpSession->tsession;
mp->size = ntohs (hdr->size) - sizeof (MESSAGE_HEADER);
printf ("Passing message from GET to core!\n");
coreAPI->receive (mp);
- httpSession->rbuff2 = NULL;
- httpSession->rpos2 = 0;
- httpSession->rsize2 = 0;
- httpSession->rpos1 = 0;
+ httpSession->cs.client.rbuff2 = NULL;
+ httpSession->cs.client.rpos2 = 0;
+ httpSession->cs.client.rsize2 = 0;
+ httpSession->cs.client.rpos1 = 0;
}
return size * nmemb;
}
+#endif
/**
- * Provide bits for upload
+ * Provide bits for upload: we're using CURL for a PUT request
+ * and now need to provide data from the message we are transmitting.
*/
static size_t
sendContentCallback (void *ptr, size_t size, size_t nmemb, void *ctx)
@@ -805,87 +1088,80 @@
struct HTTPPutData *put = ctx;
size_t max = size * nmemb;
+ put->last_activity = get_time();
if (max > put->size - put->pos)
max = put->size - put->pos;
memcpy (ptr, &put->msg[put->pos], max);
put->pos += max;
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/CURL sends %u bytes in PUT request.\n",
+ max);
+#endif
return max;
}
#define CURL_EASY_SETOPT(c, a, b) do { ret = curl_easy_setopt(c, a, b); if
(ret != CURLE_OK) GE_LOG(coreAPI->ectx, GE_WARNING | GE_USER | GE_BULK, _("%s
failed at %s:%d: `%s'\n"), "curl_easy_setopt", __FILE__, __LINE__,
curl_easy_strerror(ret)); } while (0);
+static void
+create_session_url(HTTPSession * httpSession) {
+ char *url;
+ EncName enc;
+
+ url = httpSession->cs.client.url;
+ if (url == NULL) {
+ hash2enc (&httpSession->sender.hashPubKey, &enc);
+ url = MALLOC (64 + sizeof (EncName));
+ SNPRINTF (url,
+ 64 + sizeof (EncName),
+ "http://%u.%u.%u.%u:%u/%s",
+ PRIP (ntohl (*(int *) &httpSession->cs.client.address.ip.addr)),
+ ntohs (httpSession->cs.client.address.port), &enc);
+ httpSession->cs.client.url = url;
+ }
+}
+
+#if DO_GET
/**
- * Establish a connection to a remote node.
- *
- * @param hello the hello-Message for the target node
- * @param tsessionPtr the session handle that is set
- * @return OK on success, SYSERR if the operation failed
+ * Try to do a GET on the other peer of the given
+ * http session.
+ *
+ * @return OK on success, SYSERR on error
*/
static int
-httpConnect (const P2P_hello_MESSAGE * hello, TSession ** tsessionPtr,
- int may_reuse)
-{
- const HostAddress *haddr = (const HostAddress *) &hello[1];
- TSession *tsession;
- HTTPSession *httpSession;
+create_curl_get(HTTPSession * httpSession) {
CURL *curl_get;
CURLcode ret;
CURLMcode mret;
- char *url;
- EncName enc;
- int i;
- /* check if we have a session pending for this peer */
- tsession = NULL;
- MUTEX_LOCK (httplock);
- for (i = 0; i < tsessionCount; i++)
- {
- if (0 == memcmp (&hello->senderIdentity,
- &tsessions[i]->peer, sizeof (PeerIdentity)))
- {
- tsession = tsessions[i];
- break;
- }
- }
- if ((tsession != NULL) && (OK == httpAssociate (tsession)))
- {
- *tsessionPtr = tsession;
- MUTEX_UNLOCK (httplock);
- return OK;
- }
- MUTEX_UNLOCK (httplock);
-
- /* no session pending, initiate a new one! */
+ curl_get = httpSession->cs.client.get;
+ if (curl_get != NULL) {
+ curl_multi_remove_handle(curl_multi,
+ curl_get);
+ signal_select();
+ curl_easy_cleanup(curl_get);
+ httpSession->cs.client.get = NULL;
+ }
curl_get = curl_easy_init ();
if (curl_get == NULL)
return SYSERR;
-
- hash2enc (&hello->senderIdentity.hashPubKey, &enc);
- url = MALLOC (64 + sizeof (EncName));
- SNPRINTF (url,
- 64 + sizeof (EncName),
- "http://%u.%u.%u.%u:%u/%s",
- PRIP (ntohl (*(int *) &haddr->ip.addr)),
- ntohs (haddr->port), &enc);
-
/* create GET */
CURL_EASY_SETOPT (curl_get, CURLOPT_FAILONERROR, 1);
- CURL_EASY_SETOPT (curl_get, CURLOPT_URL, url);
+ CURL_EASY_SETOPT (curl_get, CURLOPT_URL, httpSession->cs.client.url);
if (strlen (proxy) > 0)
CURL_EASY_SETOPT (curl_get, CURLOPT_PROXY, proxy);
CURL_EASY_SETOPT (curl_get, CURLOPT_BUFFERSIZE, 32 * 1024);
- if (0 == strncmp (url, "http", 4))
+ if (0 == strncmp (httpSession->cs.client.url, "http", 4))
CURL_EASY_SETOPT (curl_get, CURLOPT_USERAGENT, "GNUnet-http");
CURL_EASY_SETOPT (curl_get, CURLOPT_CONNECTTIMEOUT, 150L);
CURL_EASY_SETOPT (curl_get, CURLOPT_TIMEOUT, 150L);
CURL_EASY_SETOPT (curl_get, CURLOPT_WRITEFUNCTION, &receiveContentCallback);
-
- httpSession = MALLOC (sizeof (HTTPSession));
- memset (httpSession, 0, sizeof (HTTPSession));
- httpSession->cs.client.url = url;
CURL_EASY_SETOPT (curl_get, CURLOPT_WRITEDATA, httpSession);
- if (ret != CURLE_OK)
- goto cleanup;
+ if (ret != CURLE_OK) {
+ curl_easy_cleanup(curl_get);
+ return SYSERR;
+ }
mret = curl_multi_add_handle (curl_multi, curl_get);
if (mret != CURLM_OK)
{
@@ -894,60 +1170,174 @@
_("%s failed at %s:%d: `%s'\n"),
"curl_multi_add_handle",
__FILE__, __LINE__, curl_multi_strerror (mret));
- goto cleanup;
+ curl_easy_cleanup(curl_get);
+ return SYSERR;
}
+ signal_select();
+ httpSession->cs.client.last_get_activity = get_time();
+ httpSession->cs.client.get = curl_get;
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/CURL initiated GET request.\n");
+#endif
+ return OK;
+}
+#endif
- /* create SESSION */
+/**
+ * Establish a connection to a remote node.
+ *
+ * @param hello the hello-Message for the target node
+ * @param tsessionPtr the session handle that is set
+ * @return OK on success, SYSERR if the operation failed
+ */
+static int
+httpConnect (const P2P_hello_MESSAGE * hello, TSession ** tsessionPtr,
+ int may_reuse)
+{
+ const HostAddress *haddr = (const HostAddress *) &hello[1];
+ TSession *tsession;
+ HTTPSession *httpSession;
+ int i;
+
+ /* check if we have a session pending for this peer */
+ tsession = NULL;
+ if (may_reuse) {
+ MUTEX_LOCK (httplock);
+ for (i = 0; i < tsessionCount; i++)
+ {
+ if (0 == memcmp (&hello->senderIdentity,
+ &tsessions[i]->peer, sizeof (PeerIdentity)))
+ {
+ tsession = tsessions[i];
+ break;
+ }
+ }
+ if ((tsession != NULL) && (OK == httpAssociate (tsession)))
+ {
+ *tsessionPtr = tsession;
+ MUTEX_UNLOCK (httplock);
+ return OK;
+ }
+ MUTEX_UNLOCK (httplock);
+ }
+ /* no session pending, initiate a new one! */
+ httpSession = MALLOC (sizeof (HTTPSession));
+ memset (httpSession, 0, sizeof (HTTPSession));
httpSession->sender = hello->senderIdentity;
httpSession->lock = MUTEX_CREATE (YES);
httpSession->users = 1; /* us only, core has not seen this tsession! */
httpSession->lastUse = get_time ();
httpSession->is_client = YES;
- httpSession->cs.client.get = curl_get;
+ httpSession->cs.client.address = *haddr;
tsession = MALLOC (sizeof (TSession));
memset (tsession, 0, sizeof (TSession));
httpSession->tsession = tsession;
tsession->ttype = HTTP_PROTOCOL_NUMBER;
tsession->internal = httpSession;
+ create_session_url(httpSession);
+#if DO_GET
+ if (OK != create_curl_get(httpSession)) {
+ FREE(tsession);
+ FREE(httpSession);
+ return SYSERR;
+ }
+#endif
+ /* PUTs will be created as needed */
addTSession (tsession);
*tsessionPtr = tsession;
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/CURL initiated connection to `%s'.\n",
+ httpSession->cs.client.url);
+#endif
return OK;
-cleanup:
- curl_easy_cleanup (curl_get);
- FREE (url);
- FREE (proxy);
- FREE (httpSession);
- return SYSERR;
}
-static CURL *
+/**
+ * We received the "Thank you!" response to a PUT.
+ * Discard the data (not useful) and mark the PUT
+ * operation as completed.
+ */
+static size_t
+discardContentCallback(void * data,
+ size_t size,
+ size_t nmemb,
+ void * put_cls) {
+ struct HTTPPutData * put = put_cls;
+ /* this condition should pretty much always be
+ true; just checking here in case the PUT
+ response comes early somehow */
+ if (put->pos == put->size)
+ put->done = YES;
+ return size * nmemb;
+}
+
+/**
+ * Create a new PUT request for the given PUT data.
+ */
+static int
create_curl_put (HTTPSession * httpSession,
- struct HTTPPutData *put, unsigned int size)
+ struct HTTPPutData *put)
{
CURL *curl_put;
CURLcode ret;
+ CURLMcode mret;
+ long size;
+ /* we should have initiated a GET earlier,
+ so URL must not be NULL here */
+ GE_ASSERT(NULL,
+ httpSession->cs.client.url != NULL);
curl_put = curl_easy_init ();
if (curl_put == NULL)
- return NULL;
+ return SYSERR;
CURL_EASY_SETOPT (curl_put, CURLOPT_FAILONERROR, 1);
CURL_EASY_SETOPT (curl_put, CURLOPT_URL, httpSession->cs.client.url);
if (strlen (proxy) > 0)
CURL_EASY_SETOPT (curl_put, CURLOPT_PROXY, proxy);
- CURL_EASY_SETOPT (curl_put, CURLOPT_BUFFERSIZE, 32 * 1024);
+ CURL_EASY_SETOPT (curl_put, CURLOPT_BUFFERSIZE, put->size);
if (0 == strncmp (httpSession->cs.client.url, "http", 4))
CURL_EASY_SETOPT (curl_put, CURLOPT_USERAGENT, "GNUnet-http");
CURL_EASY_SETOPT (curl_put, CURLOPT_UPLOAD, 1);
+#if 0
+ CURL_EASY_SETOPT (curl_put, CURLOPT_VERBOSE, 1);
+#endif
CURL_EASY_SETOPT (curl_put, CURLOPT_CONNECTTIMEOUT, 150L);
- CURL_EASY_SETOPT (curl_put, CURLOPT_INFILESIZE_LARGE, size);
+ size = put->size;
+ CURL_EASY_SETOPT (curl_put, CURLOPT_INFILESIZE, size);
CURL_EASY_SETOPT (curl_put, CURLOPT_READFUNCTION, &sendContentCallback);
CURL_EASY_SETOPT (curl_put, CURLOPT_READDATA, put);
+ CURL_EASY_SETOPT (curl_put, CURLOPT_WRITEFUNCTION, &discardContentCallback);
+ CURL_EASY_SETOPT (curl_put, CURLOPT_WRITEDATA, put);
+ CURL_EASY_SETOPT (curl_put, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
if (ret != CURLE_OK)
{
curl_easy_cleanup (curl_put);
- return NULL;
+ return SYSERR;
}
- return curl_put;
+ mret = curl_multi_add_handle (curl_multi, curl_put);
+ if (mret != CURLM_OK)
+ {
+ GE_LOG (coreAPI->ectx,
+ GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+ _("%s failed at %s:%d: `%s'\n"),
+ "curl_multi_add_handle",
+ __FILE__, __LINE__, curl_multi_strerror (mret));
+ MUTEX_UNLOCK (httplock);
+ return SYSERR;
+ }
+ signal_select();
+ put->curl_put = curl_put;
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/CURL initiated PUT request to `%s'.\n",
+ httpSession->cs.client.url);
+#endif
+ return OK;
}
/**
@@ -964,13 +1354,14 @@
{
HTTPSession *httpSession = tsession->internal;
struct HTTPPutData *putData;
- CURL *curl_put;
- CURLMcode mret;
MESSAGE_HEADER *hdr;
+#if DO_GET
char *tmp;
+#endif
if (httpSession->is_client)
{
+ /* we need to do a PUT (we are the client) */
if (size >= MAX_BUFFER_SIZE)
return SYSERR;
if (size == 0)
@@ -978,6 +1369,17 @@
GE_BREAK (NULL, 0);
return SYSERR;
}
+ if (important != YES) {
+ MUTEX_LOCK (httpSession->lock);
+ if (httpSession->cs.client.puts != NULL) {
+ /* do not queue more than one unimportant PUT at a time */
+ if (httpSession->cs.client.puts->done == YES)
+ signal_select(); /* do clean up now! */
+ MUTEX_UNLOCK (httpSession->lock);
+ return NO;
+ }
+ MUTEX_UNLOCK (httpSession->lock);
+ }
putData = MALLOC (sizeof (struct HTTPPutData));
memset (putData, 0, sizeof (struct HTTPPutData));
putData->msg = MALLOC (size + sizeof (MESSAGE_HEADER));
@@ -986,84 +1388,75 @@
hdr->type = htons (0);
memcpy (&putData->msg[sizeof (MESSAGE_HEADER)], msg, size);
putData->size = size + sizeof (MESSAGE_HEADER);
+ if (OK != create_curl_put (httpSession,
+ putData)) {
+ FREE (putData->msg);
+ FREE (putData);
+ return SYSERR;
+ }
MUTEX_LOCK (httpSession->lock);
- curl_put = create_curl_put (httpSession,
- putData, size + sizeof (MESSAGE_HEADER));
- if (curl_put == NULL)
- {
- MUTEX_UNLOCK (httpSession->lock);
- FREE (putData->msg);
- FREE (putData);
- return SYSERR;
- }
- putData->curl_put = curl_put;
putData->next = httpSession->cs.client.puts;
httpSession->cs.client.puts = putData;
MUTEX_UNLOCK (httpSession->lock);
- MUTEX_LOCK (httplock);
- mret = curl_multi_add_handle (curl_multi, curl_put);
- if (mret != CURLM_OK)
- {
- GE_LOG (coreAPI->ectx,
- GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
- _("%s failed at %s:%d: `%s'\n"),
- "curl_multi_add_handle",
- __FILE__, __LINE__, curl_multi_strerror (mret));
- putData->done = YES;
- MUTEX_UNLOCK (httplock);
- return SYSERR;
- }
- MUTEX_UNLOCK (httplock);
return OK;
}
+
+ /* httpSession->isClient == false, respond to a GET (we
+ hopefully have one or will have one soon) */
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP/MHD queues %u bytes to be sent as response to GET as soon as
possible.\n",
+ size);
+#endif
+#if DO_GET
+ MUTEX_LOCK (httpSession->lock);
+ if (httpSession->cs.server.wsize == 0)
+ GROW (httpSession->cs.server.wbuff, httpSession->cs.server.wsize,
HTTP_BUF_SIZE);
+ if (httpSession->cs.server.wpos + size > httpSession->cs.server.wsize)
+ {
+ /* need to grow or discard */
+ if (!important)
+ {
+ MUTEX_UNLOCK (httpSession->lock);
+ return NO;
+ }
+ tmp = MALLOC (httpSession->cs.server.wpos + size);
+ memcpy (tmp,
+ &httpSession->cs.server.wbuff[httpSession->cs.server.woff],
httpSession->cs.server.wpos);
+ FREE (httpSession->cs.server.wbuff);
+ httpSession->cs.server.wbuff = tmp;
+ httpSession->cs.server.wsize = httpSession->cs.server.wpos + size;
+ httpSession->cs.server.woff = 0;
+ httpSession->cs.server.wpos = httpSession->cs.server.wpos + size;
+ }
else
- { /* httpSession->isClient == false */
- MUTEX_LOCK (httpSession->lock);
- if (httpSession->wsize == 0)
- GROW (httpSession->wbuff, httpSession->wsize, HTTP_BUF_SIZE);
- if (httpSession->wpos + size > httpSession->wsize)
- {
- /* need to grow or discard */
- if (!important)
- {
- MUTEX_UNLOCK (httpSession->lock);
- return NO;
- }
- tmp = MALLOC (httpSession->wpos + size);
- memcpy (tmp,
- &httpSession->wbuff[httpSession->woff], httpSession->wpos);
- FREE (httpSession->wbuff);
- httpSession->wbuff = tmp;
- httpSession->wsize = httpSession->wpos + size;
- httpSession->woff = 0;
- httpSession->wpos = httpSession->wpos + size;
- }
- else
- {
- /* fits without growing */
- if (httpSession->wpos + httpSession->woff + size >
- httpSession->wsize)
- {
- /* need to compact first */
- memmove (httpSession->wbuff,
- &httpSession->wbuff[httpSession->woff],
- httpSession->wpos);
- httpSession->woff = 0;
- }
- /* append */
- memcpy (&httpSession->wbuff[httpSession->woff + httpSession->wpos],
- msg, size);
- httpSession->wpos += size;
- }
- MUTEX_UNLOCK (httpSession->lock);
- return OK;
+ {
+ /* fits without growing */
+ if (httpSession->cs.server.wpos + httpSession->cs.server.woff + size >
+ httpSession->cs.server.wsize)
+ {
+ /* need to compact first */
+ memmove (httpSession->cs.server.wbuff,
+ &httpSession->cs.server.wbuff[httpSession->cs.server.woff],
+ httpSession->cs.server.wpos);
+ httpSession->cs.server.woff = 0;
+ }
+ /* append */
+ memcpy (&httpSession->cs.server.wbuff[httpSession->cs.server.woff +
httpSession->cs.server.wpos],
+ msg, size);
+ httpSession->cs.server.wpos += size;
}
+ MUTEX_UNLOCK (httpSession->lock);
+#endif
+ return OK;
}
/**
* Function called to cleanup dead connections
* (completed PUTs, GETs that have timed out,
- * etc.).
+ * etc.). Also re-vives GETs that have timed out
+ * if we are still interested in the connection.
*/
static void
cleanup_connections ()
@@ -1072,24 +1465,44 @@
HTTPSession *s;
struct HTTPPutData *prev;
struct HTTPPutData *pos;
+ struct MHDPutData * mpos;
+ struct MHDPutData * mprev;
+ cron_t now;
MUTEX_LOCK (httplock);
+ now = get_time();
for (i = 0; i < tsessionCount; i++)
{
s = tsessions[i]->internal;
MUTEX_LOCK (s->lock);
if (s->is_client)
{
+ if ( (s->cs.client.puts == NULL) &&
+ (s->users == 0)
+#if DO_GET
+ && (s->cs.client.last_get_activity + HTTP_TIMEOUT < now)
+#endif
+ ) {
+ MUTEX_UNLOCK (s->lock);
+#if DO_GET
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP transport destroys old (%llu ms) unused client
session\n",
+ now - s->cs.client.last_get_activity);
+#endif
+#endif
+ destroy_tsession(tsessions[i]);
+ i--;
+ continue;
+ }
+
prev = NULL;
pos = s->cs.client.puts;
while (pos != NULL)
{
- /* FIXME: check if CURL has timed out
- the GET operation! If so, clean up!
- (and make sure we re-establish GET
- as needed!) */
-
-
+ if (pos->last_activity + HTTP_TIMEOUT < now)
+ pos->done = YES;
if (pos->done)
{
if (prev == NULL)
@@ -1098,6 +1511,7 @@
prev->next = pos->next;
FREE (pos->msg);
curl_multi_remove_handle (curl_multi, pos->curl_put);
+ signal_select();
curl_easy_cleanup (pos->curl_put);
FREE (pos);
if (prev == NULL)
@@ -1109,21 +1523,67 @@
prev = pos;
pos = pos->next;
}
+#if DO_GET
+ if ( (s->cs.client.last_get_activity + HTTP_TIMEOUT < now) &&
+ ( (s->users > 0) ||
+ (s->cs.client.puts != NULL) ) )
+ create_curl_get(s);
+#endif
}
- else
- {
- /* FIXME: add code to close MHD connection
- from the server side (timeout!); need
- to
- A) tell GET callback to return "end of transmission"
- B) destroy response object
- */
- }
+ else
+ {
+ mpos = s->cs.server.puts;
+ mprev = NULL;
+ while (mpos != NULL) {
+ if ( (mpos->done == YES) ||
+ (mpos->last_activity + HTTP_TIMEOUT < now) ) {
+ if (mprev == NULL)
+ s->cs.server.puts = mpos->next;
+ else
+ mprev->next = mpos->next;
+ GROW(mpos->rbuff2,
+ mpos->rsize2,
+ 0);
+ FREE(mpos);
+ if (mprev == NULL)
+ mpos = s->cs.server.puts;
+ else
+ mpos = mprev->next;
+ continue;
+ }
+ mprev = mpos;
+ mpos = mpos->next;
+ }
+
+ /* ! s->is_client */
+ if (
+#if DO_GET
+ (s->cs.server.last_get_activity + HTTP_TIMEOUT < now) &&
+#endif
+ (s->users == 0)
+ ) {
+ MUTEX_UNLOCK (s->lock);
+#if DO_GET
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP transport destroys old (%llu ms) unused server
session\n",
+ now - s->cs.server.last_get_activity);
+#endif
+#endif
+ destroy_tsession(tsessions[i]);
+ i--;
+ continue;
+ }
+ }
MUTEX_UNLOCK (s->lock);
}
MUTEX_UNLOCK (httplock);
}
+/**
+ * Thread that runs the CURL and MHD requests.
+ */
static void *
curl_runner (void *unused)
{
@@ -1134,7 +1594,16 @@
int max;
struct timeval tv;
int running;
+ unsigned long long timeout;
+ long ms;
+ int have_tv;
+ char buf[128]; /* for reading from pipe */
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP transport select thread started\n");
+#endif
while (YES == http_running)
{
max = 0;
@@ -1153,21 +1622,50 @@
}
if (mhd_daemon != NULL)
MHD_get_fdset (mhd_daemon, &rs, &ws, &es, &max);
- /* CURL requires a regular timeout... */
- tv.tv_sec = 0;
- tv.tv_usec = 1000;
- SELECT (max + 1, &rs, &ws, &es, &tv);
+ timeout = 0;
+ have_tv = MHD_get_timeout(mhd_daemon,
+ &timeout);
+ if ( (CURLM_OK == curl_multi_timeout(curl_multi, &ms)) &&
+ ( (ms < timeout) ||
+ (have_tv == MHD_NO) ) ) {
+ timeout = ms;
+ have_tv = MHD_YES;
+ }
+ FD_SET(signal_pipe[0], &rs);
+ if (max < signal_pipe[0])
+ max = signal_pipe[0];
+ tv.tv_sec = timeout / 1000;
+ tv.tv_usec = (timeout % 1000) * 1000;
+ SELECT (max + 1, &rs, &ws, &es, (have_tv == MHD_YES) ? &tv : NULL);
if (YES != http_running)
break;
running = 0;
- curl_multi_perform (curl_multi, &running);
- if (mhd_daemon != NULL)
- MHD_run (mhd_daemon);
+ do {
+ mret = curl_multi_perform (curl_multi, &running);
+ } while ( (mret == CURLM_CALL_MULTI_PERFORM) &&
+ (http_running == YES) );
+ if ( FD_ISSET(signal_pipe[0], &rs))
+ read(signal_pipe[0], buf, sizeof(buf));
+ if ( (mret != CURLM_OK) &&
+ (mret != CURLM_CALL_MULTI_PERFORM) )
+ GE_LOG (coreAPI->ectx,
+ GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+ _("%s failed at %s:%d: `%s'\n"),
+ "curl_multi_perform",
+ __FILE__, __LINE__, curl_multi_strerror (mret));
+ if (mhd_daemon != NULL)
+ MHD_run (mhd_daemon);
cleanup_connections ();
}
+#if DEBUG_HTTP
+ GE_LOG (coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "HTTP transport select thread exits.\n");
+#endif
return NULL;
}
+
/**
* Start the server process to receive inbound traffic.
* @return OK on success, SYSERR if the operation failed
@@ -1185,7 +1683,7 @@
port = getGNUnetHTTPPort ();
if ((mhd_daemon == NULL) && (port != 0))
{
- mhd_daemon = MHD_start_daemon (MHD_NO_FLAG,
+ mhd_daemon = MHD_start_daemon (MHD_USE_DEBUG,
port,
&acceptPolicyCallback,
NULL, &accessHandlerCallback, NULL,
@@ -1197,7 +1695,14 @@
128,
MHD_OPTION_END);
}
- http_running = YES;
+ if (0 != PIPE(signal_pipe)) {
+ MHD_stop_daemon(mhd_daemon);
+ curl_multi_cleanup(curl_multi);
+ curl_multi = NULL;
+ mhd_daemon = NULL;
+ return SYSERR;
+ }
+ http_running = YES;
curl_thread = PTHREAD_CREATE (&curl_runner, NULL, 32 * 1024);
if (curl_thread == NULL)
GE_DIE_STRERROR (coreAPI->ectx,
@@ -1217,8 +1722,11 @@
if ((http_running == NO) || (curl_multi == NULL))
return SYSERR;
http_running = NO;
+ signal_select();
PTHREAD_STOP_SLEEP (curl_thread);
PTHREAD_JOIN (curl_thread, &unused);
+ CLOSE(signal_pipe[0]);
+ CLOSE(signal_pipe[1]);
if (mhd_daemon != NULL)
{
MHD_stop_daemon (mhd_daemon);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r5517 - GNUnet/src/transports,
gnunet <=