gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r5088 - GNUnet/src/transports


From: gnunet
Subject: [GNUnet-SVN] r5088 - GNUnet/src/transports
Date: Sat, 16 Jun 2007 04:35:17 -0600 (MDT)

Author: grothoff
Date: 2007-06-16 04:35:16 -0600 (Sat, 16 Jun 2007)
New Revision: 5088

Modified:
   GNUnet/src/transports/Makefile.am
   GNUnet/src/transports/http.c
Log:
rudimentary http working

Modified: GNUnet/src/transports/Makefile.am
===================================================================
--- GNUnet/src/transports/Makefile.am   2007-06-16 09:53:54 UTC (rev 5087)
+++ GNUnet/src/transports/Makefile.am   2007-06-16 10:35:16 UTC (rev 5088)
@@ -36,7 +36,7 @@
   ip.c ip.h
 libgnunetip_la_LIBADD = \
  $(top_builddir)/src/util/libgnunetutil.la
- 
+
 libgnunetip6_la_SOURCES = \
   ip6.c ip6.h
 

Modified: GNUnet/src/transports/http.c
===================================================================
--- GNUnet/src/transports/http.c        2007-06-16 09:53:54 UTC (rev 5087)
+++ GNUnet/src/transports/http.c        2007-06-16 10:35:16 UTC (rev 5088)
@@ -25,8 +25,9 @@
  *
  * TODO:
  * - connection timeout (shutdown inactive connections)
- * - proper connection shutdown (free resources, especially
- *   check completed CURL puts)
+ * - proper connection re-establishment
+ * - nothing copies TO wbuff, only from (see FIXMEs)
+ * - free resources allocated for PUT!
  */
 
 #include "gnunet_util.h"
@@ -182,11 +183,6 @@
       CURL * get;
 
       /**
-       * PUT operation
-       */
-      CURL * put;
-
-      /**
        * URL of the get and put operations.
        */
       char * url;
@@ -197,6 +193,19 @@
 
 } HTTPSession;
 
+struct HTTPPutData {
+  struct HTTPPutData * next;
+
+  char * msg;
+
+  CURL * curl_put;
+
+  unsigned int size;
+
+  unsigned int pos;
+
+};
+
 /* *********** globals ************* */
 
 /**
@@ -224,8 +233,14 @@
  */
 static CURLM * curl_multi;
 
+/**
+ * Set to YES while the transport is running.
+ */
 static int http_running;
 
+/**
+ * Thread running libcurl activities.
+ */
 static struct PTHREAD * curl_thread;
 
 /**
@@ -248,6 +263,11 @@
 static UPnP_ServiceAPI * upnp;
 
 /**
+ * List of active PUT requests.
+ */
+static struct HTTPPutData * putHead;
+
+/**
  * Lock for access to mutable state of the module,
  * that is the configuration and the tsessions array.
  * Note that we ONLY need to synchronize access to
@@ -542,8 +562,9 @@
 
   if ( (strlen(url) < 2) ||
        (OK != enc2hash(&url[1],
-                      &client)) )
+                      &client)) ) {
     return MHD_NO;
+  }
 
   /* check if we already have a session for this */
   MUTEX_LOCK(httplock);
@@ -581,7 +602,6 @@
     httpSession->lastUse = get_time();
     httpSession->is_client = NO;
     httpSession->cs.client.get = NULL;
-    httpSession->cs.client.put = NULL;
     tsession = MALLOC(sizeof(TSession));
     tsession->ttype = HTTP_PROTOCOL_NUMBER;
     tsession->internal = httpSession;
@@ -589,7 +609,7 @@
     httpSession->tsession = tsession;
     addTSession(tsession);
   }
-  if (0 == strcmp("get", method)) {
+  if (0 == strcmp("GET", method)) {
     /* handle get */
     response = MHD_create_response_from_callback(-1,
                                                 contentReaderCallback,
@@ -599,7 +619,7 @@
     MHD_queue_response(session,
                       MHD_HTTP_OK,
                       response);
-  } else if (0 == strcmp("put", method)) {
+  } else if (0 == strcmp("PUT", method)) {
     /* handle put (upload_data!) */
     MUTEX_LOCK(httpSession->lock);
     poff = 0;
@@ -616,20 +636,16 @@
        httpSession->rpos1 += cpy;
        have -= cpy;
        poff += cpy;
+       httpSession->rpos2 = 0;
       }
       if (httpSession->rpos1 < sizeof(MESSAGE_HEADER))
        break;
       hdr = (MESSAGE_HEADER *) httpSession->rbuff1;
       GROW(httpSession->rbuff2,
           httpSession->rsize2,
-          ntohs(hdr->size));
-      memcpy(httpSession->rbuff2,
-            httpSession->rbuff1,
-            sizeof(MESSAGE_HEADER));
-      GE_ASSERT(NULL,
-               httpSession->rpos2 <= ntohs(hdr->size));
-      if (httpSession->rpos2 < ntohs(hdr->size)) {
-       cpy = ntohs(hdr->size) - httpSession->rpos2;
+          ntohs(hdr->size) - sizeof(MESSAGE_HEADER));
+      if (httpSession->rpos2 < ntohs(hdr->size) - sizeof(MESSAGE_HEADER)) {
+       cpy = ntohs(hdr->size) - sizeof(MESSAGE_HEADER) - httpSession->rpos2;
        if (cpy > have)
          cpy = have;
        memcpy(&httpSession->rbuff2[httpSession->rpos2],
@@ -637,21 +653,25 @@
               cpy);
        have -= cpy;
        poff += cpy;
+       httpSession->rpos2 += cpy;
       }
-      if (httpSession->rpos2 < ntohs(hdr->size))
+      if (httpSession->rpos2 < ntohs(hdr->size) - sizeof(MESSAGE_HEADER))
        break;
       mp = MALLOC(sizeof(P2P_PACKET));
       mp->msg = httpSession->rbuff2;
       mp->sender = httpSession->sender;
       mp->tsession = httpSession->tsession;
+      mp->size = ntohs(hdr->size) - sizeof(MESSAGE_HEADER);
       coreAPI->receive(mp);
       httpSession->rbuff2 = NULL;
       httpSession->rpos2 = 0;
       httpSession->rsize2 = 0;
       httpSession->rpos1 = 0;
     }
-  } else
+    MUTEX_UNLOCK(httpSession->lock);
+  } else {
     return MHD_NO; /* must be get or put! */
+  }
   return MHD_YES;
 }
 
@@ -671,6 +691,8 @@
   MESSAGE_HEADER * hdr;
   P2P_PACKET * mp;
 
+  printf("Receiving %u bytes from GET\n",
+        have);
   while (have > 0) {
     if (httpSession->rpos1 < sizeof(MESSAGE_HEADER)) {
       cpy = sizeof(MESSAGE_HEADER) - httpSession->rpos1;
@@ -682,20 +704,18 @@
       httpSession->rpos1 += cpy;
       have -= cpy;
       poff += cpy;
+      httpSession->rpos2 = 0;
     }
     if (httpSession->rpos1 < sizeof(MESSAGE_HEADER))
       return size * nmemb;
     hdr = (MESSAGE_HEADER *) httpSession->rbuff1;
     GROW(httpSession->rbuff2,
         httpSession->rsize2,
-        ntohs(hdr->size));
-    memcpy(httpSession->rbuff2,
-          httpSession->rbuff1,
-          sizeof(MESSAGE_HEADER));
-    GE_ASSERT(NULL,
-             httpSession->rpos2 <= ntohs(hdr->size));
-    if (httpSession->rpos2 < ntohs(hdr->size)) {
-      cpy = ntohs(hdr->size) - httpSession->rpos2;
+        ntohs(hdr->size) - sizeof(MESSAGE_HEADER));
+    printf("Expecting message of %u bytes via GET\n",
+          ntohs(hdr->size));
+    if (httpSession->rpos2 < ntohs(hdr->size) - sizeof(MESSAGE_HEADER)) {
+      cpy = ntohs(hdr->size) - sizeof(MESSAGE_HEADER) - httpSession->rpos2;
       if (cpy > have)
        cpy = have;
       memcpy(&httpSession->rbuff2[httpSession->rpos2],
@@ -703,13 +723,16 @@
             cpy);
       have -= cpy;
       poff += cpy;
+      httpSession->rpos2 += cpy;
     }
-    if (httpSession->rpos2 < ntohs(hdr->size))
+    if (httpSession->rpos2 < ntohs(hdr->size) - sizeof(MESSAGE_HEADER))
       return size * nmemb;
     mp = MALLOC(sizeof(P2P_PACKET));
     mp->msg = httpSession->rbuff2;
     mp->sender = httpSession->sender;
     mp->tsession = httpSession->tsession;
+    mp->size =  ntohs(hdr->size) - sizeof(MESSAGE_HEADER);
+    printf("Passing message from GET to core!\n");
     coreAPI->receive(mp);
     httpSession->rbuff2 = NULL;
     httpSession->rpos2 = 0;
@@ -727,27 +750,15 @@
                    size_t size,
                    size_t nmemb,
                    void * ctx) {
-  HTTPSession * httpSession = ctx;
+  struct HTTPPutData * put = ctx;
   size_t max = size * nmemb;
 
-  MUTEX_LOCK(httpSession->lock);
-  if (max > httpSession->wpos)
-    max = httpSession->wpos;
+  if (max > put->size - put->pos)
+    max = put->size - put->pos;
   memcpy(ptr,
-        &httpSession->wbuff[httpSession->woff],
+        &put->msg[put->pos],
         max);
-  httpSession->woff += max;
-  httpSession->wpos -= max;
-  if (httpSession->woff == httpSession->wpos) {
-    httpSession->woff = 0;
-    httpSession->wpos = 0;
-  }
-  if (max == 0) {
-    /* if we have nothing to sent, this will terminate
-       the session (CURL API requires this) */
-    httpSession->cs.client.put = NULL;
-  }
-  MUTEX_UNLOCK(httpSession->lock);
+  put->pos += max;
   return max;
 }
 
@@ -771,6 +782,9 @@
   char * url;
   EncName enc;
 
+  /* FIXME: check if we have a GET pending for
+     this peer, and if so, use that! */
+
   curl_get = curl_easy_init();
   if (curl_get == NULL)
     return SYSERR;
@@ -780,8 +794,9 @@
   url = MALLOC(64 + sizeof(EncName));
   SNPRINTF(url,
           64 + sizeof(EncName),
-          "http://%u.%u.%u.%u/%s";,
+          "http://%u.%u.%u.%u:%u/%s";,
           PRIP(ntohl(*(int*)&haddr->ip.addr)),
+          ntohs(haddr->port),
           &enc);
 
   /* create GET */
@@ -818,8 +833,6 @@
                   httpSession);
   if (ret != CURLE_OK)
     goto cleanup;
-
-  /* FIXME: should we queue here or wait until we have data!? */
   mret = curl_multi_add_handle(curl_multi, curl_get);
   if (mret != CURLM_OK) {
     GE_LOG(coreAPI->ectx,
@@ -848,7 +861,6 @@
   httpSession->lastUse = get_time();
   httpSession->is_client = YES;
   httpSession->cs.client.get = curl_get;
-  httpSession->cs.client.put = NULL;
   tsession = MALLOC(sizeof(TSession));
   httpSession->tsession = tsession;
   tsession->ttype = HTTP_PROTOCOL_NUMBER;
@@ -865,7 +877,9 @@
 }
 
 static CURL *
-create_curl_put(HTTPSession * httpSession) {
+create_curl_put(HTTPSession * httpSession,
+               struct HTTPPutData * put,
+               unsigned int size) {
   CURL * curl_put;
   CURLcode ret;
 
@@ -899,13 +913,13 @@
                   150L);
   CURL_EASY_SETOPT(curl_put,
                   CURLOPT_INFILESIZE_LARGE,
-                  0);
+                  size);
   CURL_EASY_SETOPT(curl_put,
                   CURLOPT_READFUNCTION,
                   &sendContentCallback);
   CURL_EASY_SETOPT(curl_put,
                   CURLOPT_READDATA,
-                  httpSession);
+                  put);
   if (ret != CURLE_OK) {
     curl_easy_cleanup(curl_put);
     return NULL;
@@ -923,90 +937,63 @@
  */
 static int httpSend(TSession * tsession,
                    const void * msg,
-                   const unsigned int size,
+                   unsigned int size,
                    int important) {
   HTTPSession * httpSession = tsession->internal;
+  struct HTTPPutData * putData;
   CURL * curl_put;
   CURLMcode mret;
+  MESSAGE_HEADER * hdr;
 
+  /* FIXME: check if we have a GET pending for
+     this peer, and if so, use that! */
+
   if (size >= MAX_BUFFER_SIZE)
     return SYSERR;
   if (size == 0) {
     GE_BREAK(NULL, 0);
     return SYSERR;
   }
+  putData = MALLOC(sizeof(struct HTTPPutData));
+  putData->msg = MALLOC(size + sizeof(MESSAGE_HEADER));
+  hdr = (MESSAGE_HEADER*) putData->msg;
+  hdr->size = htons(size + sizeof(MESSAGE_HEADER));
+  hdr->type = htons(0);
+  memcpy(&putData->msg[sizeof(MESSAGE_HEADER)],
+        msg,
+        size);
+  putData->size = size + sizeof(MESSAGE_HEADER);
   MUTEX_LOCK(httpSession->lock);
-  if (httpSession->cs.client.put == NULL) {
-    /* first data to send, add PUT to multi set! */
-    curl_put = create_curl_put(httpSession);
-    if (curl_put == NULL) {
-      MUTEX_UNLOCK(httpSession->lock);
-      return SYSERR;
-    }
-    httpSession->cs.client.put = curl_put;
-    mret = curl_multi_add_handle(curl_multi, curl_put);
-    if (mret != CURLM_OK) {
-      GE_LOG(coreAPI->ectx,
-            GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
-            _("%s failed at %s:%d: `%s'\n"),
-            "curl_multi_add_handle",
-            __FILE__,
-            __LINE__,
-            curl_multi_strerror(mret));
-      curl_easy_cleanup(curl_put);
-      httpSession->cs.client.put = NULL;
-      MUTEX_UNLOCK(httpSession->lock);
-      return SYSERR;
-    }
+  curl_put = create_curl_put(httpSession,
+                            putData,
+                            size + sizeof(MESSAGE_HEADER)); 
+  MUTEX_UNLOCK(httpSession->lock);
+  putData->curl_put = curl_put;
+  if (curl_put == NULL) {
+    FREE(putData->msg);
+    FREE(putData);
+    return SYSERR;
   }
-
-  if ( (httpSession->wsize > HTTP_BUF_SIZE) &&
-       (important == NO) ) {
-    if (stats != NULL)
-      stats->change(stat_bytesDropped,
-                   size);
-    MUTEX_UNLOCK(httpSession->lock);
-    return NO;
+  MUTEX_LOCK(httplock);
+  putData->next = putHead;
+  putHead = putData;
+  mret = curl_multi_add_handle(curl_multi, curl_put);
+  if (mret != CURLM_OK) {
+    GE_LOG(coreAPI->ectx,
+          GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+          _("%s failed at %s:%d: `%s'\n"),
+          "curl_multi_add_handle",
+          __FILE__,
+          __LINE__,
+          curl_multi_strerror(mret));
+    putHead = putData->next;
+    curl_easy_cleanup(curl_put);
+    FREE(putData->msg);
+    FREE(putData);
+    MUTEX_UNLOCK(httplock);
+    return SYSERR;
   }
-  if (httpSession->wsize >= httpSession->wpos + size) {
-    if (httpSession->woff + size <= httpSession->wsize) {
-      memcpy(&httpSession->wbuff[httpSession->woff],
-            msg,
-            size);
-      httpSession->woff += size;
-      httpSession->wpos += size;
-    } else {
-      memmove(httpSession->wbuff,
-             &httpSession->wbuff[httpSession->woff - httpSession->wpos],
-             httpSession->wpos);
-      memcpy(&httpSession->wbuff[httpSession->wpos],
-            msg,
-            size);
-      httpSession->woff = httpSession->wpos + size;
-      httpSession->wpos += size;
-    }
-  } else {
-    if ( (httpSession->wpos + size > HTTP_BUF_SIZE) &&
-        (important == NO) ) {
-      if (stats != NULL)
-       stats->change(stat_bytesDropped,
-                     size);
-      MUTEX_UNLOCK(httpSession->lock);
-      return NO;
-    }
-    GROW(httpSession->wbuff,
-        httpSession->wsize,
-        httpSession->wpos + size);
-    memmove(httpSession->wbuff,
-           &httpSession->wbuff[httpSession->woff - httpSession->wpos],
-           httpSession->wpos);
-    memcpy(&httpSession->wbuff[httpSession->wpos],
-          msg,
-          size);
-    httpSession->woff = httpSession->wpos + size;
-    httpSession->wpos += size;
-  }
-  MUTEX_UNLOCK(httpSession->lock);
+  MUTEX_UNLOCK(httplock);
   return OK;
 }
 
@@ -1040,11 +1027,9 @@
             curl_multi_strerror(mret));
       break;
     }
-    /* use timeout of 1s in case that SELECT is not interrupted by
-       signal (just to increase portability a bit) -- better a 1s
-       delay in the reaction than hanging... */
-    tv.tv_sec = 1;
-    tv.tv_usec = 0;
+    /* CURL requires a regular timeout... */
+    tv.tv_sec = 0;
+    tv.tv_usec = 1000;
     SELECT(max + 1,
           &rs,
           &ws,
@@ -1249,6 +1234,10 @@
                            NULL);
   coreAPI->releaseService(stats);
   stats = NULL;
+  if (upnp != NULL) {
+    coreAPI->releaseService(upnp);
+    stats = NULL;
+  }
   FREENONNULL(filteredNetworks_);
   MUTEX_DESTROY(httplock);
   curl_global_cleanup();





reply via email to

[Prev in Thread] Current Thread [Next in Thread]