[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9916: refactor heavily to work with
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9916: refactor heavily to work with the new network engine and threading scheme. |
Date: |
Sat, 27 Dec 2008 18:48:41 -0700 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9916
committer: address@hidden
branch nick: rtmp
timestamp: Sat 2008-12-27 18:48:41 -0700
message:
refactor heavily to work with the new network engine and threading scheme.
modified:
libnet/rtmp_server.cpp
libnet/rtmp_server.h
=== modified file 'libnet/rtmp_server.cpp'
--- a/libnet/rtmp_server.cpp 2008-12-20 17:11:55 +0000
+++ b/libnet/rtmp_server.cpp 2008-12-28 01:48:41 +0000
@@ -52,13 +52,14 @@
extern map<int, Handler *> handlers;
RTMPServer::RTMPServer()
+ : _filesize(0)
{
// GNASH_REPORT_FUNCTION;
// _inbytes = 0;
// _outbytes = 0;
-// _body = new unsigned char(RTMP_BODY_SIZE+1);
-// memset(_body, 0, RTMP_BODY_SIZE+1);
+// _body = new unsigned char(RTMP_HANDSHAKE_SIZE+1);
+// memset(_body, 0, RTMP_HANDSHAKE_SIZE+1);
}
RTMPServer::~RTMPServer()
@@ -68,36 +69,35 @@
// delete _body;
}
+#if 0
// The handshake is a byte with the value of 0x3, followed by 1536
// bytes of gibberish which we need to store for later.
bool
-RTMPServer::handShakeWait()
+RTMPServer::processClientHandShake(int fd, amf::Buffer &buf)
{
GNASH_REPORT_FUNCTION;
-// char buffer[RTMP_BODY_SIZE+16];
-// memset(buffer, 0, RTMP_BODY_SIZE+16);
- boost::shared_ptr<amf::Buffer> buf = _handler->pop();
-
- if (buf == 0) {
- log_debug("Que empty, net connection dropped for fd #%d", getFileFd());
+ if (buf.reference() == 0) {
+ log_debug("no data in buffer, net connection dropped for fd #%d", fd);
return false;
- }
+ }
- if (*buf->reference() == RTMP_HANDSHAKE) {
+ cerr << buf.hexify(false) << endl;
+
+ if (*buf.reference() == RTMP_HANDSHAKE) {
log_debug (_("Handshake request is correct"));
} else {
log_error (_("Handshake request isn't correct"));
return false;
}
-// if (buf->size() >= RTMP_BODY_SIZE) {
+// if (buf->size() >= RTMP_HANDSHAKE_SIZE) {
// secret = _handler->merge(buf->reference());
// }
- if (buf->size() >= static_cast<size_t>(RTMP_BODY_SIZE)) {
- _handshake = new amf::Buffer(RTMP_BODY_SIZE);
- _handshake->copy(buf->reference() + 1, RTMP_BODY_SIZE);
+ if (buf.size() >= static_cast<size_t>(RTMP_HANDSHAKE_SIZE)) {
+ _handshake = new amf::Buffer(RTMP_HANDSHAKE_SIZE);
+ _handshake->copy(buf.reference() + 1, RTMP_HANDSHAKE_SIZE);
log_debug (_("Handshake Data matched"));
// return true;
} else {
@@ -107,93 +107,90 @@
return true;
}
+#endif
// The response is the gibberish sent back twice, preceeded by a byte
// with the value of 0x3.
bool
-RTMPServer::handShakeResponse()
+RTMPServer::handShakeResponse(int fd, amf::Buffer &handshake)
{
GNASH_REPORT_FUNCTION;
- boost::shared_ptr<amf::Buffer> buf1(new amf::Buffer(RTMP_BODY_SIZE + 1));
- *buf1 = RTMP_HANDSHAKE;
- *buf1 += _handshake;
-// _handler->pushout(buf1); FIXME:
-
- boost::shared_ptr<amf::Buffer> buf2(new amf::Buffer(RTMP_BODY_SIZE));
- buf2->copy(_handshake->begin(), RTMP_BODY_SIZE);
-// _handler->pushout(buf2); FIXME:
-
-// std::copy(_handshake->begin(), _handshake->end(), (buf1->begin() + 1));
-// boost::shared_ptr<amf::Buffer> buf = new amf::Buffer(RTMP_BODY_SIZE +
1);
-// std::copy(_handshake->begin(), _handshake->end(), buf->begin() + 1 +
RTMP_BODY_SIZE);
-// _handler->notifyout();
-
- log_debug("Sent RTMP Handshake response");
+ boost::uint8_t byte;
+ byte = RTMP_HANDSHAKE;
+
+ int ret1 = writeNet(fd, &byte, 1);
+ int ret2 = writeNet(fd, handshake);
+ int ret3 = writeNet(fd, handshake);
+
+ if ((ret2 == handshake.allocated()) && (ret3 == handshake.allocated())) {
+ log_debug("Sent RTMP Handshake response");
+ } else {
+ log_error("Couldn't sent RTMP Handshake response!");
+ }
return true;
}
-bool
-RTMPServer::serverFinish()
+boost::shared_ptr<amf::Buffer>
+RTMPServer::serverFinish(int fd, amf::Buffer &handshake1, amf::Buffer
&handshake2)
{
GNASH_REPORT_FUNCTION;
-
- boost::shared_ptr<amf::Buffer> buf = _handler->pop();
- boost::shared_ptr<amf::Buffer> obj = buf;
-
- if (buf == 0) {
- log_debug("Que empty, net connection dropped for fd #%d", getFileFd());
- return false;
- }
-
- // The first data packet is often buried in with the end of the handshake.
- // So after the handshake block, we strip that part off, and just pass on
- // the remainder for processing.
- if (buf->size() >= static_cast<size_t>(RTMP_BODY_SIZE)) {
- size_t size = buf->size() - RTMP_BODY_SIZE;
- obj.reset(new amf::Buffer[size]);
- obj->copy(buf->begin()+RTMP_BODY_SIZE, size);
- } else {
- _handler->wait();
- obj = _handler->pop();
- }
-
- int diff = std::memcmp(buf->begin(), _handshake->begin(), RTMP_BODY_SIZE);
+ boost::shared_ptr<amf::Buffer> buf;
+
+ if ((handshake1.reference() == 0) || (handshake2.reference() == 0)) {
+ log_debug("Que empty, net connection dropped for fd #%d", fd);
+ return buf;
+ }
+
+ int diff = std::memcmp(handshake1.begin(), handshake2.begin(),
RTMP_HANDSHAKE_SIZE);
if (diff == 0) {
log_debug (_("Handshake Finish Data matched"));
} else {
log_error (_("Handshake Finish Data didn't match by %d bytes"), diff);
-// return false;
+ }
+
+ // Copy the extra data from the end of the handshake to the new buffer.
Normally we
+ // try to avoiud copying anything around, but as this is only used once
for each connection,
+ // there isn't a real performance hit from it.
+ if (handshake2.allocated() >= static_cast<size_t>(RTMP_HANDSHAKE_SIZE)) {
+ log_debug("Got extra data in handshake, %d bytes for fd #%d",
+ handshake2.allocated() - RTMP_HANDSHAKE_SIZE, fd);
+ buf.reset(new Buffer(handshake2.allocated() - RTMP_HANDSHAKE_SIZE));
+ buf->copy(handshake2.reference() + RTMP_HANDSHAKE_SIZE,
handshake2.allocated() - RTMP_HANDSHAKE_SIZE);
}
- packetRead(obj);
-
- return true;
+// packetRead(*buf);
+ return buf;
}
bool
-RTMPServer::packetSend(boost::shared_ptr<amf::Buffer> /* buf */)
+RTMPServer::packetSend(amf::Buffer &/* buf */)
{
GNASH_REPORT_FUNCTION;
return false;
}
+// This overrides using same method from the base RTMP class.
bool
-RTMPServer::packetRead(boost::shared_ptr<amf::Buffer> buf)
+RTMPServer::packetRead(amf::Buffer &buf)
{
GNASH_REPORT_FUNCTION;
- unsigned int amf_index, headersize;
- boost::uint8_t *ptr = buf->reference();
+ boost::uint8_t amf_index, headersize;
+ boost::uint8_t *ptr = buf.reference();
AMF amf;
- if (buf->reference() == 0) {
+ if (ptr == 0) {
return false;
}
-
- amf_index = *buf->reference() & RTMP_INDEX_MASK;
- headersize = headerSize(*buf->reference());
+
+ cerr << "FIXME3: " << buf.hexify(true) << endl;
+
+// ptr += 1; // skip past the header byte
+
+ amf_index = *ptr & RTMP_INDEX_MASK;
+ headersize = headerSize(*ptr);
log_debug (_("The Header size is: %d"), headersize);
log_debug (_("The AMF index is: 0x%x"), amf_index);
@@ -262,7 +259,7 @@
// buf->dump();
if (buf->size() < actual_size) {
log_debug("FIXME: MERGING");
- buf = _handler->merge(buf);
+ buf = _que->merge(buf);
}
while ((ptr - buf->begin()) < static_cast<int>(actual_size)) {
boost::shared_ptr<amf::Element> el(new amf::Element);
@@ -286,7 +283,7 @@
break;
case PING:
{
- rtmp_ping_t *ping = decodePing(ptr);
+ boost::shared_ptr<rtmp_ping_t> ping = decodePing(ptr);
switch (ping->type) {
case PING_CLEAR:
break;
@@ -331,11 +328,12 @@
log_error (_("ERROR: Unidentified RTMP message content type 0x%x"),
_header.type);
break;
};
-
+
+#if 0
boost::shared_ptr<amf::Element> url = getProperty("tcUrl");
boost::shared_ptr<amf::Element> file = getProperty("swfUrl");
boost::shared_ptr<amf::Element> app = getProperty("app");
-
+
if (file) {
log_debug("SWF file %s", file->to_string());
}
@@ -345,6 +343,7 @@
if (app) {
log_debug("is file name is %s", app->to_string());
}
+#endif
return true;
}
@@ -502,73 +501,53 @@
}
// This is the thread for all incoming RTMP connections
-void
+bool
rtmp_handler(Network::thread_params_t *args)
{
GNASH_REPORT_FUNCTION;
- Handler *hand = reinterpret_cast<Handler *>(args->handler);
- RTMPServer rtmp;
-
- rtmp.setHandler(hand);
+// Handler *hand = reinterpret_cast<Handler *>(args->handler);
+ RTMPServer *rtmp = new RTMPServer;
string docroot = args->filespec;
-
+ string url, filespec;
+ url = docroot;
+ bool done = false;
+
log_debug(_("Starting RTMP Handler for fd #%d, tid %ld"),
args->netfd, get_thread_id());
- while (!hand->timetodie()) {
- log_debug(_("Waiting for RTMP request on fd #%d..."), args->netfd);
- hand->wait();
- // This thread is the last to wake up when the browser
- // closes the network connection. When browsers do this
- // varies, elinks and lynx are very forgiving to a more
- // flexible HTTP protocol, which Firefox/Mozilla & Opera
- // are much pickier, and will hang or fail to load if
- // you aren't careful.
- if (hand->timetodie()) {
- log_debug("Not waiting no more, no more for RTMP data for fd
#%d...", args->netfd);
- map<int, Handler *>::iterator hit = handlers.find(args->netfd);
- if ((*hit).second) {
- log_debug("Removing handle %x for RTMP on fd #%d", (void
*)hand, args->netfd);
- handlers.erase(args->netfd);
- }
-
- return;
- }
#ifdef USE_STATISTICS
- struct timespec start;
- clock_gettime (CLOCK_REALTIME, &start);
+ struct timespec start;
+ clock_gettime (CLOCK_REALTIME, &start);
#endif
- if (!rtmp.handShakeWait()) {
- // hand->clearout(); // remove all data from the outgoing que
- hand->die(); // tell all the threads for this connection to
die
- hand->notifyin();
- log_debug("Net RTMP done for fd #%d...", args->netfd);
-// hand->closeNet(args->netfd);
- return;
- }
- string url, filespec;
- url = docroot;
-
- rtmp.handShakeResponse();
-
- hand->wait();
- // This thread is the last to wake up when the browser
- // closes the network connection. When browsers do this
- // varies, elinks and lynx are very forgiving to a more
- // flexible HTTP protocol, which Firefox/Mozilla & Opera
- // are much pickier, and will hang or fail to load if
- // you aren't careful.
- if (hand->timetodie()) {
- log_debug("Not waiting no more, no more for RTMP data for fd
#%d...", args->netfd);
- map<int, Handler *>::iterator hit = handlers.find(args->netfd);
- if ((*hit).second) {
- log_debug("Removing handle %x for RTMP on fd #%d", (void
*)hand, args->netfd);
- handlers.erase(args->netfd);
- }
-
- return;
- }
- rtmp.serverFinish();
+
+ // Adjust the timeout
+ rtmp->setTimeout(5);
+
+ // Read the handshake bytes sent by the client when requesting
+ // a connection.
+ boost::shared_ptr<amf::Buffer> handshake1 = rtmp->recvMsg(args->netfd);
+ // See if we have data in the handshake, we should have 1537 bytes
+ if (handshake1->allocated() == 0) {
+ log_error("failed to read the handshake from the client.");
+ return false;
+ }
+
+ // Send our response to the handshake, which primarily is the bytes
+ // we just recieved.
+ rtmp->handShakeResponse(args->netfd, *handshake1);
+
+ boost::shared_ptr<amf::Buffer> handshake2 = rtmp->recvMsg(args->netfd);
+ // See if we have data in the handshake, we should have 1536 bytes
+ if (handshake2->allocated() == 0) {
+ log_error("failed to read the handshake from the client.");
+ return false;
+ }
+ boost::shared_ptr<amf::Buffer> start = rtmp->serverFinish(args->netfd,
*handshake1, *handshake2);
+
+ start->dump();
+
+ boost::shared_ptr<RTMP::rtmp_head_t> head = rtmp->decodeHeader(*start);
+ rtmp->decodeMsgBody(*start);
// Keep track of the network statistics
// Statistics st;
@@ -580,9 +559,25 @@
// st.addStats();
// proto.resetBytesIn();
// proto.resetBytesOut();
-
+
// st.dump();
- }
+ do {
+ // See if we have any messages waiting
+ boost::shared_ptr<amf::Buffer> buf = rtmp->recvMsg(args->netfd);
+ if (buf->allocated()) {
+ boost::uint8_t *ptr = buf->reference();
+ if (ptr == 0) {
+ log_debug("Que empty, net connection dropped for fd #%d",
args->netfd);
+ return false;
+ }
+ boost::shared_ptr<RTMP::rtmp_head_t> rthead =
rtmp->decodeHeader(ptr);
+ rtmp->decodeMsgBody(*buf);
+ } else {
+ done = true;
+ }
+ } while (!done);
+
+ return false;
}
// A Ping packet has two parameters that ae always specified, and 2 that are
optional.
=== modified file 'libnet/rtmp_server.h'
--- a/libnet/rtmp_server.h 2008-12-15 03:46:09 +0000
+++ b/libnet/rtmp_server.h 2008-12-28 01:48:41 +0000
@@ -28,6 +28,7 @@
#include "handler.h"
#include "network.h"
#include "buffer.h"
+#include "diskstream.h"
namespace gnash
{
@@ -37,11 +38,11 @@
public:
RTMPServer();
~RTMPServer();
- bool handShakeWait();
- bool handShakeResponse();
- bool serverFinish();
- bool packetSend(boost::shared_ptr<amf::Buffer> buf);
- bool packetRead(boost::shared_ptr<amf::Buffer> buf);
+// bool processClientHandShake(int fd, amf::Buffer &buf);
+ bool handShakeResponse(int fd, amf::Buffer &buf);
+ boost::shared_ptr<amf::Buffer> serverFinish(int fd, amf::Buffer
&handshake1, amf::Buffer &handshake2);
+ bool packetSend(amf::Buffer &buf);
+ bool packetRead(amf::Buffer &buf);
// These are handlers for the various types
boost::shared_ptr<amf::Buffer> encodeResult(RTMPMsg::rtmp_status_e status);
@@ -50,10 +51,15 @@
void dump();
private:
+ typedef boost::char_separator<char> Sep;
+ typedef boost::tokenizer<Sep> Tok;
+ DiskStream::filetype_e _filetype;
+ std::string _filespec;
+ boost::uint32_t _filesize;
};
// This is the thread for all incoming RTMP connections
-void rtmp_handler(Network::thread_params_t *args);
+bool rtmp_handler(Network::thread_params_t *args);
} // end of gnash namespace
// end of _RTMP_SERVER_H_
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9916: refactor heavily to work with the new network engine and threading scheme.,
rob <=