Worker.h
Go to the documentation of this file.
1 #ifndef art_Framework_Principal_Worker_h
2 #define art_Framework_Principal_Worker_h
3 // vim: set sw=2 expandtab :
4 
5 // ======================================================================
6 // Worker: this is a basic scheduling unit - an abstract base class to
7 // something that is really a producer or filter.
8 //
9 // A worker will not actually call through to the module unless it is
10 // in a Ready state. After a module is actually run, the state will
11 // not be Ready. The Ready state can only be reestablished by doing a
12 // reset().
13 //
14 // Pre/post module signals are posted only in the Ready state.
15 //
16 // Execution statistics are kept here.
17 //
18 // If a module has thrown an exception during execution, that
19 // exception will be rethrown if the worker is entered again and the
20 // state is not Ready. In other words, execution results (status) are
21 // cached and reused until the worker is reset().
22 // ======================================================================
23 
36 #include "cetlib_except/exception.h"
37 #include "fhiclcpp/ParameterSet.h"
38 #include "hep_concurrency/SerialTaskQueueChain.h"
39 #include "hep_concurrency/WaitingTask.h"
40 #include "hep_concurrency/WaitingTaskList.h"
42 
43 #include <atomic>
44 #include <cassert>
45 #include <exception>
46 #include <iosfwd>
47 #include <memory>
48 #include <utility>
49 #include <vector>
50 
51 namespace art {
52 
53  class ActivityRegistry;
54  class ModuleContext;
55  class FileBlock;
56  class RunPrincipal;
57  class SubRunPrincipal;
58  class EventPrincipal;
59 
60  class Worker {
61 
62  friend class RunWorkerFunctor;
63 
64  public: // TYPES
65  enum State : int {
66  Ready = 0,
67  Pass = 1,
68  Fail = 2,
69  Working = 3,
71  };
72 
73  public: // MEMBER FUNCTIONS -- Special Member Functions
74  virtual ~Worker() noexcept;
76  Worker(Worker const&) = delete;
77  Worker(Worker&) = delete;
78 
79  public: // MEMBER FUNCTIONS -- API exposed to EventProcessor, Schedule,
80  // and EndPathExecutor
81  void beginJob();
82  void endJob();
83  void respondToOpenInputFile(FileBlock const& fb);
84  void respondToCloseInputFile(FileBlock const& fb);
85  void respondToOpenOutputFiles(FileBlock const& fb);
86  void respondToCloseOutputFiles(FileBlock const& fb);
87  bool doWork(Transition, Principal&, ModuleContext const&);
88 
89  void doWork_event(hep::concurrency::WaitingTask* workerInPathDoneTask,
91  ModuleContext const&);
92 
93  // This is used to do trigger results insertion,
94  // and to run workers on the end path.
95  void doWork_event(EventPrincipal&, ModuleContext const&);
96 
98  scheduleID() const
99  {
100  return scheduleID_;
101  }
102  ModuleDescription const& description() const;
103  ModuleDescription const* descPtr() const;
104  std::string const& label() const;
105 
106  // Used only by WorkerInPath.
107  bool returnCode() const;
108 
109  hep::concurrency::SerialTaskQueueChain* serialTaskQueueChain() const;
110 
111  // Used by EventProcessor
112  // Used by Schedule
113  // Used by EndPathExecutor
114  void reset();
115 
116  // Used only by writeSummary
117  std::size_t timesVisited() const;
118  std::size_t timesRun() const;
119  std::size_t timesPassed() const;
120  std::size_t timesFailed() const;
121  std::size_t timesExcept() const;
122 
123  public: // Tasking Structure
124  void runWorker(EventPrincipal&, ModuleContext const&);
125 
126  protected: // MEMBER FUNCTIONS -- API implementation classes must provide
127  virtual std::string workerType() const = 0;
128  virtual hep::concurrency::SerialTaskQueueChain* implSerialTaskQueueChain()
129  const = 0;
130  virtual void implBeginJob() = 0;
131  virtual void implEndJob() = 0;
132  virtual bool implDoBegin(RunPrincipal& rp, ModuleContext const& mc) = 0;
133  virtual bool implDoEnd(RunPrincipal& rp, ModuleContext const& mc) = 0;
134  virtual bool implDoBegin(SubRunPrincipal& srp, ModuleContext const& mc) = 0;
135  virtual bool implDoEnd(SubRunPrincipal& srp, ModuleContext const& mc) = 0;
136  virtual bool implDoProcess(EventPrincipal&, ModuleContext const&) = 0;
137 
138  private: // MEMBER FUNCTIONS -- API implementation classes must use
139  // to provide their API to us
140  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
141  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
142  virtual void implRespondToOpenOutputFiles(FileBlock const& fb) = 0;
143  virtual void implRespondToCloseOutputFiles(FileBlock const& fb) = 0;
144 
145  private: // MEMBER DATA
147  std::atomic<ModuleDescription const*> md_;
148  std::atomic<ActionTable const*> actions_;
149  std::atomic<ActivityRegistry const*> actReg_;
150  std::atomic<int> state_;
151 
152  // if state is 'exception'
153  // Note: threading: There is no accessor for this data,
154  // the only way it is ever used is from the doWork*
155  // functions. Right now event processing only sets it,
156  // but run and subrun processing reads it. It is not
157  // clear that event processing needs this anymore,
158  // and if we go to multiple runs and subruns in flight,
159  // they may not need it anymore as well. For now, leave
160  // this, is not thread safe.
161  std::atomic<std::exception_ptr*> cached_exception_;
162 
163  std::atomic<bool> workStarted_;
164  std::atomic<bool> returnCode_;
165 
166  // Holds the waiting workerInPathDone tasks. Note: For shared
167  // modules the workers are shared. For replicated modules each
168  // schedule has its own private worker copies (the whole reason
169  // schedules exist!).
170  std::atomic<hep::concurrency::WaitingTaskList*> waitingTasks_;
171 
172  protected: // MEMBER DATA -- counts
173  std::atomic<std::size_t> counts_visited_;
174  std::atomic<std::size_t> counts_run_;
175  std::atomic<std::size_t> counts_passed_;
176  std::atomic<std::size_t> counts_failed_;
177  std::atomic<std::size_t> counts_thrown_;
178  };
179 
180 } // namespace art
181 
182 #endif /* art_Framework_Principal_Worker_h */
183 
184 // Local Variables:
185 // mode: c++
186 // End:
void endJob()
void respondToOpenOutputFiles(FileBlock const &fb)
void doWork_event(hep::concurrency::WaitingTask *workerInPathDoneTask, EventPrincipal &, ModuleContext const &)
std::size_t timesVisited() const
void respondToCloseOutputFiles(FileBlock const &fb)
std::atomic< ModuleDescription const * > md_
Definition: Worker.h:147
bool doWork(Transition, Principal &, ModuleContext const &)
std::atomic< ActionTable const * > actions_
Definition: Worker.h:148
virtual ~Worker() noexcept
void beginJob()
std::atomic< ActivityRegistry const * > actReg_
Definition: Worker.h:149
virtual hep::concurrency::SerialTaskQueueChain * implSerialTaskQueueChain() const =0
void reset()
std::atomic< hep::concurrency::WaitingTaskList * > waitingTasks_
Definition: Worker.h:170
std::atomic< std::size_t > counts_visited_
Definition: Worker.h:173
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
std::atomic< std::exception_ptr * > cached_exception_
Definition: Worker.h:161
std::atomic< int > state_
Definition: Worker.h:150
virtual void implBeginJob()=0
virtual void implRespondToCloseOutputFiles(FileBlock const &fb)=0
hep::concurrency::SerialTaskQueueChain * serialTaskQueueChain() const
virtual bool implDoEnd(RunPrincipal &rp, ModuleContext const &mc)=0
Transition
Definition: Transition.h:7
virtual std::string workerType() const =0
ScheduleID scheduleID() const
Definition: Worker.h:98
ModuleDescription const & description() const
void runWorker(EventPrincipal &, ModuleContext const &)
virtual bool implDoProcess(EventPrincipal &, ModuleContext const &)=0
ModuleDescription const * descPtr() const
ScheduleID const scheduleID_
Definition: Worker.h:146
void respondToOpenInputFile(FileBlock const &fb)
std::atomic< bool > returnCode_
Definition: Worker.h:164
std::atomic< std::size_t > counts_run_
Definition: Worker.h:174
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:177
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:175
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:176
std::size_t timesExcept() const
std::size_t timesRun() const
virtual void implEndJob()=0
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
void respondToCloseInputFile(FileBlock const &fb)
friend class RunWorkerFunctor
Definition: Worker.h:62
virtual bool implDoBegin(RunPrincipal &rp, ModuleContext const &mc)=0
std::size_t timesPassed() const
std::size_t timesFailed() const
std::string const & label() const
bool returnCode() const
virtual void implRespondToOpenOutputFiles(FileBlock const &fb)=0
std::atomic< bool > workStarted_
Definition: Worker.h:163
enum BeamMode string