[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r10072: refactored heavily. Added err
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r10072: refactored heavily. Added error checking of responses since we can now easily decode them. |
Date: |
Sat, 28 Mar 2009 12:34:29 -0600 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 10072
committer: address@hidden
branch nick: rtmp
timestamp: Sat 2009-03-28 12:34:29 -0600
message:
refactored heavily. Added error checking of responses since we can now easily
decode them.
modified:
utilities/rtmpget.cpp
=== modified file 'utilities/rtmpget.cpp'
--- a/utilities/rtmpget.cpp 2009-03-27 23:28:09 +0000
+++ b/utilities/rtmpget.cpp 2009-03-28 18:34:29 +0000
@@ -257,6 +257,7 @@
string::size_type end = path.rfind('/');
if (end != string::npos) {
filename = path.substr(end + 1);
+ path = path.substr(0, end);
}
}
@@ -266,9 +267,9 @@
tcUrl += ":" + portstr;
}
if (!query.empty()) {
- tcUrl += "/" + query;
+ tcUrl += query;
} else {
- tcUrl += "/" + path;
+ tcUrl += path;
}
}
@@ -304,10 +305,10 @@
}
if (swfUrl.empty()) {
- swfUrl = "mediaplayer.swf";
+ swfUrl = "file://rtmpget.swf";
}
if (pageUrl.empty()) {
- pageUrl = "http://gnashdev.org";
+ pageUrl = ".http://gnashdev.org";
}
log_debug("URL is %s", url);
@@ -322,30 +323,26 @@
log_debug("swfUrl is %s", swfUrl);
log_debug("pageUrl is %s", pageUrl);
+ // Create a tcp/ipc connection to the server. This doesn't require any
RTMP.
client.toggleDebug(netdebug);
if (client.createClient(hostname, port) == false) {
log_error("Can't connect to RTMP server %s", hostname);
exit(-1);
}
-
- if ( ! client.handShakeRequest() )
- {
+
+ // Read the handshake from the server
+ if (!client.handShakeRequest()) {
log_error("RTMP handshake request failed");
exit(EXIT_FAILURE);
}
- if ( ! client.clientFinish() )
- {
- log_error("RTMP handshake completion failed");
- exit(EXIT_FAILURE);
- }
-
// Make a buffer to hold the handshake data.
Buffer buf(1537);
// RTMP::rtmp_head_t *rthead = 0;
// int ret = 0;
log_debug("Sending NetConnection Connect message,");
boost::shared_ptr<amf::Buffer> buf2 = client.encodeConnect(app.c_str(),
swfUrl.c_str(), tcUrl.c_str(), 615, 124, 1, pageUrl.c_str());
+// rtmpget -vv -n rtmp://localhost/oflaDemo/DarkKnight.flv
// boost::shared_ptr<amf::Buffer> buf2 =
client.encodeConnect("video/2006/sekt/gate06/tablan_valentin",
"mediaplayer.swf",
"rtmp://velblod.videolectures.net/video/2006/sekt/gate06/tablan_valentin", 615,
124, 1, "http://gnashdev.org");
// boost::shared_ptr<amf::Buffer> buf2 = client.encodeConnect("oflaDemo",
"http://192.168.1.70/software/gnash/tests/ofla_demo.swf",
"rtmp://localhost/oflaDemo/stream", 615, 124, 1,
"http://192.168.1.70/software/gnash/tests/index.html");
//buf2->resize(buf2->size() - 6); // FIXME: encodeConnect returns the
wrong size for the buffer!
@@ -353,53 +350,104 @@
buf2->allocated(), RTMP::INVOKE,
RTMPMsg::FROM_CLIENT);
head2->resize(head2->size() + buf2->size() + 1);
+ // FIXME: ugly hack! Should be a single byte header. Do this in
Element::encode() instead!
+ head2->append(buf2->reference(), 128);
+ boost::uint8_t c = 0xc3;
+ *head2 += c;
+ head2->append(buf2->reference() + 128, buf2->allocated()-128);
+
+ // Finish the handshake process, which has to have the
NetConnection::connect() as part
+ // of the buffer, or Red5 refuses to answer.
if (!client.clientFinish(*head2)) {
log_error("RTMP handshake completion failed");
}
+ // give the server time to process our NetConnection::connect() request
+ sleep(1);
+
+ // Read the responses back from the server. This is usually a series of
system
+ // messages on channel 2, and the response message on channel 3 from our
request.
boost::shared_ptr<amf::Buffer> response = client.recvMsg();
if (!response) {
log_error("Got no response from the RTMP server");
}
+
+ // when doing remoting calls I don't see this problem with an empty packet
from Red5,
+ // but when I do streaming, it's always there, so we need to remove it.
+ boost::uint8_t *pktstart = response->reference();
+ if (*pktstart == 0xff) {
+ log_debug("Got empty packet in buffer.");
+ pktstart++;
+ }
+
+ // The response packet contains multiple messages for multiple channels,
so we
+ // we have to split the Buffer into seperate messages on a chunksize
boundary.
boost::shared_ptr<RTMP::rtmp_head_t> rthead;
- boost::shared_ptr<RTMP::queues_t> que = client.split(*response);
+ boost::shared_ptr<RTMP::queues_t> que = client.split(pktstart,
response->allocated()-1);
+ // If we got no responses, something obviously went wrong.
if (!que->size()) {
log_error("No response from INVOKE of NetConnection connect");
exit(-1);
}
-
- while (que->size()) {
- boost::shared_ptr<amf::Buffer> ptr = que->front()->pop();
- log_debug("%s: There are %d messages in the RTMP input queue",
__PRETTY_FUNCTION__, que->size());
- if (ptr) { // If there is legit data
- rthead = client.decodeHeader(ptr->reference());
- RTMPMsg *msg = client.decodeMsgBody(ptr->reference() +
rthead->head_size, rthead->bodysize);
- msg->dump();
- if (msg->getMethodName() == "_error") {
- log_error("Got an error: %s", msg->getMethodName());
- msg->at(0)->dump();
- }
- if (msg->getMethodName() == "_result") {
- log_debug("Got a result: %s", msg->getMethodName());
- if (msg->getElements().size() > 0) {
- msg->at(0)->dump();
- }
- }
-// que.front()->pop_front();
- ptr.reset();
- break;
+
+ // There is a queue of queues used to hold all the messages. The first
queue
+ // is indexed by the channel number, the second queue is all the messages
that
+ // have arrived for that channel.
+ while (que->size()) { // see if there are any messages at all
+ // Get the CQue for the first channel
+ CQue *channel_q = que->front();
+ que->pop_front(); // remove this Cque from the top level que
+
+ while (channel_q->size()) {
+ // Get the first message in the channel queue
+ boost::shared_ptr<amf::Buffer> ptr = channel_q->pop();
+// channel_q->pop_front(); // remove this Buffer from the Cque
+// ptr->dump();
+
+ log_debug("%s: There are %d messages in the RTMP input queue, %d",
+ __PRETTY_FUNCTION__, que->size(), que->front()->size());
+ if (ptr) { // If there is legit data
+ rthead = client.decodeHeader(ptr->reference());
+ if (!rthead) {
+ log_error("Couldn't decode RTMP message header");
+ continue;
+ }
+
+ RTMPMsg *msg = client.decodeMsgBody(ptr->reference() +
rthead->head_size, rthead->bodysize);
+ if (msg) {
+// msg->dump();
+ if (msg->getStatus() == RTMPMsg::NC_CONNECT_SUCCESS) {
+ if (msg->getMethodName() == "_result") {
+ log_debug("Sent NetConnection Connect message
sucessfully");
+#if 0
+ log_debug("Got a result: %s", msg->getMethodName());
+ if (msg->getElements().size() > 0) {
+ msg->at(0)->dump();
+ }
+#endif
+ }
+ }
+ if (msg->getStatus() == RTMPMsg::NC_CONNECT_FAILED) {
+ if (msg->getMethodName() == "_error") {
+ log_error("Couldn't send NetConnection Connect
message,");
+#if 0
+ log_error("Got an error: %s", msg->getMethodName());
+ if (msg->getElements().size() > 0) {
+ msg->at(0)->dump();
+ }
+#endif
+ }
+ }
+ } else {
+ log_error("Couldn't decode RTMP message Body");
+ continue;
+ }
+ }
}
}
-// if (msg1->getStatus() == RTMPMsg::NC_CONNECT_SUCCESS) {
-// log_debug("Sent NetConnection Connect message sucessfully");
-// } else {
-// log_error("Couldn't send NetConnection Connect message,");
-// //exit(-1);
-// }
-
- // make the createStream for ID 3 encoded object
+ // make the createStream
log_debug("Sending NetStream::createStream message,");
BufferSharedPtr buf3 = client.encodeStream(0x2);
// buf3->dump();
@@ -413,30 +461,189 @@
// exit(-1);
// }
-#if 0
- log_debug("Sent NetStream::createStream message successfully:");
msg2->dump();
- std::vector<ElementSharedPtr> hell = msg2->getElements();
- if (hell.size() > 0) {
- streamID = hell[0]->to_number();
- log_debug("Stream ID returned from createStream is: %d", streamID);
- } else {
- if (msg2->getMethodName() == "close") {
- log_debug("Got close packet!!! Exiting...");
- exit(0);
- }
- log_error("Got no properties from NetStream::createStream invocation,
arbitrarily taking 0 as streamID");
- streamID = 0.0;
- }
-
+ // Read the responses back from the server. This is usually a series of
system
+ // messages on channel 2, and the response message on channel 3 from our
request.
+ response.reset();
+ response = client.recvMsg();
+ if (!response) {
+ log_error("Got no response from the RTMP server");
+ }
+
+ // when doing remoting calls I don't see this problem with an empty packet
from Red5,
+ // but when I do streaming, it's always there, so we need to remove it.
+ pktstart = response->reference();
+ if (*pktstart == 0xff) {
+ log_debug("Got empty packet in buffer.");
+ pktstart++;
+ }
+
+ // The response packet contains multiple messages for multiple channels,
so we
+ // we have to split the Buffer into seperate messages on a chunksize
boundary.
+ rthead.reset();
+ que.reset();
+ que = client.split(pktstart, response->allocated()-1);
+
+ // If we got no responses, something obviously went wrong.
+ if (!que->size()) {
+ log_error("No response from INVOKE of NetConnection connect");
+ exit(-1);
+ }
+
+ // There is a queue of queues used to hold all the messages. The first
queue
+ // is indexed by the channel number, the second queue is all the messages
that
+ // have arrived for that channel.
+ while (que->size()) { // see if there are any messages at all
+ // Get the CQue for the first channel
+ CQue *channel_q = que->front();
+ que->pop_front(); // remove this Cque from the top level que
+
+ while (channel_q->size()) {
+ // Get the first message in the channel queue
+ boost::shared_ptr<amf::Buffer> ptr = channel_q->pop();
+// channel_q->pop_front(); // remove this Buffer from the Cque
+// ptr->dump();
+
+ log_debug("%s: There are %d messages in the RTMP input queue, %d",
+ __PRETTY_FUNCTION__, que->size(), que->front()->size());
+ if (ptr) { // If there is legit data
+ rthead = client.decodeHeader(ptr->reference());
+ if (!rthead) {
+ log_error("Couldn't decode RTMP message header");
+ continue;
+ }
+
+ RTMPMsg *msg = client.decodeMsgBody(ptr->reference() +
rthead->head_size, rthead->bodysize);
+ if (msg) {
+// msg->dump();
+ if (msg->getMethodName() == "_result") {
+ log_debug("Sent NetConnection createStream message
sucessfully");
+ if (msg->at(1)->getType() == amf::Element::NUMBER_AMF0)
{
+ streamID = msg->at(1)->to_number();
+ }
+ log_debug("Stream ID returned from createStream is:
%g", streamID);
+#if 0
+ if (msg->getElements().size() > 0) {
+ msg->at(1)->dump();
+ }
+#endif
+ }
+ if (msg->getMethodName() == "_error") {
+ log_error("Couldn't send NetConnection createStream
message,");
+#if 0
+ log_error("Got an error: %s", msg->getMethodName());
+ if (msg->getElements().size() > 0) {
+ msg->at(0)->dump();
+ }
+#endif
+ }
+ } else {
+ log_error("Couldn't decode RTMP message Body");
+ continue;
+ }
+ }
+ }
+ }
+
+#if 1
+ boost::shared_ptr<amf::Buffer> blob(new Buffer("08 00 00 02 00 00 22 14 01
00 00 00 02 00 04 70 6c 61 79 00 00 00 00 00 00 00 00 00 05 02 00 0e 44 61 72
6b 4b 6e 69 67 68 74 2e 66 6c 76 82 00 00 00 00 03 00 00 00 01 00 00 27 10"));
+ blob->dump();
+// client.writeNet(*blob);
+
+ // We need the streamID for all folowing operations on this stream
int id = int(streamID);
// make the NetStream::play() operations for ID 2 encoded object
-// log_debug("Sending NetStream play message,");
- BufferSharedPtr buf4 = client.encodeStreamOp(0, RTMP::STREAM_PLAY, false,
filename.c_str());
+ log_debug("Sending NetStream play message,");
+ BufferSharedPtr buf4 = client.encodeStreamOp(id, RTMP::STREAM_PLAY, false,
filename.c_str());
// BufferSharedPtr buf4 = client.encodeStreamOp(0, RTMP::STREAM_PLAY,
false, "gate06_tablan_bcueu_01");
// log_debug("TRACE: buf4: %s", hexify(buf4->reference(), buf4->size(),
true));
- total_size = buf4->size();
- RTMPMsg *msg3 = client.sendRecvMsg(0x8, RTMP::HEADER_12, total_size,
RTMP::INVOKE, RTMPMsg::FROM_CLIENT, buf4);
+ buf4->dump();
+ client.sendMsg(0x8, RTMP::HEADER_12, buf4->allocated(), RTMP::INVOKE,
RTMPMsg::FROM_CLIENT, *buf4);
+
+ // Read the responses back from the server. This is usually a series of
system
+ // messages on channel 2, and the response message on channel 3 from our
request.
+ response.reset();
+ response = client.recvMsg();
+ if (!response) {
+ log_error("Got no response from the RTMP server");
+ }
+
+ // when doing remoting calls I don't see this problem with an empty packet
from Red5,
+ // but when I do streaming, it's always there, so we need to remove it.
+ pktstart = response->reference();
+ if (*pktstart == 0xff) {
+ log_debug("Got empty packet in buffer.");
+ pktstart++;
+ }
+
+ // The response packet contains multiple messages for multiple channels,
so we
+ // we have to split the Buffer into seperate messages on a chunksize
boundary.
+ rthead.reset();
+ que.reset();
+ que = client.split(pktstart, response->allocated()-1);
+
+ // If we got no responses, something obviously went wrong.
+ if (!que->size()) {
+ log_error("No response from INVOKE of NetConnection connect");
+ exit(-1);
+ }
+
+ // There is a queue of queues used to hold all the messages. The first
queue
+ // is indexed by the channel number, the second queue is all the messages
that
+ // have arrived for that channel.
+ while (que->size()) { // see if there are any messages at all
+ // Get the CQue for the first channel
+ CQue *channel_q = que->front();
+ que->pop_front(); // remove this Cque from the top level que
+
+ while (channel_q->size()) {
+ // Get the first message in the channel queue
+ boost::shared_ptr<amf::Buffer> ptr = channel_q->pop();
+// channel_q->pop_front(); // remove this Buffer from the Cque
+ ptr->dump();
+
+ log_debug("%s: There are %d messages in the RTMP input queue, %d",
+ __PRETTY_FUNCTION__, que->size(), que->front()->size());
+ if (ptr) { // If there is legit data
+ rthead = client.decodeHeader(ptr->reference());
+ if (!rthead) {
+ log_error("Couldn't decode RTMP message header");
+ continue;
+ }
+
+ RTMPMsg *msg = client.decodeMsgBody(ptr->reference() +
rthead->head_size, rthead->bodysize);
+ if (msg) {
+// msg->dump();
+ if (msg->getMethodName() == "onStatus") {
+ log_error("Got a status message: %s",
msg->getMethodName());
+// msg->at(0)->dump();
+ boost::shared_ptr<amf::Element> level =
msg->findProperty("level");
+ if ((level->getType() == amf::Element::STRING_AMF0) &&
(strcmp(level->to_string(), "error") == 0)) {
+ boost::shared_ptr<amf::Element> description =
msg->findProperty("description");
+ if (description) {
+ log_debug("Got an error from NetStream::play()!
%s", description->to_string());
+ }
+ }
+ }
+ if (msg->getMethodName() == "_error") {
+ log_error("Got an error message: %s",
msg->getMethodName());
+// msg->at(0)->dump();
+ }
+ if (msg->getMethodName() == "_result") {
+ log_debug("Got a result message: %s",
msg->getMethodName());
+ if (msg->getElements().size() > 0) {
+// msg->at(0)->dump();
+ }
+ }
+ } else {
+ log_error("Couldn't decode RTMP message Body");
+ continue;
+ }
+ }
+ }
+ }
+
+#if 0
if (msg3) {
msg3->dump();
if (msg3->getStatus() == RTMPMsg::NS_PLAY_START) {
@@ -459,14 +666,11 @@
log_error("Never got any messages!");
exit(-1);
}
-
-#if 0
deque<CQue *>::iterator it;
for (it = que->begin(); it != que->end(); it++) {
CQue *q = *(it);
q->dump();
}
-#endif
while (que->size()) {
cerr << "QUE SIZE: " << que->size() << endl;
BufferSharedPtr ptr = que->front()->pop();
@@ -485,6 +689,7 @@
}
} while(loop--);
#endif
+#endif
// std::vector<amf::Element *> hell = msg2->getElements();
// std::vector<amf::Element *> props = hell[0]->getProperties();
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r10072: refactored heavily. Added error checking of responses since we can now easily decode them.,
rob <=