Public Member Functions | Protected Attributes | Private Attributes | List of all members
gov::fnal::cd::rms::ClientListenerLoop Class Reference

#include "/cvmfs/"

Inheritance diagram for gov::fnal::cd::rms::ClientListenerLoop:

Public Member Functions

 ClientListenerLoop (util::LinkedBlockingQueue< boost::shared_ptr< std::string > > *inputQueue)
 ~ClientListenerLoop ()
void run ()
void stop ()
void setMessageListener (RmsMessageListener *listener)
void setMessageFilter (MessageFilter *filter)
virtual bool isRunning () const
virtual bool isDone () const
virtual void watchfulSleep (int microseconds, int numberOfIntervals=10)

Protected Attributes

bool _running
bool _stopRequested
bool _hasCompleted

Private Attributes

util::LinkedBlockingQueue< boost::shared_ptr< std::string > > * _inputQueue

Detailed Description

The ClientListenerLoop decouples receiveing messages from the provider and sending them to any message listeners.

Kurt Biery
Steve Foulkes
2019/09/27 00:07:31

Definition at line 30 of file ClientListenerLoop.h.

Constructor & Destructor Documentation

gov::fnal::cd::rms::ClientListenerLoop::ClientListenerLoop ( util::LinkedBlockingQueue< boost::shared_ptr< std::string > > *  inputQueue)

Create a new ClientListenerLoop with not client listner and no message filter. The loop will just spin until a client listener has been set.

inputQueueThe input queue to pull messages off of.

Definition at line 20 of file ClientListenerLoop.cpp.

References _clientListener, _inputQueue, and _messageFilter.

20  {
21  _inputQueue = inputQueue;
22  _clientListener = NULL;
23  _messageFilter = NULL;
24 }
util::LinkedBlockingQueue< boost::shared_ptr< std::string > > * _inputQueue
gov::fnal::cd::rms::ClientListenerLoop::~ClientListenerLoop ( )

Destructor for the ClientListenerLoop. This sets the stop flag to true so that the thread will exit. The proper way to shutdown the loop is to call the stop() method and then join() on the thread handle. Otherwise, its possible the thread could exit while holding the input queue lock which may cause a crash.

Definition at line 34 of file ClientListenerLoop.cpp.

References novadaq::Runnable::_running, and novadaq::Runnable::_stopRequested.

34  {
35  _stopRequested = true;
37  while(_running) {
38  usleep(100000);
39  }
40 }

Member Function Documentation

virtual bool novadaq::Runnable::isDone ( ) const

Tests if the Runnable object has completed its running.

true if the object is done, false otherwise.

Definition at line 80 of file Runnable.h.

References novadaq::Runnable::_hasCompleted.

Referenced by novadaq::BackgroundProcess::timedWaitForDone().

80  {
81  return _hasCompleted;
82  }
virtual bool novadaq::Runnable::isRunning ( ) const

Tests if the Runnable object is running.

true if the object is running, false otherwise.

Definition at line 71 of file Runnable.h.

References novadaq::Runnable::_running.

71  {
72  return _running;
73  }
void gov::fnal::cd::rms::ClientListenerLoop::run ( )

This method runs in its own thread. If no client listener has been set, it will spin until one is set. After that, it will pull messages off of the queue, deserialize them, and then pass them through a message filter if one has been set. Lastly, the message is passed to the listener.

Implements novadaq::Runnable.

Definition at line 58 of file ClientListenerLoop.cpp.

References _clientListener, _inputQueue, _messageFilter, novadaq::Runnable::_running, novadaq::Runnable::_stopRequested, om::cout, allTimeWatchdog::endl, gov::fnal::cd::rms::RmsMessageListener::messageReceived(), gov::fnal::cd::rms::util::LinkedBlockingQueue< QueueType >::poll(), and gov::fnal::cd::rms::MessageFilter::verify().

58  {
59  boost::shared_ptr<std::string> messageText;
61  _stopRequested = false;
62  _running = true;
64  while (!_stopRequested) {
65  if (_clientListener) {
66  messageText = _inputQueue->poll(1);
67  }
68  else {
69  usleep(200000);
70  continue;
71  }
73  if (!messageText) {
74  continue;
75  }
76  // 19-Jun-2007, KAB - we need to create a new RmsMessage each time
77  // since XSD deserialization doesn't overwrite existing element
78  // objects.
79  boost::shared_ptr<base::RmsMessage>
80  receivedMessage (new base::RmsMessage);
82  try {
83  receivedMessage->deserialize(*messageText);
84  }
85  catch (const ::xsd::cxx::tree::exception<char> &excpt) {
86  std::cout << "Exception thrown during deserialization ";
87  std::cout << "of RMS message!" << std::endl;
88  std::cout << excpt << std::endl;
89  }
91  if (_messageFilter) {
92  if (_messageFilter->verify(receivedMessage)) {
93  _clientListener->messageReceived(receivedMessage);
94  }
95  }
96  else {
97  _clientListener->messageReceived(receivedMessage);
98  }
99  }
101  _running = false;
102 }
util::LinkedBlockingQueue< boost::shared_ptr< std::string > > * _inputQueue
virtual bool verify(boost::shared_ptr< base::RmsMessage > message)
Definition: MessageFilter.h:43
virtual void messageReceived(boost::shared_ptr< base::RmsMessage > theMessage)=0
OStream cout
Definition: OStream.cxx:6
void gov::fnal::cd::rms::ClientListenerLoop::setMessageFilter ( MessageFilter filter)

Set the message filter that will get called everytime a message is pulled off the queue to verify that it is relevant to the listener.

filterThe message filter

Definition at line 122 of file ClientListenerLoop.cpp.

References _messageFilter.

Referenced by gov::fnal::cd::rms::RmsProducer::initialize(), and gov::fnal::cd::rms::RmsConsumer::RmsConsumer().

122  {
124 }
Module that kips a configurable number of events between each that it allows through. Note that this module really skips (N-1) events, it uses a simple modular division as its critera. This module will cut down the data sample to 1/N of its original size.
void gov::fnal::cd::rms::ClientListenerLoop::setMessageListener ( RmsMessageListener listener)

Set the message listener that gets called when a message is received.

listenerThe listener that will get called when a message is received.

Definition at line 111 of file ClientListenerLoop.cpp.

References _clientListener.

Referenced by gov::fnal::cd::rms::RmsConsumer::setMessageListener(), and gov::fnal::cd::rms::RmsProducer::setReplyListener().

111  {
112  _clientListener = listener;
113 }
void gov::fnal::cd::rms::ClientListenerLoop::stop ( )

Set the stop flag so that the thread exits. After doing this, join() on the thread handle to make sure that the thread has exited.

Reimplemented from novadaq::Runnable.

Definition at line 47 of file ClientListenerLoop.cpp.

References novadaq::Runnable::_stopRequested.

Referenced by gov::fnal::cd::rms::RmsProducer::close(), and gov::fnal::cd::rms::RmsConsumer::close().

47  {
48  _stopRequested = true;
49 }
virtual void novadaq::Runnable::watchfulSleep ( int  microseconds,
int  numberOfIntervals = 10 

Sleeps for the specified amount of time, but periodically checks if a stop has been requested, and returns if so.

Definition at line 88 of file Runnable.h.

References novadaq::Runnable::_stopRequested, and compare_h5_caf::idx.

89  {
90  if (numberOfIntervals < 1) {numberOfIntervals = 1;}
91  int intervalTime = microseconds / numberOfIntervals;
92  if (intervalTime < 1) {intervalTime = 1;}
93  for (int idx = 0; idx < numberOfIntervals; ++idx) {
94  if (_stopRequested) {break;}
95  usleep(intervalTime);
96  }
97  }

Member Data Documentation

RmsMessageListener* gov::fnal::cd::rms::ClientListenerLoop::_clientListener

The listener that we will be sending messages to.

Definition at line 50 of file ClientListenerLoop.h.

Referenced by ClientListenerLoop(), run(), and setMessageListener().

bool novadaq::Runnable::_hasCompleted
util::LinkedBlockingQueue<boost::shared_ptr<std::string> >* gov::fnal::cd::rms::ClientListenerLoop::_inputQueue

The queue that we will be pulling messages off of.

Definition at line 45 of file ClientListenerLoop.h.

Referenced by ClientListenerLoop(), and run().

MessageFilter* gov::fnal::cd::rms::ClientListenerLoop::_messageFilter

The filter that we will be running messages through to determine if they are relevant to the listener.

Definition at line 56 of file ClientListenerLoop.h.

Referenced by ClientListenerLoop(), run(), and setMessageFilter().

bool novadaq::Runnable::_running

Tracks whether the Runnable object is running or not (this can be before it has started or after it has finished)

Definition at line 105 of file Runnable.h.

Referenced by novadaq::Runnable::isRunning(), novadaq::BackgroundProcess::run(), run(), NdmcClient::run(), novadaq::Runnable::Runnable(), and ~ClientListenerLoop().

bool novadaq::Runnable::_stopRequested

The documentation for this class was generated from the following files: