Public Member Functions | Private Attributes | Static Private Attributes | List of all members
gov::fnal::cd::rms::provider::DDSConnection Class Reference

#include "/cvmfs/nova-development.opensciencegrid.org/novasoft/releases/N21-01-23/rms/cxx/include/provider/DDSConnection.h"

Public Member Functions

 DDSConnection (const std::string applicationName, const int partitionNumber)
 
 ~DDSConnection ()
 
bool supportsDestination (const base::RmsDestination &candidateDestination)
 
std::string getApplicationName () const
 
int getPartitionNumber () const
 
void close ()
 
const std::string getUUID ()
 
std::string getHostName ()
 
template<class T >
void dumpMessage (T const &message, bool isIncomingMessage)
 
template<class T >
void sendMessage (unsigned int senderHandle, T &message)
 
template<class T >
void receiveMessage (unsigned int receiverHandle, T &message)
 
template<class T >
bool receiveMessage (unsigned int receiverHandle, T &message, unsigned int usecTimeout)
 
template<class T >
unsigned int registerSender (T const &sampleMessage, base::RmsDestination const &dest)
 
void unRegisterSender (unsigned int senderHandle)
 
template<class T >
unsigned int registerReceiver (T const &sampleMessage, base::RmsDestination const &dest)
 
void unRegisterReceiver (unsigned int receiverHandle)
 

Private Attributes

util::UUIDGenerator_uuidGenerator
 
std::string _applicationName
 
int _partitionNumber
 
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _writerTable
 
std::map< unsigned int, std::pair< std::string, std::string > > _targetChannelTable
 
unsigned int _writerCounter
 
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _readerTable
 
unsigned int _readerCounter
 
std::string _myHostName
 
boost::mutex _registrationMutex
 
int _debugFlag
 

Static Private Attributes

static int _instanceCount = 0
 
static boost::mutex _staticMutex
 

Detailed Description

This class takes care of setting up and managing connections to DDS for RMS.

Author
Kurt Biery
Version
Revision
1.17.14.1
Date
2019/09/27 00:07:31

Definition at line 35 of file DDSConnection.h.

Constructor & Destructor Documentation

gov::fnal::cd::rms::provider::DDSConnection::DDSConnection ( const std::string  applicationName,
const int  partitionNumber 
)

Creates a new DDSConnection given the application name and partition.

Parameters
applicationNameThe name of the application generating/receiving messages.
applicationPartitionThe partition of the application that is generating/receiving messages.

Definition at line 27 of file DDSConnection.cpp.

References _applicationName, _debugFlag, _instanceCount, _myHostName, _partitionNumber, _staticMutex, _uuidGenerator, om::cerr, gov::fnal::cd::rms::provider::DPSingleton::connect_participant(), allTimeWatchdog::endl, cet::getenv(), gov::fnal::cd::rms::provider::DPSingleton::Instance(), datagram_client::sl, and string.

28  {
29  _applicationName = applicationName;
30  _partitionNumber = partitionNumber;
31 
32  char hostname[80];
33  memset(hostname, 0, 80);
34  if (gethostname(hostname, 79) == 0) {
35  _uuidGenerator = new util::UUIDGenerator(hostname);
36  std::string tmpString(hostname);
37  _myHostName = tmpString;
38  }
39  else {
40  _uuidGenerator = new util::UUIDGenerator("C++DDSConnection");
41  _myHostName = "Unknown";
42  }
43 
44  _debugFlag = 0;
45  char *debugEnvVar = getenv("RMS_DEBUG");
46  if (debugEnvVar != NULL) {
47  std::string debugString(debugEnvVar);
48  try {
49  _debugFlag = boost::lexical_cast<int>(debugString);
50  }
51  catch (boost::bad_lexical_cast &excpt) {
52  std::cerr << "*** ERROR: Invalid RMS_DEBUG value: \""
53  << debugString << "\"." << std::endl;
54  }
55  }
56 
57  boost::mutex::scoped_lock sl(_staticMutex);
59  if (_instanceCount == 1) {
60  try {
62  }
63  catch (...) {
64  std::cerr << "Unable to connect DomainParticipant in "
65  << "DDSConnection constructor." << std::endl;
66  }
67  }
68 }
static DPSingleton & Instance()
Definition: CETDDS.h:142
OStream cerr
Definition: OStream.cxx:7
std::string getenv(std::string const &name)
enum BeamMode string
gov::fnal::cd::rms::provider::DDSConnection::~DDSConnection ( )

Member Function Documentation

void gov::fnal::cd::rms::provider::DDSConnection::close ( )

Closes the connection. After calling this method, the connection should no longer be used since it will no longer be connected to the underlying provider system.

Definition at line 98 of file DDSConnection.cpp.

References _uuidGenerator.

Referenced by getPartitionNumber(), and ~DDSConnection().

98  {
99  if (_uuidGenerator) {
100  delete _uuidGenerator;
101  _uuidGenerator = NULL;
102  }
103 }
template<class T >
void gov::fnal::cd::rms::provider::DDSConnection::dumpMessage ( T const &  message,
bool  isIncomingMessage 
)
inline

Definition at line 64 of file DDSConnection.h.

References novadaq::convertTimevalToDouble(), om::cout, allTimeWatchdog::endl, and datagram_client::message.

Referenced by receiveMessage(), and sendMessage().

64  {
65  std::cout << "++++++++++++++++++++++++++++++++++++++++" << std::endl;
66  std::cout << "Message type = " << typeid(message).name() << std::endl;
67  std::cout << "ID = " << message.header.id << std::endl;
68  std::cout << "Target = " << message.header.target << std::endl;
69  std::cout << "Channel = " << message.header.channel << std::endl;
70  std::cout << "Reply target = " << message.header.replyTarget << std::endl;
71  std::cout << "Correlation ID = " << message.header.correlationId << std::endl;
72  std::cout << "Sent time = " << message.header.sentTime.sec
73  << " " << message.header.sentTime.usec << std::endl;
74  std::cout << "Correlation time = " << message.header.correlationTime.sec
75  << " " << message.header.correlationTime.usec << std::endl;
76  std::cout << "Priority = " << message.header.priority << std::endl;
77  std::cout << "Partition number = " << message.header.partitionNumber << std::endl;
78  std::cout << "Host name = " << message.header.hostName << std::endl;
79  std::cout << "Application name = " << message.header.applicationName << std::endl;
80 
81  if (isIncomingMessage && message.header.correlationTime.sec > 1000000) {
82  struct timeval then;
83  then.tv_sec = message.header.correlationTime.sec;
84  then.tv_usec = message.header.correlationTime.usec;
85  struct timeval now;
86  gettimeofday(&now, 0);
87 
88  double thenTime = novadaq::convertTimevalToDouble(then);
89  double nowTime = novadaq::convertTimevalToDouble(now);
90  double timeDiff = 1000.0 * (nowTime - thenTime);
91 
92  std::cout << "Request/reply message time = " << timeDiff << " msec" << std::endl;
93  }
94 
95  std::cout << "----------------------------------------" << std::endl;
96  }
const XML_Char * name
Definition: expat.h:151
double convertTimevalToDouble(struct timeval const &inputUnixTime)
Definition: TimeUtils.cpp:10
OStream cout
Definition: OStream.cxx:6
std::string gov::fnal::cd::rms::provider::DDSConnection::getApplicationName ( ) const
inline

Definition at line 45 of file DDSConnection.h.

References _applicationName.

Referenced by sendMessage().

std::string gov::fnal::cd::rms::provider::DDSConnection::getHostName ( )
inline

Definition at line 59 of file DDSConnection.h.

References _myHostName.

59  {
60  return _myHostName;
61  }
int gov::fnal::cd::rms::provider::DDSConnection::getPartitionNumber ( ) const
inline

Definition at line 46 of file DDSConnection.h.

References _partitionNumber, and close().

Referenced by sendMessage().

const std::string gov::fnal::cd::rms::provider::DDSConnection::getUUID ( )
inline

Get a new human readable UUID from the UUID generator.

Returns
A new UUID.

Definition at line 55 of file DDSConnection.h.

References _uuidGenerator, and gov::fnal::cd::rms::util::UUIDGenerator::getUUIDHex().

Referenced by sendMessage().

55  {
56  return _uuidGenerator->getUUIDHex();
57  }
template<class T >
void gov::fnal::cd::rms::provider::DDSConnection::receiveMessage ( unsigned int  receiverHandle,
T message 
)
inline

Receives a message of the specified type from the specified destination.

Parameters
destThe destination to send the message to.
messageThe message received from the destination.

Definition at line 159 of file DDSConnection.h.

References _debugFlag, _readerTable, _registrationMutex, dumpMessage(), makeTrainCVSamples::int, datagram_client::sl, and gov::fnal::cd::rms::provider::DDSTopicReaderClass< TT_ >::take_wait().

159  {
160  std::map<unsigned int,
161  boost::shared_ptr<base::RmsCloseable> >::const_iterator readerIter;
162  {
163  boost::mutex::scoped_lock sl(_registrationMutex);
164  readerIter = _readerTable.find(receiverHandle);
165  }
166 
167  if (readerIter != _readerTable.end()) {
168  base::RmsCloseable *obj = readerIter->second.get();
169  DDSTopicReaderClass<T> *myReader =
170  dynamic_cast< DDSTopicReaderClass<T>* >(obj);
171  myReader->take_wait(message);
172  }
173 
174  if ((_debugFlag & 0x01) != 0) {
175  dumpMessage(message, true);
176  }
177  }
void dumpMessage(T const &message, bool isIncomingMessage)
Definition: DDSConnection.h:64
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _readerTable
template<class T >
bool gov::fnal::cd::rms::provider::DDSConnection::receiveMessage ( unsigned int  receiverHandle,
T message,
unsigned int  usecTimeout 
)
inline

Waits for a message of the specified type to arrive from the specified destination, but returns after the specified timeout if no message has arrived.

Parameters
destThe destination to send the message to.
messageThe message received from the destination.
usecTimeoutAmount of time to wait for a message (microseconds).
Returns
true if a message was received, false if not.

Definition at line 190 of file DDSConnection.h.

References _debugFlag, _readerTable, _registrationMutex, om::cout, dumpMessage(), allTimeWatchdog::endl, makeTrainCVSamples::int, datagram_client::message, datagram_client::sl, fabricate::status, and gov::fnal::cd::rms::provider::DDSTopicReaderClass< TT_ >::take_timed_wait().

191  {
192  std::map<unsigned int,
193  boost::shared_ptr<base::RmsCloseable> >::const_iterator readerIter;
194  {
195  boost::mutex::scoped_lock sl(_registrationMutex);
196  readerIter = _readerTable.find(receiverHandle);
197  }
198 
199  if (readerIter != _readerTable.end()) {
200  base::RmsCloseable *obj = readerIter->second.get();
201  DDSTopicReaderClass<T> *myReader =
202  dynamic_cast< DDSTopicReaderClass<T>* >(obj);
203  DDS::ReturnCode_t status =
204  myReader->take_timed_wait(message, usecTimeout);
205 
206  if ((_debugFlag & 0x01) != 0) {
207  if (status == DDS::RETCODE_OK) {
208  dumpMessage(message, true);
209  }
210  }
211  if ((_debugFlag & 0x02) != 0) {
212  if (status == DDS::RETCODE_TIMEOUT) {
213  std::cout << "++++++++++++++++++++++++++++++++++++++++" << std::endl;
214  std::cout << "Timeout waiting for " << typeid(message).name()
215  << " message." << std::endl;
216  std::cout << "Waited " << usecTimeout << " microseconds." << std::endl;
217  }
218  else if (status != DDS::RETCODE_OK) {
219  std::cout << "++++++++++++++++++++++++++++++++++++++++" << std::endl;
220  std::cout << "Received error code " << status
221  << " from take_timed_wait call."<< std::endl;
222  }
223  }
224 
225  return (status == DDS::RETCODE_OK);
226  }
227  return false;
228  }
const XML_Char * name
Definition: expat.h:151
int status
Definition: fabricate.py:1613
void dumpMessage(T const &message, bool isIncomingMessage)
Definition: DDSConnection.h:64
OStream cout
Definition: OStream.cxx:6
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _readerTable
template<class T >
unsigned int gov::fnal::cd::rms::provider::DDSConnection::registerReceiver ( T const &  sampleMessage,
base::RmsDestination const &  dest 
)
inline

Definition at line 275 of file DDSConnection.h.

References _partitionNumber, _readerCounter, _readerTable, _registrationMutex, gov::fnal::cd::rms::base::RmsDestination::CHANNEL_PROPERTY_NAME, cet::replace_all(), datagram_client::sl, string, and gov::fnal::cd::rms::base::RmsDestination::TARGET_PROPERTY_NAME.

276  {
277  boost::mutex::scoped_lock sl(_registrationMutex);
278 
279  std::string targetProperty =
281  std::string channelProperty =
283  std::string topicName = targetProperty + "_" + channelProperty;
284  boost::replace_all(topicName, "-", "_dash_");
285  std::string partitionName = "Partition" +
286  boost::lexical_cast<std::string>(_partitionNumber);
287 
288  boost::shared_ptr<base::RmsCloseable>
289  myReader(new DDSTopicReaderClass<T>(topicName.c_str(),
290  partitionName.c_str()));
291  ++_readerCounter;
292  _readerTable[_readerCounter] = myReader;
293  return _readerCounter;
294  }
bool replace_all(std::string &in, std::string const &from, std::string const &to)
Replace all occurrences of from in string with to.
static const std::string CHANNEL_PROPERTY_NAME
static const std::string TARGET_PROPERTY_NAME
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _readerTable
enum BeamMode string
template<class T >
unsigned int gov::fnal::cd::rms::provider::DDSConnection::registerSender ( T const &  sampleMessage,
base::RmsDestination const &  dest 
)
inline

Definition at line 231 of file DDSConnection.h.

References _partitionNumber, _registrationMutex, _targetChannelTable, _writerCounter, _writerTable, gov::fnal::cd::rms::base::RmsDestination::CHANNEL_PROPERTY_NAME, make_pair(), cet::replace_all(), datagram_client::sl, string, and gov::fnal::cd::rms::base::RmsDestination::TARGET_PROPERTY_NAME.

232  {
233  boost::mutex::scoped_lock sl(_registrationMutex);
234 
235  std::string targetProperty =
237  std::string channelProperty =
239  std::string topicName = targetProperty + "_" + channelProperty;
240  boost::replace_all(topicName, "-", "_dash_");
241  std::string partitionName = "Partition" +
242  boost::lexical_cast<std::string>(_partitionNumber);
243 
244  boost::shared_ptr<base::RmsCloseable>
245  myWriter(new DDSTopicWriterClass<T>(topicName.c_str(),
246  partitionName.c_str()));
247  ++_writerCounter;
248  _writerTable[_writerCounter] = myWriter;
250  make_pair(targetProperty, channelProperty);
251  return _writerCounter;
252  }
std::pair< Spectrum *, CheatDecomp * > make_pair(SpectrumLoaderBase &loader_data, SpectrumLoaderBase &loader_mc, HistAxis *axis, Cut *cut, const SystShifts &shift, const Var &wei)
Definition: DataMCLoad.C:336
bool replace_all(std::string &in, std::string const &from, std::string const &to)
Replace all occurrences of from in string with to.
static const std::string CHANNEL_PROPERTY_NAME
static const std::string TARGET_PROPERTY_NAME
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _writerTable
std::map< unsigned int, std::pair< std::string, std::string > > _targetChannelTable
enum BeamMode string
template<class T >
void gov::fnal::cd::rms::provider::DDSConnection::sendMessage ( unsigned int  senderHandle,
T message 
)
inline

Sends a message of the specified type to the specified destination.

Parameters
destThe destination to send the message to.
messageThe message to be sent to the destination.

Definition at line 105 of file DDSConnection.h.

References _debugFlag, _myHostName, _registrationMutex, _targetChannelTable, _writerTable, dumpMessage(), getApplicationName(), getPartitionNumber(), getUUID(), makeTrainCVSamples::int, datagram_client::sl, string, and gov::fnal::cd::rms::provider::DDSTopicWriterClass< TT_ >::write().

105  {
106  std::map<unsigned int,
107  boost::shared_ptr<base::RmsCloseable> >::const_iterator writerIter;
108  {
109  boost::mutex::scoped_lock sl(_registrationMutex);
110  writerIter = _writerTable.find(senderHandle);
111  }
112 
113  if (writerIter != _writerTable.end()) {
114 
115  std::string id = this->getUUID();
116  message.header.id = DDS::string_dup(id.c_str());
117 
118  std::map<unsigned int,
119  std::pair<std::string, std::string> >::const_iterator tgtChanIter;
120  tgtChanIter = _targetChannelTable.find(senderHandle);
121  if (tgtChanIter != _targetChannelTable.end()) {
122  std::string target = tgtChanIter->second.first;
123  std::string channel = tgtChanIter->second.second;
124  message.header.target = DDS::string_dup(target.c_str());
125  message.header.channel = DDS::string_dup(channel.c_str());
126  }
127 
128  std::string replyTarget = this->getApplicationName();
129  message.header.replyTarget = DDS::string_dup(replyTarget.c_str());
130  message.header.applicationName = DDS::string_dup(replyTarget.c_str());
131 
132  struct timeval now;
133  gettimeofday(&now, 0);
134  message.header.sentTime.sec = now.tv_sec;
135  message.header.sentTime.usec = now.tv_usec;
136 
137  message.header.partitionNumber = this->getPartitionNumber();
138 
139  message.header.hostName = DDS::string_dup(_myHostName.c_str());
140 
141  if ((_debugFlag & 0x01) != 0) {
142  dumpMessage(message, false);
143  }
144 
145  base::RmsCloseable *obj = writerIter->second.get();
146  DDSTopicWriterClass<T> *myWriter =
147  dynamic_cast< DDSTopicWriterClass<T>* >(obj);
148  myWriter->write(message);
149  }
150  }
const XML_Char * target
Definition: expat.h:268
void dumpMessage(T const &message, bool isIncomingMessage)
Definition: DDSConnection.h:64
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _writerTable
std::map< unsigned int, std::pair< std::string, std::string > > _targetChannelTable
enum BeamMode string
bool gov::fnal::cd::rms::provider::DDSConnection::supportsDestination ( const base::RmsDestination &  dest)

Determine if the given destination can be reached with the DDS provider.

Parameters
destThe destination to check to see if messages can be sent to it.
Returns
True if the destination can be reached, false otherwise.

Definition at line 89 of file DDSConnection.cpp.

89  {
90  return true;
91 }
void gov::fnal::cd::rms::provider::DDSConnection::unRegisterReceiver ( unsigned int  receiverHandle)
inline

Definition at line 296 of file DDSConnection.h.

References _readerTable, _registrationMutex, makeTrainCVSamples::int, and datagram_client::sl.

296  {
297  boost::mutex::scoped_lock sl(_registrationMutex);
298 
299  std::map<unsigned int,
300  boost::shared_ptr<base::RmsCloseable> >::const_iterator readerIter;
301  readerIter = _readerTable.find(receiverHandle);
302 
303  if (readerIter != _readerTable.end()) {
304  _readerTable.erase(receiverHandle);
305  }
306  }
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _readerTable
void gov::fnal::cd::rms::provider::DDSConnection::unRegisterSender ( unsigned int  senderHandle)
inline

Definition at line 254 of file DDSConnection.h.

References _registrationMutex, _targetChannelTable, _writerTable, makeTrainCVSamples::int, and datagram_client::sl.

254  {
255  boost::mutex::scoped_lock sl(_registrationMutex);
256 
257  std::map<unsigned int,
258  boost::shared_ptr<base::RmsCloseable> >::const_iterator writerIter;
259  writerIter = _writerTable.find(senderHandle);
260 
261  if (writerIter != _writerTable.end()) {
262  _writerTable.erase(senderHandle);
263  }
264 
265  std::map<unsigned int,
266  std::pair<std::string, std::string> >::const_iterator tgtChanIter;
267  tgtChanIter = _targetChannelTable.find(senderHandle);
268 
269  if (tgtChanIter != _targetChannelTable.end()) {
270  _targetChannelTable.erase(senderHandle);
271  }
272  }
std::map< unsigned int, boost::shared_ptr< base::RmsCloseable > > _writerTable
std::map< unsigned int, std::pair< std::string, std::string > > _targetChannelTable

Member Data Documentation

std::string gov::fnal::cd::rms::provider::DDSConnection::_applicationName
private

Definition at line 313 of file DDSConnection.h.

Referenced by DDSConnection(), and getApplicationName().

int gov::fnal::cd::rms::provider::DDSConnection::_debugFlag
private

Definition at line 328 of file DDSConnection.h.

Referenced by DDSConnection(), receiveMessage(), and sendMessage().

int gov::fnal::cd::rms::provider::DDSConnection::_instanceCount = 0
staticprivate

Definition at line 330 of file DDSConnection.h.

Referenced by DDSConnection(), and ~DDSConnection().

std::string gov::fnal::cd::rms::provider::DDSConnection::_myHostName
private

Definition at line 324 of file DDSConnection.h.

Referenced by DDSConnection(), getHostName(), and sendMessage().

int gov::fnal::cd::rms::provider::DDSConnection::_partitionNumber
private
unsigned int gov::fnal::cd::rms::provider::DDSConnection::_readerCounter
private

Definition at line 322 of file DDSConnection.h.

Referenced by registerReceiver().

std::map<unsigned int, boost::shared_ptr<base::RmsCloseable> > gov::fnal::cd::rms::provider::DDSConnection::_readerTable
private

Definition at line 321 of file DDSConnection.h.

Referenced by receiveMessage(), registerReceiver(), and unRegisterReceiver().

boost::mutex gov::fnal::cd::rms::provider::DDSConnection::_registrationMutex
mutableprivate
boost::mutex gov::fnal::cd::rms::provider::DDSConnection::_staticMutex
staticprivate

Definition at line 331 of file DDSConnection.h.

Referenced by DDSConnection(), and ~DDSConnection().

std::map<unsigned int, std::pair<std::string, std::string> > gov::fnal::cd::rms::provider::DDSConnection::_targetChannelTable
private

Definition at line 318 of file DDSConnection.h.

Referenced by registerSender(), sendMessage(), and unRegisterSender().

util::UUIDGenerator* gov::fnal::cd::rms::provider::DDSConnection::_uuidGenerator
private

Definition at line 312 of file DDSConnection.h.

Referenced by close(), DDSConnection(), and getUUID().

unsigned int gov::fnal::cd::rms::provider::DDSConnection::_writerCounter
private

Definition at line 319 of file DDSConnection.h.

Referenced by registerSender().

std::map<unsigned int, boost::shared_ptr<base::RmsCloseable> > gov::fnal::cd::rms::provider::DDSConnection::_writerTable
private

Definition at line 316 of file DDSConnection.h.

Referenced by registerSender(), sendMessage(), and unRegisterSender().


The documentation for this class was generated from the following files: