ClientListenerLoop.cpp
Go to the documentation of this file.
1 #include <rms/ClientListenerLoop.h>
2 #include <rms/base/RmsMessage.h>
3 #include <xsd/cxx/tree/exceptions.hxx>
4 
5 namespace gov {
6 
7 namespace fnal {
8 
9 namespace cd {
10 
11 namespace rms {
12 
13 /**
14  * Create a new ClientListenerLoop with not client listner and no
15  * message filter. The loop will just spin until a client listener
16  * has been set.
17  *
18  * @param inputQueue The input queue to pull messages off of.
19  */
20 ClientListenerLoop::ClientListenerLoop(util::LinkedBlockingQueue <boost::shared_ptr<std::string> > *inputQueue) {
21  _inputQueue = inputQueue;
22  _clientListener = NULL;
23  _messageFilter = NULL;
24 }
25 
26 /**
27  * Destructor for the ClientListenerLoop. This sets the
28  * stop flag to true so that the thread will exit. The
29  * proper way to shutdown the loop is to call the stop()
30  * method and then join() on the thread handle. Otherwise,
31  * its possible the thread could exit while holding the input
32  * queue lock which may cause a crash.
33  */
35  _stopRequested = true;
36 
37  while(_running) {
38  usleep(100000);
39  }
40 }
41 
42 /**
43  * Set the stop flag so that the thread exits. After doing this,
44  * join() on the thread handle to make sure that the thread
45  * has exited.
46  */
48  _stopRequested = true;
49 }
50 
51 /**
52  * This method runs in its own thread. If no client listener has been
53  * set, it will spin until one is set. After that, it will pull messages
54  * off of the queue, deserialize them, and then pass them through a
55  * message filter if one has been set. Lastly, the message is passed
56  * to the listener.
57  */
59  boost::shared_ptr<std::string> messageText;
60 
61  _stopRequested = false;
62  _running = true;
63 
64  while (!_stopRequested) {
65  if (_clientListener) {
66  messageText = _inputQueue->poll(1);
67  }
68  else {
69  usleep(200000);
70  continue;
71  }
72 
73  if (!messageText) {
74  continue;
75  }
76  // 19-Jun-2007, KAB - we need to create a new RmsMessage each time
77  // since XSD deserialization doesn't overwrite existing element
78  // objects.
79  boost::shared_ptr<base::RmsMessage>
80  receivedMessage (new base::RmsMessage);
81 
82  try {
83  receivedMessage->deserialize(*messageText);
84  }
85  catch (const ::xsd::cxx::tree::exception<char> &excpt) {
86  std::cout << "Exception thrown during deserialization ";
87  std::cout << "of RMS message!" << std::endl;
88  std::cout << excpt << std::endl;
89  }
90 
91  if (_messageFilter) {
92  if (_messageFilter->verify(receivedMessage)) {
93  _clientListener->messageReceived(receivedMessage);
94  }
95  }
96  else {
97  _clientListener->messageReceived(receivedMessage);
98  }
99  }
100 
101  _running = false;
102 }
103 
104 /**
105  * Set the message listener that gets called when a message
106  * is received.
107  *
108  * @param listener The listener that will get called when
109  * a message is received.
110  */
112  _clientListener = listener;
113 }
114 
115 /**
116  * Set the message filter that will get called everytime
117  * a message is pulled off the queue to verify that it
118  * is relevant to the listener.
119  *
120  * @param filter The message filter
121  */
123  _messageFilter = filter;
124 }
125 
126 } // end of namespace rms
127 
128 } // end of namespace cd
129 
130 } // end of namespace fnal
131 
132 } // end of namespace gov
util::LinkedBlockingQueue< boost::shared_ptr< std::string > > * _inputQueue
Module that kips a configurable number of events between each that it allows through. Note that this module really skips (N-1) events, it uses a simple modular division as its critera. This module will cut down the data sample to 1/N of its original size.
void setMessageFilter(MessageFilter *filter)
virtual bool verify(boost::shared_ptr< base::RmsMessage > message)
Definition: MessageFilter.h:43
Definition: fnal.py:1
virtual void messageReceived(boost::shared_ptr< base::RmsMessage > theMessage)=0
OStream cout
Definition: OStream.cxx:6
void setMessageListener(RmsMessageListener *listener)
ClientListenerLoop(util::LinkedBlockingQueue< boost::shared_ptr< std::string > > *inputQueue)
c cd(1)