RootOutput_module.cc
Go to the documentation of this file.
28 #include "fhiclcpp/ParameterSet.h"
29 #include "fhiclcpp/types/Atom.h"
32 #include "fhiclcpp/types/Table.h"
34 
35 #include <iomanip>
36 #include <memory>
37 #include <sstream>
38 #include <string>
39 #include <utility>
40 
41 using std::string;
42 
43 namespace {
44  std::string const dev_null{"/dev/null"};
45 }
46 
47 namespace art {
48  class RootOutput;
49  class RootOutputFile;
50 }
51 
52 class art::RootOutput final : public OutputModule {
53 public:
54  static constexpr char const* default_tmpDir{"<parent-path-of-filename>"};
55 
56  struct Config {
57 
58  using Name = fhicl::Name;
60  template <typename T>
62  template <typename T>
64 
66  Atom<std::string> catalog{Name("catalog"), ""};
68  Atom<bool> dropAllSubRuns{Name("dropAllSubRuns"), false};
71  Atom<int> compressionLevel{Name("compressionLevel"), 7};
72  Atom<int64_t> saveMemoryObjectThreshold{Name("saveMemoryObjectThreshold"),
73  -1l};
74  Atom<int64_t> treeMaxVirtualSize{Name("treeMaxVirtualSize"), -1};
75  Atom<int> splitLevel{Name("splitLevel"), 99};
76  Atom<int> basketSize{Name("basketSize"), 16384};
77  Atom<bool> dropMetaDataForDroppedData{Name("dropMetaDataForDroppedData"),
78  false};
79  Atom<std::string> dropMetaData{Name("dropMetaData"), "NONE"};
80  Atom<bool> writeParameterSets{Name("writeParameterSets"), true};
82  Name("enableLargeFileCatalogMetadata"),
83  true};
85  Name("fileProperties")};
86 
88  {
89  // Both RootOutput module and OutputModule use the "fileName"
90  // FHiCL parameter. However, whereas in OutputModule the
91  // parameter has a default, for RootOutput the parameter should
92  // not. We therefore have to change the default flag setting
93  // for 'OutputModule::Config::fileName'.
94  using namespace fhicl::detail;
95  ParameterBase* adjustFilename{
96  const_cast<fhicl::Atom<std::string>*>(&omConfig().fileName)};
98  }
99 
100  struct KeysToIgnore {
101  std::set<std::string>
102  operator()() const
103  {
104  std::set<std::string> keys{
106  keys.insert("results");
107  return keys;
108  }
109  };
110  };
111 
113 
114  explicit RootOutput(Parameters const&);
115 
116  void postSelectProducts() override;
117 
118  void beginJob() override;
119  void endJob() override;
120 
121  void event(EventPrincipal const&) override;
122 
123  void beginSubRun(SubRunPrincipal const&) override;
124  void endSubRun(SubRunPrincipal const&) override;
125 
126  void beginRun(RunPrincipal const&) override;
127  void endRun(RunPrincipal const&) override;
128 
129 private:
130  std::string fileNameAtOpen() const;
131  std::string fileNameAtClose(std::string const& currentFileName);
132  std::string const& lastClosedFileName() const override;
133  void openFile(FileBlock const&) override;
134  void respondToOpenInputFile(FileBlock const&) override;
135  void readResults(ResultsPrincipal const& resp) override;
136  void respondToCloseInputFile(FileBlock const&) override;
137  void incrementInputFileNumber() override;
138  Granularity fileGranularity() const override;
139  void write(EventPrincipal&) override;
140  void writeSubRun(SubRunPrincipal&) override;
141  void writeRun(RunPrincipal&) override;
142  void setSubRunAuxiliaryRangeSetID(RangeSet const&) override;
143  void setRunAuxiliaryRangeSetID(RangeSet const&) override;
144  bool isFileOpen() const override;
145  void setFileStatus(OutputFileStatus) override;
146  bool requestsToCloseFile() const override;
147  void doOpenFile();
148  void startEndFile() override;
149  void writeFileFormatVersion() override;
150  void writeFileIndex() override;
151  void writeEventHistory() override;
152  void writeProcessConfigurationRegistry() override;
153  void writeProcessHistoryRegistry() override;
154  void writeParameterSetRegistry() override;
155  void writeProductDescriptionRegistry() override;
156  void writeParentageRegistry() override;
159  FileCatalogMetadata::collection_type const& ssmd) override;
160  void writeProductDependencies() override;
161  void finishEndFile() override;
163  ProductDescriptions& producedProducts,
164  ModuleDescription const& md) override;
165 
166 private:
168  bool dropAllEvents_{false};
173  std::unique_ptr<RootOutputFile> rootOutputFile_{nullptr};
179 
180  // We keep this set of data members for the use of RootOutputFile.
181  int const compressionLevel_;
183  int64_t const treeMaxVirtualSize_;
184  int const splitLevel_;
185  int const basketSize_;
188 
189  // We keep this for the use of RootOutputFile and we also use it
190  // during file open to make some choices.
192 
193  // Set false only for cases where we are guaranteed never to need
194  // historical ParameterSet information in the downstream file
195  // (e.g. mixing).
198 
199  // ResultsProducer management.
202 };
203 
205  : OutputModule{config().omConfig, config.get_PSet()}
206  , catalog_{config().catalog()}
207  , dropAllSubRuns_{config().dropAllSubRuns()}
208  , moduleLabel_{config.get_PSet().get<string>("module_label")}
209  , enableLargeFileCatalogMetadata_{config().enableLargeFileCatalogMetadata()}
211  , filePattern_{config().omConfig().fileName()}
213  config().tmpDir()}
214  , compressionLevel_{config().compressionLevel()}
215  , saveMemoryObjectThreshold_{config().saveMemoryObjectThreshold()}
216  , treeMaxVirtualSize_{config().treeMaxVirtualSize()}
217  , splitLevel_{config().splitLevel()}
218  , basketSize_{config().basketSize()}
219  , dropMetaData_{config().dropMetaData()}
220  , dropMetaDataForDroppedData_{config().dropMetaDataForDroppedData()}
221  , writeParameterSets_{config().writeParameterSets()}
222  , fileProperties_{(
224  config.get_PSet().has_key(config().fileProperties.name()),
225  filePattern_), // comma operator!
226  config().fileProperties())}
227  , rpm_{config.get_PSet()}
228 {
229  bool const dropAllEventsSet{config().dropAllEvents(dropAllEvents_)};
232 
233  // N.B. Any time file switching is enabled at a boundary other than
234  // InputFile, fastCloningEnabled_ ***MUST*** be deactivated. This is
235  // to ensure that the Event tree from the InputFile is not
236  // accidentally cloned to the output file before the output
237  // module has seen the events that are going to be processed.
238  bool const fastCloningSet{config().fastCloning(fastCloningEnabled_)};
241  if (!writeParameterSets_) {
242  mf::LogWarning("PROVENANCE")
243  << "Output module " << moduleLabel_
244  << " has parameter writeParameterSets set to false.\n"
245  << "Parameter set provenance will not be available in subsequent jobs.\n"
246  << "Check your experiment's policy on this issue to avoid future "
247  "problems\n"
248  << "with analysis reproducibility.\n";
249  }
250 }
251 
252 void
254 {
255  // Note: The file block here refers to the currently open input
256  // file, so we can find out about the available products by
257  // looping over the branches of the input file data trees.
258  if (!isFileOpen()) {
259  doOpenFile();
261  }
262 }
263 
264 void
266 {
267  if (isFileOpen()) {
268  rootOutputFile_->selectProducts();
269  }
270 }
271 
272 void
274 {
275  ++inputFileCount_;
276  if (!isFileOpen())
277  return;
278 
279  auto const* rfb = dynamic_cast<RootFileBlock const*>(&fb);
280 
281  bool fastCloneThisOne = fastCloningEnabled_ && rfb &&
282  (rfb->tree() != nullptr) &&
283  ((remainingEvents() < 0) ||
284  (remainingEvents() >= rfb->tree()->GetEntries()));
285  if (fastCloningEnabled_ && !fastCloneThisOne) {
286  mf::LogWarning("FastCloning")
287  << "Fast cloning deactivated for this input file due to "
288  << "empty event tree and/or event limits.";
289  }
290  if (fastCloneThisOne && !rfb->fastClonable()) {
291  mf::LogWarning("FastCloning")
292  << "Fast cloning deactivated for this input file due to "
293  << "information in FileBlock.";
294  fastCloneThisOne = false;
295  }
296  rootOutputFile_->beginInputFile(rfb, fastCloneThisOne);
298 }
299 
300 void
302 {
303  rpm_.for_each_RPWorker([&resp](RPWorker& w) { w.rp().doReadResults(resp); });
304 }
305 
306 void
308 {
309  if (isFileOpen()) {
310  rootOutputFile_->respondToCloseInputFile(fb);
311  }
312 }
313 
314 void
316 {
317  if (dropAllEvents_) {
318  return;
319  }
321  ep.addToProcessHistory();
322  }
323  rootOutputFile_->writeOne(ep);
324  fstats_.recordEvent(ep.id());
325 }
326 
327 void
329 {
330  rootOutputFile_->setSubRunAuxiliaryRangeSetID(rs);
331 }
332 
333 void
335 {
336  if (dropAllSubRuns_) {
337  return;
338  }
340  sr.addToProcessHistory();
341  }
342  rootOutputFile_->writeSubRun(sr);
343  fstats_.recordSubRun(sr.id());
344 }
345 
346 void
348 {
349  rootOutputFile_->setRunAuxiliaryRangeSetID(rs);
350 }
351 
352 void
354 {
355  if (hasNewlyDroppedBranch()[InRun]) {
357  }
358  rootOutputFile_->writeRun(r);
359  fstats_.recordRun(r.id());
360 }
361 
362 void
364 {
365  auto resp = std::make_unique<ResultsPrincipal>(
367  resp->setProducedProducts(producedResultsProducts_);
368  if (ProductMetaData::instance().productProduced(InResults) ||
370  resp->addToProcessHistory();
371  }
373  [&resp](RPWorker& w) { w.rp().doWriteResults(*resp); });
374  rootOutputFile_->writeResults(*resp);
375 }
376 
377 void
379 {
380  rootOutputFile_->writeFileFormatVersion();
381 }
382 
383 void
385 {
386  rootOutputFile_->writeFileIndex();
387 }
388 
389 void
391 {
392  rootOutputFile_->writeEventHistory();
393 }
394 
395 void
397 {
398  rootOutputFile_->writeProcessConfigurationRegistry();
399 }
400 
401 void
403 {
404  rootOutputFile_->writeProcessHistoryRegistry();
405 }
406 
407 void
409 {
410  if (writeParameterSets_) {
411  rootOutputFile_->writeParameterSetRegistry();
412  }
413 }
414 
415 void
417 {
418  rootOutputFile_->writeProductDescriptionRegistry();
419 }
420 
421 void
423 {
424  rootOutputFile_->writeParentageRegistry();
425 }
426 
427 void
431 {
432  rootOutputFile_->writeFileCatalogMetadata(fstats_, md, ssmd);
433 }
434 
435 void
437 {
438  rootOutputFile_->writeProductDependencies();
439 }
440 
441 void
443 {
444  std::string const currentFileName{rootOutputFile_->currentFileName()};
445  rootOutputFile_->writeTTrees();
446  rootOutputFile_.reset();
448  lastClosedFileName_ = fileNameAtClose(currentFileName);
449  detail::logFileAction("Closed output file ", lastClosedFileName_);
451 }
452 
453 void
455  ProductDescriptions& producedProducts,
456  ModuleDescription const& md)
457 {
458  // Register Results products from ResultsProducers.
459  rpm_.for_each_RPWorker([&mpr, &producedProducts, &md](RPWorker& w) {
460  auto const& params = w.params();
462  ModuleDescription{params.rpPSetID,
463  params.rpPluginType,
464  md.moduleLabel() + '#' + params.rpLabel,
466  md.parentageEnabled(),
467  md.rangesEnabled(),
468  md.dbEnabled(),
470  w.rp().registerProducts(mpr, producedProducts, w.moduleDescription());
471  });
472 
473  // Form product table for Results products. We do this here so we
474  // can appropriately set the product tables for the
475  // ResultsPrincipal.
476  producedResultsProducts_ = ProductTable{producedProducts, InResults};
477 }
478 
479 void
481 {
482  if (isFileOpen())
483  rootOutputFile_->setFileStatus(ofs);
484 }
485 
486 bool
488 {
489  return rootOutputFile_.get() != nullptr;
490 }
491 
492 void
494 {
495  if (isFileOpen())
496  rootOutputFile_->incrementInputFileNumber();
497 }
498 
499 bool
501 {
502  return isFileOpen() ? rootOutputFile_->requestsToCloseFile() : false;
503 }
504 
507 {
508  return fileProperties_.granularity();
509 }
510 
511 void
513 {
514  if (inputFileCount_ == 0) {
516  << "Attempt to open output file before input file. "
517  << "Please report this to the core framework developers.\n";
518  }
520  std::make_unique<RootOutputFile>(this,
521  fileNameAtOpen(),
526  splitLevel_,
527  basketSize_,
533  description().dbEnabled());
535  detail::logFileAction("Opened output file with pattern ", filePattern_);
536 }
537 
538 string
540 {
541  return filePattern_ == dev_null ? dev_null :
542  unique_filename(tmpDir_ + "/RootOutput");
543 }
544 
545 string
547 {
548  return filePattern_ == dev_null ?
549  dev_null :
550  fRenamer_.maybeRenameFile(currentFileName, filePattern_);
551 }
552 
553 string const&
555 {
556  if (lastClosedFileName_.empty()) {
557  throw Exception(errors::LogicError, "RootOutput::currentFileName(): ")
558  << "called before meaningful.\n";
559  }
560  return lastClosedFileName_;
561 }
562 
563 void
565 {
567 }
568 
569 void
571 {
574 }
575 
576 void
578 {
579  rpm_.for_each_RPWorker([&ep](RPWorker& w) { w.rp().doEvent(ep); });
580 }
581 
582 void
584 {
585  rpm_.for_each_RPWorker([&srp](RPWorker& w) {
587  w.rp().doBeginSubRun(srp);
588  });
589 }
590 
591 void
593 {
594  rpm_.for_each_RPWorker([&srp](RPWorker& w) { w.rp().doEndSubRun(srp); });
595 }
596 
597 void
599 {
600  rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doBeginRun(rp); });
601 }
602 
603 void
605 {
606  rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doEndRun(rp); });
607 }
608 
610 
611 // vim: set sw=2:
void writeParentageRegistry() override
keys
Reco plots.
Definition: caf_analysis.py:46
std::vector< std::pair< std::string, std::string >> collection_type
void respondToCloseInputFile(FileBlock const &) override
Atom< bool > enableLargeFileCatalogMetadata
void startEndFile() override
static constexpr ModuleDescriptionID invalidID()
bool parentageEnabled() const
std::string const filePattern_
bool shouldDropEvents(bool dropAllEventsSet, bool dropAllEvents, bool dropAllSubRuns)
void writeProductDescriptionRegistry() override
void endSubRun(SubRunPrincipal const &) override
void doEvent(EventPrincipal const &)
auto const & get_PSet() const
void doBeginSubRun(SubRunPrincipal const &)
Atom< bool > dropMetaDataForDroppedData
static cet::exempt_ptr< Consumer > non_module_context()
void recordRun(RunID const &id)
ClosingCriteria fileProperties_
std::string fileNameAtOpen() const
void doWriteFileCatalogMetadata(FileCatalogMetadata::collection_type const &md, FileCatalogMetadata::collection_type const &ssmd) override
void doWriteResults(ResultsPrincipal &)
ModuleDescription const & moduleDescription() const
Definition: RPWorker.h:65
std::string const & fileName() const
Definition: FileBlock.h:38
FileStatsCollector fstats_
void writeFileIndex() override
void registerProducts(MasterProductRegistry &mpr, ProductDescriptions &producedProducts, ModuleDescription const &md)
PostCloseFileRenamer fRenamer_
std::string const moduleLabel_
void doReadResults(ResultsPrincipal const &)
void validateFileNamePattern(bool do_check, std::string const &pattern)
void logFileAction(const char *msg, std::string const &file)
void writeFileFormatVersion() override
void setFileStatus(OutputFileStatus) override
void recordEvent(EventID const &id)
static constexpr char const * default_tmpDir
Definition: config.py:1
void addToProcessHistory()
std::vector< BranchDescription > ProductDescriptions
bool isFileOpen() const override
void openFile(FileBlock const &) override
void writeProcessConfigurationRegistry() override
void doEndRun(RunPrincipal const &)
DEFINE_ART_MODULE(TestTMapFile)
void incrementInputFileNumber() override
void for_each_RPWorker(on_rpworker_t wfunc)
Definition: RPManager.h:75
int64_t const treeMaxVirtualSize_
std::string lastClosedFileName_
OptionalAtom< bool > fastCloning
std::string unique_filename(std::string stem, std::string extension=".root")
void doBeginRun(RunPrincipal const &)
void doRegisterProducts(MasterProductRegistry &mpr, ProductDescriptions &producedProducts, ModuleDescription const &md) override
void writeParameterSetRegistry() override
OutputFileStatus
Atom< int64_t > saveMemoryObjectThreshold
SubRunID id() const
DropMetaData dropMetaData_
void doEndSubRun(SubRunPrincipal const &)
bool requestsToCloseFile() const override
static ProductMetaData const & instance()
ProductTable producedResultsProducts_
auto granularity() const
void write(EventPrincipal &) override
std::unique_ptr< RootOutputFile > rootOutputFile_
std::string const & lastClosedFileName() const override
void setModuleDescription(ModuleDescription const &)
Definition: RPWorker.h:71
void recordSubRun(SubRunID const &id)
RootOutput(Parameters const &)
int const compressionLevel_
fhicl::TableFragment< art::OutputModule::Config > omConfig
caf::StandardRecord * sr
Atom< std::string > tmpDir
void recordInputFile(std::string const &inputFileName)
void writeProductDependencies() override
std::array< bool, NumBranchTypes > const & hasNewlyDroppedBranch() const
Definition: OutputModule.h:340
void writeSubRun(SubRunPrincipal &) override
ModuleDescription const & description() const
Definition: OutputModule.h:309
void beginRun(RunPrincipal const &) override
Atom< std::string > catalog
int64_t const saveMemoryObjectThreshold_
void invoke(invoke_function_t< void, ARGS... > mfunc, ARGS &&...args)
Definition: RPManager.h:65
std::string const & moduleLabel() const
int remainingEvents() const
Definition: OutputModule.h:321
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void beginJob() override
void setRunAuxiliaryRangeSetID(RangeSet const &) override
void respondToOpenInputFile(FileBlock const &) override
ResultsProducer & rp()
Definition: RPWorker.h:47
::xsd::cxx::tree::string< char, simple_type > string
Definition: Database.h:154
std::set< std::string > operator()() const
std::string parent_path(std::string const &path)
RunID const & id() const
Definition: RunPrincipal.h:48
void readResults(ResultsPrincipal const &resp) override
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void setSubRunAuxiliaryRangeSetID(RangeSet const &) override
std::string fileNameAtClose(std::string const &currentFileName)
void endRun(RunPrincipal const &) override
void showMissingConsumes() const
std::string maybeRenameFile(std::string const &inPath, std::string const &toPattern)
TRandom3 r(0)
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
void endJob() override
EventID const & id() const
Granularity fileGranularity() const override
Atom< int64_t > treeMaxVirtualSize
void beginSubRun(SubRunPrincipal const &) override
void writeProcessHistoryRegistry() override
void postSelectProducts() override
void writeRun(RunPrincipal &) override
fhicl::Table< ClosingCriteria::Config > fileProperties
void writeEventHistory() override
Float_t w
Definition: plot.C:20
bool shouldFastClone(bool fastCloningSet, bool fastCloning, bool wantAllEvents, ClosingCriteria const &fileProperties)
std::string const catalog_
std::string const & processName() const
RPParams const & params() const
Definition: RPWorker.h:59
void set_par_style(par_style const vt)
OptionalAtom< bool > dropAllEvents
void event(EventPrincipal const &) override
ProcessConfiguration const & processConfiguration() const
Atom< std::string > dropMetaData
bool enableLargeFileCatalogMetadata_
void finishEndFile() override