EndPathExecutor.h
Go to the documentation of this file.
1 #ifndef art_Framework_Core_EndPathExecutor_h
2 #define art_Framework_Core_EndPathExecutor_h
3 // ======================================================================
4 // EndPathExecutor
5 //
6 // Class to handle the execution of the end path. Invoked in all the
7 // right places by the event processor.
8 //
9 // The RangeSetHandlers manage the RangeSets that are to be assigned
10 // to (a) the (Sub)RunAuxiliaries and (b) the (Sub)Run products
11 // produced in the current process. Since all (Sub)Run
12 // products/auxiliaries produced in the current process are written to
13 // all output modules during write(Sub)Run, there is only one relevant
14 // RangeSet for the (Sub)Run at any given time. RangeSets
15 // corresponding to multiple (Sub)Run fragments are aggregated on
16 // input.
17 // ======================================================================
30 #include "cetlib/trim.h"
32 
33 #include <memory>
34 #include <vector>
35 
36 namespace art {
37  class EndPathExecutor;
38  class MasterProductRegistry;
39 }
40 
42 public:
44  ActionTable& actions,
45  ActivityRegistry& areg,
47  bool const parentageEnabled,
48  bool const rangesEnabled);
49 
50  template <typename T>
51  void process(typename T::MyPrincipal& principal);
52 
53  void beginJob();
54  void endJob();
55 
57  void writeSubRun(SubRunPrincipal& srp);
58  void writeRun(RunPrincipal& rp);
59 
60  void seedRunRangeSet(std::unique_ptr<RangeSetHandler>);
61  void seedSubRunRangeSet(std::unique_ptr<RangeSetHandler>);
62 
65 
66  void closeAllOutputFiles();
68 
69  void closeSomeOutputFiles();
70  void openSomeOutputFiles(FileBlock const& fb);
72 
73  void respondToOpenInputFile(FileBlock const& fb);
74  void respondToCloseInputFile(FileBlock const& fb);
75  void respondToOpenOutputFiles(FileBlock const& fb);
76  void respondToCloseOutputFiles(FileBlock const& fb);
77 
78  // Allow output files to close that need to
80  bool outputsToOpen() const;
81  bool outputsToClose() const;
82  bool someOutputsOpen() const;
84 
85  // Return whether a module has reached its maximum count.
86  bool terminate() const;
87 
88  // Call selectProducts() on all OutputModules.
89  void selectProducts(ProductList const&);
90 
91 private:
92  using OutputWorkers = std::vector<OutputWorker*>;
93  using OutputWorkerSet = std::set<OutputWorker*>;
94 
95  void resetAll();
96 
97  template <typename T>
98  void runEndPaths(typename T::MyPrincipal&);
99 
100  template <class F>
102  template <class F>
104 
111  std::vector<unsigned char> workersEnabled_;
112  std::vector<unsigned char> outputWorkersEnabled_;
114  std::unique_ptr<RangeSetHandler> runRangeSetHandler_{nullptr};
115  std::unique_ptr<RangeSetHandler> subRunRangeSetHandler_{nullptr};
116  bool const parentageEnabled_{true};
117  bool const rangesEnabled_{true};
118 };
119 
120 template <typename T>
121 void
122 art::EndPathExecutor::process(typename T::MyPrincipal& ep)
123 {
124  this->resetAll();
125 
126  if (T::level == Level::Event) {
128  }
129  try {
130  if (!endPathInfo_.pathPtrs().empty()) {
131  endPathInfo_.pathPtrs().front()->process<T>(ep);
132  }
133  }
134  catch (cet::exception& ex) {
135  actions::ActionCodes const action{T::level == Level::Event ?
136  act_table_->find(ex.root_cause()) :
138  switch (action) {
140  mf::LogWarning(ex.category())
141  << "exception being ignored for current event:\n"
142  << cet::trim_right_copy(ex.what(), " \n");
143  break;
144  }
145  default: {
146  throw art::Exception(errors::EventProcessorFailure, "EndPathExecutor:")
147  << "an exception occurred during current event processing\n"
148  << ex;
149  }
150  }
151  }
152  catch (...) {
153  mf::LogError("PassingThrough")
154  << "an exception occurred during current event processing\n";
155  throw;
156  }
158 }
159 
160 template <class F>
161 void
163 {
164  size_t index{0};
165  for (auto const& val : endPathInfo_.workers()) {
166  if (workersEnabled_[index++]) {
167  fcn(val.second.get());
168  }
169  }
170 }
171 
172 template <class F>
173 void
175 {
176  size_t index{0};
177  for (auto ow : outputWorkers_) {
178  if (outputWorkersEnabled_[index++]) {
179  fcn(ow);
180  }
181  }
182 }
183 
184 // Local Variables:
185 // mode: c++
186 // End:
187 
188 #endif /* art_Framework_Core_EndPathExecutor_h */
std::vector< unsigned char > workersEnabled_
void runEndPaths(typename T::MyPrincipal &)
#define F(x, y, z)
EndPathExecutor(PathManager &pm, ActionTable &actions, ActivityRegistry &areg, MasterProductRegistry &mpr, bool const parentageEnabled, bool const rangesEnabled)
bool outputsToClose() const
void writeEvent(EventPrincipal &ep)
ActionTable * act_table_
bool terminate() const
actions::ActionCodes find(std::string const &category) const
TF1 * fcn
Definition: warp_gsimple.C:35
void doForAllEnabledWorkers_(F f)
std::map< BranchKey, BranchDescription > ProductList
Definition: ProductList.h:15
::xsd::cxx::tree::exception< char > exception
Definition: Database.h:225
void writeSubRun(SubRunPrincipal &srp)
void addPass()
Definition: PathsInfo.h:81
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void respondToOpenInputFile(FileBlock const &fb)
void respondToOpenOutputFiles(FileBlock const &fb)
void setAuxiliaryRangeSetID(SubRunPrincipal &srp)
void seedSubRunRangeSet(std::unique_ptr< RangeSetHandler >)
std::string trim_right_copy(std::string source, std::string const &t=" ")
Definition: trim.h:54
void process(typename T::MyPrincipal &principal)
void recordOutputClosureRequests(Granularity)
void openAllOutputFiles(FileBlock &fb)
void respondToCloseOutputFiles(FileBlock const &fb)
void openSomeOutputFiles(FileBlock const &fb)
OutputFileStatus
WorkerMap const & workers() const
Definition: PathsInfo.h:87
void setOutputFileStatus(OutputFileStatus)
bool someOutputsOpen() const
std::set< OutputWorker * > OutputWorkerSet
OutputWorkers outputWorkers_
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
void writeRun(RunPrincipal &rp)
bool outputsToOpen() const
OutputWorkerSet outputWorkersToOpen_
void addEvent()
Definition: PathsInfo.h:75
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
std::vector< OutputWorker * > OutputWorkers
void selectProducts(ProductList const &)
void seedRunRangeSet(std::unique_ptr< RangeSetHandler >)
void respondToCloseInputFile(FileBlock const &fb)
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
ActivityRegistry & actReg_
std::vector< unsigned char > outputWorkersEnabled_
void doForAllEnabledOutputWorkers_(F f)
double T
Definition: Xdiff_gwt.C:5
OutputFileStatus fileStatus_
bool const parentageEnabled_
void incrementInputFileNumber()
OutputWorkerSet outputWorkersToClose_
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
PathPtrs const & pathPtrs() const
Definition: PathsInfo.h:93