RmsProducer.cpp
Go to the documentation of this file.
1 #include <rms/RmsProducer.h>
2 #include <rms/base/RmsRuntimeException.h>
3 
4 namespace gov {
5 
6 namespace fnal {
7 
8 namespace cd {
9 
10 namespace rms {
11 
12 /**
13  * Create a new producer given a connection and a destination.
14  *
15  * This constructor will not modify the reply timeout, which is set at
16  * 5 minutes by default.
17  *
18  * @param providerConnection Connection object, which is used for receiving
19  * and sending messages.
20  * @param dest The destination that we are sending messages to.
21  */
22 RmsProducer::RmsProducer(boost::shared_ptr<provider::RmsConnection> providerConnection,
23  base::RmsDestination dest) :
24  _providerConnection(providerConnection),
25  _replyTimeout(300),
26  _listenerLoop(&_inputQueue),
27  _listenerThread(boost::bind(&novadaq::Runnable::run, &_listenerLoop)) {
28  _myDestinations.push_back(dest);
29 
30  initialize();
31 }
32 
33 /**
34  * Create a new producer given a connection and a destination.
35  *
36  * This constructor will modify the reply timeout.
37  *
38  * @param providerConnection Connection object, which is used for receiving
39  * and sending messages.
40  * @param dest The destination that we are sending messages to.
41  * @param replyTimeout The amount of time in seconds to wait for
42  * a reply to a message.
43  */
44 RmsProducer::RmsProducer(boost::shared_ptr<provider::RmsConnection> providerConnection,
45  base::RmsDestination dest, int replyTimeout) :
46  _providerConnection(providerConnection),
48  _listenerThread(boost::bind(&novadaq::Runnable::run, &_listenerLoop)) {
49  _replyTimeout = replyTimeout;
50 
51  _myDestinations.push_back(dest);
52 
53  initialize();
54 }
55 
56 /**
57  * Create a new producer given a connection and an array of destinations.
58  *
59  * This constructor will not modify the reply timeout, which is set at
60  * 5 minutes by default.
61  *
62  * @param providerConnection Connection object, which is used for receiving
63  * and sending messages.
64  * @param dest The destinations that we are sending messages to.
65  */
66 RmsProducer::RmsProducer(boost::shared_ptr<provider::RmsConnection> providerConnection,
67  std::vector <base::RmsDestination> dest) :
68  _providerConnection(providerConnection),
69  _replyTimeout(300),
71  _listenerThread(boost::bind(&novadaq::Runnable::run, &_listenerLoop)) {
73 
74  initialize();
75 }
76 
77 /**
78  * Create a new producer given a connection and an array of destinations.
79  *
80  * This constructor will modify the reply timeout.
81  *
82  * @param providerConnection Connection object, which is used for receiving
83  * and sending messages.
84  * @param dest The destinations that we are sending messages to.
85  * @param replyTimeout The amount of time in seconds to wait for
86  * a reply to a message.
87  */
88 RmsProducer::RmsProducer(boost::shared_ptr<provider::RmsConnection> providerConnection,
89  std::vector <base::RmsDestination> dest, int replyTimeout) :
90  _providerConnection(providerConnection),
92  _listenerThread(boost::bind(&novadaq::Runnable::run, &_listenerLoop)) {
93  _replyTimeout = replyTimeout;
94 
96 
97  initialize();
98 }
99 
100 /**
101  * Destructor, have close() do all the cleanup.
102  */
104  close();
105 }
106 
107 /**
108  * Send a string to the destinations that are associated with
109  * this producer. No record of this is kept, so it won't be
110  * possible to reply to this string.
111  *
112  * @param messageText The string to send out.
113  */
115  for (unsigned int i = 0; i < _myDestinations.size(); i++) {
116  _providerConnection->sendString(_myDestinations[i], messageText);
117  }
118 }
119 
120 /**
121  * Send a message. This is responsible for setting the replyDestination
122  * of the message, the sentTime and the destination. This will also
123  * add the message's ID to the reply list.
124  *
125  * @param message The message to be sent.
126  */
127 void RmsProducer::sendMessage(base::RmsMessage message) {
128  base::RmsDestination replyDestination;
129  const std::string messageUUID = _providerConnection->getUUID();
130  boost::posix_time::ptime now(boost::posix_time::microsec_clock::universal_time());
131 
132  message.setSentTime();
133  message.setId(messageUUID);
134 
135  replyDestination = _providerConnection->getMessageSource();
136 
137  // 21-Jun-2007, KAB - provide a default time to live (where should we get
138  // this from officially?)
139  message.setTimeToLive(5000);
140 
141  _replyIdTable.insert(std::pair<std::string,
142  boost::posix_time::ptime>(messageUUID, now));
143 
144  for (unsigned int i = 0; i < _myDestinations.size(); i++) {
145  message.setDestination(_myDestinations[i]);
146 
147  replyDestination.setProperty(base::RmsDestination::CHANNEL_PROPERTY_NAME,
149  replyDestination.validate();
150  message.setReplyDestination(replyDestination);
151 
152  _providerConnection->sendMessage(_myDestinations[i], message);
153  }
154 }
155 
156 /**
157  * Pull replies off the incoming message queue. This function will
158  * block until a message is received.
159  *
160  * @return An RmsMessage on success. This will create an RmsStatus
161  * object and insert it into the RmsMessage if an error occurs.
162  */
163 boost::shared_ptr<base::RmsMessage> RmsProducer::receiveReply() {
164  boost::shared_ptr<base::RmsMessage> receivedReply;
165  boost::shared_ptr<std::string> messageText;
166 
167  do {
168  messageText = _inputQueue.take();
169  receivedReply.reset(new base::RmsMessage);
170  receivedReply->deserialize(*messageText);
171  } while (!verify(receivedReply));
172 
173  return receivedReply;
174 }
175 
176 /**
177  * Pull replies off the incoming message queue. This function will
178  * block until a message is received, or the time expires; whichever
179  * happens first.
180  *
181  * @param timeout The maximum amount of time in seconds to wait if
182  * the message queue is empty before giving up and returning.
183  *
184  * @return An RmsMessage on success. This will return null if we fail
185  * to pull a message off the queue. If there is some sort of error in
186  * pulling a message off the queue or in decoding the message, an RmsMessage
187  * will be returned with an RmsStatus object inside it.
188  */
189 boost::shared_ptr<base::RmsMessage> RmsProducer::receiveReply(long timeout) {
190  boost::shared_ptr<base::RmsMessage> receivedReply;
191  boost::shared_ptr<std::string> messageText;
192 
193  do {
194  messageText = _inputQueue.poll(timeout);
195 
196  if (!messageText) {
197  receivedReply.reset();
198  return receivedReply;
199  }
200 
201  receivedReply.reset(new base::RmsMessage);
202  receivedReply->deserialize(*messageText);
203  } while (!verify(receivedReply));
204 
205  return receivedReply;
206 }
207 
208 /**
209  * Pull replies off the incoming message queue. This function will
210  * not block.
211  *
212  * @return An RmsMessage on success. This will return null if we timeout
213  * while trying to pull a message off the queue. If there is some sort of
214  * error in pulling a message off the queue or in decoding the message, an
215  * RmsMessage will be returned with an RmsStatus object inside it.
216  */
217 boost::shared_ptr<base::RmsMessage> RmsProducer::receiveReplyNoWait() {
218  boost::shared_ptr<base::RmsMessage> receivedReply;
219  boost::shared_ptr<std::string> messageText;
220 
221  do {
222  messageText = _inputQueue.poll();
223 
224  if (!messageText) {
225  receivedReply.reset();
226  return receivedReply;
227  }
228 
229  receivedReply.reset(new base::RmsMessage);
230  receivedReply->deserialize(*messageText);
231  } while (!verify(receivedReply));
232 
233  return receivedReply;
234 }
235 
236 /**
237  * Register a callback with the producer. This will start a
238  * thread that pulls replies off the input queue and sends them
239  * to the listener.
240  *
241  * @param listener The listener that will receive messages
242  *
243  * @return NONE
244  */
247 }
248 
249 /**
250  * Stop the delete the listener loop, as well as the
251  * provider listener and input queue.
252  */
255  _listenerThread.join();
256 
257  if (_providerListener) {
258  delete _providerListener;
259  _providerListener = NULL;
260  }
261 
262  return;
263 }
264 
265 /**
266  * Verify that a reply received is one that the producer
267  * is actually interested in. We maintain a list of all
268  * the message IDs that are sent out, and compare replies
269  * received to this list.
270  *
271  * @param message The message that we are checking.
272  *
273  * @return True is the message is supposed to come to
274  * this producer, false otherwise.
275  */
276 bool RmsProducer::verify(boost::shared_ptr<base::RmsMessage> message) {
277  boost::posix_time::ptime now(boost::posix_time::microsec_clock::universal_time());
278  boost::posix_time::ptime deadline = now - boost::posix_time::seconds(_replyTimeout);
279  std::map<std::string, boost::posix_time::ptime>::iterator replyIterator;
280 
281  if(message->getMessageStatus().getCode() != base::RmsStatus::SUCCESS_CODE) {
282  return true;
283  }
284 
285  /**
286  * Look through the list of IDs so we can nuke any that have
287  * expired.
288  */
289  replyIterator = _replyIdTable.begin();
290 
291  while (replyIterator != _replyIdTable.end()) {
292  if (replyIterator->second < deadline) {
293  _replyIdTable.erase(replyIterator);
294  }
295  replyIterator++;
296  }
297 
298  /**
299  * Try to locate the ID of the message that was passed into
300  * this method.
301  */
302  replyIterator = _replyIdTable.find(message->getCorrelationId());
303 
304  if (replyIterator != _replyIdTable.end()) {
305  _replyIdTable.erase(message->getCorrelationId());
306  return true;
307  }
308 
309  return false;
310 }
311 
312 /**
313  * Called by the constructor, this will verify that our provider
314  * can talk to the supplied destination, as well as creating the
315  * input queue, replyId list, the provider listener. This will
316  * also determine the reply destination.
317  */
319  unsigned int i;
320  base::RmsDestination replyDestination(_providerConnection->getMessageSource());
321 
322  for (i = 0; i < _myDestinations.size(); i++) {
324 
325  _myDestinations[i].validate();
326  if (!_providerConnection->supportsDestination(_myDestinations[i])) {
328  msg.append("The current connection does not support ");
329  msg.append("the requested destination: ");
330  msg.append(_myDestinations[i].toString());
332  }
333  }
334 
336 
337  for (std::set <std::string>::iterator channelIterator = _replyChannels.begin();
338  channelIterator != _replyChannels.end(); channelIterator++) {
339  replyDestination.setProperty(base::RmsDestination::CHANNEL_PROPERTY_NAME,
340  *channelIterator);
341  replyDestination.validate();
342 
344  _providerConnection->addListener(replyDestination, _providerListener);
345  }
346 }
347 
348 } // end of namespace rms
349 
350 } // end of namespace cd
351 
352 } // end of namespace fnal
353 
354 } // end of namespace gov
void setReplyListener(RmsMessageListener *listener)
ClientListenerLoop _listenerLoop
Definition: RmsProducer.h:90
void sendMessage(base::RmsMessage message)
std::vector< base::RmsDestination > _myDestinations
Definition: RmsProducer.h:78
provider::BufferedProviderListener * _providerListener
Definition: RmsProducer.h:106
void setMessageFilter(MessageFilter *filter)
std::set< std::string > _replyChannels
Definition: RmsProducer.h:84
RmsProducer(boost::shared_ptr< provider::RmsConnection > providerConnection, base::RmsDestination dest)
Definition: RmsProducer.cpp:22
boost::shared_ptr< base::RmsMessage > receiveReplyNoWait()
static const std::string CHANNEL_PROPERTY_NAME
Definition: fnal.py:1
std::map< std::string, boost::posix_time::ptime > _replyIdTable
Definition: RmsProducer.h:113
#define GENERATE_RMS_RUNTIME_EXCEPTION(msg)
void sendString(std::string messageText)
Definition: run.py:1
boost::shared_ptr< provider::RmsConnection > _providerConnection
Definition: RmsProducer.h:68
void setMessageListener(RmsMessageListener *listener)
util::LinkedBlockingQueue< boost::shared_ptr< std::string > > _inputQueue
Definition: RmsProducer.h:96
boost::shared_ptr< base::RmsMessage > receiveReply()
bool verify(boost::shared_ptr< base::RmsMessage > message)
c cd(1)
enum BeamMode string