DDSConnection.h
Go to the documentation of this file.
1 #ifndef _RMS_DDSCONNECTION_H
2 #define _RMS_DDSCONNECTION_H
3 
4 #include <rms/base/RmsDestination.h>
5 #include <rms/util/UUIDGenerator.h>
6 #include <rms/provider/CETDDS.h>
7 #include <NovaDAQUtilities/TimeUtils.h>
8 
9 #include <boost/shared_ptr.hpp>
10 #include <boost/thread/mutex.hpp>
11 #include <boost/lexical_cast.hpp>
12 #include <boost/algorithm/string.hpp>
13 
14 #include <string>
15 #include <map>
16 
17 namespace gov {
18 
19 namespace fnal {
20 
21 namespace cd {
22 
23 namespace rms {
24 
25 namespace provider {
26 
27 /**
28  * This class takes care of setting up and managing connections
29  * to DDS for RMS.
30  *
31  * @author Kurt Biery
32  * @version $Revision: 1.17.14.1 $ $Date: 2019/09/27 00:07:31 $
33  */
34 
36 
37  public:
38 
39  DDSConnection (const std::string applicationName,
40  const int partitionNumber);
41  ~DDSConnection ();
42 
43  bool supportsDestination(const base::RmsDestination& candidateDestination);
44 
46  int getPartitionNumber() const {return _partitionNumber;}
47 
48  void close();
49 
50  /**
51  * Get a new human readable UUID from the UUID generator.
52  *
53  * @return A new UUID.
54  */
55  const std::string getUUID() {
56  return _uuidGenerator->getUUIDHex();
57  }
58 
60  return _myHostName;
61  }
62 
63  template <class T>
64  void dumpMessage(T const& message, bool isIncomingMessage) {
65  std::cout << "++++++++++++++++++++++++++++++++++++++++" << std::endl;
66  std::cout << "Message type = " << typeid(message).name() << std::endl;
67  std::cout << "ID = " << message.header.id << std::endl;
68  std::cout << "Target = " << message.header.target << std::endl;
69  std::cout << "Channel = " << message.header.channel << std::endl;
70  std::cout << "Reply target = " << message.header.replyTarget << std::endl;
71  std::cout << "Correlation ID = " << message.header.correlationId << std::endl;
72  std::cout << "Sent time = " << message.header.sentTime.sec
73  << " " << message.header.sentTime.usec << std::endl;
74  std::cout << "Correlation time = " << message.header.correlationTime.sec
75  << " " << message.header.correlationTime.usec << std::endl;
76  std::cout << "Priority = " << message.header.priority << std::endl;
77  std::cout << "Partition number = " << message.header.partitionNumber << std::endl;
78  std::cout << "Host name = " << message.header.hostName << std::endl;
79  std::cout << "Application name = " << message.header.applicationName << std::endl;
80 
81  if (isIncomingMessage && message.header.correlationTime.sec > 1000000) {
82  struct timeval then;
83  then.tv_sec = message.header.correlationTime.sec;
84  then.tv_usec = message.header.correlationTime.usec;
85  struct timeval now;
86  gettimeofday(&now, 0);
87 
88  double thenTime = novadaq::convertTimevalToDouble(then);
89  double nowTime = novadaq::convertTimevalToDouble(now);
90  double timeDiff = 1000.0 * (nowTime - thenTime);
91 
92  std::cout << "Request/reply message time = " << timeDiff << " msec" << std::endl;
93  }
94 
95  std::cout << "----------------------------------------" << std::endl;
96  }
97 
98  /**
99  * Sends a message of the specified type to the specified destination.
100  *
101  * @param dest The destination to send the message to.
102  * @param message The message to be sent to the destination.
103  */
104  template <class T>
105  void sendMessage(unsigned int senderHandle, T& message) {
106  std::map<unsigned int,
107  boost::shared_ptr<base::RmsCloseable> >::const_iterator writerIter;
108  {
109  boost::mutex::scoped_lock sl(_registrationMutex);
110  writerIter = _writerTable.find(senderHandle);
111  }
112 
113  if (writerIter != _writerTable.end()) {
114 
115  std::string id = this->getUUID();
116  message.header.id = DDS::string_dup(id.c_str());
117 
118  std::map<unsigned int,
119  std::pair<std::string, std::string> >::const_iterator tgtChanIter;
120  tgtChanIter = _targetChannelTable.find(senderHandle);
121  if (tgtChanIter != _targetChannelTable.end()) {
122  std::string target = tgtChanIter->second.first;
123  std::string channel = tgtChanIter->second.second;
124  message.header.target = DDS::string_dup(target.c_str());
125  message.header.channel = DDS::string_dup(channel.c_str());
126  }
127 
128  std::string replyTarget = this->getApplicationName();
129  message.header.replyTarget = DDS::string_dup(replyTarget.c_str());
130  message.header.applicationName = DDS::string_dup(replyTarget.c_str());
131 
132  struct timeval now;
133  gettimeofday(&now, 0);
134  message.header.sentTime.sec = now.tv_sec;
135  message.header.sentTime.usec = now.tv_usec;
136 
137  message.header.partitionNumber = this->getPartitionNumber();
138 
139  message.header.hostName = DDS::string_dup(_myHostName.c_str());
140 
141  if ((_debugFlag & 0x01) != 0) {
142  dumpMessage(message, false);
143  }
144 
145  base::RmsCloseable *obj = writerIter->second.get();
146  DDSTopicWriterClass<T> *myWriter =
147  dynamic_cast< DDSTopicWriterClass<T>* >(obj);
148  myWriter->write(message);
149  }
150  }
151 
152  /**
153  * Receives a message of the specified type from the specified destination.
154  *
155  * @param dest The destination to send the message to.
156  * @param message The message received from the destination.
157  */
158  template <class T>
159  void receiveMessage(unsigned int receiverHandle, T& message) {
160  std::map<unsigned int,
161  boost::shared_ptr<base::RmsCloseable> >::const_iterator readerIter;
162  {
163  boost::mutex::scoped_lock sl(_registrationMutex);
164  readerIter = _readerTable.find(receiverHandle);
165  }
166 
167  if (readerIter != _readerTable.end()) {
168  base::RmsCloseable *obj = readerIter->second.get();
169  DDSTopicReaderClass<T> *myReader =
170  dynamic_cast< DDSTopicReaderClass<T>* >(obj);
171  myReader->take_wait(message);
172  }
173 
174  if ((_debugFlag & 0x01) != 0) {
175  dumpMessage(message, true);
176  }
177  }
178 
179  /**
180  * Waits for a message of the specified type to arrive from the
181  * specified destination, but returns after the specified timeout
182  * if no message has arrived.
183  *
184  * @param dest The destination to send the message to.
185  * @param message The message received from the destination.
186  * @param usecTimeout Amount of time to wait for a message (microseconds).
187  * @return true if a message was received, false if not.
188  */
189  template <class T>
190  bool receiveMessage(unsigned int receiverHandle, T& message,
191  unsigned int usecTimeout) {
192  std::map<unsigned int,
193  boost::shared_ptr<base::RmsCloseable> >::const_iterator readerIter;
194  {
195  boost::mutex::scoped_lock sl(_registrationMutex);
196  readerIter = _readerTable.find(receiverHandle);
197  }
198 
199  if (readerIter != _readerTable.end()) {
200  base::RmsCloseable *obj = readerIter->second.get();
201  DDSTopicReaderClass<T> *myReader =
202  dynamic_cast< DDSTopicReaderClass<T>* >(obj);
203  DDS::ReturnCode_t status =
204  myReader->take_timed_wait(message, usecTimeout);
205 
206  if ((_debugFlag & 0x01) != 0) {
207  if (status == DDS::RETCODE_OK) {
208  dumpMessage(message, true);
209  }
210  }
211  if ((_debugFlag & 0x02) != 0) {
212  if (status == DDS::RETCODE_TIMEOUT) {
213  std::cout << "++++++++++++++++++++++++++++++++++++++++" << std::endl;
214  std::cout << "Timeout waiting for " << typeid(message).name()
215  << " message." << std::endl;
216  std::cout << "Waited " << usecTimeout << " microseconds." << std::endl;
217  }
218  else if (status != DDS::RETCODE_OK) {
219  std::cout << "++++++++++++++++++++++++++++++++++++++++" << std::endl;
220  std::cout << "Received error code " << status
221  << " from take_timed_wait call."<< std::endl;
222  }
223  }
224 
225  return (status == DDS::RETCODE_OK);
226  }
227  return false;
228  }
229 
230  template <class T>
231  unsigned int registerSender(T const& sampleMessage,
232  base::RmsDestination const& dest) {
233  boost::mutex::scoped_lock sl(_registrationMutex);
234 
235  std::string targetProperty =
237  std::string channelProperty =
239  std::string topicName = targetProperty + "_" + channelProperty;
240  boost::replace_all(topicName, "-", "_dash_");
241  std::string partitionName = "Partition" +
242  boost::lexical_cast<std::string>(_partitionNumber);
243 
244  boost::shared_ptr<base::RmsCloseable>
245  myWriter(new DDSTopicWriterClass<T>(topicName.c_str(),
246  partitionName.c_str()));
247  ++_writerCounter;
248  _writerTable[_writerCounter] = myWriter;
250  make_pair(targetProperty, channelProperty);
251  return _writerCounter;
252  }
253 
254  void unRegisterSender(unsigned int senderHandle) {
255  boost::mutex::scoped_lock sl(_registrationMutex);
256 
257  std::map<unsigned int,
258  boost::shared_ptr<base::RmsCloseable> >::const_iterator writerIter;
259  writerIter = _writerTable.find(senderHandle);
260 
261  if (writerIter != _writerTable.end()) {
262  _writerTable.erase(senderHandle);
263  }
264 
265  std::map<unsigned int,
266  std::pair<std::string, std::string> >::const_iterator tgtChanIter;
267  tgtChanIter = _targetChannelTable.find(senderHandle);
268 
269  if (tgtChanIter != _targetChannelTable.end()) {
270  _targetChannelTable.erase(senderHandle);
271  }
272  }
273 
274  template <class T>
275  unsigned int registerReceiver(T const& sampleMessage,
276  base::RmsDestination const& dest) {
277  boost::mutex::scoped_lock sl(_registrationMutex);
278 
279  std::string targetProperty =
281  std::string channelProperty =
283  std::string topicName = targetProperty + "_" + channelProperty;
284  boost::replace_all(topicName, "-", "_dash_");
285  std::string partitionName = "Partition" +
286  boost::lexical_cast<std::string>(_partitionNumber);
287 
288  boost::shared_ptr<base::RmsCloseable>
289  myReader(new DDSTopicReaderClass<T>(topicName.c_str(),
290  partitionName.c_str()));
291  ++_readerCounter;
292  _readerTable[_readerCounter] = myReader;
293  return _readerCounter;
294  }
295 
296  void unRegisterReceiver(unsigned int receiverHandle) {
297  boost::mutex::scoped_lock sl(_registrationMutex);
298 
299  std::map<unsigned int,
300  boost::shared_ptr<base::RmsCloseable> >::const_iterator readerIter;
301  readerIter = _readerTable.find(receiverHandle);
302 
303  if (readerIter != _readerTable.end()) {
304  _readerTable.erase(receiverHandle);
305  }
306  }
307 
308  protected:
309 
310  private:
311 
315 
316  std::map<unsigned int, boost::shared_ptr<base::RmsCloseable> > _writerTable;
317  std::map<unsigned int, std::pair<std::string, std::string> >
319  unsigned int _writerCounter;
320 
321  std::map<unsigned int, boost::shared_ptr<base::RmsCloseable> > _readerTable;
322  unsigned int _readerCounter;
323 
325 
327 
329 
330  static int _instanceCount;
332 };
333 
334 } // end of namespace provider
335 
336 } // end of namespace rms
337 
338 } // end of namespace cd
339 
340 } // end of namespace fnal
341 
342 } // end of namespace gov
343 
344 #endif
unsigned int registerReceiver(T const &sampleMessage, base::RmsDestination const &dest)
const XML_Char * name
Definition: expat.h:151
void unRegisterReceiver(unsigned int receiverHandle)
int status
Definition: fabricate.py:1613
const XML_Char * target
Definition: expat.h:268
std::pair< Spectrum *, CheatDecomp * > make_pair(SpectrumLoaderBase &loader_data, SpectrumLoaderBase &loader_mc, HistAxis *axis, Cut *cut, const SystShifts &shift, const Var &wei)
Definition: DataMCLoad.C:336
void dumpMessage(T const &message, bool isIncomingMessage)
Definition: DDSConnection.h:64
DDSConnection(const std::string applicationName, const int partitionNumber)
void receiveMessage(unsigned int receiverHandle, T &message)
bool replace_all(std::string &in, std::string const &from, std::string const &to)
Replace all occurrences of from in string with to.
bool supportsDestination(const base::RmsDestination &candidateDestination)
static const std::string CHANNEL_PROPERTY_NAME
Definition: fnal.py:1
bool receiveMessage(unsigned int receiverHandle, T &message, unsigned int usecTimeout)
double convertTimevalToDouble(struct timeval const &inputUnixTime)
Definition: TimeUtils.cpp:10
OStream cout
Definition: OStream.cxx:6
void unRegisterSender(unsigned int senderHandle)
void sendMessage(unsigned int senderHandle, T &message)
::xsd::cxx::tree::string< char, simple_type > string
Definition: Database.h:154
static const std::string TARGET_PROPERTY_NAME
unsigned int registerSender(T const &sampleMessage, base::RmsDestination const &dest)
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _readerTable
double T
Definition: Xdiff_gwt.C:5
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _writerTable
int take_timed_wait(TT_ &mm, unsigned int usec)
Definition: CETDDS.h:589
c cd(1)
std::map< unsigned int, std::pair< std::string, std::string > > _targetChannelTable