RmsReceiver.h
Go to the documentation of this file.
1 #ifndef _RMS_RECEIVER_H
2 #define _RMS_RECEIVER_H
3 
4 /**
5  * @package gov.fnal.cd.rms
6  *
7  * @brief Classes that allow someone to send and receive
8  * messages utilizing the Responsive Messaging System.
9  *
10  * This namespace includes the Sender and Receiver classes
11  * as well as interfaces that can be implemented to handle
12  * messages straight out of RMS.
13  */
14 
15 #include <NovaDAQUtilities/Runnable.h>
16 #include <rms/base/RmsCloseable.h>
17 #include <rms/base/RmsDestination.h>
18 #include <rms/base/RmsRuntimeException.h>
19 
20 #include <boost/shared_ptr.hpp>
21 #include <boost/thread/thread.hpp>
22 #include <boost/interprocess/smart_ptr/unique_ptr.hpp>
23 
24 #include <string>
25 #include <iostream>
26 
27 namespace gov {
28 
29 namespace fnal {
30 
31 namespace cd {
32 
33 namespace rms {
34 
35 template<typename R> struct RunnableDeleter {
36  void operator()(R *runnable) {
37  assert( runnable != NULL );
38  runnable->stop();
39  delete runnable;
40  }
41 };
42 
43 template<typename T> struct BoostThreadDeleter {
44  void operator()(T * thread) {
45  assert( thread != NULL );
46 
47 // thread->interrupt();
48 
49  if (thread->joinable())
50  thread->join();
51 
52  delete thread;
53  }
54 };
55 
56 
57 template <typename MSG>
59 public:
60  void processMessage(boost::shared_ptr<MSG> message);
61 };
62 
63 template <typename MSG>
64 void RmsDummyListener<MSG>::processMessage(boost::shared_ptr<MSG> message)
65 {
66  // do nothing
67 }
68 
69 
70 template <class CONN, typename MSG, class LSTNR>
72 public:
73  ListenerLoop(boost::shared_ptr<CONN> connection,
74  unsigned int receiverHandle,
75  boost::shared_ptr<LSTNR> listener,
76  std::string targetName);
77  ListenerLoop(boost::shared_ptr<CONN> connection,
78  unsigned int receiverHandle,
79  LSTNR* listener,
80  std::string targetName);
81  void run();
82 
83 private:
84  boost::shared_ptr<CONN> _myConnection;
85  unsigned int _receiverHandle;
86  boost::shared_ptr<LSTNR> _listener;
89 };
90 
91 template <class CONN, typename MSG, class LSTNR>
93 ListenerLoop(boost::shared_ptr<CONN> connection,
94  unsigned int receiverHandle,
95  boost::shared_ptr<LSTNR> listener,
96  std::string targetName)
97 {
98  _myConnection = connection;
99  _receiverHandle = receiverHandle;
100  _listener = listener;
101  _listenerRawPtr = 0;
102  _targetName = targetName;
103 }
104 
105 template <class CONN, typename MSG, class LSTNR>
107 ListenerLoop(boost::shared_ptr<CONN> connection,
108  unsigned int receiverHandle,
109  LSTNR* listener,
110  std::string targetName)
111 {
112  _myConnection = connection;
113  _receiverHandle = receiverHandle;
114  _listenerRawPtr = listener;
115  _targetName = targetName;
116 }
117 
118 template <class CONN, typename MSG, class LSTNR>
120  _stopRequested = false;
121  _running = true;
122 
123  while (! _stopRequested) {
124  try {
125  if (_listener.get() != 0) {
126  boost::shared_ptr<MSG> message(new MSG());
127  //std::cout << "***** Before thread receive " << _targetName
128  // << " " << _receiverHandle << std::endl;
129  if (_myConnection->receiveMessage(_receiverHandle, *message,
130  250000)) {
131  _listener->processMessage(message);
132  }
133  }
134  else if (_listenerRawPtr != 0) {
135  boost::shared_ptr<MSG> message(new MSG());
136  //std::cout << "***** Before thread receive " << _targetName
137  // << " " << _receiverHandle << std::endl;
138  if (_myConnection->receiveMessage(_receiverHandle, *message,
139  250000)) {
140  _listenerRawPtr->processMessage(message);
141  }
142  }
143  else {
144  //interruptable sleep
145  boost::this_thread::sleep(boost::posix_time::microseconds(250000));
146  }
147  }
149  LOG_EXCEPTION("Caught RmsNotConnectedException in ListenerLoop::run(); message=", ex);
150 
151  //interruptable sleep
152  boost::this_thread::sleep(boost::posix_time::microseconds(250000));
153  }
154  catch (gov::fnal::cd::rms::base:: RmsExitingProcessException& ex1) {
155  _myConnection->close(); //does not throw
156 
157  _stopRequested=true;
158 
159  LOG_EXCEPTION("Caught RmsNotConnectedException in ListenerLoop::run(); message=", ex1);
160  }
161  catch ( ... ) {
162  _myConnection->close(); //does not throw
163 
164  _stopRequested=true;
165 
166  LOG_EXCEPTION("Caught RmsNotConnectedException in ListenerLoop::run(); message=","");
167  }
168  }
169 
170  _running = false;
171 
172  #if CETDDS_DEBUG1
173  std::cout << "Exiting an RmsReceiver listener loop." << std::endl;
174  #endif
175 }
176 
177 
178 
179 /**
180  * This class contains functionality for receiving messages of specified
181  * types from a specified destination.
182  *
183  * @author Kurt Biery
184  * @version $Revision: 1.12 $ $Date: 2011/05/12 17:11:14 $
185  */
186 
187 template <class CONN, typename MSG, class LSTNR = RmsDummyListener<MSG> >
188 class RmsReceiver : public base::RmsCloseable {
189 
190 public:
191  RmsReceiver(boost::shared_ptr<CONN> parentConnection,
192  base::RmsDestination dest);
193  ~RmsReceiver();
194 
195  void receiveMessage(MSG& message);
196 
197  bool receiveMessage(MSG& message, unsigned int usecTimeout);
198 
199  void setListener(boost::shared_ptr< LSTNR > listener);
200 
201  void setListener(LSTNR* listener);
202 
203  void close() throw();
204  bool hasBeenClosed() throw(){return _hasBeenClosed;};
205 private:
206  bool isRunsInListenerLoop();
207 
208 private:
209  /**
210  * Handle to the RMS connection
211  */
212  boost::shared_ptr<CONN> _myConnection;
213 
214  /**
215  * RmsDestination that we are listening to
216  */
217  base::RmsDestination _myDestination;
218 
219  unsigned int _receiverHandle;
220 
221  boost::interprocess::unique_ptr< ListenerLoop<CONN, MSG, LSTNR>,
223 
224 
225  boost::interprocess::unique_ptr< boost::thread,
227 
229 };
230 
231 /**
232  * Constructs an RmsReceiver instance.
233  *
234  * @param parentConn Connection object which is used for
235  * receiving messages.
236  * @param dest The destination that we are receiving messages from.
237  * @throws RmsRuntimeException If the destination is invalid.
238  */
239 template <class CONN, typename MSG, class LSTNR>
241 RmsReceiver(boost::shared_ptr<CONN> parentConn, base::RmsDestination dest) :
242  _myConnection(parentConn),
243  _myDestination(dest),
244  _listenerLoop(0),
245  _listenerThread(0),
246  _hasBeenClosed(false)
247 {
248  dest.validate();
249  if (! _myConnection->supportsDestination(dest)) {
251  msg.append("The current connection does not support ");
252  msg.append("the requested destination: ");
253  msg.append(dest.toString());
255  }
256 
257  MSG dummyObj;
258  _receiverHandle = _myConnection->registerReceiver(dummyObj, _myDestination);
259 }
260 
261 /**
262  * Destructs an RmsReceiver instance.
263  */
264 template <class CONN, typename MSG, class LSTNR>
266 {
267  this->close(); //does not throw
268 }
269 
270 /**
271  * Checks if an RmsReceiver instance runs in a ListenerLoop.
272  */
273 template <class CONN, typename MSG, class LSTNR>
275 {
276  return (_listenerLoop.get() != 0 && _listenerThread.get() != 0);
277 }
278 
279 
280 /**
281  * Receives the next message. This function will block until a
282  * message is received.
283  *
284  * @param the message reference to copy the received message into
285  * @throws RmsCommunicationException if there is a problem receiving
286  * the next message.
287  */
288 template <class CONN, typename MSG, class LSTNR>
290 {
291  try
292  {
293  if (_hasBeenClosed)
294  GENERATE_RMS_NOTCONNECTED_EXCEPTION("RMS connection was closed.");
295 
296  _myConnection->receiveMessage(_receiverHandle, message);
297  }
299  LOG_EXCEPTION( "Caught RmsNotConnectedException in RmsReceiver::receiveMessage(); message=", ex );
300 
301  //rethrow if running inside a listener thread
302  if (isRunsInListenerLoop())
303  throw;
304  }
305  catch ( ... ) {
306  this->close(); //does not throw
307 
308  LOG_EXCEPTION( "Caught exception in RmsReceiver::receiveMessage(). Closed RMS connection.","");
309 
310  //rethrow if running inside a listener thread
311  if (isRunsInListenerLoop())
312  throw;
313  }
314 }
315 
316 /**
317  * Receives the next message. This function will return after the
318  * specified timeout if no message has been received in that time.
319  *
320  * @param the message reference to copy the received message into
321  * @param the amount of time (in microseconds) to wait for a message
322  * @returns true if a message was received, false otherwise
323  * @throws RmsCommunicationException if there is a problem receiving
324  * the next message.
325  */
326 template <class CONN, typename MSG, class LSTNR>
328  unsigned int usecTimeout)
329 {
330  try
331  {
332  if (_hasBeenClosed)
333  GENERATE_RMS_NOTCONNECTED_EXCEPTION("RMS connection was closed.");
334 
335  return _myConnection->receiveMessage(_receiverHandle, message,
336  usecTimeout);
337  }
339  LOG_EXCEPTION("Caught RmsNotConnectedException in RmsReceiver::receiveMessage(); message=", ex);
340 
341  //rethrow if running inside a listener thread
342  if (isRunsInListenerLoop())
343  throw;
344  }
345  catch ( ... ) {
346  this->close(); //does not throw
347 
348  LOG_EXCEPTION("Caught exception in RmsReceiver::receiveMessage(). Closing RMS connection.","");
349 
350  //rethrow if running inside a listener thread
351  if (isRunsInListenerLoop())
352  throw;
353  }
354 
355  return false;
356 }
357 
358 /**
359  * Sets the listener for this receiver. *Note* that this method starts
360  * a background thread to receive messages, *and* the presence of a
361  * listener makes the receiving of messages with the
362  * receiveMessage() method rather irrelevant.
363  *
364  * @param the listener to be called when messages arrive
365  */
366 template <class CONN, typename MSG, class LSTNR>
368 setListener(boost::shared_ptr<LSTNR> listener)
369 {
370  // clean up any existing listener
371  if (_listenerLoop.get() != 0) {
372  _listenerLoop->stop();
373  }
374 
375  _listenerThread.reset(); //joins and deletes the listener thread
376 
377  std::string targetProperty =
379 
380  _listenerLoop.reset(
382  listener, targetProperty));
383 
384  _listenerThread.reset(
385  new boost::thread(boost::bind(&novadaq::Runnable::run, _listenerLoop.get())));
386 }
387 
388 /**
389  * Sets the listener for this receiver. *Note* that this method starts
390  * a background thread to receive messages, *and* the presence of a
391  * listener makes the receiving of messages with the
392  * receiveMessage() method rather irrelevant.
393  *
394  * @param the listener to be called when messages arrive
395  */
396 template <class CONN, typename MSG, class LSTNR>
398 setListener(LSTNR* listener)
399 {
400  // clean up any existing listener
401  if (_listenerLoop.get() != 0) {
402  _listenerLoop->stop();
403  }
404 
405  _listenerThread.reset(); //joins and deletes the listener thread
406 
407  std::string targetProperty =
409 
410  _listenerLoop.reset(
412  listener, targetProperty));
413 
414  _listenerThread.reset(new boost::thread(
415  boost::bind(&novadaq::Runnable::run, _listenerLoop.get())));
416 }
417 
418 /**
419  * Closes the RmsReceiver.
420  */
421 template <class CONN, typename MSG, class LSTNR>
423 {
424  if (_hasBeenClosed) {
425  return;
426  }
427 
428  _hasBeenClosed = true;
429 
430  if (_listenerLoop.get() != 0) {
431  _listenerLoop->stop(); //not expected to throw
432  }
433 
434  _listenerThread.reset(); //joins and deletes the listener thread
435  _listenerLoop.reset(); //deletes the listener loop
436 
437  try {
438  _myConnection->unRegisterReceiver(_receiverHandle);
439  } catch (...) {}
440 
441  _myConnection.reset();
442 #if CETDDS_DEBUG1
443  std::cout << "Closed RmsReceiver connection" << std::endl;
444 #endif
445 }
446 
447 }; // end of namespace rms
448 
449 }; // end of namespace cd
450 
451 }; // end of namespace fnal
452 
453 }; // end of namespace gov
454 
455 #endif
456 // kate: indent-mode cstyle; space-indent on; indent-width 0;
boost::interprocess::unique_ptr< boost::thread, BoostThreadDeleter< boost::thread > > _listenerThread
Definition: RmsReceiver.h:226
#define LOG_EXCEPTION(message, what)
void processMessage(boost::shared_ptr< MSG > message)
Definition: RmsReceiver.h:64
RmsReceiver(boost::shared_ptr< CONN > parentConnection, base::RmsDestination dest)
Definition: RmsReceiver.h:241
virtual void run()=0
void receiveMessage(MSG &message)
Definition: RmsReceiver.h:289
void setListener(boost::shared_ptr< LSTNR > listener)
Definition: RmsReceiver.h:368
#define R(x)
Definition: fnal.py:1
boost::shared_ptr< CONN > _myConnection
Definition: RmsReceiver.h:212
base::RmsDestination _myDestination
Definition: RmsReceiver.h:217
#define GENERATE_RMS_RUNTIME_EXCEPTION(msg)
boost::shared_ptr< LSTNR > _listener
Definition: RmsReceiver.h:86
OStream cout
Definition: OStream.cxx:6
static const std::string TARGET_PROPERTY_NAME
#define GENERATE_RMS_NOTCONNECTED_EXCEPTION(msg)
assert(nhit_max >=nhit_nbins)
ListenerLoop(boost::shared_ptr< CONN > connection, unsigned int receiverHandle, boost::shared_ptr< LSTNR > listener, std::string targetName)
Definition: RmsReceiver.h:93
double T
Definition: Xdiff_gwt.C:5
boost::shared_ptr< CONN > _myConnection
Definition: RmsReceiver.h:84
c cd(1)
procfile close()
boost::interprocess::unique_ptr< ListenerLoop< CONN, MSG, LSTNR >, RunnableDeleter< ListenerLoop< CONN, MSG, LSTNR > > > _listenerLoop
Definition: RmsReceiver.h:222
enum BeamMode string