ConcurrentQueue_t.cpp
Go to the documentation of this file.
1 #include "cppunit/extensions/HelperMacros.h"
2 
3 #include "NovaDAQUtilities/ConcurrentQueue.h"
4 #include "boost/thread.hpp"
5 #include "boost/shared_ptr.hpp"
6 #include "boost/bind.hpp"
7 
8 #include <math.h>
9 #include <vector>
10 
12 
13 class FillQueue
14 {
15 public:
16  FillQueue(boost::shared_ptr<queue_t>& p,
17  unsigned int delay,
18  unsigned int nEntries):
19  _sharedQueue(p),
20  _delay(delay),
21  _counter(nEntries+1)
22  { }
23 
24  void operator()();
25 
26  void waiting_fill();
27 
28 private:
29  boost::shared_ptr<queue_t> _sharedQueue;
30  unsigned int _delay;
31  unsigned int _counter;
32 };
33 
34 
36 {
37  while(--_counter)
38  {
39  sleep(_delay);
40  _sharedQueue->enq_nowait(_counter);
41  }
42 }
43 
45 {
46  while(--_counter) _sharedQueue->enq_wait(_counter);
47 }
48 
50 {
51 public:
52  DrainQueue(boost::shared_ptr<queue_t>& p, unsigned int delay) :
53  _sharedQueue(p),
54  _delay(delay),
55  _counter(0)
56  { }
57 
58  void operator()();
59  unsigned int count() const;
60 
61 private:
62  boost::shared_ptr<queue_t> _sharedQueue;
63  unsigned int _delay;
64  unsigned int _counter;
65 };
66 
68 {
70  while(true)
71  {
72  sleep(_delay);
73  if (_sharedQueue->deq_nowait(val)) {
74  ++_counter;
75  }
76  else {
77  return;
78  }
79  }
80 }
81 
82 unsigned int DrainQueue::count() const
83 {
84  return _counter;
85 }
86 
87 
88 /*
89  DrainTimedQueue is used for testing the timed-wait version of the
90  dequeue functionality.
91  */
92 
94 {
95 public:
96  DrainTimedQueue(boost::shared_ptr<queue_t>& p, unsigned int delay) :
97  _sharedQueue(p),
98  _delay(delay),
99  _counter(0)
100  { }
101 
102  void operator()();
103  unsigned int count() const;
104 
105 private:
106  boost::shared_ptr<queue_t> _sharedQueue;
107  unsigned int _delay;
108  unsigned int _counter;
109 };
110 
112 {
114  while(true)
115  {
116  sleep(_delay);
117  if (_sharedQueue->deq_nowait(val)) ++_counter;
118  else return;
119  }
120 }
121 
122 unsigned int DrainTimedQueue::count() const
123 {
124  return _counter;
125 }
126 
127 class testConcurrentQueue : public CppUnit::TestFixture
128 {
129  CPPUNIT_TEST_SUITE(testConcurrentQueue);
130  CPPUNIT_TEST(default_q_is_empty);
131  CPPUNIT_TEST(queue_is_fifo);
132  CPPUNIT_TEST(enq_and_deq);
133  CPPUNIT_TEST(many_fillers);
134  CPPUNIT_TEST(enq_timing);
135  CPPUNIT_TEST(change_capacity);
136  CPPUNIT_TEST(failiffull);
137  CPPUNIT_TEST(keepnewest);
138  CPPUNIT_TEST(rejectnewest);
139 
140  CPPUNIT_TEST_SUITE_END();
141 
142 public:
143  void setUp();
144  void tearDown();
145 
146  void default_q_is_empty();
147  void queue_is_fifo();
148  void enq_and_deq();
149  void many_fillers();
150  void enq_timing();
151  void change_capacity();
152  void failiffull();
153  void keepnewest();
154  void rejectnewest();
155 
156 private:
157  // No data members yet.
158 };
159 
160 void
162 {
163 }
164 
165 void
167 {
168 }
169 
170 void
172 {
173  std::cerr << "\nConcurrentQueue_t::default_q_is_empty\n";
175  CPPUNIT_ASSERT(q.empty());
176  CPPUNIT_ASSERT(!q.full());
177 }
178 
179 void
181 {
182  std::cerr << "\nConcurrentQueue_t::queue_is_fifo\n";
184  q.enq_nowait(1);
185  q.enq_nowait(2);
186  q.enq_nowait(3);
187  int value(0);
188  CPPUNIT_ASSERT(q.deq_nowait(value));
189  CPPUNIT_ASSERT(value == 1);
190  CPPUNIT_ASSERT(q.deq_nowait(value));
191  CPPUNIT_ASSERT(value == 2);
192  CPPUNIT_ASSERT(q.deq_nowait(value));
193  CPPUNIT_ASSERT(value == 3);
194  CPPUNIT_ASSERT(q.empty());
195  CPPUNIT_ASSERT(!q.full());
196 }
197 
198 void
200 {
201  std::cerr << "\nConcurrentQueue_t::enq_and_deq\n";
202  boost::shared_ptr<queue_t> q(new queue_t);
203  unsigned int delay = 0;
204  unsigned int num_items = 10000;
205  boost::thread producer(FillQueue(q, delay, num_items));
206  sleep(1); // give the producer a chance to start
207  boost::thread consumer(DrainQueue(q, delay));
208  producer.join();
209  // gross hack: give the consumer a chance to finish
210  for (unsigned int idx = 0; idx < 10; ++idx) {
211  sleep(1);
212  if (q->size() == 0) {break;}
213  }
214  CPPUNIT_ASSERT(q->size() == 0);
215 }
216 
217 void
219 {
220  std::cerr << "\nConcurrentQueue_t::many_fillers\n";
221  size_t num_fillers(3);
222  unsigned int num_items(10);
223  boost::shared_ptr<queue_t> q(new queue_t(num_items*num_fillers+1));
224 
225  boost::thread_group producers;
226  typedef std::vector<FillQueue> fillers_t;
227  fillers_t fillers(num_fillers, FillQueue(q, 0, num_items));
228  for (fillers_t::iterator
229  i = fillers.begin(),
230  e = fillers.end();
231  i != e;
232  ++i)
233  {
234  using boost::bind;
235  using boost::thread;
236  producers.add_thread(new thread(bind(&FillQueue::waiting_fill,
237  &*i)));
238  }
239  producers.join_all();
240  CPPUNIT_ASSERT(q->size() == num_items * num_fillers);
241 }
242 
243 void
245 {
246  std::cerr << "\nConcurrentQueue_t::enq_timing "
247  << "(this may take up to 30 seconds)\n";
248  queue_t q(1);
249 
250  // Queue is initially empty, so the first call should succeed.
251  CPPUNIT_ASSERT(q.enq_nowait(1));
252  CPPUNIT_ASSERT(q.size() == 1);
253  CPPUNIT_ASSERT(q.capacity() == 1);
254  CPPUNIT_ASSERT(q.full());
255 
256  // The queue is now full. The next enq should fail.
257  //edm::CPUTimer t;
258  //t.start();
259  CPPUNIT_ASSERT(!q.enq_nowait(1));
260  //t.stop();
261  // We somewhat arbitrarily choose 10 milliseconds as "immediately
262  // enough".
263  //CPPUNIT_ASSERT(t.realTime() < 0.01);
264 
265  // Now test the timeout version, with a range of timeouts.
266  for (unsigned long wait_time = 0; wait_time < 3; ++wait_time)
267  {
268  //t.reset();
269  CPPUNIT_ASSERT(q.size() == 1);
270  //t.start();
271  CPPUNIT_ASSERT(!q.enq_timed_wait(1, wait_time));
272  //t.stop();
273  // We somewhat arbitrarily choose 10 milliseconds as "good enough
274  // resolution".
275  //CPPUNIT_ASSERT(fabs(t.realTime()-wait_time) < 0.01);
276  }
277 
278  // Now test the version that waits indefinitiely. We fill the queue,
279  // start a draining thread that delays before each deq, and then
280  // make sure do eventually return from the call to enq_wait.
281  boost::shared_ptr<queue_t> qptr(new queue_t(1));
282  CPPUNIT_ASSERT(qptr->capacity() == 1);
283  CPPUNIT_ASSERT(qptr->enq_nowait(1));
284  CPPUNIT_ASSERT(qptr->size() == 1);
285 
286  unsigned long delay = 2;
287  boost::thread consumer(DrainQueue(qptr,delay));
288 
289  qptr->enq_wait(delay);
290  consumer.join();
291  CPPUNIT_ASSERT(qptr->empty());
292 }
293 
294 void
296 {
297  std::cerr << "\nConcurrentQueue_t::change_capacity\n";
298  queue_t q(1);
299  CPPUNIT_ASSERT(q.enq_nowait(1));
300  CPPUNIT_ASSERT(!q.enq_nowait(1));
301  CPPUNIT_ASSERT(!q.set_capacity(2)); // did not reset
302  CPPUNIT_ASSERT(!q.enq_nowait(3)); // ... so this fails.
303 
304  q.clear();
305  CPPUNIT_ASSERT(q.set_capacity(2));
306  CPPUNIT_ASSERT(q.enq_nowait(1));
307  CPPUNIT_ASSERT(q.enq_nowait(2));
308  CPPUNIT_ASSERT(!q.enq_nowait(3));
309  CPPUNIT_ASSERT(q.size() == 2);
310  CPPUNIT_ASSERT(q.capacity() == 2);
311 }
312 
313 void
315 {
316  std::cerr << "\nConcurrentQueue_t::failiffull\n";
318  CPPUNIT_ASSERT(q.enq_nowait(1));
319  CPPUNIT_ASSERT(!q.enq_nowait(2));
320  CPPUNIT_ASSERT(q.size() == 1);
321  int value;
322  CPPUNIT_ASSERT(q.deq_nowait(value));
323  CPPUNIT_ASSERT(value==1);
324 }
325 
326 void
328 {
329  std::cerr << "\nConcurrentQueue_t::keepnewest\n";
331  q.enq_nowait(1);
332  q.enq_nowait(2);
333  CPPUNIT_ASSERT(q.size() == 1);
334  int value;
335  CPPUNIT_ASSERT(q.deq_nowait(value));
336  CPPUNIT_ASSERT(value == 2);
337 }
338 
339 void
341 {
342  std::cerr << "\nConcurrentQueue_t::rejectnewest\n";
344  q.enq_nowait(1);
345  q.enq_nowait(2);
346  CPPUNIT_ASSERT(q.size() == 1);
347  int value;
348  CPPUNIT_ASSERT(q.deq_nowait(value));
349  CPPUNIT_ASSERT(value == 1);
350 }
351 
353 CPPUNIT_REGISTRY_ADD_TO_DEFAULT("NovaDAQUtilities");
354 
355 /// emacs configuration
356 /// Local Variables: -
357 /// mode: c++ -
358 /// c-basic-offset: 2 -
359 /// indent-tabs-mode: nil -
360 /// End: -
unsigned int _counter
size_type size() const
DrainQueue(boost::shared_ptr< queue_t > &p, unsigned int delay)
boost::shared_ptr< queue_t > _sharedQueue
unsigned int _delay
bool deq_nowait(value_type &item)
const char * p
Definition: xmltok.h:285
boost::shared_ptr< queue_t > _sharedQueue
bool set_capacity(size_type n)
size_type capacity() const
OStream cerr
Definition: OStream.cxx:7
novadaq::ConcurrentQueue< int > queue_t
FillQueue(boost::shared_ptr< queue_t > &p, unsigned int delay, unsigned int nEntries)
unsigned int _delay
const XML_Char int const XML_Char * value
Definition: expat.h:331
EnqPolicy::return_type enq_nowait(value_type const &item)
boost::shared_ptr< queue_t > _sharedQueue
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(testConcurrentQueue,"NovaDAQUtilities")
bool enq_timed_wait(value_type const &p, unsigned long wait_sec)
unsigned int count() const
CPPUNIT_REGISTRY_ADD_TO_DEFAULT("NovaDAQUtilities")
unsigned int _counter
unsigned int count() const
Float_t e
Definition: plot.C:35
DrainTimedQueue(boost::shared_ptr< queue_t > &p, unsigned int delay)