EndPathExecutor.h
Go to the documentation of this file.
1 #ifndef art_Framework_Core_EndPathExecutor_h
2 #define art_Framework_Core_EndPathExecutor_h
3 // vim: set sw=2 expandtab :
4 
5 // =========================================================================
6 // Class to handle the execution of the end path. Invoked in all the
7 // right places by the event processor.
8 //
9 // The RangeSetHandlers manage the RangeSets that are to be assigned
10 // to (a) the (Sub)RunAuxiliaries and (b) the (Sub)Run products
11 // produced in the current process. Since all (Sub)Run
12 // products/auxiliaries produced in the current process are written to
13 // all output modules during write(Sub)Run, there is only one relevant
14 // RangeSet for the (Sub)Run at any given time. RangeSets
15 // corresponding to multiple (Sub)Run fragments are aggregated on
16 // input.
17 // =========================================================================
18 
36 #include "cetlib/trim.h"
37 #include "hep_concurrency/RecursiveMutex.h"
38 #include "hep_concurrency/WaitingTask.h"
40 
41 #include <memory>
42 #include <mutex>
43 #include <vector>
44 
45 namespace art {
46  class EventProcessor;
48  friend class EventProcessor;
49 
50  public:
51  // Special Member Functions
54  PathManager& pm,
55  ActionTable const& actions,
56  ActivityRegistry const& areg,
58 
59  EndPathExecutor(EndPathExecutor&&) = delete;
61  EndPathExecutor(EndPathExecutor const&) = delete;
62  EndPathExecutor& operator=(EndPathExecutor const&) = delete;
63 
64  void beginJob();
65  void endJob();
66 
67  // Input File Open/Close.
68  void selectProducts(ProductTables const&);
69  void respondToOpenInputFile(FileBlock const& fb);
73  bool someOutputsOpen() const;
74  void closeAllOutputFiles();
75 
76  void seedRunRangeSet(RangeSetHandler const&);
77  void setRunAuxiliaryRangeSetID(RangeSet const& rs);
78  void writeRun(RunPrincipal& rp);
79 
81  void setSubRunAuxiliaryRangeSetID(RangeSet const& rs);
82  void writeSubRun(SubRunPrincipal& srp);
83 
84  // Process Run/SubRun
86 
87  // Process Event
88  //
89  // Used to make sure only one event is being processed at a time.
90  // The schedules take turns having their events processed on a
91  // first-come first-served basis (FIFO).
92  template <typename T>
93  void push(const T& func);
96 
97  // Output File Switching API
98  //
99  // Called by EventProcessor::closeSomeOutputFiles(), which is called when
100  // output file switching is happening. Note: This is really returns
101  // !outputWorkersToClose_.empty()
102  bool outputsToClose() const;
103  // MT note: This is where we need to get all the schedules
104  // synchronized, and then have all schedules do the file
105  // close, and then the file open, then the schedules can
106  // proceed. A nasty complication is that a great deal of
107  // time can go by between the file close and the file
108  // open because artdaq may pause the run in between, and
109  // wants to have all output files closed while the run is
110  // paused. They probably want the input file closed too.
111  void closeSomeOutputFiles();
112  // Note: This really just returns !outputWorkersToOpen_.empty()
113  bool outputsToOpen() const;
114  void openSomeOutputFiles(FileBlock const& fb);
115  // Note: When we are passed OutputFileStatus::Switching, we must close
116  // the file and call openSomeOutputFiles which changes it back
117  // to OutputFileStatus::Open.
118  // A side effect of switching status is the run/subrun/event writes
119  // are not counted in the overall counting by
120  // RootOutputClosingCriteria. However, they are still counted by the
121  // individual counters.
123  // Note: What this is really used for is to push workers into
124  // the outputWorkersToClose_ data member.
127  // Return whether or not all of the output workers have
128  // reached their maximum limit of work to do.
129  bool allAtLimit() const;
130 
131  private:
132  // Protects runRangeSetHandler_, and subRunRangeSetHandler_.
133  mutable hep::concurrency::RecursiveMutex mutex_{"EndPathExecutor::mutex_"};
134  // Filled by ctor, const after that.
136  std::atomic<ActionTable const*> actionTable_;
137  std::atomic<ActivityRegistry const*> actReg_;
138  std::atomic<PathsInfo*> endPathInfo_;
139  // Dynamic, cause an error if more than one thread processes an event.
140  std::atomic<int> runningWorkerCnt_;
141  // Filled by ctor, const after that.
142  std::atomic<std::vector<OutputWorker*>*> outputWorkers_;
143  // Dynamic, updated by run processing.
144  std::atomic<RangeSetHandler*> runRangeSetHandler_;
145  // Dynamic, updated by subrun processing.
146  std::atomic<PerScheduleContainer<RangeSetHandler*>*> subRunRangeSetHandler_;
147 
148  // Output File Switching
149  std::atomic<OutputFileStatus> fileStatus_;
150  std::atomic<std::set<OutputWorker*>*> outputWorkersToOpen_;
151  // Note: During an output file switch, after the closes happen, the entire
152  // contents of this is moved to outputWorkersToOpen_.
153  // FIXME: The move to outputWorkersToOpen_ is not really necessary, a flag
154  // is all we need, something that says whether we should close or open what
155  // is in the list. Basically EventProcessor uses recordOutputClosureRequests
156  // to populate the list, then uses the list to do closes, then uses the same
157  // list to do opens, then clears the list.
158  std::atomic<std::set<OutputWorker*>*> outputWorkersToClose_;
159  };
160 } // namespace art
161 
162 // Local Variables:
163 // mode: c++
164 // End:
165 
166 #endif /* art_Framework_Core_EndPathExecutor_h */
EndPathExecutor & operator=(EndPathExecutor &&)=delete
std::atomic< std::set< OutputWorker * > * > outputWorkersToClose_
bool outputsToClose() const
bool allAtLimit() const
void seedRunRangeSet(RangeSetHandler const &)
void process_event(EventPrincipal &)
std::atomic< std::vector< OutputWorker * > * > outputWorkers_
void push(const T &func)
void writeSubRun(SubRunPrincipal &srp)
std::atomic< PathsInfo * > endPathInfo_
void respondToOpenInputFile(FileBlock const &fb)
std::atomic< PerScheduleContainer< RangeSetHandler * > * > subRunRangeSetHandler_
void respondToOpenOutputFiles(FileBlock const &fb)
void selectProducts(ProductTables const &)
void recordOutputClosureRequests(Granularity)
void respondToCloseOutputFiles(FileBlock const &fb)
void openSomeOutputFiles(FileBlock const &fb)
OutputFileStatus
std::atomic< std::set< OutputWorker * > * > outputWorkersToOpen_
Transition
Definition: Transition.h:7
void setOutputFileStatus(OutputFileStatus)
def callbacks(model_name, group, tensorboard=True)
Definition: regression.py:123
std::atomic< OutputFileStatus > fileStatus_
hep::concurrency::RecursiveMutex mutex_
void seedSubRunRangeSet(RangeSetHandler const &)
bool someOutputsOpen() const
ScheduleContext const sc_
double func(double x, double y)
void writeRun(RunPrincipal &rp)
std::atomic< ActivityRegistry const * > actReg_
bool outputsToOpen() const
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
EndPathExecutor(ScheduleID sid, PathManager &pm, ActionTable const &actions, ActivityRegistry const &areg, UpdateOutputCallbacks &callbacks)
std::atomic< int > runningWorkerCnt_
std::atomic< ActionTable const * > actionTable_
void respondToCloseInputFile(FileBlock const &fb)
std::atomic< RangeSetHandler * > runRangeSetHandler_
void writeEvent(EventPrincipal &)
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
void process(Transition, Principal &)
double T
Definition: Xdiff_gwt.C:5
void incrementInputFileNumber()