EpicsMessenger.cpp
Go to the documentation of this file.
1 #include <rms/provider/EpicsMessenger.h>
2 
3 namespace gov {
4 
5 namespace fnal {
6 
7 namespace cd {
8 
9 namespace rms {
10 
11 namespace provider {
12 
13 /**
14  * Callback function for handling notifications from EPICS. Calls the
15  * monitorChanged() method on the messenger object that is interited
16  * in the PV that changed.
17  *
18  * @param eventHandlerArgs Parameters from EPICS
19  */
20 static void
21 EpicsMessengerCallback(struct event_handler_args eventHandlerArgs) {
22  EpicsMessenger *messengerObject;
23  char *messengerData;
24 
25  messengerObject = (EpicsMessenger *)eventHandlerArgs.usr;
26  messengerData = (char *)eventHandlerArgs.dbr;
27 
28  messengerObject->monitorChanged(messengerData);
29 }
30 
31 /**
32  * Constructs an instance of this class for the specified
33  * EPICS process variable (which is expected to a waveform record).
34  *
35  * @param pvName The name of the PV to connect to
36  * @param caContext The EPICS context
37  * @param maxSize The maximum size of messages that should be posted to the PV
38  * @param uuidGenerator A UUID Generator to add UUIDs to messages
39  */
41  struct ca_client_context *caContext,
42  int maxSize,
43  util::UUIDGenerator *uuidGenerator) {
44  int result;
45 
46  _pvName = pvName;
47  _caContext = caContext;
48  _maxMessageSize = maxSize;
49  _uuidGenerator = uuidGenerator;
50 
51  ca_attach_context(_caContext);
52  ca_create_channel(_pvName.c_str(), NULL, NULL, 10, &_pvChannel);
53  result = ca_pend_io(_PEND_OP_TIMEOUT);
55 
56  if (result != ECA_NORMAL) {
57  std::cout << "Failed to connect to " << pvName;
58  std::cout << " in " << _PEND_OP_TIMEOUT;
59  std::cout << " seconds..." << std::endl;
60  }
61 
62  _firstListener = true;
63 }
64 
65 /**
66  * Destructor, just call close() which frees any allocated
67  * memory and clears out open channels.
68  */
70  close();
71 }
72 
73 /**
74  * Get the name of the PV that this class connects to.
75  *
76  * @return The name of the PV that this class connects to.
77  */
79  return _pvName;
80 }
81 
82 /**
83  * Post a message to the PV. This method will split up
84  * the message so that it can be reassembled by each
85  * process that receives it.
86  *
87  * @param messageText The message to send
88  */
89 void EpicsMessenger::sendMessage(const std::string& messageText) {
90  std::vector <MessageFragment *> fragments;
91  MessageSplitter splitter;
92  char *fragment;
93  const char *uuid;
94  int size;
95 
96  uuid = _uuidGenerator->getUUID();
97 
98  fragments = splitter.splitMessage(messageText, _maxMessageSize,
99  uuid);
100 
101  delete[] uuid;
102 
103  ca_attach_context(_caContext);
104  for (unsigned int i = 0; i < fragments.size(); i++) {
105  fragments[i]->getBytes(&fragment, &size);
106  ca_array_put(DBR_CHAR, size, _pvChannel, fragment);
107  ca_pend_io(_PEND_OP_TIMEOUT);
108  delete [] fragment;
109  delete fragments[i];
110  }
112 }
113 
114 /**
115  * Adds the specified listener to this messenger.
116  *
117  * @param listener The ProviderListener that will be
118  * called when updates to this PV occur.
119  */
121  if (_firstListener) {
122  ca_attach_context(_caContext);
123  ca_create_subscription(DBR_CHAR, 0, _pvChannel, DBE_VALUE,
124  EpicsMessengerCallback, (void *)this, &_pvSubscription);
125  ca_pend_io(_PEND_OP_TIMEOUT);
127  _firstListener = false;
128  }
129 
130  for (unsigned int i = 0; i < _listenerList.size(); i++) {
131  if (listener == _listenerList[i]) {
132  return;
133  }
134  }
135 
136  _listenerList.push_back(listener);
137 }
138 
139 /**
140  * Removes the specified listener from this messenger.
141  *
142  * @param listener The listener to remove
143  */
145  std::vector <ProviderListener *>::iterator i;
146 
147  i = _listenerList.begin();
148 
149  while (i != _listenerList.end()) {
150  if (*i == listener) {
151  _listenerList.erase(i);
152  return;
153  }
154  i++;
155  }
156 }
157 
158 /**
159  * Processes changes to the PV value. This method is called by the
160  * static callback function when the PV value changes.
161  *
162  * @param data The new value of the PV
163  */
165  std::vector <MessageAssembler *>::iterator i;
166  MessageFragment *fragment = new MessageFragment(data);
167 
168  i = _assemblerList.begin();
169 
170  while (i != _assemblerList.end()) {
171  if ((*i)->addFragment(fragment)) {
172  if ((*i)->hasCompleteMessage()) {
173  std::string messageText = (*i)->getMessage();
174  _assemblerList.erase(i);
175  notifyListeners(messageText);
176  delete *i;
177  }
178 
179  return;
180  }
181  i++;
182  }
183 
184  // Didn't find an assmbler for the message. Make a new one..
185  MessageAssembler *newAssembler = new MessageAssembler(fragment);
186 
187  if (newAssembler->hasCompleteMessage()) {
188  std::string messageText = newAssembler->getMessage();
189  notifyListeners(messageText);
190 
191  delete newAssembler;
192 
193  return;
194  }
195 
196  _assemblerList.push_back(newAssembler);
197 }
198 
199 /**
200  * Go through all the listeners that have expressed
201  * interest in this PV and post the message to them.
202  *
203  * @param messageText A message to send to all the listeners.
204  */
206  boost::shared_ptr<std::string> message(new std::string(messageText));
207 
208  for (unsigned int i = 0; i < _listenerList.size(); i++) {
209  _listenerList[i]->textReceived(message);
210  }
211 }
212 
213 /**
214  * Close down all the EPICS connections and free any
215  * memory that has been allocated. This includes all
216  * assemblers, the channel access subscription and
217  * the channel itself.
218  */
220  std::vector <MessageAssembler *>::iterator assemblerIterator;
221 
222  ca_attach_context(_caContext);
223 
224  ca_flush_io();
225  ca_pend_io(_PEND_OP_TIMEOUT);
226 
227  if (!_firstListener) {
228  ca_clear_subscription(_pvSubscription);
229  }
230 
231  ca_clear_channel(_pvChannel);
233 
234  assemblerIterator = _assemblerList.begin();
235 
236  while (assemblerIterator != _assemblerList.end()) {
237  delete *assemblerIterator;
238  assemblerIterator++;
239  }
240 }
241 
242 }; // end of namespace provider
243 
244 }; // end of namespace rms
245 
246 }; // end of namespace cd
247 
248 }; // end of namespace fnal
249 
250 }; // end of namespace gov
void epicsShareAPI ca_detach_context()
EpicsMessenger(std::string pvName, struct ca_client_context *caContext, int maxSize, util::UUIDGenerator *uuidGenerator)
std::vector< ProviderListener * > _listenerList
std::vector< MessageAssembler * > _assemblerList
const XML_Char const XML_Char * data
Definition: expat.h:268
void removeListener(ProviderListener *listener)
void addListener(ProviderListener *listener)
static std::vector< MessageFragment * > splitMessage(std::string messageText, int maxFragmentSize, const char *uuid)
Definition: fnal.py:1
void sendMessage(const std::string &messageText)
OStream cout
Definition: OStream.cxx:6
static void EpicsMessengerCallback(struct event_handler_args eventHandlerArgs)
void notifyListeners(std::string messageText)
c cd(1)
enum BeamMode string