12 #include "ccpp_dds_dcps.h" 13 #include "rms/base/RmsCloseable.h" 14 #include "rms/provider/CheckStatus.h" 15 #include "rms/base/RmsRuntimeException.h" 16 #include "rms/provider/ProcessSignalHandler.h" 18 #include <NovaDAQUtilities/EnvVarCache.h> 21 #include <boost/algorithm/string/replace.hpp> 22 #include <boost/interprocess/smart_ptr/unique_ptr.hpp> 24 #include "boost/bind.hpp" 27 #include <dds/domain.hpp> 28 #include <dds/topic.hpp> 29 #include <dds/reader.hpp> 30 #include <dds/traits.hpp> 33 #define CETDDS_DEBUG 0 34 #define CETDDS_EXTRA_DEBUG 0 35 #define CETDDS_DEBUG1 0 37 #define declare_dds_message(TOPIC) REGISTER_TOPIC_TRAITS(TOPIC) 79 :
virtual public base::RmsCloseable,
public base::Notifiable
89 bool hasBeenClosed()
throw();
93 void waitForNetworkServices();
96 boost::shared_ptr< dds::DataWriter< TT_ > >
writer;
104 :
virtual public base::RmsCloseable,
public base::Notifiable
112 int take_nowait( TT_ &
mm );
113 int take_timed_wait( TT_ & mm,
unsigned int usec );
114 int take_wait( TT_ & mm );
115 void close()
throw();
116 bool hasBeenClosed()
throw();
120 int take_timed_wait( TT_ & topicdata, dds::Duration_t
const&
duration );
122 TT_ & topicdata, dds::ReturnCode_t &
status);
128 boost::shared_ptr< dds::DataReader<TT_> >
reader;
129 boost::interprocess::unique_ptr< dds::WaitSet, Deleter<dds::WaitSet> >
wait_set;
147 void disconnect_participant();
148 void connect_participant();
149 void reconnect_participant();
152 void addToCleanupList(base::Notifiable* notifiable);
153 void removeFromCleanupList(base::Notifiable* notifiable);
160 dds::DomainParticipant& get_participant();
164 boost::interprocess::unique_ptr<dds::DomainParticipant,Deleter<dds::DomainParticipant> >
dp;
168 signalHandler.release();
182 template <
class TT_ >
186 std::cout << topic_name <<
"\n" <<
"hello\n";
192 template <
class TT_ >
196 std::cout << topic_name <<
"\n" <<
"hello\n";
202 template <
class TT_ >
208 DPSingleton::Instance().removeFromCleanupList(
this);
211 std::cerr <<
"Exception calling close() in in DDSTopicWriterClass." 217 template <
class TT_ >
221 std::cout << topic_name <<
"\n" <<
"hello\n";
227 template <
class TT_ >
231 std::cout << topic_name <<
"\n" <<
"hello\n";
237 template <
class TT_ >
243 DPSingleton::Instance().removeFromCleanupList(
this);
246 std::cerr <<
"Exception calling close() in in DDSTopicReaderClass." 252 template <
class TT_ >
256 _startupDone =
false;
257 _hasBeenClosed =
false;
260 dds::DomainParticipant dp(DPSingleton::Instance().get_participant());
265 char *
name =topicTS.get_type_name();
267 DDS::string_free(name);
270 const bool isContentFilteredTopicsEnabled =
string2bool(
273 std::string extended_topic_name( isContentFilteredTopicsEnabled ?
278 dds::TopicQos topicQoS(dp);
280 dp->get_default_topic_qos(topicQoS);
282 const bool printTopicName =
284 if (printTopicName) {
285 std::cout <<
"Writer topic = " << extended_topic_name
286 <<
", partition = " << partition_name <<
std::endl;
288 dds::Topic<TT_> topic(dp, extended_topic_name, type_name, topicQoS );
291 dds::Publisher publisher (dp, partition_name);
294 dds::DataWriterQos writerQoS(dp, topicQoS );
295 writerQoS.set_auto_dispose(
true);
297 writerQoS.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
300 writer.reset(
new dds::DataWriter<TT_>(publisher,topic, writerQoS ) );
303 gettimeofday( &now, NULL );
304 _startTime = ( double ) now.tv_sec + (((
double ) now.tv_usec ) / 1000000.0 );
306 DPSingleton::Instance().addToCleanupList(
this );
317 template <
class TT_ >
321 _hasBeenClosed =
false;
324 dds::DomainParticipant dp(DPSingleton::Instance().get_participant());
328 char *
name =topicTS.get_type_name();
330 DDS::string_free(name);
333 const bool isContentFilteredTopicsEnabled =
string2bool(
336 std::string extended_topic_name( isContentFilteredTopicsEnabled ?
340 dds::TopicQos topicQoS(dp);
341 dp->get_default_topic_qos(topicQoS);
343 const bool printTopicName =
345 if (printTopicName) {
346 std::cout <<
"Reader topic = " << extended_topic_name
347 <<
", partition = " << partition_name <<
std::endl;
350 dds::Topic<TT_> topic(dp, extended_topic_name, type_name, topicQoS);
353 dds::Subscriber subscriber(dp, partition_name);
356 dds::DataReaderQos readerQoS(dp, topicQoS );
358 if ( isContentFilteredTopicsEnabled ) {
359 dds_topic_filter_t content_filter;
365 topic,content_filter.first,content_filter.second);
367 reader.reset(
new dds::DataReader<TT_>( subscriber, content_filtered_topic, readerQoS ));
370 reader.reset(
new dds::DataReader<TT_>( subscriber, topic, readerQoS ));
373 dds::ReadCondition read_condition = reader->create_readcondition(
375 DDS::ANY_SAMPLE_STATE,DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE );
382 wait_set.reset(
new dds::WaitSet);
383 wait_set->attach(read_condition);
386 DPSingleton::Instance().addToCleanupList(
this );
394 template <
class TT_ >
399 DDS::SampleLostStatus sampleLostStatus;
400 dds::ReturnCode_t
status =reader->get_sample_lost_status(sampleLostStatus);
401 if (status !=DDS::RETCODE_OK)
404 std::cout <<
"DDSTopiReaderClass get_sample_lost_status returned error_code = " << status <<
std::endl;
408 if (sampleLostStatus.total_count > 0)
409 std::cout <<
"DDSTopiReaderClass lost data samples total_count=" << sampleLostStatus.total_count
410 <<
", total_count_change=" << sampleLostStatus.total_count_change <<
std::endl;
413 template <
class TT_ >
416 TT_ & topicdata,dds::ReturnCode_t &
status)
421 statusConditionHandler(reader);
423 const unsigned int seqSize=1;
426 dds::SampleInfoSeq infoSeq(seqSize);
428 status = reader->take( topicSeq, infoSeq,1,
429 DDS::ANY_SAMPLE_STATE,DDS::ANY_VIEW_STATE,DDS::ANY_INSTANCE_STATE );
433 if (status==DDS::RETCODE_NO_DATA)
436 #if CETDDS_EXTRA_DEBUG 440 if ( infoSeq[0].valid_data ) {
441 topicdata = topicSeq[0];
445 printf(
"handle invalid data\n" );
447 status = DDS::RETCODE_NO_DATA;
452 template <
class TT_ >
464 getEnvVarAsDouble(
"RMS_WRITER_STARTUP_DELAY", 0.020 );
467 gettimeofday( &now, NULL );
468 double currentTime = ( double ) now.tv_sec +
469 (((
double ) now.tv_usec ) / 1000000.0 );
470 double neededDelay = maxDelay - ( currentTime - _startTime );
476 if ( neededDelay > 0 ) {
477 usleep((
int )( 1000000 * neededDelay ) );
483 template <
class TT_ >
495 else if(this->hasBeenNotified())
500 if ( ! _startupDone )
501 waitForNetworkServices();
504 dds::ReturnCode_t
status = writer->write( topicdata );
506 if(this->hasBeenNotified())
522 template <
class TT_ >
534 else if(this->hasBeenNotified())
545 dds::ReturnCode_t
status=DDS::RETCODE_NO_DATA;
546 dds::ReturnCode_t wait_status=DDS::RETCODE_ERROR;
552 while ( wait_status != DDS::RETCODE_TIMEOUT && status == DDS::RETCODE_NO_DATA && !_hasBeenClosed)
554 wait_status = wait_set->dispatch(duration);
556 if (wait_status==DDS::RETCODE_OK)
557 receiveMessage(reader, topicdata, status);
559 if(this->hasBeenNotified())
565 catch (dds::InterruptedException& ex)
568 std::cerr <<
"Caught InterruptedException while calling take_timed_wait() in DDSTopicReaderClass; message=" 574 if ( wait_status == DDS::RETCODE_TIMEOUT )
575 status = DDS::RETCODE_TIMEOUT;
576 else if (wait_status==DDS::RETCODE_OK && status!=DDS::RETCODE_OK)
577 status = DDS::RETCODE_NO_DATA;
579 return ((
int )status );
588 template <
class TT_ >
591 dds::Duration_t timeout={ usec / 1000000, ( usec - ( usec / 1000000 ) * 1000000 ) * 1000};
592 return take_timed_wait( topicdata,timeout );
595 template <
class TT_ >
598 dds::Duration_t timeout=DDS::DURATION_INFINITE;
599 return take_timed_wait( topicdata,timeout );
602 template <
class TT_ >
605 dds::Duration_t timeout=DDS::DURATION_ZERO;
606 return take_timed_wait( topicdata,timeout );
611 template <
class TT_ >
614 _hasBeenClosed =
true;
623 template <
class TT_ >
626 _hasBeenClosed =
true;
635 template <
class TT_ >
638 return _hasBeenClosed;
641 template <
class TT_ >
644 return _hasBeenClosed;
boost::interprocess::unique_ptr< dds::WaitSet, Deleter< dds::WaitSet > > wait_set
boost::shared_ptr< dds::DataWriter< TT_ > > writer
static DPSingleton & Instance()
static EnvVarCache & getInstance()
dds_topic_filter_t & create_topic_filter_for(dds_topic_filter_t &filter, std::string const &type_name, std::string const &topic_name)
std::pair< std::string, std::vector< std::string > > dds_topic_filter_t
Module that kips a configurable number of events between each that it allows through. Note that this module really skips (N-1) events, it uses a simple modular division as its critera. This module will cut down the data sample to 1/N of its original size.
::xsd::cxx::tree::duration< char, simple_type > duration
std::string unique_topic_name_for(std::string const &extended_topic_name)
std::shared_ptr< T > shared_ptr
boost::interprocess::unique_ptr< dds::DomainParticipant, Deleter< dds::DomainParticipant > > dp
printf("%d Experimental points found\n", nlines)
std::string create_extended_topic_name_for(std::string const &type_name, std::string const &topic_name)
volatile bool _hasBeenClosed
static constexpr Double_t mm
boost::interprocess::unique_ptr< ProcessSignalHandler, ProcessSignalHandlerDeleter > ProcessSignalHandler_t
bool string2bool(std::string const &bool_string)
void dumpReceivedSamples(dds::SampleInfoSeq const &infoSeq, const unsigned int length, const int status)
std::string create_content_filtered_topic_name_for(std::string const &type_name, std::string const &topic_name)
boost::shared_ptr< dds::DataReader< TT_ > > reader
#define GENERATE_RMS_NOTCONNECTED_EXCEPTION(msg)
assert(nhit_max >=nhit_nbins)
#define GENERATE_RMS_EXITINGPROCESS_EXCEPTION(msg)
void checkStatus(DDS::ReturnCode_t status, const char *info)
ProcessSignalHandler_t signalHandler
volatile bool _hasBeenClosed