gnash-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnash-commit] /srv/bzr/gnash/rtmp r9768: handle persistant and non pers


From: rob
Subject: [Gnash-commit] /srv/bzr/gnash/rtmp r9768: handle persistant and non persistant network connections based on the Keep-Alive fields.
Date: Fri, 21 Nov 2008 22:12:52 -0700
User-agent: Bazaar (1.5)

------------------------------------------------------------
revno: 9768
committer: address@hidden
branch nick: rtmp
timestamp: Fri 2008-11-21 22:12:52 -0700
message:
  handle persistant and non persistant network connections based on the 
Keep-Alive fields.
modified:
  cygnal/cygnal.cpp
  libnet/http.cpp
=== modified file 'cygnal/cygnal.cpp'
--- a/cygnal/cygnal.cpp 2008-11-21 16:41:31 +0000
+++ b/cygnal/cygnal.cpp 2008-11-22 05:12:52 +0000
@@ -175,7 +175,7 @@
         { 't', "testing",       Arg_parser::no  },
         { 'a', "admin",         Arg_parser::no  },
         { 'r', "root",          Arg_parser::yes },
-        { 'c', "threads",       Arg_parser::no }
+        { 'm', "multithreaded", Arg_parser::no }
         };
 
     Arg_parser parser(argc, argv, opts);
@@ -234,7 +234,7 @@
          case 'r':
              docroot = parser.argument(i).c_str();
              break;
-         case 'c':
+         case 'm':
              crcfile.setThreadingFlag(true);
              break;
          case 'n':
@@ -497,7 +497,50 @@
     // Start a server on this tcp/ip port.
     fd = net.createServer(args->port);
     log_debug("Starting Connection Handler for fd #%d, port %hd", fd, 
args->port);
-    // FIXME: this runs forever, we probably want a cleaner way to test for 
the end of time.
+
+    // Get the number of cpus in this system. For multicore
+    // systems we'll get better load balancing if we keep all the
+    // cpus busy. So a pool of threrads is started for each cpu,
+    // the default being just one. Each thread is reponsible for
+    // handling part of the total active file descriptors.
+#ifdef HAVE_SYSCONF
+    long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
+    log_debug("This system has %d cpus.", ncpus);
+#endif 
+    size_t nfds = crcfile.getFDThread();
+    
+    log_debug("This system is configured for %d file descriptors to be watched 
by each thread.", nfds);
+    
+    // cap the number of threads
+    int cpu = 0;
+    cpu = (cpu % ncpus);
+    
+    // Get the next thread ID to hand off handling this file
+    // descriptor to. If the limit for threads per cpu hasn't been
+    // set or is set to 0, assume one thread per processor by
+    // default. There won't even be threads for each cpu if
+    // threading has been disabled in the cygnal config file.
+    int spawn_limit = 0;
+    if (nfds == 0) {
+       spawn_limit = ncpus;
+    } else {
+       spawn_limit = ncpus * nfds;
+    }
+
+    // Rotate in a range of 0 to the limit.
+    tid = (tid + 1) % (spawn_limit + 1);
+    log_debug("thread ID %d for fd #%d", tid, args->netfd);
+       
+    Handler *hand = new Handler;
+    args->handler = hand;
+    if (crcfile.getThreadingFlag()) {
+      log_debug("Multi-threaded mode for server on fd #%d", fd);
+//      log_debug("Starting handler: %x for fd #%d", (void *)hand, 
args->netfd);
+      boost::thread handler(boost::bind(&dispatch_handler, args));
+    }
+    
+    // FIXME: this runs forever, we probably want a cleaner way to
+    // test for the end of time.
     do {
        net.setPort(args->port);
        if (netdebug) {
@@ -511,39 +554,6 @@
        args->netfd = net.newConnection(true, fd);
        log_debug("New network connection for fd #%d", args->netfd);
     
-       // Get the number of cpus in this system. For multicore
-       // systems we'll get better load balancing if we keep all the
-       // cpus busy. So a pool of threrads is started for each cpu,
-       // the default being just one. Each thread is reponsible for
-       // handling part of the total active file descriptors.
-#ifdef HAVE_SYSCONF
-       long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
-       log_debug("This system has %d cpus.", ncpus);
-#endif 
-       size_t nfds = crcfile.getFDThread();
-       
-       log_debug("This system is configured for %d file descriptors to be 
watched by each thread.", nfds);
-
-       // cap the number of threads
-       int cpu = (cpu % ncpus);
-
-       // Get the next thread ID to hand off handling this file
-       // descriptor to. If the limit for threads per cpu hasn't been
-       // set or is set to 0, assume one thread per processor by
-       // default. There won't even be threads for each cpu if
-       // threading has been disabled in the cygnal config file.
-       int spawn_limit = 0;
-       if (nfds == 0) {
-           spawn_limit = ncpus;
-       } else {
-           spawn_limit = ncpus * nfds;
-       }
-
-       // Rotate in a range of 0 to the limit.
-       tid = (tid + 1) % (spawn_limit + 1);
-       log_debug("thread ID %d for fd #%d", tid, args->netfd);
-       
-       Handler *hand = new Handler;
        struct pollfd fds;
        fds.fd = args->netfd;
        fds.events = POLLIN |POLLRDHUP;
@@ -554,28 +564,24 @@
 //         hand->addPollFD(fds, rtmp_handler);
 //     }
        // if supporting multiple threads
-       args->handler = hand;
-       if (crcfile.getThreadingFlag()) {
-           log_debug("Multi-threaded mode for fd #%d", args->netfd);
-           log_debug("Starting handler: %x for fd #%d", (void *)hand, 
args->netfd);
+       if (!crcfile.getThreadingFlag()) {
+         log_debug("Single threaded mode for fd #%d", args->netfd);
+         dispatch_handler(args);
 #if 0
-           if (args->port == (port_offset + RTMPT_PORT)) {
-               boost::thread handler(boost::bind(&http_handler, args));
-           }
-           if (args->port == (port_offset + RTMP_PORT)) {
-               boost::thread handler(boost::bind(&rtmp_handler, args));
-           }
-#endif
-           boost::thread handler(boost::bind(&dispatch_handler, args));
+         if (args->port == (port_offset + RTMPT_PORT)) {
+           boost::thread handler(boost::bind(&http_handler, args));
+         }
+         if (args->port == (port_offset + RTMP_PORT)) {
+           boost::thread handler(boost::bind(&rtmp_handler, args));
+         }
        } else {                // single threaded
-           log_debug("Single threaded mode for fd #%d", args->netfd);
-           dispatch_handler(args);
+#endif
        }
 //     net.closeNet(args->netfd);              // this shuts down this socket 
connection
        log_debug("Restarting loop for next connection for port %d...", 
args->port);
     } while(!done);
     
-    // All threads should exit now.
+    // All threads should wake up now.
     alldone.notify_all();
 
 } // end of connection_handler
@@ -592,6 +598,7 @@
     while(!hand->timetodie()) {    
        int limit = hand->getPollFDSize();
        net.setTimeout(timeout);
+       cerr << "LIMIT is: " << limit << endl;
        if (limit > 0) {
            struct pollfd *fds = hand->getPollFDPtr();
            boost::shared_ptr< vector<struct pollfd> > hits;
@@ -607,10 +614,14 @@
                                  it->revents, it->fd);
                        hand->erasePollFD(it);
                        net.closeNet(it->fd);
+                       continue;
                    }
                    log_debug("Got something on fd #%d, 0x%x", it->fd, 
it->revents);
                    hand->getEntry(it->fd)(args);
-                   hand->erasePollFD(it);
+//                 if (!crcfile.getThreadingFlag()) {
+//                     hand->die();
+//                 }
+                   hand->erasePollFD(it->fd);
                    net.closeNet(it->fd);
                }
            } catch (std::exception& e) {
@@ -625,8 +636,10 @@
                }
            }
         } else {
-           log_debug("nothing to wait for.");
-           hand->die();
+           log_debug("nothing to wait for...");
+           sleep(1);
+           return;
+           //hand->wait();
         }
     }
 } // end of dispatch_handler

=== modified file 'libnet/http.cpp'
--- a/libnet/http.cpp   2008-11-22 00:28:11 +0000
+++ b/libnet/http.cpp   2008-11-22 05:12:52 +0000
@@ -1501,11 +1501,11 @@
 void
 http_handler(Handler::thread_params_t *args)
 {
-    GNASH_REPORT_FUNCTION;
+//    GNASH_REPORT_FUNCTION;
 //    struct thread_params thread_data;
     string url, filespec, parameters;
     string::size_type pos;
-    Network *net = reinterpret_cast<Network *>(args->handler);
+    Handler *hand = reinterpret_cast<Handler *>(args->handler);
     HTTP www;
     bool done = false;
 //    www.setHandler(net);
@@ -1519,30 +1519,12 @@
 
     // Wait for data, and when we get it, process it.
     do {
-#ifdef THREADED_IO
-       hand->wait();
-       if (hand->timetodie()) {
-           log_debug("Not waiting no more, no more for more HTTP data for fd 
#%d...", args->netfd);
-           map<int, Handler *>::iterator hit = handlers.find(args->netfd);
-           if ((*hit).second) {
-               log_debug("Removing handle %x for HTTP on fd #%d",
-                         (void *)hand, args->netfd);
-               handlers.erase(args->netfd);
-               delete (*hit).second;
-           }
-           return;
-       }
-#endif
        
 #ifdef USE_STATISTICS
        struct timespec start;
        clock_gettime (CLOCK_REALTIME, &start);
 #endif
        
-//     conndata->statistics->setFileType(NetStats::RTMPT);
-//     conndata->statistics->startClock();
-//     args->netfd = www.getFileFd();
-//     www.recvMsg(5);
        www.recvMsg(args->netfd);
        
        if (!www.processGetRequest()) {
@@ -1655,11 +1637,12 @@
        }
        log_debug("http_handler all done transferring requested file...");
 //     cache.dump();
-       done = true;
+//     done = true;
 
+       // Unless the Keep-Alive flag is set, this isn't a persisant network
+       // connection.
        if (!www.keepAlive()) {
            log_debug("Keep-Alive is off", www.keepAlive());
-//         www.closeConnection();
            done = true;
        }
 #if 0
@@ -1679,7 +1662,7 @@
                // See if this is a persistant connection
 //             if (!www.keepAlive()) {
 //                 log_debug("Keep-Alive is off", www.keepAlive());
-               hand->closeConnection();
+               hand->closeNet();
 //             }
            }
        }
@@ -1698,6 +1681,10 @@
 //    } while(!hand->timetodie());
     } while(done != true);
     
+//     www.closeNet(args->netfd);
+//     hand->erasePollFD(args->netfd);
+    hand->notify();
+    
     log_debug("http_handler all done now finally...");
     
 } // end of httphandler


reply via email to

[Prev in Thread] Current Thread [Next in Thread]