RmsConsumer.cpp
Go to the documentation of this file.
1 #include <rms/RmsConsumer.h>
2 #include <rms/base/RmsDestination.h>
3 #include <rms/base/RmsMessageBody.h>
4 #include <rms/base/RmsRuntimeException.h>
5 
6 namespace gov {
7 
8 namespace fnal {
9 
10 namespace cd {
11 
12 namespace rms {
13 
14 /**
15  * Create a new consumer given a provider connection and a destination.
16  *
17  * @param providerConnection A reference to a RMS provider
18  * @param dest The destination where we are consuming messages
19  * from.
20  */
21 RmsConsumer::RmsConsumer(boost::shared_ptr<provider::RmsConnection> providerConnection,
22  base::RmsDestination dest) :
23  _providerConnection(providerConnection),
24  _listenerLoop(&_inputQueue),
25  _listenerThread(boost::bind(&novadaq::Runnable::run, &_listenerLoop)) {
26  dest.validate();
27  if (! _providerConnection->supportsDestination(dest)) {
29  msg.append("The current connection does not support ");
30  msg.append("the requested destination: ");
31  msg.append(dest.toString());
33  }
34  _myDestinations.push_back(dest);
35 
36  // 02-Nov-2007, KAB - keep track of "verification destinations"
37  // explicitly. These will be used in the verify() method,
38  // and they differ slightly depending on whether the destination
39  // has the "source" property set. If they do, then we'll compare
40  // to the reply destination in messages.
41  std::string targetProperty =
43  std::string sourceProperty =
45  if (targetProperty == "" && sourceProperty != "") {
46  base::RmsDestination mappedDestination(_myDestinations[0]);
47  mappedDestination.setProperty(base::RmsDestination::TARGET_PROPERTY_NAME,
48  sourceProperty);
49  _myVerifyDests.push_back(mappedDestination);
50  }
51  else {
52  _myVerifyDests.push_back(_myDestinations[0]);
53  }
54 
57 
59 }
60 
61 /**
62  * Create a new consumer given a provider connection and a
63  * vector of destinations.
64  *
65  * @param providerConnection A reference to a RMS provider
66  * @param dest The destination where we are consuming messages
67  * from.
68  */
69 RmsConsumer::RmsConsumer(boost::shared_ptr<provider::RmsConnection> providerConnection,
70  std::vector <base::RmsDestination> dest) :
71  _providerConnection(providerConnection),
73  _listenerThread(boost::bind(&novadaq::Runnable::run, &_listenerLoop)) {
74 
75  for (unsigned int i = 0; i < dest.size(); i++) {
76  dest[i].validate();
77  if (! _providerConnection->supportsDestination(dest[i])) {
79  msg.append("The current connection does not support ");
80  msg.append("the requested destination: ");
81  msg.append(dest[i].toString());
83  }
84  }
86 
88 
89  for (unsigned int i = 0; i < _myDestinations.size(); i++) {
91 
92  // 02-Nov-2007, KAB - keep track of "verification destinations"
93  // explicitly. These will be used in the verify() method,
94  // and they differ slightly depending on whether the destination
95  // has the "source" property set. If they do, then we'll compare
96  // to the reply destination in messages.
97  std::string targetProperty =
99  std::string sourceProperty =
101  if (targetProperty == "" && sourceProperty != "") {
102  base::RmsDestination mappedDestination(_myDestinations[i]);
103  mappedDestination.setProperty(base::RmsDestination::TARGET_PROPERTY_NAME,
104  sourceProperty);
105  _myVerifyDests.push_back(mappedDestination);
106  }
107  else {
108  _myVerifyDests.push_back(_myDestinations[i]);
109  }
110  }
111 
113 }
114 
115 /**
116  * Destructor, have close() do all the cleanup.
117  */
119  close();
120 }
121 
122 /**
123  * Receive a string from the provider. This will block
124  * indefinitely.
125  *
126  * @return A string from the provider
127  */
128 boost::shared_ptr<std::string> RmsConsumer::receiveString() {
129  return _inputQueue.take();
130 }
131 
132 /**
133  * Receive a string from the provider. This will wait at most
134  * timeout seconds.
135  *
136  * @param timeout The maximum amount of time in seconds to wait
137  * before returning.
138  *
139  * @return A string from the provider if one is available before
140  * the timeout expires. This will return an empty shared_ptr
141  * otherwise.
142  */
143 boost::shared_ptr<std::string> RmsConsumer::receiveString(long timeout) {
144  return _inputQueue.poll(timeout);
145 }
146 
147 /**
148  * Receive a string from the provider. This will not block.
149  *
150  * @return A string from the provider if one is available.
151  * This will return an empty shared_ptr otherwise.
152  */
153 boost::shared_ptr<std::string> RmsConsumer::receiveStringNoWait() {
154  return _inputQueue.poll();
155 }
156 
157 /**
158  * Receive a message from the provider. This will block
159  * indefinitely.
160  *
161  * @return A message from the provider
162  */
163 boost::shared_ptr<base::RmsMessage> RmsConsumer::receiveMessage() {
164  boost::shared_ptr<base::RmsMessage> receivedMessage;
165  boost::shared_ptr<std::string> messageText;
166 
167  do {
168  messageText = _inputQueue.take();
169  receivedMessage.reset(new base::RmsMessage);
170  receivedMessage->deserialize(*messageText);
171  } while (!verify(receivedMessage));
172 
173  return receivedMessage;
174 }
175 
176 /**
177  * Receive a message from the provider. This will wait at most
178  * timeout seconds.
179  *
180  * @param timeout The maximum amount of time in seconds to wait
181  * before returning.
182  *
183  * @return A message from the provider if one is available before
184  * the timeout expires. This will return an empty shared_ptr
185  * otherwise.
186  */
187 boost::shared_ptr<base::RmsMessage> RmsConsumer::receiveMessage(long timeout) {
188  boost::shared_ptr<base::RmsMessage> receivedMessage;
189  boost::shared_ptr<std::string> messageText;
190 
191  do {
192  messageText = _inputQueue.poll(timeout);
193 
194  if (!messageText) {
195  receivedMessage.reset();
196  return receivedMessage;
197  }
198 
199  receivedMessage.reset(new base::RmsMessage);
200  receivedMessage->deserialize(*messageText);
201  } while (!verify(receivedMessage));
202 
203  return receivedMessage;
204 }
205 
206 /**
207  * Receive a message from the provider. This will not block.
208  *
209  * @return A message from the provider if one is available.
210  * This will return an empty shared_ptr otherwise.
211  */
212 boost::shared_ptr<base::RmsMessage> RmsConsumer::receiveMessageNoWait() {
213  boost::shared_ptr<base::RmsMessage> receivedMessage;
214  boost::shared_ptr<std::string> messageText;
215 
216  do {
217  messageText = _inputQueue.poll();
218 
219  if (!messageText) {
220  receivedMessage.reset();
221  return receivedMessage;
222  }
223 
224  receivedMessage.reset(new base::RmsMessage);
225  receivedMessage->deserialize(*messageText);
226  } while (!verify(receivedMessage));
227 
228  return receivedMessage;
229 }
230 
231 /**
232  * Set the listener that gets called whenever a message
233  * is received.
234  *
235  * @param listener The new listener
236  */
239 }
240 
241 /**
242  * Send a reply to a message. The message should have been
243  * created by calling the createReply() method on the original message.
244  *
245  * @param message The reply message
246  */
247 void RmsConsumer::sendReply(boost::shared_ptr<base::RmsMessage> message) {
248  base::RmsDestination replyDestination;
249 
250  replyDestination = _providerConnection->getMessageSource();
251  replyDestination.setProperty(base::RmsDestination::CHANNEL_PROPERTY_NAME,
252  message->getDestination().getProperty(base::RmsDestination::CHANNEL_PROPERTY_NAME));
253  replyDestination.validate();
254  message->setReplyDestination(replyDestination);
255 
256  message->setSentTime();
257  message->setId(_providerConnection->getUUID());
258 
259  message->getDestination().validate();
260  _providerConnection->sendMessage(message->getDestination(), *message);
261 }
262 
263 /**
264  * Close the consumer and free any resources that it may
265  * have allocated.
266  */
269  _listenerThread.join();
270 
271  if (_providerListener) {
272  delete _providerListener;
273  _providerListener = NULL;
274  }
275 
276  return;
277 }
278 
279 /**
280  * Verify that a message is relevant to this consumer. Basically,
281  * we just match the destination of the message to the destination
282  * we are listening to. If the message has a status code that isn't
283  * success, we send it back to the user even if it doesn't match the
284  * consumer's destination.
285  *
286  * @param message The message to check
287  *
288  * @return True is the message should be delivered to this consumer,
289  * false otherwise.
290  */
291 bool RmsConsumer::verify(boost::shared_ptr<base::RmsMessage> message) {
292 
293  // TODO - check if the message status is not success and return "true"
294  // if something went wrong so application code learns about the problem.
295 
296  std::string messageType =
297  message->getDestination().getProperty(base::RmsDestination::MESSAGE_TYPE_PROPERTY_NAME);
298 
299  if (messageType == base::RmsDestination::LOOPBACK_MESSAGE_TYPE) {
300  handleLoopback(message);
301  return false;
302  }
303 
304  for (unsigned int i = 0; i < _myVerifyDests.size(); i++) {
305  // 02-Nov-2007, KAB - if the consumer destination is a
306  // source destination, then compare to the message reply destination
307  // rather than the message destination. (In that case,
308  // we don't care where the message is going *to*, we care where
309  // it came *from*).
310  std::string sourceProperty =
312  if (sourceProperty != "") {
313  if (message->getReplyDestination().matches(_myVerifyDests[i])) {
314  return true;
315  }
316  }
317  else {
318  if (message->getDestination().matches(_myVerifyDests[i])) {
319  return true;
320  }
321  }
322  }
323 
324  return false;
325 }
326 
327 /**
328  * Handle a loopback message that is received in this consumer.
329  *
330  * @param loopbackMessage The loopback message that has been
331  * received.
332  */
333 void RmsConsumer::handleLoopback(boost::shared_ptr<base::RmsMessage> loopbackMessage) {
334  base::RmsMessageBody loopbackBody;
335 
336  loopbackBody.setContent(loopbackMessage->getBody().getContent());
337  boost::shared_ptr<base::RmsMessage> loopbackReply =
338  loopbackMessage->createReply();
339  loopbackReply->setBody(loopbackBody);
340 
341  sendReply(loopbackReply);
342 }
343 
344 } // end of namespace rms
345 
346 } // end of namespace cd
347 
348 } // end of namespace fnal
349 
350 } // end of namespace gov
std::vector< base::RmsDestination > _myVerifyDests
Definition: RmsConsumer.h:108
provider::BufferedProviderListener * _providerListener
Definition: RmsConsumer.h:92
ClientListenerLoop _listenerLoop
Definition: RmsConsumer.h:87
boost::shared_ptr< base::RmsMessage > receiveMessage()
boost::shared_ptr< std::string > receiveStringNoWait()
void sendReply(boost::shared_ptr< base::RmsMessage > message)
boost::shared_ptr< std::string > receiveString()
static const std::string SOURCE_PROPERTY_NAME
util::LinkedBlockingQueue< boost::shared_ptr< std::string > > _inputQueue
Definition: RmsConsumer.h:98
void setMessageListener(RmsMessageListener *listener)
void setMessageFilter(MessageFilter *filter)
boost::shared_ptr< provider::RmsConnection > _providerConnection
Definition: RmsConsumer.h:76
static const std::string CHANNEL_PROPERTY_NAME
bool verify(boost::shared_ptr< base::RmsMessage > message)
Definition: fnal.py:1
#define GENERATE_RMS_RUNTIME_EXCEPTION(msg)
Definition: run.py:1
std::vector< base::RmsDestination > _myDestinations
Definition: RmsConsumer.h:81
void setMessageListener(RmsMessageListener *listener)
void handleLoopback(boost::shared_ptr< base::RmsMessage > loopbackMessage)
static const std::string TARGET_PROPERTY_NAME
static const std::string LOOPBACK_MESSAGE_TYPE
boost::shared_ptr< base::RmsMessage > receiveMessageNoWait()
RmsConsumer(boost::shared_ptr< provider::RmsConnection > providerConnection, base::RmsDestination dest)
Definition: RmsConsumer.cpp:21
c cd(1)
static const std::string MESSAGE_TYPE_PROPERTY_NAME
enum BeamMode string