[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9843: don't use the dispatch_handler
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9843: don't use the dispatch_handler when in single threaded mode. |
Date: |
Wed, 17 Dec 2008 10:35:48 -0700 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9843
committer: address@hidden
branch nick: rtmp
timestamp: Wed 2008-12-17 10:35:48 -0700
message:
don't use the dispatch_handler when in single threaded mode.
change who owns the network handle for multi-threaded mode.
modified:
cygnal/cygnal.cpp
=== modified file 'cygnal/cygnal.cpp'
--- a/cygnal/cygnal.cpp 2008-12-15 03:48:31 +0000
+++ b/cygnal/cygnal.cpp 2008-12-17 17:35:48 +0000
@@ -69,6 +69,8 @@
#include "buffer.h"
#include "handler.h"
#include "cache.h"
+#include "gettext.h"
+#include "cygnal.h"
#ifdef ENABLE_NLS
#include <locale.h>
@@ -82,6 +84,7 @@
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
+#include <boost/thread/tss.hpp>
//using gnash::log_debug;
using namespace std;
@@ -125,6 +128,9 @@
// Admin commands are small
const int ADMINPKTSIZE = 80;
+// These keep track of the number of active threads.
+ThreadCounter tids;
+
// end of globals
static LogFile& dbglogfile = LogFile::getDefaultInstance();
@@ -138,6 +144,9 @@
static boost::condition alldone;
static boost::mutex alldone_mutex;
+static boost::condition noclients;
+static boost::mutex noclients_mutex;
+
static void
usage()
{
@@ -264,7 +273,7 @@
// struct thread_params ssl_data;
// rtmp_data.port = port_offset + 1935;
// boost::thread rtmp_port(boost::bind(&rtmp_thread, &rtmp_data));
- // Admin handler
+ // Admin handler
if (admin) {
Network::thread_params_t admin_data;
admin_data.port = gnash::ADMIN_PORT;
@@ -479,6 +488,8 @@
Network net;
bool done = false;
static int tid = 0;
+ map<int, Network *> networks;
+
if (netdebug) {
net.toggleDebug(true);
}
@@ -499,10 +510,6 @@
log_debug("This system is configured for %d file descriptors to be watched
by each thread.", nfds);
- // cap the number of threads
- int cpu = 0;
- cpu = (cpu % ncpus);
-
// Get the next thread ID to hand off handling this file
// descriptor to. If the limit for threads per cpu hasn't been
// set or is set to 0, assume one thread per processor by
@@ -514,33 +521,29 @@
} else {
spawn_limit = ncpus * nfds;
}
+ log_debug("Spawn limit is: %d", spawn_limit);
- // Rotate in a range of 0 to the limit.
- tid = (tid + 1) % (spawn_limit + 1);
- log_debug("thread ID %d for fd #%d", tid, fd);
-
// Handler *hand = new Handler;
args->handler = &net;
- if (crcfile.getThreadingFlag() == true) {
- boost::bind(dispatch_handler, args);
- log_debug("Multi-threaded mode for server on fd #%d", fd);
-// log_debug("Starting handler: %x for fd #%d", (void *)hand,
args->netfd);
- boost::thread handler(boost::bind(&dispatch_handler, args));
- }
- // FIXME: this runs forever, we probably want a cleaner way to
+ // FIXME: this may run forever, we probably want a cleaner way to
// test for the end of time.
do {
net.setPort(args->port);
if (netdebug) {
net.toggleDebug(true);
}
+
+ // Rotate in a range of 0 to the limit.
+ tid = (tid + 1) % (spawn_limit + 1);
+ log_debug("thread ID %d for fd #%d", tid, fd);
+
// Wait for a connection to this tcp/ip from a client. If set
// to true, this will block until a request comes in. If set
// to single threaded mode, this will only allow one client to
// connect at a time. This is to make it easier to debug
- // things when you have a heavily threadd application.
+ // things when you have a heavily threaded application.
args->netfd = net.newConnection(true, fd);
if (args->netfd <= 0) {
log_debug("No new network connections");
@@ -552,36 +555,52 @@
struct pollfd fds;
fds.fd = args->netfd;
fds.events = POLLIN | POLLRDHUP;
- if (args->port == (port_offset + RTMPT_PORT)) {
-// Handler::thread_params_t *targs = new Handler::thread_params_t;
-// targs->netfd = args->netfd;
-// targs->handler = args->handler;
- HTTP *http = new HTTP;
- http->setFileFd(args->netfd);
- args->handler = http;
- boost::bind(http_handler, args);
- http->addPollFD(fds, http_handler);
-// hand->notify();
- }
-// if (args->port == RTMP_PORT) {
+ if (crcfile.getThreadingFlag() == true) {
+ // Each dispatch thread gets it's own argument data and
+ // network connection data.
+ log_debug("Multi-threaded mode for server on fd #%d", fd);
+ Network::thread_params_t *targs = new Network::thread_params_t;
+ Network *tnet = 0;
+ targs->netfd = args->netfd;
+ // If we haven't spawned up to our max allowed, start a
+ // new dispatch thread to handle data.
+ if (networks[tid] == 0) {
+ log_debug("Starting new dispatch thread for tid #%d", tid);
+ tids.increment();
+ tnet = new Network;
+ tnet->setFileFd(args->netfd);
+ targs->handler = tnet;
+ } else {
+ log_debug("Not starting new HTTP thread, spawned already for
tid #%d", tid);
+ tnet = networks[tid];
+ }
+ if (args->port == (port_offset + RTMPT_PORT)) {
+ boost::bind(http_handler, args);
+ tnet->addPollFD(fds, http_handler);
+ } else if (args->port == RTMP_PORT) {
+// tnet->addPollFD(fds, rtmp_handler);
+ log_unimpl("Not ready for RTMP data yet.");
+ }
+ if (networks[tid] == 0) {
+ networks[tid] = tnet;
+ boost::thread handler(boost::bind(&dispatch_handler, targs));
+ }
+ } else {
+ // When in single threaded mode, just call the protocol
+ // handler directly. As this is primarily only used when
+ // debugging Cygnal itself, we don't want the extra
+ // overhead of the distpatch_handler.
+ log_debug("Single threaded mode for fd #%d", args->netfd);
+ if (args->port == (port_offset + RTMPT_PORT)) {
+ http_handler(args);
+ } else if (args->port == RTMP_PORT) {
// hand->addPollFD(fds, rtmp_handler);
-// }
- if (crcfile.getThreadingFlag() == false) { // single threaded
- log_debug("Single threaded mode for fd #%d", args->netfd);
- dispatch_handler(args);
-#if 0
- if (args->port == (port_offset + RTMPT_PORT)) {
- boost::thread handler(boost::bind(&http_handler, args));
- }
- if (args->port == (port_offset + RTMP_PORT)) {
- boost::thread handler(boost::bind(&rtmp_handler, args));
- }
-#endif
- } else {
- // hand->wait();
- // handler->join();
- log_debug("Debug mode, waiting for thread to complete");
+ log_unimpl("Not ready for RTMP data yet.");
+ }
}
+
+ log_debug("Number of active Threads is %d", tids.num_of_tids());
+
// net.closeNet(args->netfd); // this shuts down this socket
connection
log_debug("Restarting loop for next connection for port %d...",
args->port);
} while(!done);
@@ -600,8 +619,9 @@
Network *net = reinterpret_cast<Network *>(args->handler);
// Network net;
int timeout = 5000;
+ bool done = false;
-// while(!hand->timetodie()) {
+ do {
int limit = net->getPollFDSize();
net->setTimeout(timeout);
cerr << "LIMIT is: " << limit << endl;
@@ -615,25 +635,33 @@
cerr << "Hits: " << hits->size() << endl;
cerr << "Pollfds: " << net->getPollFDSize() << endl;
for (it = hits->begin(); it != hits->end(); it++) {
+ // We got an error, which isn't always a crises, as some
are normal
+ // if the client disconnects while we're talking to it.
if ((it->revents & POLLRDHUP) || (it->revents & POLLNVAL))
{
log_debug("Revents has a POLLRDHUP or POLLNVAL set to
%d for fd #%d",
it->revents, it->fd);
-// hand->erasePollFD(it->fd);
-// net.closeNet(it->fd);
+ net->erasePollFD(it->fd);
+ net->closeNet(it->fd);
// continue;
break;
} else {
- log_debug("Got something on fd #%d, 0x%x", it->fd,
it->revents);
- // Call the protocol handler for this network connection
- net->getEntry(it->fd)(args);
+ // We got some data, so process it
+ log_debug("Got something on fd #%d, 0x%x", it->fd,
it->revents);
+ // Call the protocol handler for this network connection
+ bool ret = net->getEntry(it->fd)(args);
+ log_debug("Handler returned %s", (ret) ? "true" :
"false");
+ if (ret) {
+ net->closeNet(it->fd);
+ net->erasePollFD(it->fd);
+ }
}
// if (!crcfile.getThreadingFlag()) {
// hand->die();
// }
- if (it->fd <= net->getPollFDSize()) {
- net->closeNet(it->fd);
- net->erasePollFD(it->fd);
- }
+// if (it->fd <= net->getPollFDSize()) {
+// net->closeNet(it->fd);
+// net->erasePollFD(it->fd);
+// }
}
} catch (std::exception& e) {
log_error("Network connection was dropped: %s", e.what());
@@ -649,15 +677,15 @@
} else {
log_debug("nothing to wait for...");
if (crcfile.getThreadingFlag()) {
-// hand->wait();
- log_debug("Got new network file descriptor to watch");
- } else {
- return;
+ done = true;
}
-// }
- }
+ }
+ } while (!done);
+
+ tids.decrement();
+
} // end of dispatch_handler
-
+
// local Variables:
// mode: C++
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9843: don't use the dispatch_handler when in single threaded mode.,
rob <=