gnash-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnash-commit] /srv/bzr/gnash/rtmp r9974: improved main message processi


From: rob
Subject: [Gnash-commit] /srv/bzr/gnash/rtmp r9974: improved main message processing loop.
Date: Wed, 04 Feb 2009 17:43:21 -0700
User-agent: Bazaar (1.5)

------------------------------------------------------------
revno: 9974
committer: address@hidden
branch nick: rtmp
timestamp: Wed 2009-02-04 17:43:21 -0700
message:
  improved main message processing loop.
modified:
  cygnal/rtmp_server.cpp
=== modified file 'cygnal/rtmp_server.cpp'
--- a/cygnal/rtmp_server.cpp    2009-02-04 21:12:35 +0000
+++ b/cygnal/rtmp_server.cpp    2009-02-05 00:43:21 +0000
@@ -366,7 +366,7 @@
 boost::shared_ptr<Buffer>
 RTMPServer::encodeResult(RTMPMsg::rtmp_status_e status)
 {
-    GNASH_REPORT_FUNCTION;
+//    GNASH_REPORT_FUNCTION;
     
 //    Buffer *buf = new Buffer;
 //     boost::uint8_t *ptr = buf->reference();
@@ -715,9 +715,12 @@
     // Adjust the timeout
     rtmp->setTimeout(10);
     
-    boost::shared_ptr<amf::Buffer> pkt;
+    boost::shared_ptr<amf::Buffer>  pkt;
     boost::shared_ptr<amf::Element> tcurl;
     boost::shared_ptr<amf::Element> swfurl;
+    boost::shared_ptr<amf::Buffer> response;
+
+    RTMP::rtmp_headersize_e response_head_size = RTMP::HEADER_12;
     
     // This handler is called everytime there is RTMP data on a socket to 
process the
     // messsage. Unlike HTTP, RTMP always uses persistant network connections, 
so we
@@ -755,6 +758,7 @@
            initialize = false;
        }
 
+#if 0
        // The very first message after the handshake is the Invoke call of
        // NetConnection::connect().
        boost::shared_ptr<RTMP::rtmp_head_t> head = rtmp->decodeHeader(*pkt);
@@ -786,6 +790,7 @@
                }
            }
        }
+#endif
 
        // Send a ping to reset the new stream
        boost::shared_ptr<amf::Buffer> ping_reset = 
rtmp->encodePing(RTMP::PING_RESET, 0);
@@ -795,7 +800,8 @@
        } else {
            log_error("Couldn't send Ping to client!");
        }
-       
+
+#if 0
        // send a response to the NetConnection::connect() request
 //     boost::shared_ptr<amf::Buffer> response = 
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
        boost::shared_ptr<amf::Buffer> response;
@@ -804,7 +810,11 @@
            log_error("No body found in message!");
            return false;
        } else {
-           response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+           if (0) {
+               response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_FAILED);
+           } else {
+               response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+           }
            // The initial packet from the client is always a NetConnection 
object
            // invoking the 'connect' method.
            if (body->getMethodName() == "connect") {
@@ -824,16 +834,18 @@
        } else {
            log_error("Couldn't send NetConnection::connect() response to 
client!");
        }
-       
+#endif
     } else {
        // Read the handshake bytes sent by the client when requesting
        // a connection.
+#if 0
        pkt = rtmp->recvMsg(args->netfd);
        // See if we have data in the handshake, we should have 1537 bytes
        if (pkt->allocated() == 0) {
            log_error("failed to read RTMP data from the client.");
            return false;
        }
+#endif
     }
 
     // See if this is a Red5 style echo test.
@@ -848,7 +860,6 @@
            }
        }
     }
-  
       // Keep track of the network statistics
 //    Statistics st;
 //    st.setFileType(NetStats::RTMP);
@@ -867,23 +878,95 @@
     
     // Adjust the timeout
     rtmp->setTimeout(30);
+//    boost::shared_ptr<amf::Buffer> buf;
     do {
-       boost::shared_ptr<amf::Buffer> buf = rtmp->recvMsg(args->netfd);
-       if (buf) {
-           if (buf->allocated()) {
-               boost::uint8_t *ptr = buf->reference();
-               if (ptr == 0) {
-                   log_debug("Que empty, net connection dropped for fd #%d", 
args->netfd);
-                   return false;
-               }
-               boost::shared_ptr<RTMP::rtmp_head_t> rthead = 
rtmp->decodeHeader(ptr);
-               ptr += rthead->head_size; // skip past the header
+       // If there is no data left from the previous chunk, process that before
+       // reading more data.
+       if (pkt != 0) {
+           log_debug("data left from previous packet");
+       } else {
+           pkt = rtmp->recvMsg(args->netfd);
+       }
+       
+       if (pkt != 0) {
+           boost::uint8_t *tmpptr = 0;
+           if (pkt->allocated()) {
+               boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
+               boost::shared_ptr<RTMP::rtmp_head_t> qhead;
+               cerr << "FIXME1 Que size is: " << que->size() << endl;
+               for (size_t i=0; i<que->size(); i++) {
+                   boost::shared_ptr<amf::Buffer> bufptr = que->at(i)->pop();
+//                     que->at(i)->dump();
+                   if (bufptr) {
+                       bufptr->dump();
+                       qhead = rtmp->decodeHeader(bufptr->reference());
+                       log_debug("Message for channel #%d", qhead->channel);
+//                     tmpptr = bufptr->reference();
+                       tmpptr = bufptr->reference() + qhead->head_size;
+                       if (qhead->channel == RTMP_SYSTEM_CHANNEL) {
+                           boost::shared_ptr<RTMP::rtmp_ping_t> ping = 
rtmp->decodePing(tmpptr);
+                           log_debug("Processed Ping message from client, type 
%d", ping->type);
+                       } else {
+                           body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
+                           if (body) {
+                               body->setChannel(qhead->channel);
+                               // Invoke the NetConnection::connect() method
+                               if (body->getMethodName() == "connect") {
+                                   response_head_size = RTMP::HEADER_12;
+                                   tcurl  = body->findProperty("tcUrl");
+                                   if (tcurl) {
+                                       log_debug("Client request for remote 
file is: %s", tcurl->to_string());
+                                   }
+                                   swfurl = body->findProperty("swfUrl");
+                                   if (swfurl) {
+                                       log_debug("SWF filename making request 
is: %s", swfurl->to_string());
+                                   }
+                                   response = 
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+                                   
+                                   // Send a ping to reset the new stream
+                                   boost::shared_ptr<amf::Buffer> ping_reset = 
rtmp->encodePing(RTMP::PING_RESET, 0);
+                                   if (rtmp->sendMsg(args->netfd, 
RTMP_SYSTEM_CHANNEL, RTMP::HEADER_12,
+                                                     ping_reset->size(), 
RTMP::PING, RTMPMsg::FROM_SERVER, *ping_reset)) {
+                                       log_debug("Sent Ping to client");
+                                   } else {
+                                       log_error("Couldn't send Ping to 
client!");
+                                   }
+                                   
+                               }
+                               
+                               // Invoke the NetStream::createStream() method
+                               if (body->getMethodName() == "createStream") {
+                                   double streamid  = body->getStreamID();
+                                   log_debug("The streamID from 
NetStream::createStream() is: %d", streamid);
+                                   response_head_size = RTMP::HEADER_8;
+                                   response = 
rtmp->encodeResult(RTMPMsg::NS_DATA_START);
+                                   body->dump();
+                               }
+                               if (rtmp->sendMsg(args->netfd, 
body->getChannel(), response_head_size, response->allocated(),
+                                                 RTMP::INVOKE, 
RTMPMsg::FROM_SERVER, *response)) {
+                                   log_error("Sent response to client.");
+                               } else {
+                                   log_error("Couldn't send response to 
client!");
+                               }
+                           }
+                       }
+                   } else {
+                       log_error("Message contains no data!");
+                   }
+               } // end of processing all the messages in the que
+               
+               // we're done processing these packets, so get rid of them
+               pkt.reset();
+
+
+               
+#if 0    
                // This is support for the Red5 'echo_test', which exercises 
encoding and
                // decoding of complex and nested AMF data types. FIXME: this 
should be
                // moved to a CGI type of thing that executes this as a 
separate process,
                // using a socket to pass output back to the client.
                if (echo) {
-                   boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*buf);
+                   boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
                    boost::shared_ptr<amf::Buffer> bufptr;
                    if (que->size() > 0) {
                        cerr << "FIXME2 echo Que size is: " << que->size() << 
endl;
@@ -913,14 +996,13 @@
                        log_error("Couldn't send echo test response to 
client!");
                        done = true;
                    }
-               } else {
+               } else {        // end of Red5 echo test support
 //                 buf->dump();
                    // This is a non-Red5 message, which should be the normal 
mode of operating.
-                   boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*buf);
+//                 boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
                    if (que->size() > 0) {
                        boost::shared_ptr<amf::Buffer> bufptr;
                        if (que->size() > 0) {
-                           cerr << "FIXME3 Que size is: " << que->size() << 
endl;
                            bufptr = que->at(0)->pop();
                        }
                        if (bufptr) {
@@ -930,8 +1012,14 @@
                                boost::uint8_t *tmpptr = bufptr->reference() + 
qhead->head_size;
                                body = rtmp->decodeMsgBody(tmpptr, 
qhead->bodysize);
                                boost::shared_ptr<amf::Buffer> response;
-                               if (body->getMethodName() == "createStream") {
-                                   response = 
rtmp->encodeResult(RTMPMsg::NS_DATA_START);
+                               if (body) {
+                                   if (body->getMethodName() == "connect") {
+                                       response = 
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+                                   } else if (body->getMethodName() == 
"createStream") {
+                                       response = 
rtmp->encodeResult(RTMPMsg::NS_DATA_START);
+                                   } else {
+                                       response = 
rtmp->encodeResult(RTMPMsg::NS_FAILED);
+                                   }
                                } else {
                                    response = 
rtmp->encodeResult(RTMPMsg::NS_FAILED);
                                }
@@ -950,6 +1038,9 @@
                        }
                    }
                }
+#endif
+
+               
            } else {
                log_error("Never read any data from fd #%d", args->netfd);
 #if 0


reply via email to

[Prev in Thread] Current Thread [Next in Thread]