Schedule.h
Go to the documentation of this file.
1 #ifndef art_Framework_Core_Schedule_h
2 #define art_Framework_Core_Schedule_h
3 // vim: set sw=2 expandtab :
4 
5 // ======================================================================
6 // Schedule
7 //
8 // A schedule is a sequence of trigger paths. After construction, events
9 // can be fed to the object and passed through all the modules in the
10 // schedule. All accounting about processing of events by modules and
11 // paths is contained here or in object held by containment.
12 //
13 // The trigger results producer is generated and managed here. This
14 // class also manages calls to endjob and beginjob.
15 //
16 // A TriggerResults object will always be inserted into the event for
17 // any schedule. The producer of the TriggerResults EDProduct is always
18 // the last module in the trigger path. The TriggerResultInserter is
19 // given a fixed label of "TriggerResults".
20 //
21 // Processing of an event happens by pushing the event through the
22 // Paths. The scheduler performs the reset() on each of the workers
23 // independent of the Path objects.
24 // ======================================================================
25 
42 #include "cetlib/trim.h"
43 #include "fhiclcpp/ParameterSet.h"
44 #include "hep_concurrency/WaitingTask.h"
46 
47 #include <atomic>
48 #include <functional>
49 #include <map>
50 #include <memory>
51 #include <set>
52 #include <string>
53 #include <utility>
54 #include <vector>
55 
56 namespace art {
57  class ActivityRegistry;
58  class UpdateOutputCallbacks;
59  class Schedule {
60  public: // Special Member Functions
61  ~Schedule() noexcept;
63  PathManager&,
64  std::string const& processName,
65  fhicl::ParameterSet const& proc_pset,
66  fhicl::ParameterSet const& trig_pset,
69  ActionTable const&,
70  ActivityRegistry const&);
71 
72  Schedule(Schedule const&) = delete;
73  Schedule(Schedule&&) = delete;
74  Schedule& operator=(Schedule const&) = delete;
75  Schedule& operator=(Schedule&&) = delete;
76 
77  public: // API presented to EventProcessor
79  void process_event(hep::concurrency::WaitingTask* endPathTask,
80  tbb::task* eventLoopTask,
82  void beginJob();
83  void endJob();
84  void respondToOpenInputFile(FileBlock const&);
88 
89  // Tasking Structure
90  void pathsDoneTask(hep::concurrency::WaitingTask* endPathTask,
91  tbb::task* eventLoopTask,
93  std::exception_ptr const*);
94 
95  // Implementation details.
96  void process_event_pathsDone(hep::concurrency::WaitingTask* endPathTask,
97  tbb::task* eventLoopTask,
99 
100  private:
101  bool skipNonReplicated_(Worker const&);
102 
103  // const after ctor.
105  std::atomic<ActionTable const*> actionTable_;
106  std::atomic<ActivityRegistry const*> actReg_;
107  std::atomic<PathsInfo*> triggerPathsInfo_;
108  std::atomic<Worker*> results_inserter_;
109 
110  // Dynamic: cause an error if more than one thread processes an
111  // event.
112  std::atomic<int> runningWorkerCnt_;
113  };
114 } // namespace art
115 
116 // Local Variables:
117 // mode: c++
118 // End:
119 
120 #endif /* art_Framework_Core_Schedule_h */
void respondToCloseOutputFiles(FileBlock const &)
void respondToCloseInputFile(FileBlock const &)
~Schedule() noexcept
std::atomic< PathsInfo * > triggerPathsInfo_
Definition: Schedule.h:107
void process(Transition, Principal &)
std::vector< BranchDescription > ProductDescriptions
ScheduleContext const sc_
Definition: Schedule.h:104
void pathsDoneTask(hep::concurrency::WaitingTask *endPathTask, tbb::task *eventLoopTask, EventPrincipal &, std::exception_ptr const *)
Transition
Definition: Transition.h:7
std::atomic< Worker * > results_inserter_
Definition: Schedule.h:108
void respondToOpenOutputFiles(FileBlock const &)
std::atomic< int > runningWorkerCnt_
Definition: Schedule.h:112
std::atomic< ActionTable const * > actionTable_
Definition: Schedule.h:105
std::atomic< ActivityRegistry const * > actReg_
Definition: Schedule.h:106
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
void process_event_pathsDone(hep::concurrency::WaitingTask *endPathTask, tbb::task *eventLoopTask, EventPrincipal &)
void respondToOpenInputFile(FileBlock const &)
bool skipNonReplicated_(Worker const &)
Schedule(ScheduleID, PathManager &, std::string const &processName, fhicl::ParameterSet const &proc_pset, fhicl::ParameterSet const &trig_pset, UpdateOutputCallbacks &, ProductDescriptions &, ActionTable const &, ActivityRegistry const &)
void process_event(hep::concurrency::WaitingTask *endPathTask, tbb::task *eventLoopTask, EventPrincipal &)
enum BeamMode string
Schedule & operator=(Schedule const &)=delete