1 #ifndef _RMS_RECEIVER_H 2 #define _RMS_RECEIVER_H 15 #include <NovaDAQUtilities/Runnable.h> 16 #include <rms/base/RmsCloseable.h> 17 #include <rms/base/RmsDestination.h> 18 #include <rms/base/RmsRuntimeException.h> 20 #include <boost/shared_ptr.hpp> 21 #include <boost/thread/thread.hpp> 22 #include <boost/interprocess/smart_ptr/unique_ptr.hpp> 37 assert( runnable != NULL );
49 if (thread->joinable())
57 template <
typename MSG>
60 void processMessage(boost::shared_ptr<MSG>
message);
63 template <
typename MSG>
70 template <
class CONN,
typename MSG,
class LSTNR>
74 unsigned int receiverHandle,
75 boost::shared_ptr<LSTNR> listener,
78 unsigned int receiverHandle,
91 template <
class CONN,
typename MSG,
class LSTNR>
94 unsigned int receiverHandle,
95 boost::shared_ptr<LSTNR> listener,
98 _myConnection = connection;
99 _receiverHandle = receiverHandle;
100 _listener = listener;
102 _targetName = targetName;
105 template <
class CONN,
typename MSG,
class LSTNR>
108 unsigned int receiverHandle,
112 _myConnection = connection;
113 _receiverHandle = receiverHandle;
114 _listenerRawPtr = listener;
115 _targetName = targetName;
118 template <
class CONN,
typename MSG,
class LSTNR>
120 _stopRequested =
false;
123 while (! _stopRequested) {
125 if (_listener.get() != 0) {
126 boost::shared_ptr<MSG>
message(
new MSG());
129 if (_myConnection->receiveMessage(_receiverHandle, *message,
131 _listener->processMessage(message);
134 else if (_listenerRawPtr != 0) {
135 boost::shared_ptr<MSG>
message(
new MSG());
138 if (_myConnection->receiveMessage(_receiverHandle, *message,
140 _listenerRawPtr->processMessage(message);
145 boost::this_thread::sleep(boost::posix_time::microseconds(250000));
149 LOG_EXCEPTION(
"Caught RmsNotConnectedException in ListenerLoop::run(); message=", ex);
152 boost::this_thread::sleep(boost::posix_time::microseconds(250000));
154 catch (gov::fnal::cd::rms::base:: RmsExitingProcessException& ex1) {
155 _myConnection->close();
159 LOG_EXCEPTION(
"Caught RmsNotConnectedException in ListenerLoop::run(); message=", ex1);
162 _myConnection->close();
166 LOG_EXCEPTION(
"Caught RmsNotConnectedException in ListenerLoop::run(); message=",
"");
187 template <
class CONN,
typename MSG,
class LSTNR = RmsDummyListener<MSG> >
191 RmsReceiver(boost::shared_ptr<CONN> parentConnection,
192 base::RmsDestination
dest);
195 void receiveMessage(MSG&
message);
197 bool receiveMessage(MSG& message,
unsigned int usecTimeout);
199 void setListener(boost::shared_ptr< LSTNR > listener);
201 void setListener(LSTNR* listener);
203 void close()
throw();
206 bool isRunsInListenerLoop();
221 boost::interprocess::unique_ptr< ListenerLoop<CONN, MSG, LSTNR>,
225 boost::interprocess::unique_ptr< boost::thread,
239 template <
class CONN,
typename MSG,
class LSTNR>
242 _myConnection(parentConn),
243 _myDestination(dest),
246 _hasBeenClosed(false)
251 msg.append(
"The current connection does not support ");
252 msg.append(
"the requested destination: ");
253 msg.append(dest.toString());
264 template <
class CONN,
typename MSG,
class LSTNR>
273 template <
class CONN,
typename MSG,
class LSTNR>
288 template <
class CONN,
typename MSG,
class LSTNR>
299 LOG_EXCEPTION(
"Caught RmsNotConnectedException in RmsReceiver::receiveMessage(); message=", ex );
308 LOG_EXCEPTION(
"Caught exception in RmsReceiver::receiveMessage(). Closed RMS connection.",
"");
326 template <
class CONN,
typename MSG,
class LSTNR>
328 unsigned int usecTimeout)
339 LOG_EXCEPTION(
"Caught RmsNotConnectedException in RmsReceiver::receiveMessage(); message=", ex);
348 LOG_EXCEPTION(
"Caught exception in RmsReceiver::receiveMessage(). Closing RMS connection.",
"");
366 template <
class CONN,
typename MSG,
class LSTNR>
382 listener, targetProperty));
396 template <
class CONN,
typename MSG,
class LSTNR>
412 listener, targetProperty));
421 template <
class CONN,
typename MSG,
class LSTNR>
boost::interprocess::unique_ptr< boost::thread, BoostThreadDeleter< boost::thread > > _listenerThread
#define LOG_EXCEPTION(message, what)
void processMessage(boost::shared_ptr< MSG > message)
RmsReceiver(boost::shared_ptr< CONN > parentConnection, base::RmsDestination dest)
void receiveMessage(MSG &message)
void operator()(T *thread)
void setListener(boost::shared_ptr< LSTNR > listener)
void operator()(R *runnable)
bool isRunsInListenerLoop()
boost::shared_ptr< CONN > _myConnection
base::RmsDestination _myDestination
#define GENERATE_RMS_RUNTIME_EXCEPTION(msg)
boost::shared_ptr< LSTNR > _listener
static const std::string TARGET_PROPERTY_NAME
#define GENERATE_RMS_NOTCONNECTED_EXCEPTION(msg)
assert(nhit_max >=nhit_nbins)
ListenerLoop(boost::shared_ptr< CONN > connection, unsigned int receiverHandle, boost::shared_ptr< LSTNR > listener, std::string targetName)
unsigned int _receiverHandle
boost::shared_ptr< CONN > _myConnection
boost::interprocess::unique_ptr< ListenerLoop< CONN, MSG, LSTNR >, RunnableDeleter< ListenerLoop< CONN, MSG, LSTNR > > > _listenerLoop
unsigned int _receiverHandle