ccRTP 2.1.2
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
incqueue.cpp
Go to the documentation of this file.
1 // Copyright (C) 2001-2015 Federico Montesino Pouzols <fedemp@altern.org>
2 //
3 // This program is free software; you can redistribute it and/or modify
4 // it under the terms of the GNU General Public License as published by
5 // the Free Software Foundation; either version 2 of the License, or
6 // (at your option) any later version.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 //
17 // As a special exception, you may use this file as part of a free software
18 // library without restriction. Specifically, if other files instantiate
19 // templates or use macros or inline functions from this file, or you compile
20 // this file and link it with other files to produce an executable, this
21 // file does not by itself cause the resulting executable to be covered by
22 // the GNU General Public License. This exception does not however
23 // invalidate any other reasons why the executable file might be covered by
24 // the GNU General Public License.
25 //
26 // This exception applies only to the code released under the name GNU
27 // ccRTP. If you copy code from other releases into a copy of GNU
28 // ccRTP, as the General Public License permits, the exception does
29 // not apply to the code that you add in this way. To avoid misleading
30 // anyone as to the status of such modified files, you must delete
31 // this exception notice from them.
32 //
33 // If you write modifications of your own for GNU ccRTP, it is your choice
34 // whether to permit this exception to apply to your modifications.
35 // If you do not wish that, delete this exception notice.
36 //
37 
38 #include "private.h"
39 #include <ccrtp/iqueue.h>
40 
41 NAMESPACE_COMMONCPP
42 
44 
46 ConflictingTransportAddress(InetAddress na,tpport_t dtp, tpport_t ctp):
47 networkAddress(na), dataTransportPort(dtp),
48 controlTransportPort(ctp), next(NULL)
49 {
50  SysTime::gettimeofday(&lastPacketTime,NULL);
51 }
52 
54 ConflictHandler::searchDataConflict(InetAddress na, tpport_t dtp)
55 {
57  while ( result->networkAddress != na ||
58  result->dataTransportPort != dtp)
59  result = result->next;
60  return result;
61 }
62 
64 ConflictHandler::searchControlConflict(InetAddress na, tpport_t ctp)
65 {
67  while ( result &&
68  (result->networkAddress != na ||
69  result->controlTransportPort != ctp) )
70  result = result->next;
71  return result;
72 }
73 
74 void
75 ConflictHandler::addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp)
76 {
78  new ConflictingTransportAddress(na,dtp,ctp);
79 
80  if ( lastConflict ) {
81  lastConflict->setNext(nc);
82  lastConflict = nc;
83  } else {
85  }
86 }
87 
93 
96 {
97  recvFirst = recvLast = NULL;
98  sourceExpirationPeriod = 5; // 5 RTCP report intervals
102 }
103 
104 void
106 {
107  IncomingRTPPktLink* recvnext;
108  // flush the reception queue (incoming packets not yet
109  // retrieved)
110  recvLock.writeLock();
111  while( recvFirst )
112  {
113  recvnext = recvFirst->getNext();
114 
115  // nullify source specific packet list
117  s->setFirst(NULL);
118  s->setLast(NULL);
119 
120  delete recvFirst->getPacket();
121  delete recvFirst;
122  recvFirst = recvnext;
123  }
124  recvLock.unlock();
125 }
126 
127 void
129 {
130  const uint32 MAXTRIES = 20;
131  uint32 newssrc;
132  uint16 tries = 0;
133  do {
134  newssrc = random32();
135  tries++;
136  } while ( (tries < MAXTRIES) && isRegistered(newssrc) );
137 
138  if ( MAXTRIES == tries ) {
139  // TODO we are in real trouble.
140  }
141 }
142 
143 bool
145 {
146  bool w;
147  recvLock.readLock();
148  if ( NULL == src )
149  w = ( NULL != recvFirst);
150  else
151  w = isMine(*src) && ( NULL != getLink(*src)->getFirst() );
152 
153  recvLock.unlock();
154  return w;
155 }
156 
157 uint32
159 {
160  recvLock.readLock();
161 
162  // get the first packet
163  IncomingRTPPktLink* packetLink;
164  if ( NULL == src )
165  packetLink = recvFirst;
166  else
167  packetLink = isMine(*src) ? getLink(*src)->getFirst() : NULL;
168 
169  // get the timestamp of the first packet
170  uint32 ts;
171  if ( packetLink )
172  ts = packetLink->getTimestamp();
173  else
174  ts = 0l;
175 
176  recvLock.unlock();
177  return ts;
178 }
179 
180 size_t
182 {
183  InetHostAddress network_address;
184  tpport_t transport_port;
185 
186  uint32 nextSize = (uint32)getNextDataPacketSize();
187  unsigned char* buffer = new unsigned char[nextSize];
188  int32 rtn = (int32)recvData(buffer,nextSize,network_address,transport_port);
189  if ( (rtn < 0) || ((uint32)rtn > getMaxRecvPacketSize()) ){
190  delete [] buffer;
191  return 0;
192  }
193 
194  // get time of arrival
195  struct timeval recvtime;
196  SysTime::gettimeofday(&recvtime,NULL);
197 
198  // Special handling of padding to take care of encrypted content.
199  // In case of SRTP the padding length field is also encrypted, thus
200  // it gives a wrong length. Check and clear padding bit before
201  // creating the RTPPacket. Will be set and re-computed after a possible
202  // SRTP decryption.
203  uint8 padSet = (*buffer & 0x20);
204  if (padSet) {
205  *buffer = *buffer & ~0x20; // clear padding bit
206  }
207  // build a packet. It will link itself to its source
208  IncomingRTPPkt* packet =
209  new IncomingRTPPkt(buffer,rtn);
210 
211  // Generic header validity check.
212  if ( !packet->isHeaderValid() ) {
213  delete packet;
214  return 0;
215  }
216 
217  CryptoContext* pcc = getInQueueCryptoContext( packet->getSSRC());
218  if (pcc == NULL) {
219  pcc = getInQueueCryptoContext(0);
220  if (pcc != NULL) {
221  pcc = pcc->newCryptoContextForSSRC(packet->getSSRC(), 0, 0L);
222  if (pcc != NULL) {
223  pcc->deriveSrtpKeys(0);
225  }
226  }
227  }
228  if (pcc != NULL) {
229  int32 ret = packet->unprotect(pcc);
230  if (ret < 0) {
231  if (!onSRTPPacketError(*packet, ret)) {
232  delete packet;
233  return 0;
234  }
235  }
236  }
237  if (padSet) {
238  packet->reComputePayLength(true);
239  }
240  // virtual for profile-specific validation and processing.
241  if ( !onRTPPacketRecv(*packet) ) {
242  delete packet;
243  return 0;
244  }
245 
246  bool source_created;
247  SyncSourceLink* sourceLink =
248  getSourceBySSRC(packet->getSSRC(),source_created);
249  SyncSource* s = sourceLink->getSource();
250  if ( source_created ) {
251  // Set data transport address.
252  setDataTransportPort(*s,transport_port);
253  // Network address is assumed to be the same as the control one
254  setNetworkAddress(*s,network_address);
255  sourceLink->initStats();
256  // Keep first sequence number for stats
257  sourceLink->setBaseSeqNum(packet->getSeqNum());
258  // First packet arrival time.
259  sourceLink->setInitialDataTime(recvtime);
261  if ( sourceLink->getHello() )
262  onNewSyncSource(*s);
263  } else if ( 0 == s->getDataTransportPort() ) {
264  // Test if RTCP packets had been received but this is the
265  // first data packet from this source.
266  setDataTransportPort(*s,transport_port);
267  }
268 
269  // Before inserting in the queue,
270  // 1) check for collisions and loops. If the packet cannot be
271  // assigned to a source, it will be rejected.
272  // 2) check the source is a sufficiently well known source
273  // TODO: also check CSRC identifiers.
274  if ( checkSSRCInIncomingRTPPkt(*sourceLink,source_created,
275  network_address,transport_port) &&
276  recordReception(*sourceLink,*packet,recvtime) ) {
277  // now the packet link is linked in the queues
278  IncomingRTPPktLink* packetLink =
279  new IncomingRTPPktLink(packet,
280  sourceLink,
281  recvtime,
282  packet->getTimestamp() -
283  sourceLink->getInitialDataTimestamp(),
284  NULL,NULL,NULL,NULL);
285  insertRecvPacket(packetLink);
286  } else {
287  // must be discarded due to collision or loop or
288  // invalid source
289  delete packet;
290  }
291 
292  // ccRTP keeps packets from the new source, but avoids
293  // flip-flopping. This allows losing less packets and for
294  // mobile telephony applications or other apps that may change
295  // the source transport address during the session.
296  return rtn;
297 }
298 
300 bool is_new, InetAddress& network_address, tpport_t transport_port)
301 {
302  bool result = true;
303 
304  // Test if the source is new and it is not the local one.
305  if ( is_new &&
306  sourceLink.getSource()->getID() != getLocalSSRC() )
307  return result;
308 
309  SyncSource *s = sourceLink.getSource();
310 
311  if ( s->getDataTransportPort() != transport_port ||
312  s->getNetworkAddress() != network_address ) {
313  // SSRC collision or a loop has happened
314  if ( s->getID() != getLocalSSRC() ) {
315  // TODO: Optional error counter.
316 
317  // Note this differs from the default in the RFC.
318  // Discard packet only when the collision is
319  // repeating (to avoid flip-flopping)
320  if ( sourceLink.getPrevConflict() &&
321  (
322  (network_address ==
323  sourceLink.getPrevConflict()->networkAddress)
324  &&
325  (transport_port ==
326  sourceLink.getPrevConflict()->dataTransportPort)
327  ) ) {
328  // discard packet and do not flip-flop
329  result = false;
330  } else {
331  // Record who has collided so that in
332  // the future we can how if the
333  // collision repeats.
334  sourceLink.setPrevConflict(network_address,
335  transport_port,0);
336  // Change sync source transport address
337  setDataTransportPort(*s,transport_port);
338  setNetworkAddress(*s,network_address);
339  }
340 
341  } else {
342  // Collision or loop of own packets.
343  ConflictingTransportAddress* conflicting =
344  searchDataConflict(network_address,
345  transport_port);
346  if ( conflicting ) {
347  // Optional error counter.
348  updateConflict(*conflicting);
349  result = false;
350  } else {
351  // New collision
355  dispatchBYE("SSRC collision detected when receiving data packet.");
356  renewLocalSSRC();
357  setNetworkAddress(*s,network_address);
358  setDataTransportPort(*s,transport_port);
360  sourceLink.initStats();
362  }
363  }
364  }
365  return result;
366 }
367 
368 bool
370 {
371  SyncSourceLink *srcLink = packetLink->getSourceLink();
372  unsigned short seq = packetLink->getPacket()->getSeqNum();
373  recvLock.writeLock();
374  IncomingRTPPktLink* plink = srcLink->getLast();
375  if ( plink && (seq < plink->getPacket()->getSeqNum()) ) {
376  // a disordered packet, so look for its place
377  while ( plink && (seq < plink->getPacket()->getSeqNum()) ){
378  // the packet is a duplicate
379  if ( seq == plink->getPacket()->getSeqNum() ) {
380  recvLock.unlock();
381  VDL(("Duplicated disordered packet: seqnum %d, SSRC:",
382  seq,srcLink->getSource()->getID()));
383  delete packetLink->getPacket();
384  delete packetLink;
385  return false;
386  }
387  plink = plink->getSrcPrev();
388  }
389  if ( !plink ) {
390  // we have scanned the whole (and non empty)
391  // list, so this must be the older (first)
392  // packet from this source.
393 
394  // insert into the source specific queue
395  IncomingRTPPktLink* srcFirst = srcLink->getFirst();
396  srcFirst->setSrcPrev(packetLink);
397  packetLink->setSrcNext(srcFirst);
398  // insert into the global queue
399  IncomingRTPPktLink* prevFirst = srcFirst->getPrev();
400  if ( prevFirst ){
401  prevFirst->setNext(packetLink);
402  packetLink->setPrev(prevFirst);
403  }
404  srcFirst->setPrev(packetLink);
405  packetLink->setNext(srcFirst);
406  srcLink->setFirst(packetLink);
407  } else {
408  // (we are in the middle of the source list)
409  // insert into the source specific queue
410  plink->getSrcNext()->setSrcPrev(packetLink);
411  packetLink->setSrcNext(plink->getSrcNext());
412  // -- insert into the global queue, with the
413  // minimum priority compared to packets from
414  // other sources
415  plink->getSrcNext()->getPrev()->setNext(packetLink);
416  packetLink->setPrev(plink->getSrcNext()->getPrev());
417  plink->getSrcNext()->setPrev(packetLink);
418  packetLink->setNext(plink->getSrcNext());
419  // ------
420  plink->setSrcNext(packetLink);
421  packetLink->setSrcPrev(plink);
422  // insert into the global queue (giving
423  // priority compared to packets from other sources)
424  //list->getNext->setPrev(packetLink);
425  //packetLink->setNext(list->getNext);
426  //list->setNext(packet);
427  //packet->setPrev(list);
428  }
429  } else {
430  // An ordered packet
431  if ( !plink ) {
432  // the only packet in the source specific queue
433  srcLink->setLast(packetLink);
434  srcLink->setFirst(packetLink);
435  // the last packet in the global queue
436  if ( recvLast ) {
437  recvLast->setNext(packetLink);
438  packetLink->setPrev(recvLast);
439  }
440  recvLast = packetLink;
441  if ( !recvFirst )
442  recvFirst = packetLink;
443  } else {
444  // there are already more packets from this source.
445  // this ignores duplicate packets
446  if ( plink && (seq == plink->getPacket()->getSeqNum()) ) {
447  VDL(("Duplicated packet: seqnum %d, SSRC:",
448  seq,srcLink->getSource->getID()));
449  recvLock.unlock();
450  delete packetLink->getPacket();
451  delete packetLink;
452  return false;
453  }
454  // the last packet in the source specific queue
455  srcLink->getLast()->setSrcNext(packetLink);
456  packetLink->setSrcPrev(srcLink->getLast());
457  srcLink->setLast(packetLink);
458  // the last packet in the global queue
459  recvLast->setNext(packetLink);
460  packetLink->setPrev(recvLast);
461  recvLast = packetLink;
462  }
463  }
464  // account the insertion of this packet into the queue
465  srcLink->recordInsertion(*packetLink);
466  recvLock.unlock();
467  // packet successfully inserted
468  return true;
469 }
470 
471 const AppDataUnit*
472 IncomingDataQueue::getData(uint32 stamp, const SyncSource* src)
473 {
474  IncomingRTPPktLink* pl;
475 // unsigned count = 0;
476  AppDataUnit* result;
477 
478  if ( NULL != (pl = getWaiting(stamp,src)) ) {
479  IncomingRTPPkt* packet = pl->getPacket();
480 // size_t len = packet->getPayloadSize();
481 
482  SyncSource &src = *(pl->getSourceLink()->getSource());
483  result = new AppDataUnit(*packet,src);
484 
485  // delete the packet link, but not the packet
486  delete pl;
487 // count += len;
488  } else {
489  result = NULL;
490  }
491  return result;
492 }
493 
494 // FIX: try to merge and organize
496 IncomingDataQueue::getWaiting(uint32 timestamp, const SyncSource* src)
497 {
498  if ( src && !isMine(*src) )
499  return NULL;
500 
501  IncomingRTPPktLink *result;
502  recvLock.writeLock();
503  if ( src != NULL ) {
504  // process source specific queries:
505  // we will modify the queue of this source
506  SyncSourceLink* srcm = getLink(*src);
507 
508  // first, delete all older packets. The while loop
509  // down here counts how many older packets are there;
510  // then the for loop deletes them and advances l till
511  // the first non older packet.
512  int nold = 0;
513  IncomingRTPPktLink* l = srcm->getFirst();
514  if ( !l ) {
515  result = NULL;
516  recvLock.unlock();
517  return result;
518  }
519  while ( l && ((l->getTimestamp() < timestamp) ||
520  end2EndDelayed(*l))) {
521  nold++;
522  l = l->getSrcNext();
523  }
524  // to know whether the global queue gets empty
525  bool nonempty = false;
526  for ( int i = 0; i < nold; i++) {
527  l = srcm->getFirst();
528  srcm->setFirst(srcm->getFirst()->getSrcNext());;
529  // unlink from the global queue
530  nonempty = false;
531  if ( l->getPrev() ){
532  nonempty = true;
533  l->getPrev()->setNext(l->getNext());
534  } if ( l->getNext() ) {
535  nonempty = true;
536  l->getNext()->setPrev(l->getPrev());
537  }
538  // now, delete it
539  onExpireRecv(*(l->getPacket()));// notify packet discard
540  delete l->getPacket();
541  delete l;
542  }
543  // return the packet, if found
544  if ( !srcm->getFirst() ) {
545  // threre are no more packets from this source
546  srcm->setLast(NULL);
547  if ( !nonempty )
548  recvFirst = recvLast = NULL;
549  result = NULL;
550  } else if ( srcm->getFirst()->getTimestamp() > timestamp ) {
551  // threre are only newer packets from this source
552  srcm->getFirst()->setSrcPrev(NULL);
553  result = NULL;
554  } else {
555  // (src->getFirst()->getTimestamp() == stamp) is true
556  result = srcm->getFirst();
557  // unlink the selected packet from the global queue
558  if ( result->getPrev() )
559  result->getPrev()->setNext(result->getNext());
560  else
561  recvFirst = result->getNext();
562  if ( result->getNext() )
563  result->getNext()->setPrev(result->getPrev());
564  else
565  recvLast = result->getPrev();
566  // unlink the selected packet from the source queue
567  srcm->setFirst(result->getSrcNext());
568  if ( srcm->getFirst() )
569  srcm->getFirst()->setPrev(NULL);
570  else
571  srcm->setLast(NULL);
572  }
573  } else {
574  // process source unspecific queries
575  int nold = 0;
577  while ( l && (l->getTimestamp() < timestamp ||
578  end2EndDelayed(*l) ) ){
579  nold++;
580  l = l->getNext();
581  }
582  for (int i = 0; i < nold; i++) {
585  // unlink the packet from the queue of its source
586  SyncSourceLink* src = l->getSourceLink();
587  src->setFirst(l->getSrcNext());
588  if ( l->getSrcNext() )
589  l->getSrcNext()->setSrcPrev(NULL);
590  else
591  src->setLast(NULL);
592  // now, delete it
593  onExpireRecv(*(l->getPacket()));// notify packet discard
594  delete l->getPacket();
595  delete l;
596  }
597 
598  // return the packet, if found
599  if ( !recvFirst ) {
600  // there are no more packets in the queue
601  recvLast = NULL;
602  result = NULL;
603  } else if ( recvFirst->getTimestamp() > timestamp ) {
604  // there are only newer packets in the queue
605  if (l)
606  l->setPrev(NULL);
607  result = NULL;
608  } else {
609  // (recvFirst->getTimestamp() == stamp) is true
610  result = recvFirst;
611  // unlink the selected packet from the global queue
613  if ( recvFirst )
614  recvFirst->setPrev(NULL);
615  else
616  recvLast = NULL;
617  // unlink the selected packet from the queue
618  // of its source
619  SyncSourceLink* src = result->getSourceLink();
620  src->setFirst(result->getSrcNext());
621  if ( src->getFirst() )
622  src->getFirst()->setSrcPrev(NULL);
623  else
624  src->setLast(NULL);
625  }
626  }
627  recvLock.unlock();
628  return result;
629 }
630 
631 bool
633 const IncomingRTPPkt& pkt, const timeval recvtime)
634 {
635  bool result = true;
636 
637  // Source validation.
638  SyncSource* src = srcLink.getSource();
639  if ( !(srcLink.isValid()) ) {
640  // source is not yet valid.
641  if ( pkt.getSeqNum() == srcLink.getMaxSeqNum() + 1 ) {
642  // packet in sequence.
643  srcLink.decProbation();
644  if ( srcLink.isValid() ) {
645  // source has become valid.
646  // TODO: avoid this the first time.
647  srcLink.initSequence(pkt.getSeqNum());
648  } else {
649  result = false;
650  }
651  } else {
652  // packet not in sequence.
653  srcLink.probation = getMinValidPacketSequence() - 1;
654  result = false;
655  }
656  srcLink.setMaxSeqNum(pkt.getSeqNum());
657  } else {
658  // source was already valid.
659  uint16 step = pkt.getSeqNum() - srcLink.getMaxSeqNum();
660  if ( step < getMaxPacketDropout() ) {
661  // Ordered, with not too high step.
662  if ( pkt.getSeqNum() < srcLink.getMaxSeqNum() ) {
663  // sequene number wrapped.
664  srcLink.incSeqNumAccum();
665  }
666  srcLink.setMaxSeqNum(pkt.getSeqNum());
667  } else if ( step <= (SEQNUMMOD - getMaxPacketMisorder()) ) {
668  // too high step of the sequence number.
669  if ( pkt.getSeqNum() == srcLink.getBadSeqNum() ) {
670  srcLink.initSequence(pkt.getSeqNum());
671  } else {
672  srcLink.setBadSeqNum((pkt.getSeqNum() + 1) &
673  (SEQNUMMOD - 1) );
674  //This additional check avoids that
675  //the very first packet from a source
676  //be discarded.
677  if ( 0 < srcLink.getObservedPacketCount() ) {
678  result = false;
679  } else {
680  srcLink.setMaxSeqNum(pkt.getSeqNum());
681  }
682  }
683  } else {
684  // duplicate or reordered packet
685  }
686  }
687 
688  if ( result ) {
689  // the packet is considered valid.
690  srcLink.incObservedPacketCount();
691  srcLink.incObservedOctetCount(pkt.getPayloadSize());
692  srcLink.lastPacketTime = recvtime;
693  if ( srcLink.getObservedPacketCount() == 1 ) {
694  // ooops, it's the first packet from this source
695  setSender(*src,true);
696  srcLink.setInitialDataTimestamp(pkt.getTimestamp());
697  }
698  // we record the last time a packet from this source
699  // was received, this has statistical interest and is
700  // needed to time out old senders that are no sending
701  // any longer.
702 
703  // compute the interarrival jitter estimation.
704  timeval tarrival;
705  timeval lastT = srcLink.getLastPacketTime();
706  timeval initial = srcLink.getInitialDataTime();
707  timersub(&lastT,&initial,&tarrival);
708  uint32 arrival = timeval2microtimeout(tarrival)
710  uint32 transitTime = arrival - pkt.getTimestamp();
711  int32 delta = transitTime -
712  srcLink.getLastPacketTransitTime();
713  srcLink.setLastPacketTransitTime(transitTime);
714  if ( delta < 0 )
715  delta = -delta;
716  srcLink.setJitter( srcLink.getJitter() +
717  (1.0f / 16.0f) *
718  (static_cast<float>(delta) -
719  srcLink.getJitter()));
720  }
721  return result;
722 }
723 
724 void
726 {}
727 
728 void
730 {
731  std::list<CryptoContext *>::iterator i;
732 
733  MutexLock lock(cryptoMutex);
734  // check if a CryptoContext for a SSRC already exists. If yes
735  // remove it from list before inserting the new one.
736  for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ) {
737  if( (*i)->getSsrc() == cc->getSsrc() ) {
738  CryptoContext* tmp = *i;
739  cryptoContexts.erase(i);
740  delete tmp;
741  break;
742  }
743  }
744  cryptoContexts.push_back(cc);
745 }
746 
747 void
749 {
750  std::list<CryptoContext *>::iterator i;
751 
752  MutexLock lock(cryptoMutex);
753  if (cc == NULL) { // Remove any incoming crypto contexts
754  for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) {
755  CryptoContext* tmp = *i;
756  i = cryptoContexts.erase(i);
757  delete tmp;
758  }
759  }
760  else {
761  for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
762  if( (*i)->getSsrc() == cc->getSsrc() ) {
763  CryptoContext* tmp = *i;
764  cryptoContexts.erase(i);
765  delete tmp;
766  return;
767  }
768  }
769  }
770 }
771 
774 {
775  std::list<CryptoContext *>::iterator i;
776 
777  MutexLock lock(cryptoMutex);
778  for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
779  if( (*i)->getSsrc() == ssrc) {
780  return (*i);
781  }
782  }
783  return NULL;
784 }
785 
786 END_NAMESPACE
787 
virtual bool onRTPPacketRecv(IncomingRTPPkt &)
A virtual function to support parsing of arriving packets to determine if they should be kept in the ...
Definition: iqueue.h:1206
ConflictingTransportAddress * next
Definition: iqueue.h:255
static const uint16 defaultMaxPacketDropout
Definition: iqueue.h:1283
IncomingDataQueue::IncomingRTPPktLink * getWaiting(uint32 timestamp, const SyncSource *src=NULL)
This is used to fetch a packet in the receive queue and to expire packets older than the current time...
Definition: incqueue.cpp:496
static const uint32 SEQNUMMOD
Definition: iqueue.h:855
ConflictingTransportAddress * searchDataConflict(InetAddress na, tpport_t dtp)
Definition: incqueue.cpp:54
virtual size_t dispatchBYE(const std::string &)
A plugin point for posting of BYE messages.
Definition: queuebase.h:228
tpport_t getDataTransportPort() const
Definition: sources.h:271
Mutex cryptoMutex
Definition: iqueue.h:1289
Synchronization source in an RTP session.
Definition: sources.h:192
ThreadLock recvLock
Definition: iqueue.h:1277
Interface (envelope) to data received over RTP packets.
Definition: queuebase.h:68
The implementation for a SRTP cryptographic context.
Definition: CryptoContext.h:82
void setSender(SyncSource &source, bool active)
Definition: iqueue.h:150
void renewLocalSSRC()
Definition: incqueue.cpp:128
uint32 random32()
Definition: queue.cpp:492
void reComputePayLength(bool padding)
Re-compute payload length.
Definition: rtppkt.cpp:178
uint16 maxPacketMisorder
Definition: iqueue.h:1285
RTP packets received from other participants.
Definition: rtppkt.h:704
void recordExtraction(const IncomingRTPPkt &pkt)
Log extraction of a packet from this source from the scheduled reception queue.
Definition: incqueue.cpp:725
static const size_t defaultMembersSize
Definition: iqueue.h:1287
virtual bool end2EndDelayed(IncomingRTPPktLink &)
Definition: iqueue.h:1238
bool isRegistered(uint32 ssrc)
Returns whether there is already a synchronizacion source with "ssrc" SSRC identifier.
Definition: members.cpp:205
microtimeout_t timeval2microtimeout(const timeval &t)
Convert a time interval, expressed as a timeval value into a microseconds counter.
Definition: base.h:90
uint16 maxPacketDropout
Definition: iqueue.h:1286
virtual size_t recvData(unsigned char *buffer, size_t length, InetHostAddress &host, tpport_t &port)=0
This function performs the physical I/O for reading a packet from the source.
bool checkSSRCInIncomingRTPPkt(SyncSourceLink &sourceLink, bool is_new, InetAddress &na, tpport_t tp)
Apply collision and loop detection and correction algorithm when receiving RTP data packets...
Definition: incqueue.cpp:299
virtual void onNewSyncSource(const SyncSource &)
Virtual called when a new synchronization source has joined the session.
Definition: iqueue.h:1185
uint8 minValidPacketSequence
Definition: iqueue.h:1284
size_t getMaxRecvPacketSize() const
Definition: queuebase.h:302
uint8 getMinValidPacketSequence() const
Get the minimun number of consecutive packets that must be received from a source before accepting it...
Definition: iqueue.h:1010
uint32 getSSRC() const
Get synchronization source numeric identifier.
Definition: rtppkt.h:740
Declaration of ccRTP internal stuff.
uint16 getMaxPacketMisorder() const
Definition: iqueue.h:1022
int32 unprotect(CryptoContext *pcc)
Unprotect a received packet.
Definition: rtppkt.cpp:294
static const uint16 defaultMaxPacketMisorder
Definition: iqueue.h:1282
#define VDL(e)
Definition: private.h:110
uint16 getDefaultMaxPacketMisorder() const
Definition: iqueue.h:1018
uint32 getTimestamp() const
Definition: rtppkt.h:149
bool isHeaderValid()
Get validity of this packet.
Definition: rtppkt.h:730
uint8 sourceExpirationPeriod
Definition: iqueue.h:1288
void setNext(ConflictingTransportAddress *nc)
Definition: iqueue.h:240
IncomingDataQueue(uint32 size)
Definition: incqueue.cpp:94
static const size_t defaultMaxRecvPacketSize
Definition: queuebase.h:328
Generic RTP input queues.
void setNetworkAddress(SyncSource &source, InetAddress addr)
Definition: iqueue.h:162
CryptoContext * newCryptoContextForSSRC(uint32 ssrc, int roc, int64 keyDerivRate)
Derive a new Crypto Context for use with a new SSRC.
bool isWaiting(const SyncSource *src=NULL) const
Determine if packets are waiting in the reception queue.
Definition: incqueue.cpp:144
bool isMine(const SyncSource &source) const
Get whether a synchronization source is recorded in this membership controller.
Definition: iqueue.h:342
bool recordReception(SyncSourceLink &srcLink, const IncomingRTPPkt &pkt, const timeval recvtime)
Log reception of a new RTP packet from this source.
Definition: incqueue.cpp:632
static const size_t defaultMembersHashSize
Definition: iqueue.h:854
uint16 getSeqNum() const
Definition: rtppkt.h:142
CryptoContext * getInQueueCryptoContext(uint32 ssrc)
Get an input queue CryptoContext identified by SSRC.
Definition: incqueue.cpp:773
void setDataTransportPort(SyncSource &source, tpport_t p)
Definition: iqueue.h:154
bool insertRecvPacket(IncomingRTPPktLink *packetLink)
Insert a just received packet in the queue (both general and source specific queues).
Definition: incqueue.cpp:369
const AppDataUnit * getData(uint32 stamp, const SyncSource *src=NULL)
Retreive data from a specific timestamped packet if such a packet is currently available in the recei...
Definition: incqueue.cpp:472
const InetAddress & getNetworkAddress() const
Definition: sources.h:277
IncomingRTPPktLink * recvFirst
Definition: iqueue.h:1279
tpport_t getControlTransportPort() const
Definition: sources.h:274
SyncSourceLink * getSourceBySSRC(uint32 ssrc, bool &created)
Get the description of a source by its ssrc identifier.
Definition: members.cpp:226
virtual size_t takeInDataPacket()
This function is used by the service thread to process the next incoming packet and place it in the r...
Definition: incqueue.cpp:181
void purgeIncomingQueue()
Definition: incqueue.cpp:105
static const uint8 defaultMinValidPacketSequence
Definition: iqueue.h:1281
uint32 getPayloadSize() const
Definition: rtppkt.h:128
uint32 getLocalSSRC() const
Definition: queuebase.h:184
uint16 getMaxPacketDropout() const
Definition: iqueue.h:1039
uint16 getDefaultMaxPacketDropout() const
Definition: iqueue.h:1035
void deriveSrtpKeys(uint64 index)
Perform key derivation according to SRTP specification.
uint32 getSsrc() const
Get the SSRC of this SRTP Cryptograhic context.
Controls the group membership in the current session.
Definition: iqueue.h:298
IncomingRTPPktLink * recvLast
Definition: iqueue.h:1279
ConflictingTransportAddress * lastConflict
Definition: iqueue.h:285
uint32 getCurrentRTPClockRate() const
Get the clock rate in RTP clock units (for instance, 8000 units per second for PCMU, or 90000 units per second for MP2T).
Definition: queuebase.h:195
ConflictingTransportAddress(InetAddress na, tpport_t dtp, tpport_t ctp)
Definition: incqueue.cpp:46
uint32 getFirstTimestamp(const SyncSource *src=NULL) const
Get timestamp of first packet waiting in the queue.
Definition: incqueue.cpp:158
virtual void onExpireRecv(IncomingRTPPkt &)
A hook to filter packets in the receive queue that are being expired.
Definition: iqueue.h:1217
uint32 getID() const
Definition: sources.h:257
void addConflict(const InetAddress &na, tpport_t dtp, tpport_t ctp)
Definition: incqueue.cpp:75
SyncSourceLink * getLink(const SyncSource &source) const
Definition: iqueue.h:336
virtual bool onSRTPPacketError(IncomingRTPPkt &pkt, int32 errorCode)
A hook that gets called if the decoding of an incoming SRTP was erroneous.
Definition: iqueue.h:1234
void updateConflict(ConflictingTransportAddress &ca)
Definition: iqueue.h:273
uint8 getDefaultMinValidPacketSequence() const
Definition: iqueue.h:1002
void removeInQueueCryptoContext(CryptoContext *cc)
Remove input queue CryptoContext.
Definition: incqueue.cpp:748
virtual size_t getNextDataPacketSize() const =0
std::list< CryptoContext * > cryptoContexts
Definition: iqueue.h:1290
ConflictingTransportAddress * firstConflict
Definition: iqueue.h:285
ConflictingTransportAddress * searchControlConflict(InetAddress na, tpport_t ctp)
Definition: incqueue.cpp:64
void setInQueueCryptoContext(CryptoContext *cc)
Set input queue CryptoContext.
Definition: incqueue.cpp:729
void setControlTransportPort(SyncSource &source, tpport_t p)
Definition: iqueue.h:158