gnash-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnash-commit] /srv/bzr/gnash/rtmp r10067: Read multiple RTMP messages.


From: rob
Subject: [Gnash-commit] /srv/bzr/gnash/rtmp r10067: Read multiple RTMP messages. Move the processing to net_handler() from call().
Date: Fri, 27 Mar 2009 11:56:15 -0600
User-agent: Bazaar (1.5)

------------------------------------------------------------
revno: 10067
committer: address@hidden
branch nick: rtmp
timestamp: Fri 2009-03-27 11:56:15 -0600
message:
  Read multiple RTMP messages. Move the processing to net_handler() from call().
modified:
  libcore/asobj/NetConnection_as.cpp
=== modified file 'libcore/asobj/NetConnection_as.cpp'
--- a/libcore/asobj/NetConnection_as.cpp        2009-03-27 15:23:24 +0000
+++ b/libcore/asobj/NetConnection_as.cpp        2009-03-27 17:56:15 +0000
@@ -472,6 +472,7 @@
 
     VM& vm = asCallback->getVM();
     tdata->st = &vm.getStringTable();
+    tdata->nas = this;
 //     tdata->vm = vm;
     
     // Send the request via HTTP
@@ -486,15 +487,20 @@
        request += buf;
        _http_client->writeNet(request);
        tdata->network = reinterpret_cast<Network *>(_http_client.get());
+       tdata->network->setProtocol(url.protocol());
     }
 
     // Send the request via RTMP
     if (url.protocol() == "rtmp") {
        tdata->network = reinterpret_cast<Network *>(_rtmp_client.get());
+       tdata->network->setProtocol(url.protocol());
        boost::shared_ptr<amf::Element> el = args[2].to_element();
 //     el->dump();
        boost::shared_ptr<amf::Buffer> request = 
_rtmp_client->encodeEchoRequest(app, 2.0, *el);
        _rtmp_client->sendMsg(0x3, RTMP::HEADER_12, request->allocated(), 
RTMP::INVOKE, RTMPMsg::FROM_CLIENT, *request);
+
+       
+#if 0
        boost::shared_ptr<amf::Buffer> response = _rtmp_client->recvMsg();
        response->dump();
        boost::shared_ptr<RTMP::rtmp_head_t> rthead;
@@ -523,19 +529,17 @@
                }
            }
        }
+#endif
+
+       
     }
 
-    //    this->test();
-
     // Start a thread to wait for the response
 #if 0
     boost::thread process_thread(boost::bind(&net_handler, tdata.get()));
 #else
     net_handler(tdata.get());
 #endif
-    
-//    _currentConnection.reset(new HTTPRemotingHandler(*this, url));
-
 }
 
 std::auto_ptr<IOChannel>
@@ -759,125 +763,165 @@
 //    boost::mutex::scoped_lock lock(call_mutex);
 
     args->network->setTimeout(50);
-    
-    // Suck all the data waiting for us in the network
-    boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
-    do {
-       size_t ret = args->network->readNet(buf->reference() + 
buf->allocated(), 
-                                        buf->size(), 60);
-       // The timeout expired
-       if (ret == 0) {
-           log_debug("no data yet for fd #%d, continuing...",
-                     args->network->getFileFd());
-           result = false;
-           done = true;
-       }
-       // Something happened to the network connection
-       if ((ret == static_cast<size_t>(string::npos)) || (ret == 
static_cast<size_t>(-1))) {
-           log_debug("socket for fd #%d was closed...",
-                     args->network->getFileFd());
-           return false;
-       }
-       // We got data.
-       if (ret > 0) {
-           // If we got less data than we tried to read, then we got the
-           // whole packet most likely.
-           if (ret < buf->size()) {
-               done = true;
-               result = true;
-           }
-           if (ret == buf->size()) {
-               // become larger by another default block size.
-               buf->resize(buf->size() + amf::NETBUFSIZE);
-               log_debug("Got a full packet, making the buffer larger to %d",
-                         buf->size());
-               result = true;
-           }
-           // manually set the seek pointer in the buffer, as we read
-           // the data into the raw memory allocated to the buffer. We
-           // only want to do this if we got data of course.
-           buf->setSeekPointer(buf->end() + ret);
-       } else {
-           log_debug("no more data for fd #%d, exiting...", 
-                     args->network->getFileFd());
-           done = true;
-       }
-    } while(done != true);
-    
-    // Now process the data
-    if (result) {
-       HTTP *http = reinterpret_cast<HTTP *>(args->network);;
-       amf::AMF amf;
-       boost::uint8_t *data = http->processHeaderFields(*buf);
+    if (args->network->getProtocol() == "rtmp") {
+#if 1
+       do {
+           RTMPClient *client = reinterpret_cast<RTMPClient *>(args->network);
+           boost::shared_ptr<amf::Buffer> response = client->recvMsg();
+           response->dump();
+           boost::shared_ptr<RTMP::rtmp_head_t> rthead;
+           boost::shared_ptr<RTMP::queues_t> que = client->split(*response);
+           
+           log_debug("%s: There are %d messages in the RTMP input queue", 
__PRETTY_FUNCTION__, que->size());
+           while (que->size()) {
+               boost::shared_ptr<amf::Buffer> ptr = que->front()->pop();
+               log_debug("%s: There are %d messages in the RTMP input queue", 
__PRETTY_FUNCTION__, que->size());
+               if (ptr) {              // If there is legit data
+                   rthead = client->decodeHeader(ptr->reference());
+                   RTMPMsg *msg = client->decodeMsgBody(ptr->reference() + 
rthead->head_size, rthead->bodysize);
+                   msg->dump();
+                   if (msg->getMethodName() == "_error") {
+                       log_error("Got an error: %s", msg->getMethodName());
+                       msg->at(0)->dump();
+                       args->nas->notifyStatus(NetConnection_as::CALL_FAILED);
+                   }
+                   if (msg->getMethodName() == "_result") {
+                       log_debug("Got a result: %s", msg->getMethodName());
+                       if (msg->getElements().size() > 0) {
+                           msg->at(0)->dump();
+                           as_value tmp(*msg->at(0));
+//                     string_table::key methodKey = 
tdata->st->find(methodName);
+                           string_table::key methodKey = 
args->st->find("onResult");
+                           args->callback->callMethod(methodKey, tmp);
+                       }
+                   }
+                   ptr.reset();
+                   done = true;
+                   break;
+               }
+           }
+       } while (!done);
+#endif
+    } else if (args->network->getProtocol() == "http") {
+       // Suck all the data waiting for us in the network
+       boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
+       do {
+           size_t ret = args->network->readNet(buf->reference() + 
buf->allocated(), 
+                                               buf->size(), 60);
+           // The timeout expired
+           if (ret == 0) {
+               log_debug("no data yet for fd #%d, continuing...",
+                         args->network->getFileFd());
+               result = false;
+               done = true;
+           }
+           // Something happened to the network connection
+           if ((ret == static_cast<size_t>(string::npos)) || (ret == 
static_cast<size_t>(-1))) {
+               log_debug("socket for fd #%d was closed...",
+                         args->network->getFileFd());
+               return false;
+           }
+           // We got data.
+           if (ret > 0) {
+               // If we got less data than we tried to read, then we got the
+               // whole packet most likely.
+               if (ret < buf->size()) {
+                   done = true;
+                   result = true;
+               }
+               if (ret == buf->size()) {
+                   // become larger by another default block size.
+                   buf->resize(buf->size() + amf::NETBUFSIZE);
+                   log_debug("Got a full packet, making the buffer larger to 
%d",
+                             buf->size());
+                   result = true;
+               }
+               // manually set the seek pointer in the buffer, as we read
+               // the data into the raw memory allocated to the buffer. We
+               // only want to do this if we got data of course.
+               buf->setSeekPointer(buf->end() + ret);
+           } else {
+               log_debug("no more data for fd #%d, exiting...", 
+                         args->network->getFileFd());
+               done = true;
+           }
+       } while(done != true);
+       
+       // Now process the data
+       if (result) {
+           HTTP *http = reinterpret_cast<HTTP *>(args->network);;
+           amf::AMF amf;
+           boost::uint8_t *data = http->processHeaderFields(*buf);
 //             http->dump();
-       size_t length = http->getContentLength();
-       if (http->getField("transfer-encoding") == "chunked") {
-           chunked = true;
-       }
-       // Make sure we have a sane length. If Chunked, then we don't have
-       // a length field, so we use the size of the data that
-       boost::shared_ptr<amf::Buffer> chunk;
-       if (length == 0) {
-           if (chunked) {
-               size_t count = http->recvChunked(data, (buf->end() - data));
-               log_debug("Got %d chunked data messages", count);
-           } else {
-               done = true;
-               result = false;
-           }
-       }
-       
+           size_t length = http->getContentLength();
+           if (http->getField("transfer-encoding") == "chunked") {
+               chunked = true;
+           }
+           // Make sure we have a sane length. If Chunked, then we don't have
+           // a length field, so we use the size of the data that
+           boost::shared_ptr<amf::Buffer> chunk;
+           if (length == 0) {
+               if (chunked) {
+                   size_t count = http->recvChunked(data, (buf->end() - data));
+                   log_debug("Got %d chunked data messages", count);
+               } else {
+                   done = true;
+                   result = false;
+               }
+           }
+           
 //     for (size_t i=0; i<http->sizeChunks(); i++) {
-       log_debug("Cookie is: \"%s\"", http->getField("cookie"));
-       log_debug("Content type is: \"%s\"", http->getField("content-type"));
-       if (http->getField("content-type").find("application/x-amf") != 
string::npos) {
-           if (chunked) {
-               chunk = http->mergeChunks();
-           } else {
-               chunk.reset(new amf::Buffer(buf->end() - data));
-               chunk->copy(data,(buf->end() - data));
-           }
-           
+           log_debug("Cookie is: \"%s\"", http->getField("cookie"));
+           log_debug("Content type is: \"%s\"", 
http->getField("content-type"));
+           if (http->getField("content-type").find("application/x-amf") != 
string::npos) {
+               if (chunked) {
+                   chunk = http->mergeChunks();
+               } else {
+                   chunk.reset(new amf::Buffer(buf->end() - data));
+                   chunk->copy(data,(buf->end() - data));
+               }
+               
 //         chunk = http->popChunk();
 //         chunk->dump();
-           amf::AMF_msg amsg;
-           boost::shared_ptr<amf::AMF_msg::context_header_t> head =
-               amsg.parseAMFPacket(chunk->reference(), chunk->allocated());
+               amf::AMF_msg amsg;
+               boost::shared_ptr<amf::AMF_msg::context_header_t> head =
+                   amsg.parseAMFPacket(chunk->reference(), chunk->allocated());
 //         amsg.dump();
-           log_debug("%d messages in AMF packet", amsg.messageCount());
-           for (size_t i=0; i<amsg.messageCount(); i++) {
+               log_debug("%d messages in AMF packet", amsg.messageCount());
+               for (size_t i=0; i<amsg.messageCount(); i++) {
 //             amsg.getMessage(i)->data->dump();
-               boost::shared_ptr<amf::Element> el = amsg.getMessage(i)->data;
-               as_value tmp(*el);
+                   boost::shared_ptr<amf::Element> el = 
amsg.getMessage(i)->data;
+                   as_value tmp(*el);
 //             NetConnection_as *obj = (NetConnection_as *)args->network;
-               log_debug("Calling NetConnection %s(%s)",
-                         amsg.getMessage(i)->header.target, tmp);
-               // The method name looks something like this: /17/onResult
-               // the first field is a sequence number so each response can
-               // be matched to the request that made it. We only want the
-               // name part, so we can call the method.
-               string::size_type pos = 
amsg.getMessage(i)->header.target.find('/', 1);
-               string methodName;
-               if (pos != string::npos) {
-                   methodName = 
amsg.getMessage(i)->header.target.substr(pos+1,  
amsg.getMessage(i)->header.target.size());
-               }
+                   log_debug("Calling NetConnection %s(%s)",
+                             amsg.getMessage(i)->header.target, tmp);
+                   // The method name looks something like this: /17/onResult
+                   // the first field is a sequence number so each response can
+                   // be matched to the request that made it. We only want the
+                   // name part, so we can call the method.
+                   string::size_type pos = 
amsg.getMessage(i)->header.target.find('/', 1);
+                   string methodName;
+                   if (pos != string::npos) {
+                       methodName = 
amsg.getMessage(i)->header.target.substr(pos+1,  
amsg.getMessage(i)->header.target.size());
+                   }
 //             VM& vm = args->callback->getVM();
 //             string_table& st = vm.getStringTable();
-               string_table::key methodKey;
+                   string_table::key methodKey;
 //             boost::mutex::scoped_lock lock(_nc_mutex);
-               methodKey = args->st->find(methodName);
-               args->callback->callMethod(methodKey, tmp);
-           }
-       } else {        // not AMF data
-           if ((http->getField("content-type").find("application/xml") != 
string::npos)
-               || (http->getField("content-type").find("text/html") != 
string::npos)) {
-               log_debug("Textual Data is: %s", reinterpret_cast<char 
*>(data));
-           } else {
-               log_debug("Binary Data is: %s", hexify(data, length, true));
+                   methodKey = args->st->find(methodName);
+                   args->callback->callMethod(methodKey, tmp);
+               }
+           } else {    // not AMF data
+               if ((http->getField("content-type").find("application/xml") != 
string::npos)
+                   || (http->getField("content-type").find("text/html") != 
string::npos)) {
+                   log_debug("Textual Data is: %s", reinterpret_cast<char 
*>(data));
+               } else {
+                   log_debug("Binary Data is: %s", hexify(data, length, true));
+               }
            }
        }
     }
-
+    
     log_debug("net_handler all done...");
 
     return result;


reply via email to

[Prev in Thread] Current Thread [Next in Thread]