1 #ifndef _RMS_DDSCONNECTION_H 2 #define _RMS_DDSCONNECTION_H 4 #include <rms/base/RmsDestination.h> 5 #include <rms/util/UUIDGenerator.h> 6 #include <rms/provider/CETDDS.h> 7 #include <NovaDAQUtilities/TimeUtils.h> 9 #include <boost/shared_ptr.hpp> 10 #include <boost/thread/mutex.hpp> 11 #include <boost/lexical_cast.hpp> 12 #include <boost/algorithm/string.hpp> 40 const int partitionNumber);
72 std::cout <<
"Sent time = " << message.header.sentTime.sec
73 <<
" " << message.header.sentTime.usec <<
std::endl;
74 std::cout <<
"Correlation time = " << message.header.correlationTime.sec
75 <<
" " << message.header.correlationTime.usec <<
std::endl;
81 if (isIncomingMessage && message.header.correlationTime.sec > 1000000) {
83 then.tv_sec = message.header.correlationTime.sec;
84 then.tv_usec = message.header.correlationTime.usec;
86 gettimeofday(&now, 0);
90 double timeDiff = 1000.0 * (nowTime - thenTime);
106 std::map<
unsigned int,
107 boost::shared_ptr<base::RmsCloseable> >::const_iterator writerIter;
116 message.header.id = DDS::string_dup(
id.c_str());
118 std::map<
unsigned int,
119 std::pair<std::string, std::string> >::const_iterator tgtChanIter;
124 message.header.target = DDS::string_dup(target.c_str());
125 message.header.channel = DDS::string_dup(channel.c_str());
129 message.header.replyTarget = DDS::string_dup(replyTarget.c_str());
130 message.header.applicationName = DDS::string_dup(replyTarget.c_str());
133 gettimeofday(&now, 0);
134 message.header.sentTime.sec = now.tv_sec;
135 message.header.sentTime.usec = now.tv_usec;
139 message.header.hostName = DDS::string_dup(
_myHostName.c_str());
145 base::RmsCloseable *obj = writerIter->second.get();
148 myWriter->
write(message);
160 std::map<
unsigned int,
161 boost::shared_ptr<base::RmsCloseable> >::const_iterator readerIter;
168 base::RmsCloseable *obj = readerIter->second.get();
191 unsigned int usecTimeout) {
192 std::map<
unsigned int,
193 boost::shared_ptr<base::RmsCloseable> >::const_iterator readerIter;
200 base::RmsCloseable *obj = readerIter->second.get();
203 DDS::ReturnCode_t
status =
207 if (status == DDS::RETCODE_OK) {
212 if (status == DDS::RETCODE_TIMEOUT) {
218 else if (status != DDS::RETCODE_OK) {
220 std::cout <<
"Received error code " << status
221 <<
" from take_timed_wait call."<<
std::endl;
225 return (status == DDS::RETCODE_OK);
232 base::RmsDestination
const&
dest) {
239 std::string topicName = targetProperty +
"_" + channelProperty;
244 boost::shared_ptr<base::RmsCloseable>
246 partitionName.c_str()));
250 make_pair(targetProperty, channelProperty);
257 std::map<
unsigned int,
258 boost::shared_ptr<base::RmsCloseable> >::const_iterator writerIter;
265 std::map<
unsigned int,
266 std::pair<std::string, std::string> >::const_iterator tgtChanIter;
276 base::RmsDestination
const&
dest) {
283 std::string topicName = targetProperty +
"_" + channelProperty;
288 boost::shared_ptr<base::RmsCloseable>
290 partitionName.c_str()));
299 std::map<
unsigned int,
300 boost::shared_ptr<base::RmsCloseable> >::const_iterator readerIter;
316 std::map<unsigned int, boost::shared_ptr<base::RmsCloseable> >
_writerTable;
317 std::map<unsigned int, std::pair<std::string, std::string> >
321 std::map<unsigned int, boost::shared_ptr<base::RmsCloseable> >
_readerTable;
unsigned int registerReceiver(T const &sampleMessage, base::RmsDestination const &dest)
void unRegisterReceiver(unsigned int receiverHandle)
util::UUIDGenerator * _uuidGenerator
static int _instanceCount
const std::string getUUID()
std::pair< Spectrum *, CheatDecomp * > make_pair(SpectrumLoaderBase &loader_data, SpectrumLoaderBase &loader_mc, HistAxis *axis, Cut *cut, const SystShifts &shift, const Var &wei)
std::string _applicationName
void dumpMessage(T const &message, bool isIncomingMessage)
DDSConnection(const std::string applicationName, const int partitionNumber)
void receiveMessage(unsigned int receiverHandle, T &message)
bool replace_all(std::string &in, std::string const &from, std::string const &to)
Replace all occurrences of from in string with to.
bool supportsDestination(const base::RmsDestination &candidateDestination)
static const std::string CHANNEL_PROPERTY_NAME
unsigned int _readerCounter
std::string getHostName()
bool receiveMessage(unsigned int receiverHandle, T &message, unsigned int usecTimeout)
unsigned int _writerCounter
int getPartitionNumber() const
double convertTimevalToDouble(struct timeval const &inputUnixTime)
void unRegisterSender(unsigned int senderHandle)
void sendMessage(unsigned int senderHandle, T &message)
const std::string getUUIDHex()
static const std::string TARGET_PROPERTY_NAME
unsigned int registerSender(T const &sampleMessage, base::RmsDestination const &dest)
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _readerTable
std::string getApplicationName() const
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _writerTable
static boost::mutex _staticMutex
int take_timed_wait(TT_ &mm, unsigned int usec)
std::map< unsigned int, std::pair< std::string, std::string > > _targetChannelTable
boost::mutex _registrationMutex