ccRTP 2.1.2
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
pool.cpp
Go to the documentation of this file.
1 // Copyright (C) 2000-2015 Federico Montesino Pouzols <fedemp@altern.org>
2 //
3 // This program is free software; you can redistribute it and/or modify
4 // it under the terms of the GNU General Public License as published by
5 // the Free Software Foundation; either version 2 of the License, or
6 // (at your option) any later version.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 //
17 // As a special exception, you may use this file as part of a free software
18 // library without restriction. Specifically, if other files instantiate
19 // templates or use macros or inline functions from this file, or you compile
20 // this file and link it with other files to produce an executable, this
21 // file does not by itself cause the resulting executable to be covered by
22 // the GNU General Public License. This exception does not however
23 // invalidate any other reasons why the executable file might be covered by
24 // the GNU General Public License.
25 //
26 // This exception applies only to the code released under the name GNU
27 // ccRTP. If you copy code from other releases into a copy of GNU
28 // ccRTP, as the General Public License permits, the exception does
29 // not apply to the code that you add in this way. To avoid misleading
30 // anyone as to the status of such modified files, you must delete
31 // this exception notice from them.
32 //
33 // If you write modifications of your own for GNU ccRTP, it is your choice
34 // whether to permit this exception to apply to your modifications.
35 // If you do not wish that, delete this exception notice.
36 //
37 
38 #include "private.h"
39 #include <ccrtp/pool.h>
40 
41 #include <algorithm>
42 
43 NAMESPACE_COMMONCPP
44 using std::list;
45 
47 {
48 #ifndef _MSWINDOWS_
49  highestSocket = 0;
50  setPoolTimeout(0,3000);
51  FD_ZERO(&recvSocketSet);
52 #endif
53 }
54 
55 bool
57 {
58 #ifndef _MSWINDOWS_
59  bool result = false;
60  poolLock.writeLock();
61  // insert in list.
62  PredEquals predEquals(&session);
63  if ( sessionList.end() == std::find_if(sessionList.begin(),sessionList.end(),predEquals) ) {
64  result = true;
65  sessionList.push_back(new SessionListElement(&session));
66  } else {
67  result = false;
68  }
69  poolLock.unlock();
70  return result;
71 #else
72  return false;
73 #endif
74 }
75 
76 bool
78 {
79 #ifndef _MSWINDOWS_
80  bool result = false;
81  poolLock.writeLock();
82  // remove from list.
83  PredEquals predEquals(&session);
84  PoolIterator i;
85  if ( sessionList.end() != (i = find_if(sessionList.begin(),sessionList.end(),predEquals)) ) {
86  (*i)->clear();
87  result = true;
88  } else {
89  result = false;
90  }
91  poolLock.unlock();
92  return result;
93 #else
94  return false;
95 #endif
96 }
97 
98 size_t
100 {
101 #ifndef _MSWINDOWS_
102  size_t result;
103  poolLock.readLock();
104  result = sessionList.size();
105  poolLock.unlock();
106  return result;
107 #else
108  return 0;
109 #endif
110 }
111 
112 void
114 {
115 #ifndef _MSWINDOWS_
116  SOCKET so;
117  microtimeout_t packetTimeout(0);
118  while ( isActive() ) {
119  poolLock.readLock();
120  // Make a copy of the list so that add and remove does
121  // not affect the list during this loop iteration
122  list<SessionListElement*> sessions(sessionList);
123  poolLock.unlock();
124 
125  PoolIterator i = sessions.begin();
126  while ( i != sessions.end() ) {
127  poolLock.readLock();
128  if (!(*i)->isCleared()) {
129  RTPSessionBase* session((*i)->get());
130  controlReceptionService(*session);
131  controlTransmissionService(*session);
132  }
133  poolLock.unlock();
134  i++;
135  }
136  timeval timeout = getPoolTimeout();
137 
138  // Reinitializa fd set
139  FD_ZERO(&recvSocketSet);
140  poolLock.readLock();
141  highestSocket = 0;
142  for (PoolIterator j = sessions.begin(); j != sessions.end (); j++) {
143  if (!(*j)->isCleared()) {
144  RTPSessionBase* session((*j)->get());
145  SOCKET s = getDataRecvSocket(*session);
146  FD_SET(s,&recvSocketSet);
147  if ( s > highestSocket + 1 )
148  highestSocket = s + 1;
149  }
150  }
151  poolLock.unlock();
152 
153 
154  int n = select(highestSocket,&recvSocketSet,NULL,NULL,
155  &timeout);
156 
157  i = sessions.begin();
158  while ( (i != sessions.end()) ) {
159  poolLock.readLock();
160  if (!(*i)->isCleared()) {
161  RTPSessionBase* session((*i)->get());
162  so = getDataRecvSocket(*session);
163  if ( FD_ISSET(so,&recvSocketSet) && (n-- > 0) ) {
164  takeInDataPacket(*session);
165  }
166 
167  // schedule by timestamp, as in
168  // SingleThreadRTPSession (by Joergen
169  // Terner)
170  if (packetTimeout < 1000) {
171  packetTimeout = getSchedulingTimeout(*session);
172  }
173  microtimeout_t maxWait =
175  // make sure the scheduling timeout is
176  // <= the check interval for RTCP
177  // packets
178  packetTimeout = (packetTimeout > maxWait)? maxWait : packetTimeout;
179  if ( packetTimeout < 1000 ) { // !(packetTimeout/1000)
180  dispatchDataPacket(*session);
181  //timerTick();
182  } else {
183  packetTimeout = 0;
184  }
185  }
186  poolLock.unlock();
187  i++;
188  }
189 
190  // Purge elements for removed sessions.
191  poolLock.writeLock();
192  i = sessionList.begin();
193  while (i != sessionList.end()) {
194  if ((*i)->isCleared()) {
195  SessionListElement* element(*i);
196  i = sessionList.erase(i);
197  delete element;
198  }
199  else {
200  ++i;
201  }
202  }
203  poolLock.unlock();
204 
205  //GF we added that to allow the kernel scheduler to
206  // give other tasks some time as if we have lots of
207  // active sessions the thread cann take all the CPU if we
208  // don't pause at all. We haven't found the best way to
209  // do that yet.
210  // usleep (10);
211  yield();
212  }
213 #endif // ndef WIN32
214 }
215 
216 #if defined(_MSC_VER) && _MSC_VER >= 1300
218 const InetHostAddress& ia, tpport_t dataPort, tpport_t controlPort, int pri,
219 uint32 memberssize, RTPApplication& app) :
221 (ia,dataPort,controlPort,memberssize,app)
222 {}
223 
225 const InetMcastAddress& ia, tpport_t dataPort, tpport_t controlPort, int pri,
226 uint32 memberssize, RTPApplication& app, uint32 iface) :
228 (ia,dataPort,controlPort,memberssize,app,iface)
229 {}
230 
232 {
233  enableStack();
234  Thread::start();
235 }
236 
238 {
240 }
241 
243 {}
244 
246 {
247  microtimeout_t timeout = 0;
248  while ( ServiceQueue::isActive() ) {
249  if ( timeout < 1000 ){ // !(timeout/1000)
250  timeout = getSchedulingTimeout();
251  }
252  controlReceptionService();
253  controlTransmissionService();
254  microtimeout_t maxWait =
255  timeval2microtimeout(getRTCPCheckInterval());
256  // make sure the scheduling timeout is
257  // <= the check interval for RTCP
258  // packets
259  timeout = (timeout > maxWait)? maxWait : timeout;
260  if ( timeout < 1000 ) { // !(timeout/1000)
261  dispatchDataPacket();
262  timerTick();
263  } else {
264  if ( isPendingData(timeout/1000) ) {
265  takeInDataPacket();
266  }
267  timeout = 0;
268  }
269  }
270  dispatchBYE("GNU ccRTP stack finishing.");
271  Thread::exit();
272 }
273 
274 
275 #ifdef CCXX_IPV6
276 
277 SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>(
278 const IPV6Host& ia, tpport_t dataPort, tpport_t controlPort, int pri,
279 uint32 memberssize, RTPApplication& app) :
280 Thread(pri), TRTPSessionBaseIPV6<RTPDataChannel,RTCPChannel,ServiceQueue>
281 (ia,dataPort,controlPort,memberssize,app)
282 {}
283 
284 SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>(
285 const IPV6Multicast& ia, tpport_t dataPort, tpport_t controlPort, int pri,
286 uint32 memberssize, RTPApplication& app, uint32 iface) :
287 Thread(pri), TRTPSessionBaseIPV6<RTPDataChannel,RTCPChannel,ServiceQueue>
288 (ia,dataPort,controlPort,memberssize,app,iface)
289 {}
290 
291 void SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::startRunning()
292 {
293  enableStack();
294  Thread::start();
295 }
296 
297 bool SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::isPendingData(microtimeout_t timeout)
298 {
299  return TRTPSessionBaseIPV6<RTPDataChannel,RTCPChannel,ServiceQueue>::isPendingData(timeout);
300 }
301 
302 void SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::timerTick(void)
303 {}
304 
305 void SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::run(void)
306 {
307  microtimeout_t timeout = 0;
308  while ( ServiceQueue::isActive() ) {
309  if ( timeout < 1000 ){ // !(timeout/1000)
310  timeout = getSchedulingTimeout();
311  }
312  controlReceptionService();
313  controlTransmissionService();
314  microtimeout_t maxWait =
315  timeval2microtimeout(getRTCPCheckInterval());
316  // make sure the scheduling timeout is
317  // <= the check interval for RTCP
318  // packets
319  timeout = (timeout > maxWait)? maxWait : timeout;
320  if ( timeout < 1000 ) { // !(timeout/1000)
321  dispatchDataPacket();
322  timerTick();
323  } else {
324  if ( isPendingData(timeout/1000) ) {
325  takeInDataPacket();
326  }
327  timeout = 0;
328  }
329  }
330  dispatchBYE("GNU ccRTP stack finishing.");
331  Thread::exit();
332 }
333 
334 
335 #endif
336 
337 
338 #endif
339 
340 END_NAMESPACE
341 
std::list< SessionListElement * >::iterator PoolIterator
Definition: pool.h:190
std::list< SessionListElement * > sessionList
Definition: pool.h:189
size_t dispatchDataPacket(RTPSessionBase &s)
Definition: pool.h:67
void controlTransmissionService(RTPSessionBase &s)
Definition: pool.h:75
An RTP application, holding identifying RTCP SDES item values.
Definition: sources.h:364
Pools of RTP sessions.
DualRTPChannel< RTPBaseUDPIPv4Socket > DualRTPUDPIPv4Channel
Definition: channel.h:444
virtual void timerTick(void)
Definition: rtp.h:543
uint32 microtimeout_t
Time interval expressed in microseconds.
Definition: base.h:67
void run()
Runnable method for the thread.
Definition: pool.cpp:113
Generic RTP protocol stack for exchange of realtime data.
microtimeout_t timeval2microtimeout(const timeval &t)
Convert a time interval, expressed as a timeval value into a microseconds counter.
Definition: base.h:90
SOCKET getDataRecvSocket(RTPSessionBase &s) const
Definition: pool.h:78
RTPSessionPool()
Definition: pool.cpp:46
Declaration of ccRTP internal stuff.
virtual bool isPendingData(microtimeout_t timeout)
Definition: rtp.h:546
size_t takeInDataPacket(RTPSessionBase &s)
Definition: pool.h:63
std equality for SessionListElement objects.
Definition: pool.h:128
SOCKET highestSocket
Definition: pool.h:196
fd_set recvSocketSet
Definition: pool.h:195
timeval getRTCPCheckInterval(RTPSessionBase &s)
Definition: pool.h:59
void startRunning()
Activate stack and start service thread.
Definition: rtp.h:508
bool isPendingData(microtimeout_t timeout)
Definition: rtp.h:199
virtual void run(void)
Single runnable method for this RTP stacks, schedules outgoing and incoming RTP data and RTCP packets...
Definition: rtp.h:553
Class for tracking session status.
Definition: pool.h:92
bool isActive()
Definition: pool.h:173
bool removeSession(RTPSessionBase &session)
Definition: pool.cpp:77
microtimeout_t getSchedulingTimeout(RTPSessionBase &s)
Definition: pool.h:56
void controlReceptionService(RTPSessionBase &s)
Definition: pool.h:71
This template class adds the threading aspect to the RTPSessionBase template in one of the many possi...
Definition: rtp.h:418
ThreadLock poolLock
Definition: pool.h:192
timeval getPoolTimeout()
Definition: pool.h:180
bool addSession(RTPSessionBase &session)
Definition: pool.cpp:56
size_t getPoolLength() const
Definition: pool.cpp:99
void setPoolTimeout(int sec, int usec)
Definition: pool.h:183
This class, an RTP/RTCP queue, adds audio/video profile (AVP) specific methods to the generic RTCP se...
Definition: cqueue.h:708