// Copyright (C) 2001,2002 Federico Montesino Pouzols // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. // // As a special exception to the GNU General Public License, permission is // granted for additional uses of the text contained in its release // of ccRTP. // // The exception is that, if you link the ccRTP library with other // files to produce an executable, this does not by itself cause the // resulting executable to be covered by the GNU General Public License. // Your use of that executable is in no way restricted on account of // linking the ccRTP library code into it. // // This exception does not however invalidate any other reasons why // the executable file might be covered by the GNU General Public License. // // This exception applies only to the code released under the // name ccRTP. If you copy code from other releases into a copy of // ccRTP, as the General Public License permits, the exception does // not apply to the code that you add in this way. To avoid misleading // anyone as to the status of such modified files, you must delete // this exception notice from them. // // If you write modifications of your own for ccRTP, it is your choice // whether to permit this exception to apply to your modifications. // If you do not wish that, delete this exception notice. #include "private.h" #include #ifdef CCXX_NAMESPACES namespace ost { #endif const size_t OutgoingDataQueueBase::defaultMaxSendSegmentSize = 65536; OutgoingDataQueueBase::OutgoingDataQueueBase() { // segment data in packets of no more than 65536 octets. setMaxSendSegmentSize(getDefaultMaxSendSegmentSize()); } DestinationListHandler::DestinationListHandler() : destinationCounter(0), firstDestination(NULL), lastDestination(NULL), destinationLock() { } DestinationListHandler::~DestinationListHandler() { TransportAddress* tmp; writeLockDestinationList(); while ( firstDestination ) { tmp = getFirstDestination(); firstDestination = firstDestination->getNext(); #ifdef CCXX_EXCEPTIONS try { #endif delete tmp; #ifdef CCXX_EXCEPTIONS } catch (...) {} #endif } unlockDestinationList(); } bool DestinationListHandler::addDestinationToList(const InetAddress& ia, tpport_t data, tpport_t control) { TransportAddress* addr = new TransportAddress(ia,data,control); writeLockDestinationList(); if ( firstDestination == NULL ) firstDestination = lastDestination = addr; else { lastDestination->setNext(addr); lastDestination = addr; } destinationCounter++; unlockDestinationList(); return true; } bool DestinationListHandler::removeDestinationFromList(const InetAddress& ia, tpport_t dataPort, tpport_t controlPort) { bool result = false; TransportAddress* prev = NULL; writeLockDestinationList(); TransportAddress* ta = getFirstDestination(); while ( NULL != ta ) { if ( ia == ta->getNetworkAddress() && dataPort == ta->getDataTransportPort() && controlPort == ta->getControlTransportPort() ) { // matches. -> remove it. result = true; if ( prev ) prev->setNext(ta->getNext()); destinationCounter--; delete ta; } else { prev = ta; ta = ta->getNext(); } } unlockDestinationList(); return result; } /// Schedule at 8 ms. const microtimeout_t OutgoingDataQueue::defaultSchedulingTimeout = 8000; /// Packets unsent will expire after 40 ms. const microtimeout_t OutgoingDataQueue::defaultExpireTimeout = 40000; OutgoingDataQueue::OutgoingDataQueue(): OutgoingDataQueueBase(), DestinationListHandler(), sendLock(), sendFirst(NULL), sendLast(NULL) { setInitialTimestamp(random32()); setSchedulingTimeout(getDefaultSchedulingTimeout()); setExpireTimeout(getDefaultExpireTimeout()); sendInfo.packetCount = 0; sendInfo.octetCount = 0; sendInfo.sendSeq = random16(); // random initial sequence number sendInfo.sendCC = 0; // initially, 0 CSRC identifiers follow the fixed heade sendInfo.marked = false; sendInfo.complete = true; // the local source is the first contributing source sendInfo.sendSources[0] = getLocalSSRC(); // this will be an accumulator for the successive cycles of timestamp sendInfo.overflowTime.tv_sec = getInitialTime().tv_sec; sendInfo.overflowTime.tv_usec = getInitialTime().tv_usec; } void OutgoingDataQueue::purgeOutgoingQueue() { OutgoingRTPPktLink* sendnext; // flush the sending queue (delete outgoing packets // unsent so far) sendLock.enterMutex(); while ( sendFirst ) { sendnext = sendFirst->getNext(); delete sendFirst; sendFirst = sendnext; } sendLock.signal(true); sendLock.leaveMutex(); } bool OutgoingDataQueue::addDestination(const InetHostAddress& ia, tpport_t dataPort, tpport_t controlPort) { if ( 0 == controlPort ) controlPort = dataPort + 1; bool result = addDestinationToList(ia,dataPort,controlPort); if ( result && isSingleDestination() ) { setDataPeer(ia,dataPort); setControlPeer(ia,controlPort); } return result; } bool OutgoingDataQueue::addDestination(const InetMcastAddress& ia, tpport_t dataPort, tpport_t controlPort) { if ( 0 == controlPort ) controlPort = dataPort + 1; bool result = addDestinationToList(ia,dataPort,controlPort); if ( result && isSingleDestination() ) { setDataPeer(ia,dataPort); setControlPeer(ia,controlPort); } return result; } bool OutgoingDataQueue::forgetDestination(const InetHostAddress& ia, tpport_t dataPort, tpport_t controlPort) { if ( 0 == controlPort ) controlPort = dataPort + 1; return DestinationListHandler:: removeDestinationFromList(ia,dataPort,controlPort); } bool OutgoingDataQueue::forgetDestination(const InetMcastAddress& ia, tpport_t dataPort, tpport_t controlPort) { if ( 0 == controlPort ) controlPort = dataPort + 1; return DestinationListHandler:: removeDestinationFromList(ia,dataPort,controlPort); } bool OutgoingDataQueue::isSending(void) const { if(sendFirst) return true; return false; } microtimeout_t OutgoingDataQueue::getSchedulingTimeout(void) { struct timeval send, now; uint32 rate; uint32 rem; for(;;) { // if there is no packet to send, use the default scheduling // timeout if( !sendFirst ) return schedulingTimeout; uint32 stamp = sendFirst->getPacket()->getTimestamp(); stamp -= getInitialTimestamp(); rate = getCurrentRTPClockRate(); // now we want to get in send _when_ the // packet is scheduled to be sent. // translate timestamp to timeval send.tv_sec = stamp / rate; rem = stamp % rate; send.tv_usec = (1000ul*rem) / (rate/1000ul); // 10^6 * rem/rate // add timevals. Overflow holds the inital time // plus the time accumulated through successive // overflows of timestamp. See below. timeradd(&send,&(sendInfo.overflowTime),&send); gettimeofday(&now, NULL); // Problem: when timestamp overflows, time goes back. // We MUST ensure that _send_ is not too lower than // _now_, otherwise, we MUST keep how many time was // lost because of overflow. We assume that _send_ // 5000 seconds lower than now suggests timestamp // overflow. (Remember than the 32 bits of the // timestamp field are 47722 seconds under a sampling // clock of 90000 hz.) This is not a perfect // solution. Disorderedly timestamped packets coming // after an overflowed one will be wrongly // corrected. Nevertheless, this may only corrupt a // handful of those packets every more than 13 hours // (if timestamp started from 0). if ( now.tv_sec - send.tv_sec > 5000){ timeval overflow; overflow.tv_sec =(~static_cast(0)) / rate; overflow.tv_usec = (~static_cast(0)) % rate * 1000000ul / rate; do { timeradd(&send,&overflow,&send); timeradd(&(sendInfo.overflowTime),&overflow, &(sendInfo.overflowTime)); } while ( now.tv_sec - send.tv_sec > 5000 ); } // This tries to solve the aforementioned problem // about disordered packets coming after an overflowed // one. Now we apply the reverse idea. if ( send.tv_sec - now.tv_sec > 20000 ){ timeval overflow; overflow.tv_sec = (~static_cast(0)) / rate; overflow.tv_usec = (~static_cast(0)) % rate * 1000000ul / rate; timersub(&send,&overflow,&send); } // A: This sets a maximum timeout of 1 hour. if ( send.tv_sec - now.tv_sec > 3600 ){ return 3600000000ul; } int32 diff = ((send.tv_sec - now.tv_sec) * 1000000ul) + send.tv_usec - now.tv_usec; // B: wait diff usecs more before sending if ( diff >= 0 ){ return static_cast(diff); } // C: the packet must be sent right now if ( (diff < 0) && static_cast(-diff) <= getExpireTimeout() ){ return 0; } // D: the packet has expired -> delete it. sendLock.enterMutex(); OutgoingRTPPktLink* packet = sendFirst; sendFirst = sendFirst->getNext(); onExpireSend(*(packet->getPacket())); // new virtual to notify delete packet; if ( sendFirst ) sendFirst->setPrev(NULL); else sendLast = NULL; sendLock.leaveMutex(); } I( false ); return 0; } void OutgoingDataQueue::putData(uint32 stamp, const unsigned char *data, size_t datalen) { if ( !data || !datalen ) return; size_t step = 0, offset = 0; while ( offset < datalen ) { // remainder and step take care of segmentation // according to getMaxSendSegmentSize() size_t remainder = datalen - offset; step = ( remainder > getMaxSendSegmentSize() ) ? getMaxSendSegmentSize() : remainder; OutgoingRTPPkt* packet; if ( sendInfo.sendCC ) packet = new OutgoingRTPPkt(sendInfo.sendSources,15,data + offset,step); else packet = new OutgoingRTPPkt(data + offset,step); packet->setPayloadType(getCurrentPayloadType()); packet->setSeqNum(sendInfo.sendSeq++); packet->setTimestamp(stamp + getInitialTimestamp()); packet->setSSRCNetwork(getLocalSSRCNetwork()); if ( (0 == offset) && getMark() ) { packet->setMarker(true); setMark(false); } else { packet->setMarker(false); } // insert the packet into the "tail" of the sending queue sendLock.enterMutex(); OutgoingRTPPktLink *link = new OutgoingRTPPktLink(packet,sendLast,NULL); if (sendLast) sendLast->setNext(link); else sendFirst = link; sendLast = link; sendLock.signal(true); sendLock.leaveMutex(); offset += step; } } bool OutgoingDataQueue::waitData(microtimeout_t to) const { bool result = true; sendLock.enterMutex(); if (!sendFirst) { sendLock.wait(to, true); if (!sendFirst) result = false; } sendLock.leaveMutex(); return result; } size_t OutgoingDataQueue::dispatchDataPacket(void) { sendLock.enterMutex(); OutgoingRTPPktLink* packetLink = sendFirst; if ( !packetLink ){ sendLock.leaveMutex(); return 0; } OutgoingRTPPkt* packet = packetLink->getPacket(); uint32 rtn = packet->getPayloadSize(); lockDestinationList(); if ( isSingleDestination() ) { sendData(packet->getRawPacket(), packet->getRawPacketSize()); } else { // when no destination has been added, NULL == dest. TransportAddress* dest = getFirstDestination(); while ( dest ) { setDataPeer(dest->getNetworkAddress(), dest->getDataTransportPort()); sendData(packet->getRawPacket(), packet->getRawPacketSize()); dest = dest->getNext(); } } unlockDestinationList(); // unlink the sent packet from the queue and destroy it. Also // record the sending. sendFirst = sendFirst->getNext(); if ( sendFirst ) { sendFirst->setPrev(NULL); } else { sendLast = NULL; } // for general accounting and RTCP SR statistics sendInfo.packetCount++; sendInfo.octetCount += packet->getPayloadSize(); delete packetLink; sendLock.leaveMutex(); return rtn; } size_t OutgoingDataQueue::setPartial(uint32 stamp, unsigned char *data, size_t offset, size_t max) { sendLock.enterMutex(); OutgoingRTPPktLink* packetLink = sendFirst; while ( packetLink ) { uint32 pstamp = packetLink->getPacket()->getTimestamp(); if ( pstamp > stamp ) packetLink = NULL; if ( pstamp >= stamp ) break; packetLink = packetLink->getNext(); } if ( !packetLink ) { sendLock.leaveMutex(); return 0; } OutgoingRTPPkt* packet = packetLink->getPacket(); if ( offset >= packet->getPayloadSize() ) return 0; if ( max > packet->getPayloadSize() - offset ) max = packet->getPayloadSize() - offset; memcpy((unsigned char*)(packet->getPayload()) + offset, data, max); sendLock.leaveMutex(); return max; } #ifdef CCXX_NAMESPACES }; #endif /** EMACS ** * Local variables: * mode: c++ * c-basic-offset: 8 * End: */