gnash-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnash-commit] /srv/bzr/gnash/rtmp r9701: add support for an array of po


From: rob
Subject: [Gnash-commit] /srv/bzr/gnash/rtmp r9701: add support for an array of pollfds, need for poll().
Date: Wed, 05 Nov 2008 19:31:49 -0700
User-agent: Bazaar (1.5)

------------------------------------------------------------
revno: 9701
committer: address@hidden
branch nick: rtmp
timestamp: Wed 2008-11-05 19:31:49 -0700
message:
  add support for an array of pollfds, need for poll().
modified:
  libnet/handler.cpp
  libnet/handler.h
=== modified file 'libnet/handler.cpp'
--- a/libnet/handler.cpp        2008-11-01 15:02:59 +0000
+++ b/libnet/handler.cpp        2008-11-06 02:31:49 +0000
@@ -59,7 +59,7 @@
 Handler::~Handler()
 {
 //    GNASH_REPORT_FUNCTION;
-    closeConnection();
+//    closeConnection();
     _die = true;
     notifyout();
     notifyin();
@@ -160,6 +160,26 @@
     }    
 }
 
+void
+Handler::addPollFD(struct pollfd &fd)
+{
+    boost::mutex::scoped_lock lock(_poll_mutex);
+    return _pollfds.push_back(fd);
+}
+
+struct pollfd
+&Handler::getPollFD(int index)
+{
+    boost::mutex::scoped_lock lock(_poll_mutex);
+    return _pollfds[index];
+}
+struct pollfd *
+Handler::getPollFDPtr()
+{
+    boost::mutex::scoped_lock lock(_poll_mutex);
+    return &_pollfds[0];
+};
+
 // Dump internal data.
 void
 Handler::dump()
@@ -169,6 +189,7 @@
     _outgoing.dump();    
 }
 
+#if 0
 size_t
 Handler::readPacket(int fd)
 {
@@ -205,6 +226,7 @@
     }
     return ret;
 }
+#endif
 
 // start the two thread handlers for the queues
 bool
@@ -219,8 +241,28 @@
     log_debug(_("Starting Handlers for port %d, tid %ld"),
              args->port, get_thread_id());
 
+    struct pollfd *fds;
+    int nfds = 1;
+    Network net;
+    boost::shared_ptr<vector<int> > hits = net.waitForNetData(nfds, fds);
+    vector<int>::const_iterator it;
+#if 0
+    for (it = _pollfds.begin(); it != _pollfds.end(); it++) {
+//     Buffer buf;
+//     net.readNet(*it, buf.reference(), buf.size());
+       args->netfd = *it;
+       if (crcfile.getThreadingFlag()) {
+           if (args->port == port_offset + gnash::RTMPT_PORT) {
+               boost::thread handler(boost::bind(&http_handler, args));
+           }
+       } else {
+           callback[*it](args);
+       }
+    }    
+#endif
+    
 //     boost::thread outport(boost::bind(&netout_handler, args));
-    boost::thread inport(boost::bind(&netin_handler, args));
+//    boost::thread inport(boost::bind(&netin_handler, args));
 
 #if 0
     if (args->port == 4080) {  // FIXME: hack alert!
@@ -231,15 +273,6 @@
     }
 #endif
 
-// We don't want to wait for the threads to complete, we
-// want to return to the main program so it can spawn another
-// thread for the next incoming connection.    
-//     inport.join();    
-//     outport.join();
-//     handler.join();
-//     if (_die) {
-//     log_debug("Handler done...");
-//     }
     return true;
 }
 
@@ -249,13 +282,14 @@
 {
     GNASH_REPORT_FUNCTION;
 
-    Handler *hand = reinterpret_cast<Handler *>(args->handle);
+    Network *net = reinterpret_cast<Network *>(args->handler);
+    size_t ret;
 
     log_debug("Starting to wait for data in net for fd #%d", args->netfd);
     
     do {
        boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
-       size_t ret = hand->readNet(args->netfd, buf->reference(), buf->size(), 
1);
+//     ret = hand->readNet(args->netfd, buf->reference(), buf->size(), 1);
 
 //     cerr << (char *)buf->reference() << endl;
        // the read timed out as there was no data, but the socket is still 
open.
@@ -275,18 +309,19 @@
 //         if (ret < NETBUFSIZE) {
 //             buf->resize(ret);
 //         }
-           hand->push(buf);
-           hand->notify();
+//         hand->push(buf);
+//         hand->notify();
        } else {
            log_debug("no more data for fd #%d, exiting...", args->netfd);
-           hand->die();
+//         hand->die();
            break;
        }
-    } while (!hand->timetodie());
+//    } while (!hand->timetodie());
+    } while (ret > 0);
     // We're done. Notify the other threads the socket is closed, and tell 
them to die.
     log_debug("Net In handler done for fd #%d...", args->netfd);
-    hand->notify();
-    hand->closeNet(args->netfd);
+//    hand->notify();
+//    hand->closeNet(args->netfd);
 //    hand->dump();
 }
 

=== modified file 'libnet/handler.h'
--- a/libnet/handler.h  2008-11-01 15:02:59 +0000
+++ b/libnet/handler.h  2008-11-06 02:31:49 +0000
@@ -19,10 +19,24 @@
 #ifndef __HANDLER_H__
 #define __HANDLER_H__ 1
 
+#ifdef HAVE_CONFIG_H
+#include "gnashconfig.h"
+#endif
+
 #include <boost/cstdint.hpp>
+#include <boost/thread/mutex.hpp>
 //#include <boost/thread/condition.hpp>
 #include <string>
 #include <deque>
+#include <map>
+
+#ifdef HAVE_POLL
+# include <sys/poll.h>
+#else 
+# ifdef HAVE_EPOLL
+#  include <sys/epoll.h>
+# endif
+#endif
 
 #include "log.h"
 #include "network.h"
@@ -35,9 +49,11 @@
 namespace gnash
 {
 
-class Handler : public gnash::Network
+
+class Handler
 {
 public:
+    
      DSOEXPORT Handler();
     ~Handler();
 
@@ -53,10 +69,12 @@
     typedef struct {
        int netfd;
        int port;
-       void *handle;
+       void *handler;
        std::string filespec;
     } thread_params_t ;
     
+    typedef void entry_t (thread_params_t *);
+
     // Specify which queue should be used
     typedef enum { INCOMING, OUTGOING } fifo_e;
     
@@ -124,11 +142,12 @@
     void waitin() { _incoming.wait(); };
     void waitout() { _outgoing.wait(); };
 
-    size_t readPacket(int fd);
+//    size_t readPacket(int fd);
     
     // start the two thread handlers for the queues
     bool DSOEXPORT start(thread_params_t *args);
 
+#if 0
     /// \brief Write a Buffer the network connection.
     ///
     /// @param fd The file descriptor to write the data too.
@@ -146,7 +165,8 @@
     /// @return The number of bytes sent
     int  DSOEXPORT writeNet(boost::shared_ptr<amf::Buffer> &buf)
        { return Network::writeNet(buf->reference(), buf->size()); };
-    
+#endif
+
     // Dump internal data.
     void dump();
     
@@ -156,12 +176,24 @@
 #endif
     void die() { _die = true; _outgoing.notify(); };
     bool timetodie() { return _die; };
+
+    void addPollFD(struct pollfd &fd);
+    struct pollfd &getPollFD(int index);
+    struct pollfd *getPollFDPtr();
     
 private:
-    bool _die;
-    int _netfd;
-    CQue _incoming;
-    CQue _outgoing;
+    bool       _die;
+    int                _netfd;
+    CQue       _incoming;
+    CQue       _outgoing;
+    /// \var Handler::_handlers
+    ///                Keep a list of all active network connections
+    std::map<int, entry_t *> _handlers;
+#ifdef HAVE_POLL
+    // This is the mutex that controls access to the que.
+    std::vector<struct pollfd> _pollfds;
+    boost::mutex       _poll_mutex;
+#endif
 };
 
 // This is the thread for all incoming network connections, which


reply via email to

[Prev in Thread] Current Thread [Next in Thread]