1 #include <rms/RmsProducer.h> 2 #include <rms/base/RmsRuntimeException.h> 23 base::RmsDestination
dest) :
24 _providerConnection(providerConnection),
26 _listenerLoop(&_inputQueue),
45 base::RmsDestination
dest,
int replyTimeout) :
67 std::vector <base::RmsDestination>
dest) :
89 std::vector <base::RmsDestination>
dest,
int replyTimeout) :
128 base::RmsDestination replyDestination;
130 boost::posix_time::ptime
now(boost::posix_time::microsec_clock::universal_time());
132 message.setSentTime();
133 message.setId(messageUUID);
139 message.setTimeToLive(5000);
142 boost::posix_time::ptime>(messageUUID, now));
149 replyDestination.validate();
150 message.setReplyDestination(replyDestination);
164 boost::shared_ptr<base::RmsMessage> receivedReply;
165 boost::shared_ptr<std::string> messageText;
169 receivedReply.reset(
new base::RmsMessage);
170 receivedReply->deserialize(*messageText);
171 }
while (!
verify(receivedReply));
173 return receivedReply;
190 boost::shared_ptr<base::RmsMessage> receivedReply;
191 boost::shared_ptr<std::string> messageText;
197 receivedReply.reset();
198 return receivedReply;
201 receivedReply.reset(
new base::RmsMessage);
202 receivedReply->deserialize(*messageText);
203 }
while (!
verify(receivedReply));
205 return receivedReply;
218 boost::shared_ptr<base::RmsMessage> receivedReply;
219 boost::shared_ptr<std::string> messageText;
225 receivedReply.reset();
226 return receivedReply;
229 receivedReply.reset(
new base::RmsMessage);
230 receivedReply->deserialize(*messageText);
231 }
while (!
verify(receivedReply));
233 return receivedReply;
277 boost::posix_time::ptime
now(boost::posix_time::microsec_clock::universal_time());
278 boost::posix_time::ptime deadline = now - boost::posix_time::seconds(
_replyTimeout);
279 std::map<std::string, boost::posix_time::ptime>::iterator replyIterator;
292 if (replyIterator->second < deadline) {
302 replyIterator =
_replyIdTable.find(message->getCorrelationId());
328 msg.append(
"The current connection does not support ");
329 msg.append(
"the requested destination: ");
337 for (std::set <std::string>::iterator channelIterator =
_replyChannels.begin();
341 replyDestination.validate();
QueueType poll(long timeout)
static const int SUCCESS_CODE
void setReplyListener(RmsMessageListener *listener)
ClientListenerLoop _listenerLoop
void sendMessage(base::RmsMessage message)
std::vector< base::RmsDestination > _myDestinations
provider::BufferedProviderListener * _providerListener
void setMessageFilter(MessageFilter *filter)
std::set< std::string > _replyChannels
RmsProducer(boost::shared_ptr< provider::RmsConnection > providerConnection, base::RmsDestination dest)
boost::thread _listenerThread
boost::shared_ptr< base::RmsMessage > receiveReplyNoWait()
static const std::string CHANNEL_PROPERTY_NAME
std::map< std::string, boost::posix_time::ptime > _replyIdTable
#define GENERATE_RMS_RUNTIME_EXCEPTION(msg)
void sendString(std::string messageText)
boost::shared_ptr< provider::RmsConnection > _providerConnection
void setMessageListener(RmsMessageListener *listener)
util::LinkedBlockingQueue< boost::shared_ptr< std::string > > _inputQueue
boost::shared_ptr< base::RmsMessage > receiveReply()
bool verify(boost::shared_ptr< base::RmsMessage > message)