Worker.h
Go to the documentation of this file.
1 #ifndef art_Framework_Principal_Worker_h
2 #define art_Framework_Principal_Worker_h
3 
4 // ======================================================================
5 // Worker: this is a basic scheduling unit - an abstract base class to
6 // something that is really a producer or filter.
7 //
8 // A worker will not actually call through to the module unless it is
9 // in a Ready state. After a module is actually run, the state will
10 // not be Ready. The Ready state can only be reestablished by doing a
11 // reset().
12 //
13 // Pre/post module signals are posted only in the Ready state.
14 //
15 // Execution statistics are kept here.
16 //
17 // If a module has thrown an exception during execution, that
18 // exception will be rethrown if the worker is entered again and the
19 // state is not Ready. In other words, execution results (status) are
20 // cached and reused until the worker is reset().
21 // ======================================================================
22 
31 #include "cetlib/exempt_ptr.h"
32 #include "cetlib_except/exception.h"
33 #include "fhiclcpp/ParameterSet.h"
35 
36 #include <cassert>
37 #include <iosfwd>
38 #include <memory>
39 #include <utility>
40 
41 // ----------------------------------------------------------------------
42 
43 namespace art {
44  class ActivityRegistry;
45  class EventPrincipal;
46  class FileBlock;
47  class RunPrincipal;
48  class SubRunPrincipal;
49 }
50 
51 class art::Worker {
52 public:
54 
55  Worker(ModuleDescription const& iMD, WorkerParams const& iWP);
56  virtual ~Worker() noexcept = default;
57 
58  template <typename T>
59  bool doWork(typename T::MyPrincipal&, CurrentProcessingContext const* cpc);
60  void beginJob();
61  void endJob();
62  void respondToOpenInputFile(FileBlock const& fb);
63  void respondToCloseInputFile(FileBlock const& fb);
64  void respondToOpenOutputFiles(FileBlock const& fb);
65  void respondToCloseOutputFiles(FileBlock const& fb);
66 
67  void
69  {
70  state_ = Ready;
71  }
72 
73  ModuleDescription const&
74  description() const
75  {
76  return md_;
77  }
78  ModuleDescription const*
79  descPtr() const
80  {
81  return &md_;
82  }
83  /// The signals are required to live longer than the last call to 'doWork'
84  /// this was done to improve performance based on profiling
86 
87  void
89  {
91  }
92 
93  std::size_t
94  timesRun() const
95  {
96  return counts_.times<stats::Run>();
97  }
98  std::size_t
99  timesVisited() const
100  {
101  return counts_.times<stats::Visited>();
102  }
103  std::size_t
104  timesPassed() const
105  {
106  return counts_.times<stats::Passed>();
107  }
108  std::size_t
109  timesFailed() const
110  {
111  return counts_.times<stats::Failed>();
112  }
113  std::size_t
114  timesExcept() const
115  {
117  }
118  State
119  state() const
120  {
121  return state_;
122  }
123 
124  virtual bool modifiesEvent() const = 0;
125 
126  std::string const&
127  label() const
128  {
129  return md_.moduleLabel();
130  }
131 
132 protected:
133  virtual std::string workerType() const = 0;
134  virtual bool implDoProcess(EventPrincipal&,
135  CurrentProcessingContext const* cpc,
136  CountingStatistics&) = 0;
137  virtual bool implDoBegin(RunPrincipal& rp,
138  CurrentProcessingContext const* cpc) = 0;
139  virtual bool implDoEnd(RunPrincipal& rp,
140  CurrentProcessingContext const* cpc) = 0;
141  virtual bool implDoBegin(SubRunPrincipal& srp,
142  CurrentProcessingContext const* cpc) = 0;
143  virtual bool implDoEnd(SubRunPrincipal& srp,
144  CurrentProcessingContext const* cpc) = 0;
145  virtual void implBeginJob() = 0;
146  virtual void implEndJob() = 0;
147 
148 private:
149  template <BranchActionType>
150  struct ImplDoWork;
151 
152  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
153  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
154  virtual void implRespondToOpenOutputFiles(FileBlock const& fb) = 0;
155  virtual void implRespondToCloseOutputFiles(FileBlock const& fb) = 0;
156 
159 
162  std::shared_ptr<art::Exception> cached_exception_{
163  nullptr}; // if state is 'exception'
165 };
166 
167 namespace art {
168  namespace detail {
169  template <typename T>
171  T const& ip,
172  cet::exception& ex);
173  }
174 
175  template <>
177  template <typename PRINCIPAL>
178  static bool
179  invoke(Worker* const w, PRINCIPAL& p, CurrentProcessingContext const* cpc)
180  {
181  return w->implDoBegin(p, cpc);
182  }
183  };
184 
185  template <>
187  template <typename PRINCIPAL>
188  static bool
189  invoke(Worker* const w, PRINCIPAL& p, CurrentProcessingContext const* cpc)
190  {
191  return w->implDoEnd(p, cpc);
192  }
193  };
194 
195  template <>
197  template <typename PRINCIPAL>
198  static bool
199  invoke(Worker* const w, PRINCIPAL& p, CurrentProcessingContext const* cpc)
200  {
201  return w->implDoProcess(p, cpc, w->counts_);
202  }
203  };
204 }
205 
206 template <typename T>
209  T const& ip,
210  cet::exception& iEx)
211 {
212  iEx << iMD.moduleName() << "/" << iMD.moduleLabel() << " " << ip.id() << "\n";
213  return iEx;
214 }
215 
216 template <typename T>
217 bool
218 art::Worker::doWork(typename T::MyPrincipal& p,
219  CurrentProcessingContext const* cpc)
220 {
222  counts.template increment<stats::Visited>();
223 
224  switch (state_) {
225  case Ready:
226  break;
227  case Pass:
228  return true;
229  case Fail:
230  return false;
231  case Exception: {
232  // Rethrow the cached exception again. It seems impossible to
233  // get here a second time unless a cet::exception has been
234  // thrown previously.
235  mf::LogWarning("repeat")
236  << "A module has been invoked a second time even though"
237  " it caught an exception during the previous invocation."
238  "\nThis may be an indication of a configuration problem.\n";
239  throw *cached_exception_;
240  }
241  case Working:
242  break; // See below.
243  }
244 
245  bool rc{false};
246  try {
247  if (state_ == Working) {
248  // Not part of the switch statement above because we want the
249  // exception to be caught by our handling mechanism.
251  << "A Module has been invoked while it is still being executed.\n"
252  << "Product dependencies have invoked a module execution cycle.\n";
253  }
254 
255  assert(actReg_.get() != nullptr);
256  state_ = Working;
257 
258  T::preModuleSignal(*actReg_, md_);
260  T::postModuleSignal(*actReg_, md_);
261 
262  state_ = Pass;
263 
264  if (T::level == Level::Event && !rc)
265  state_ = Fail;
266  }
267  catch (cet::exception& e) {
268 
269  // NOTE: the warning printed as a result of ignoring or failing a
270  // module will only be printed during the full true processing
271  // pass of this module.
272 
273  // Get the action corresponding to this exception. However, if
274  // processing something other than an event (e.g. run, subRun)
275  // always rethrow.
277  actions_.find(e.root_cause()) :
279 
280  // If we are processing an endPath, treat SkipEvent or FailPath as
281  // FailModule, so any subsequent OutputModules are still run.
282  if (cpc && cpc->isEndPath()) {
285  }
286 
287  switch (action) {
289  rc = true;
290  counts.template increment<stats::Passed>();
291  state_ = Pass;
292  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
293  << e.what();
294  break;
295  }
296  case actions::FailModule: {
297  rc = true;
298  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
299  << e.what();
300  counts.template increment<stats::Failed>();
301  state_ = Fail;
302  break;
303  }
304  default: {
305  // We should not need to include the event/run/module names in
306  // the exception because the error logger will pick this up
307  // automatically. I'm leaving it in until this is verified.
308 
309  // here we simply add a small amount of data to the exception to
310  // add some context, we could have rethrown it as something else
311  // and embedded with this exception as an argument to the
312  // constructor.
313  counts.template increment<stats::ExceptionThrown>();
314  state_ = Exception;
315  e << "cet::exception going through module ";
317  if (auto edmEx = dynamic_cast<art::Exception*>(&e)) {
318  cached_exception_ = std::make_shared<art::Exception>(*edmEx);
319  } else {
320  cached_exception_ = std::make_shared<art::Exception>(
322  }
323  throw;
324  }
325  }
326  }
327  catch (std::bad_alloc const& bda) {
328  counts.template increment<stats::ExceptionThrown>();
329  state_ = Exception;
330  cached_exception_ = std::make_shared<art::Exception>(errors::BadAlloc);
332  << "A std::bad_alloc exception occurred during a call to the module ";
333  detail::exceptionContext(md_, p, *cached_exception_)
334  << "The job has probably exhausted the virtual memory available to the "
335  "process.\n";
336  throw *cached_exception_;
337  }
338  catch (std::exception const& e) {
339  counts.template increment<stats::ExceptionThrown>();
340  state_ = Exception;
341  cached_exception_ = std::make_shared<art::Exception>(errors::StdException);
343  << "A std::exception occurred during a call to the module ";
344  detail::exceptionContext(md_, p, *cached_exception_)
345  << "and cannot be repropagated.\n"
346  << "Previous information:\n"
347  << e.what();
348  throw *cached_exception_;
349  }
350  catch (std::string const& s) {
351  counts.template increment<stats::ExceptionThrown>();
352  state_ = Exception;
354  std::make_shared<art::Exception>(errors::BadExceptionType, "std::string");
355  *cached_exception_ << "A std::string thrown as an exception occurred "
356  "during a call to the module ";
357  detail::exceptionContext(md_, p, *cached_exception_)
358  << "and cannot be repropagated.\n"
359  << "Previous information:\n string = " << s;
360  throw *cached_exception_;
361  }
362  catch (char const* c) {
363  counts.template increment<stats::ExceptionThrown>();
364  state_ = Exception;
365  cached_exception_ = std::make_shared<art::Exception>(
366  errors::BadExceptionType, "const char *");
367  *cached_exception_ << "A const char* thrown as an exception occurred "
368  "during a call to the module ";
369  detail::exceptionContext(md_, p, *cached_exception_)
370  << "and cannot be repropagated.\n"
371  << "Previous information:\n const char* = " << c << "\n";
372  throw *cached_exception_;
373  }
374  catch (...) {
375  counts.template increment<stats::ExceptionThrown>();
376  state_ = Exception;
378  std::make_shared<art::Exception>(errors::Unknown, "repeated");
380  << "An unknown occurred during a previous call to the module ";
381  detail::exceptionContext(md_, p, *cached_exception_)
382  << "and cannot be repropagated.\n";
383  throw *cached_exception_;
384  }
385 
386  return rc;
387 }
388 #endif /* art_Framework_Principal_Worker_h */
389 
390 // Local Variables:
391 // mode: c++
392 // End:
void setActivityRegistry(cet::exempt_ptr< ActivityRegistry > areg)
virtual ~Worker() noexcept=default
void endJob()
virtual bool implDoProcess(EventPrincipal &, CurrentProcessingContext const *cpc, CountingStatistics &)=0
void respondToOpenOutputFiles(FileBlock const &fb)
std::size_t timesVisited() const
Definition: Worker.h:99
void respondToCloseOutputFiles(FileBlock const &fb)
actions::ActionCodes find(std::string const &category) const
const char * p
Definition: xmltok.h:285
CountingStatistics counts_
Definition: Worker.h:157
std::size_t times() const
void beginJob()
::xsd::cxx::tree::exception< char > exception
Definition: Database.h:225
TString ip
Definition: loadincs.C:5
void reset()
Definition: Worker.h:68
ActionTable const & actions_
Definition: Worker.h:161
cet::exception & exceptionContext(ModuleDescription const &md, T const &ip, cet::exception &ex)
Definition: Worker.h:208
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
virtual void implBeginJob()=0
virtual void implRespondToCloseOutputFiles(FileBlock const &fb)=0
const XML_Char * s
Definition: expat.h:262
std::shared_ptr< art::Exception > cached_exception_
Definition: Worker.h:162
cet::exempt_ptr< ActivityRegistry > actReg_
Definition: Worker.h:164
virtual std::string workerType() const =0
std::string const & moduleName() const
ModuleDescription const & description() const
Definition: Worker.h:74
ModuleDescription md_
Definition: Worker.h:160
ModuleDescription const * descPtr() const
Definition: Worker.h:79
State state_
Definition: Worker.h:158
void respondToOpenInputFile(FileBlock const &fb)
std::string const & moduleLabel() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
State state() const
Definition: Worker.h:119
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
static bool invoke(Worker *const w, PRINCIPAL &p, CurrentProcessingContext const *cpc)
Definition: Worker.h:189
void clearCounters()
Definition: Worker.h:88
Worker(ModuleDescription const &iMD, WorkerParams const &iWP)
bool doWork(typename T::MyPrincipal &, CurrentProcessingContext const *cpc)
Definition: Worker.h:218
static bool invoke(Worker *const w, PRINCIPAL &p, CurrentProcessingContext const *cpc)
Definition: Worker.h:179
std::size_t timesExcept() const
Definition: Worker.h:114
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
std::size_t timesRun() const
Definition: Worker.h:94
virtual bool modifiesEvent() const =0
assert(nhit_max >=nhit_nbins)
virtual void implEndJob()=0
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
void respondToCloseInputFile(FileBlock const &fb)
double T
Definition: Xdiff_gwt.C:5
virtual bool implDoBegin(RunPrincipal &rp, CurrentProcessingContext const *cpc)=0
std::size_t timesPassed() const
Definition: Worker.h:104
Float_t e
Definition: plot.C:35
std::size_t timesFailed() const
Definition: Worker.h:109
std::string const & label() const
Definition: Worker.h:127
Float_t w
Definition: plot.C:20
virtual void implRespondToOpenOutputFiles(FileBlock const &fb)=0
static bool invoke(Worker *const w, PRINCIPAL &p, CurrentProcessingContext const *cpc)
Definition: Worker.h:199
virtual bool implDoEnd(RunPrincipal &rp, CurrentProcessingContext const *cpc)=0
enum BeamMode string