gnash-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnash-commit] /srv/bzr/gnash/rtmp r9679: move http_handler to here. Add


From: rob
Subject: [Gnash-commit] /srv/bzr/gnash/rtmp r9679: move http_handler to here. Add new options.
Date: Sat, 01 Nov 2008 08:58:30 -0600
User-agent: Bazaar (1.5)

------------------------------------------------------------
revno: 9679
committer: address@hidden
branch nick: rtmp
timestamp: Sat 2008-11-01 08:58:30 -0600
message:
  move http_handler to here. Add new options.
modified:
  cygnal/cygnal.cpp
=== modified file 'cygnal/cygnal.cpp'
--- a/cygnal/cygnal.cpp 2008-09-09 03:49:35 +0000
+++ b/cygnal/cygnal.cpp 2008-11-01 14:58:30 +0000
@@ -15,8 +15,7 @@
 // You should have received a copy of the GNU General Public License
 // along with this program; if not, write to the Free Software
 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
-// 
-
+//
 
 #ifdef HAVE_CONFIG_H
 #include "gnashconfig.h"
@@ -51,12 +50,15 @@
 #include "log.h"
 #include "crc.h"
 #include "rtmp.h"
+#include "rtmp_server.h"
 #include "http.h"
+#include "utility.h"
 #include "limits.h"
 #include "netstats.h"
 #include "statistics.h"
 //#include "stream.h"
 #include "gmemory.h"
+#include "diskstream.h"
 #include "arg_parser.h"
 
 // classes internal to Cygnal
@@ -73,13 +75,20 @@
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/bind.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition.hpp>
 
-using gnash::log_debug;
+//using gnash::log_debug;
 using namespace std;
+using namespace gnash;
 using namespace cygnal;
-using namespace gnash;
 using namespace amf;
 
+// Keep a list of all active network connections
+namespace gnash {
+extern map<int, gnash::Handler *> handlers;
+}
+
 static void usage();
 static void version_and_copyright();
 static void cntrlc_handler(int sig);
@@ -89,9 +98,6 @@
 
 LogFile& dbglogfile = LogFile::getDefaultInstance();
 
-// The rcfile is loaded and parsed here:
-CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
-
 // Toggles very verbose debugging info from the network Network class
 static bool netdebug = false;
 
@@ -113,16 +119,34 @@
 // conflict with apache on the same machine.
 static int port_offset = 0;
 
-// Keep a list of all active network connections
-namespace gnash {
-extern map<int, Handler *> handlers;
-}
+// Toggle the admin thread
+static bool admin = false;
 
 // Admin commands are small
 const int ADMINPKTSIZE = 80;
 
 // end of globals
 
+// The rcfile is loaded and parsed here:
+CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
+
+// This mutex is used to signify when all the threads are done.
+static boost::condition        alldone;
+static boost::mutex    alldone_mutex;
+
+static void
+usage()
+{
+       cout << _("cygnal -- a streaming media server.") << endl
+       << endl
+       << _("Usage: cygnal [options...]") << endl
+       << _("  -h,  --help          Print this help and exit") << endl
+       << _("  -V,  --version       Print version information and exit") << 
endl
+       << _("  -v,  --verbose       Output verbose debug info") << endl
+       << _("  -p   --port-offset   RTMPT port offset") << endl
+       << endl;
+}
+
 int
 main(int argc, char *argv[])
 {
@@ -140,7 +164,9 @@
         { 'p', "port-offset",   Arg_parser::yes },
         { 'v', "verbose",       Arg_parser::no  },
         { 'd', "dump",          Arg_parser::no  },
-        { 'n', "netdebug",      Arg_parser::no  }
+        { 'n', "netdebug",      Arg_parser::no  },
+        { 't', "testing",       Arg_parser::no  },
+        { 'a', "admin",         Arg_parser::no  }
         };
 
     Arg_parser parser(argc, argv, opts);
@@ -180,6 +206,12 @@
          case 'V':
              version_and_copyright();
              exit(0);
+         case 't':
+             crcfile.setTestingFlag(true);
+             break;
+         case 'a':
+             admin = true;
+             break;
          case 'v':
              dbglogfile.setVerbosity();
              log_debug (_("Verbose output turned on"));
@@ -204,44 +236,48 @@
     act.sa_handler = cntrlc_handler;
     sigaction (SIGINT, &act, NULL);
 
+    boost::mutex::scoped_lock lk(alldone_mutex);
+    
 //     struct thread_params rtmp_data;
 //     struct thread_params ssl_data;
 //     rtmp_data.port = port_offset + 1935;
 //     boost::thread rtmp_port(boost::bind(&rtmp_thread, &rtmp_data));
 
-#if 1
-    // Admin handler
-    Handler::thread_params_t admin_data;
-    admin_data.port = gnash::ADMIN_PORT;
-    boost::thread adminhandler(boost::bind(&admin_handler, &admin_data));
-#endif
-
-#if 1
-    // Incomming connection handler for port 80, HTTP and RTMPT. As port 80 
requires
-    // root access, cygnal supports a "port offset" for debugging and 
development of
-    // the server. Since this port offset changes the constant to test for 
which protocol,
-    // we pass the info to the start thread so it knows which handler to 
invoke.
+    // Incomming connection handler for port 80, HTTP and
+    // RTMPT. As port 80 requires root access, cygnal supports a
+    // "port offset" for debugging and development of the
+    // server. Since this port offset changes the constant to test
+    // for which protocol, we pass the info to the start thread so
+    // it knows which handler to invoke. 
     Handler::thread_params_t http_data;
     http_data.port = port_offset + gnash::RTMPT_PORT;
     http_data.netfd = 0;
     http_data.filespec = docroot;
     boost::thread http_thread(boost::bind(&connection_handler, &http_data));
-#endif
     
-#if 1
-    // Incomming connection handler for port 1935, RTMP. As RTMP is not a 
priviledged port,
-    // we just open it without an offset.
+    // Incomming connection handler for port 1935, RTMP. As RTMP
+    // is not a priviledged port, we just open it without an offset.
     Handler::thread_params_t rtmp_data;
     rtmp_data.port = gnash::RTMP_PORT;
     rtmp_data.netfd = 0;
     rtmp_data.filespec = docroot;
     boost::thread rtmp_thread(boost::bind(&connection_handler, &rtmp_data));
-#endif
+    
+    // Admin handler
+    if (admin) {
+       Handler::thread_params_t admin_data;
+       admin_data.port = gnash::ADMIN_PORT;
+       boost::thread admin_thread(boost::bind(&admin_handler, &admin_data));
+       admin_thread.join();
+    }
 
     // wait for the thread to finish
-//     adminhandler.join();    
 //     http_thread.join();
-    rtmp_thread.join();
+//     rtmp_thread.join();
+
+    // Wait for all the threads to die
+    alldone.wait(lk);
+    
     log_debug (_("Cygnal done..."));
     
     return(0);
@@ -298,20 +334,6 @@
     << endl;
 }
 
-
-static void
-usage()
-{
-       cout << _("cygnal -- a streaming media server.") << endl
-       << endl
-       << _("Usage: cygnal [options...]") << endl
-       << _("  -h,  --help          Print this help and exit") << endl
-       << _("  -V,  --version       Print version information and exit") << 
endl
-       << _("  -v,  --verbose       Output verbose debug info") << endl
-       << _("  -p   --port-offset   RTMPT port offset") << endl
-       << endl;
-}
-
 // FIXME: this function could be tweaked for better performance
 void
 admin_handler(Handler::thread_params_t *args)
@@ -419,6 +441,9 @@
        net.closeNet();         // this shuts down this socket connection
     }
     net.closeConnection();             // this shuts down the server on this 
connection
+
+    // All threads should exit now.
+    alldone.notify_all();
 }
     
 void
@@ -428,12 +453,12 @@
     int fd = 0;
 //    list<Handler *> *handlers = reinterpret_cast<list<Handler *> 
*>(args->handle);
 
-    log_debug("Starting Connection Handler for fd #%d, port %hd", args->netfd, 
args->port);
     Network net;
     if (netdebug) {
        net.toggleDebug(true);
     }
     fd = net.createServer(args->port);
+    log_debug("Starting Connection Handler for fd #%d, port %hd", fd, 
args->port);
     // FIXME: this runs forever, we probably want a cleaner way to test for 
the end of time.
     do {
        Handler *hand = new Handler;
@@ -445,12 +470,189 @@
        args->handle = hand;
        log_debug("Adding handler: %x for fd #%d", (void *)hand, args->netfd);
        handlers[args->netfd] = hand;
-       hand->start(args);
+
+       if (crcfile.getThreadingFlag()) {
+           hand->start(args);
+       } else {
+           log_debug(_("Starting Handlers for port %d, tid %ld"),
+                     args->port, get_thread_id());
+           
+           if (args->port == (port_offset + RTMPT_PORT)) {
+               boost::thread handler(boost::bind(&http_handler, args));
+           }
+           if (args->port == RTMP_PORT) {
+               boost::thread handler(boost::bind(&rtmp_handler, args));
+           }
+       }
        log_debug("Restarting loop for next connection for port %d...", 
args->port);
     } while(1);
+
+    // All threads should exit now.
+    alldone.notify_all();
+
 } // end of connection_handler
 
-//} // end of cygnal namespace
+extern "C" {
+void
+http_handler(Handler::thread_params_t *args)
+{
+    GNASH_REPORT_FUNCTION;
+//    struct thread_params thread_data;
+    string url, filespec, parameters;
+    string::size_type pos;
+    Handler *hand = reinterpret_cast<Handler *>(args->handle);
+    HTTP www;
+    www.setHandler(hand);
+
+    log_debug(_("Starting HTTP Handler for fd #%d, tid %ld"),
+             args->netfd, get_thread_id());
+    
+    string docroot = args->filespec;
+    
+    log_debug("Starting to wait for data in net for fd #%d", args->netfd);
+
+    // Wait for data, and when we get it, process it.
+    do {
+#ifdef THREADED_IO
+       hand->wait();
+       if (hand->timetodie()) {
+           log_debug("Not waiting no more, no more for more HTTP data for fd 
#%d...", args->netfd);
+           map<int, Handler *>::iterator hit = handlers.find(args->netfd);
+           if ((*hit).second) {
+               log_debug("Removing handle %x for HTTP on fd #%d",
+                         (void *)hand, args->netfd);
+               handlers.erase(args->netfd);
+               delete (*hit).second;
+           }
+           return;
+       }
+#endif
+       
+#ifdef USE_STATISTICS
+       struct timespec start;
+       clock_gettime (CLOCK_REALTIME, &start);
+#endif
+       
+//     conndata->statistics->setFileType(NetStats::RTMPT);
+//     conndata->statistics->startClock();
+//     args->netfd = www.getFileFd();
+//     www.recvMsg(5);
+       www.recvMsg(args->netfd);
+       
+       if (!www.processGetRequest()) {
+           hand->die();        // tell all the threads for this connection to 
die
+           hand->notifyin();
+           log_debug("Net HTTP done for fd #%d...", args->netfd);
+//         hand->closeNet(args->netfd);
+           return;
+       }
+       url = docroot;
+       url += www.getURL();
+       pos = url.find("?");
+       filespec = url.substr(0, pos);
+       parameters = url.substr(pos + 1, url.size());
+       // Get the file size for the HTTP header
+       
+       if (www.getFileStats(filespec) == amf::AMF::FILETYPE_ERROR) {
+           www.formatErrorResponse(HTTP::NOT_FOUND);
+       }
+       // Send the reply
+       www.formatGetReply(HTTP::LIFE_IS_GOOD);
+//     cerr << "Size = " << www.getHeader().size() << "        " << 
www.getHeader() << endl;
+       
+       hand->Network::writeNet(args->netfd, (boost::uint8_t 
*)www.getHeader().c_str(), www.getHeader().size());
+//     hand->writeNet(args->netfd, www.getHeader(), www.getHeader().size());
+//     strcpy(thread_data.filespec, filespec.c_str());
+//     thread_data.statistics = conndata->statistics;
+       
+       // Keep track of the network statistics
+//     conndata->statistics->stopClock();
+//     log_debug (_("Bytes read: %d"), www.getBytesIn());
+//     log_debug (_("Bytes written: %d"), www.getBytesOut());
+//     st.setBytes(www.getBytesIn() + www.getBytesOut());
+//     conndata->statistics->addStats();
+
+       if (filespec[filespec.size()-1] == '/') {
+           filespec += "index.html";
+       }
+
+//     DiskStream filestream;
+//     filestream.open(filespec);
+#if 0
+       if (url != docroot) {
+           log_debug (_("File to load is: %s"), filespec.c_str());
+           log_debug (_("Parameters are: %s"), parameters.c_str());
+           struct stat st;
+           int filefd;
+           size_t ret;
+#ifdef USE_STATISTICS
+           struct timespec start;
+           clock_gettime (CLOCK_REALTIME, &start);
+#endif
+           if (stat(filespec.c_str(), &st) == 0) {
+               filefd = ::open(filespec.c_str(), O_RDONLY);
+               log_debug (_("File \"%s\" is %lld bytes in size, disk fd #%d"), 
filespec,
+                          st.st_size, filefd);
+               do {
+                   boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
+                   ret = read(filefd, buf->reference(), buf->size());
+                   if (ret == 0) { // the file is done
+                       break;
+                   }
+                   if (ret != buf->size()) {
+                       buf->resize(ret);
+                       log_debug("Got last data block from disk file, size 
%d", buf->size());
+                   }
+                   log_debug("Read %d bytes from %s.", ret, filespec);
+#if 0
+                   hand->pushout(buf);
+                   hand->notifyout();
+#else
+                   // Don't bother with the outgoing que
+                   if (ret > 0) {
+                       ret = hand->writeNet(buf);
+                   }
+#endif
+               } while(ret > 0);
+               log_debug("Done transferring %s to net fd #%d",
+                         filespec, args->netfd);
+               ::close(filefd); // close the disk file
+               // See if this is a persistant connection
+//             if (!www.keepAlive()) {
+//                 log_debug("Keep-Alive is off", www.keepAlive());
+// //              hand->closeConnection();
+//             }
+#ifdef USE_STATISTICS
+               struct timespec end;
+               clock_gettime (CLOCK_REALTIME, &end);
+               log_debug("Read %d bytes from \"%s\" in %f seconds",
+                         st.st_size, filespec,
+                         (float)((end.tv_sec - start.tv_sec) + ((end.tv_nsec - 
start.tv_nsec)/1e9)));
+#endif
+           }
+
+//         memset(args->filespec, 0, 256);
+//         memcpy(->filespec, filespec.c_str(), filespec.size());
+//         boost::thread sendthr(boost::bind(&stream_thread, args));
+//         sendthr.join();
+       }
+#endif
+       
+#ifdef USE_STATISTICS
+       struct timespec end;
+       clock_gettime (CLOCK_REALTIME, &end);
+       log_debug("Processing time for GET request was %f seconds",
+                 (float)((end.tv_sec - start.tv_sec) + ((end.tv_nsec - 
start.tv_nsec)/1e9)));
+#endif
+//     conndata->statistics->dump();
+//    }
+    } while(!hand->timetodie());
+    
+    log_debug("httphandler all done now finally...");
+    
+} // end of httphandler
+    
+} // end of extern C
 
 // local Variables:
 // mode: C++


reply via email to

[Prev in Thread] Current Thread [Next in Thread]