Source.h
Go to the documentation of this file.
1 #ifndef art_Framework_IO_Sources_Source_h
2 #define art_Framework_IO_Sources_Source_h
3 
4 // ======================================================================
5 //
6 // The Source class template is used to create InputSources which
7 // are capable of reading Runs, SubRuns and Events from non-standard
8 // input files. Sources instantiated from Source are *not* random
9 // access sources.
10 //
11 // The Source class template requires the use of a type T as its
12 // template parameter, satisfying the conditions outlined below. In
13 // one's XXX_module.cc class one must provide a typedef and module macro
14 // call along the lines of:
15 //
16 // namespace arttest {
17 // typedef art::Source<GeneratorTestDetail> GeneratorTest;
18 // }
19 //
20 // DEFINE_ART_INPUT_SOURCE(arttest::GeneratorTest)
21 //
22 // However, there are several "flavors" of InputSource possible using
23 // this template, and one may wish to specify them using the "type
24 // traits" found in SourceTraits.h. Type traits are simple class
25 // templates that are used to signal properties of the classes used as
26 // their template arguments. Specialization is common. There are many
27 // examples of type traits in the standard, such as std::is_const<T> or
28 // std::is_integral<T>. Any traits you wish to specialize must be
29 // defined *after* the definition of your detail class T, but *before*
30 // the typedef above which will attempt to instantiate them. See
31 // SourceTraits.h for descriptions of the different traits one might
32 // wish to apply.
33 //
34 // The type T must supply the following non-static member functions:
35 //
36 // * Construct an object of type T. The ParameterSet provided will be
37 // that constructed by the 'source' statement in the job
38 // configuration file. The ProductRegistryHelper must be used to
39 // register products to be reconstituted by this source.
40 //
41 // T(fhicl::ParameterSet const&,
42 // art::ProductRegistryHelper&,
43 // art::SourceHelper const&);
44 //
45 // * Open the file of the given name, returning a new fileblock in
46 // fb. If readFile is unable to return a valid FileBlock it should
47 // throw. Suggestions for suitable exceptions are:
48 // art::Exception(art::errors::FileOpenError) or
49 // art::Exception(art::errors::FileReadError).
50 //
51 // void readFile(std::string const& filename,
52 // art::FileBlock*& fb);
53 //
54 // * Read the next part of the current file. Return false if nothing
55 // was read; return true and set the appropriate 'out' arguments if
56 // something was read.
57 //
58 // bool readNext(art::RunPrincipal const* const inR,
59 // art::SubRunPrincipal const* const inSR,
60 // art::RunPrincipal*& outR,
61 // art::SubRunPrincipal*& outSR,
62 // art::EventPrincipal*& outE);
63 //
64 // * After readNext has returned false, the behavior differs
65 // depending on whether Source_Generator<XXX>::value is true or
66 // false. If false (the default), then readFile(...) will be called
67 // provided there is an unused string remaining in
68 // source.fileNames. If true, then the source will finish unless
69 // there exists an *optional* function:
70 //
71 // bool hasMoreData(); // or
72 //
73 // bool hasMoreData() const;
74 //
75 // which returns true.
76 //
77 // * Close the current input file.
78 //
79 // void closeCurrentFile();
80 //
81 // ======================================================================
82 
100 #include "cetlib/exempt_ptr.h"
101 #include "cetlib/metaprogramming.h"
102 #include "fhiclcpp/ParameterSet.h"
103 
104 #include <algorithm>
105 #include <memory>
106 #include <type_traits>
107 
108 // ----------------------------------------------------------------------
109 
110 namespace art {
111 
112  template <class T>
113  class Source;
114 
115  namespace detail {
116  // Template metaprogramming.
118 
119  template <typename T, typename = void>
120  struct has_hasMoreData : std::false_type {};
121 
122  template <typename T>
124  T,
125  enable_if_function_exists_t<bool (T::*)(), &T::hasMoreData>>
126  : std::true_type {};
127 
128  template <typename T>
130  T,
131  enable_if_function_exists_t<bool (T::*)() const, &T::hasMoreData>>
132  : std::true_type {};
133 
134  template <typename T>
136  bool
138  {
139  return t.hasMoreData();
140  }
141  };
142 
143  template <typename T>
145  bool
147  {
148  return false;
149  }
150  };
151  }
152 }
153 
154 // No-one gets to override this class.
155 template <class T>
156 class art::Source final : public art::InputSource {
157 public:
158  Source(Source<T> const&) = delete;
159  Source<T>& operator=(Source<T> const&) = delete;
160 
161  using SourceDetail = T;
162 
164 
165  input::ItemType nextItemType() override;
166 
167  std::unique_ptr<FileBlock> readFile() override;
168  void closeFile() override;
169 
170  std::unique_ptr<RunPrincipal> readRun() override;
171  std::unique_ptr<SubRunPrincipal> readSubRun(
173  std::unique_ptr<EventPrincipal> readEvent(
175 
176  std::unique_ptr<art::RangeSetHandler> runRangeSetHandler() override;
177  std::unique_ptr<art::RangeSetHandler> subRunRangeSetHandler() override;
178 
179 private:
181 
184  SourceHelper sourceHelper_; // So it can be used by detail.
187 
189  std::string currentFileName_{};
190 
191  std::unique_ptr<RunPrincipal> newRP_{nullptr};
192  std::unique_ptr<SubRunPrincipal> newSRP_{nullptr};
193  std::unique_ptr<EventPrincipal> newE_{nullptr};
194 
195  // Cached Run and SubRun Principals used for users creating new
196  // SubRun and Event Principals. These are non owning!
197  cet::exempt_ptr<RunPrincipal> cachedRP_{nullptr};
198  cet::exempt_ptr<SubRunPrincipal> cachedSRP_{nullptr};
199 
200  bool pendingSubRun_{false};
201  bool pendingEvent_{false};
202 
203  bool subRunIsNew_{false};
204 
205  SubRunNumber_t remainingSubRuns_{1};
206  bool haveSRLimit_{false};
207  EventNumber_t remainingEvents_{1};
208  bool haveEventLimit_{false};
209  bool const parentageEnabled_{true};
210  bool const rangesEnabled_{true};
211 
212  // Called in the constructor, to finish the process of product
213  // registration.
214  void finishProductRegistration_(InputSourceDescription& d);
215 
216  // Make detail_ try to read more stuff from its file. Cache any new
217  // run/subrun/event. Throw an exception if we detect an error in the
218  // data stream, or logic of the detail_ class. Move to the
219  // appropriate new state.
220  bool readNext_();
221 
222  // Check to see whether we have a new file to attempt to read,
223  // moving to either the IsStop or IsFile state.
224  void checkForNextFile_();
225 
226  // Call readNext_() and throw if we have not moved to the IsRun state.
227  void readNextAndRequireRun_();
228 
229  // Call readNext_() and throw if we have moved to the IsEvent state.
230  void readNextAndRefuseEvent_();
231 
232  // Test the newly read data for validity, given our current state.
233  void throwIfInsane_(bool result,
234  RunPrincipal* newR,
235  SubRunPrincipal* newSR,
236  EventPrincipal* newE) const;
237 
238  // Throw an art::Exception(errors::DataCorruption), with the given
239  // message text.
240  static void throwDataCorruption_(const char* msg);
241 };
242 
243 template <class T>
246  , act_{&d.activityRegistry}
247  , sourceHelper_{d.moduleDescription, d.parentageEnabled_, d.rangesEnabled_}
249  , fh_{p.get<std::vector<std::string>>("fileNames",
250  std::vector<std::string>())}
251  , parentageEnabled_{d.parentageEnabled_}
252  , rangesEnabled_{d.rangesEnabled_}
253 {
254  // Handle maxSubRuns parameter.
255  int64_t maxSubRuns_par = p.get<int64_t>("maxSubRuns", -1);
256  if (maxSubRuns_par > -1) {
257  remainingSubRuns_ = maxSubRuns_par;
258  haveSRLimit_ = true;
259  }
260  // Handle maxEvents parameter.
261  int64_t maxEvents_par = p.get<int64_t>("maxEvents", -1);
262  if (maxEvents_par > -1) {
263  remainingEvents_ = maxEvents_par;
264  haveEventLimit_ = true;
265  }
266  // Verify we got a real ActivityRegistry.
267  if (!act_) {
268  throw Exception(errors::LogicError) << "no ActivityRegistry\n";
269  }
270  // Finish product registration.
272 }
273 
274 template <class T>
275 void
277 {
279 }
280 
281 template <class T>
282 void
284  RunPrincipal* newR,
285  SubRunPrincipal* newSR,
286  EventPrincipal* newE) const
287 {
288  std::ostringstream errMsg;
289  if (result) {
290  if (!newR && !newSR && !newE) {
292  << "readNext returned true but created no new data\n";
293  }
294 
295  if (!cachedRP_ && !newR) {
297  << "readNext returned true but no RunPrincipal has been set, and no "
298  "cached RunPrincipal exists.\n"
299  "This can happen if a new input file has been opened and the "
300  "RunPrincipal has not been appropriately assigned.";
301  }
302 
303  if (!cachedSRP_ && !newSR) {
305  << "readNext returned true but no SubRunPrincipal has been set, and no "
306  "cached SubRunPrincipal exists.\n"
307  "This can happen if a new input file has been opened and the "
308  "SubRunPrincipal has not been appropriately assigned.";
309  }
310 
311  if (cachedRP_ && newR && cachedRP_.get() == newR) {
312  errMsg << "readNext returned a new Run which is the old Run for "
313  << cachedRP_->id()
314  << ".\nIf you don't have a new run, don't return one!\n";
315  }
316  if (cachedSRP_ && newSR && cachedSRP_.get() == newSR) {
317  errMsg << "readNext returned a new SubRun which is the old SubRun for "
318  << cachedSRP_->id()
319  << ".\nIf you don't have a new subRun, don't return one!\n";
320  }
321  // Either or both of the above cases could be true and we need
322  // to make both of them safe before we throw:
323  if (!errMsg.str().empty())
324  throw Exception(errors::LogicError) << errMsg.str();
325  if (cachedRP_ && cachedSRP_) {
326  if (!newR && newSR && newSR->id() == cachedSRP_->id())
327  throwDataCorruption_("readNext returned a 'new' SubRun "
328  "that was the same as the previous "
329  "SubRun\n");
330  if (newR && newR->id() == cachedRP_->id())
331  throwDataCorruption_("readNext returned a 'new' Run "
332  "that was the same as the previous "
333  "Run\n");
334  if (newR && !newSR && newE)
335  throwDataCorruption_("readNext returned a new Run and "
336  "Event without a SubRun\n");
337  if (newR && newSR && newSR->id() == cachedSRP_->id())
338  throwDataCorruption_("readNext returned a new Run with "
339  "a SubRun from the wrong Run\n");
340  }
341  RunID rID;
342  SubRunID srID;
343  EventID eID;
344  if (newR) {
345  rID = newR->id();
346  if (!rID.isValid()) {
348  "readNext returned a Run with an invalid RunID.\n");
349  }
350  } else if (cachedRP_) {
351  rID = cachedRP_->id();
352  }
353  if (newSR) {
354  srID = newSR->id();
355  if (rID != srID.runID()) {
356  errMsg << "readNext returned a SubRun " << srID
357  << " which is a mismatch to " << rID << "\n";
358  throwDataCorruption_(errMsg.str().c_str());
359  }
360  if (!srID.isValid()) {
362  "readNext returned a SubRun with an invalid SubRunID.\n");
363  }
364  if (newSR->runPrincipalExemptPtr()) {
366  "readNext returned a SubRun with a non-null embedded Run.\n");
367  }
368  } else if (cachedSRP_) {
369  srID = cachedSRP_->id();
370  }
371  if (newE) {
372  eID = newE->id();
373  if (srID != eID.subRunID()) {
374  errMsg << "readNext returned an Event " << eID
375  << " which is a mismatch to " << srID << "\n";
376  throwDataCorruption_(errMsg.str().c_str());
377  }
378  if (!eID.isValid()) {
380  "readNext returned an Event with an invalid EventID.\n");
381  }
382  if (newE->subRunPrincipalExemptPtr()) {
384  "readNext returned an Event with a non-null embedded SubRun.\n");
385  }
386  }
387  } else {
388  if (newR || newSR || newE)
390  << "readNext returned false but created new data\n";
391  }
392 }
393 
394 template <class T>
395 bool
397 {
398  std::unique_ptr<RunPrincipal> newR{nullptr};
399  std::unique_ptr<SubRunPrincipal> newSR{nullptr};
400  std::unique_ptr<EventPrincipal> newE{nullptr};
401 
402  bool result{false};
403  {
404  RunPrincipal* nR{nullptr};
405  SubRunPrincipal* nSR{nullptr};
406  EventPrincipal* nE{nullptr};
407  result = detail_.readNext(cachedRP_.get(), cachedSRP_.get(), nR, nSR, nE);
408  newR.reset(nR);
409  newSR.reset(nSR);
410  newE.reset(nE);
411  throwIfInsane_(result, newR.get(), newSR.get(), newE.get());
412  }
413 
414  if (result) {
415  subRunIsNew_ = newSR && ((!cachedSRP_) || newSR->id() != cachedSRP_->id());
416  pendingSubRun_ = newSR.get() != nullptr;
417  pendingEvent_ = newE.get() != nullptr;
418  if (newR) {
419  newRP_ = std::move(newR);
420  }
421  if (newSR) {
422  auto rp = newRP_ ? newRP_.get() : cachedRP_.get();
423  newSR->setRunPrincipal(rp);
424  newSRP_ = std::move(newSR);
425  }
426  if (newE) {
427  auto srp = newSRP_ ? newSRP_.get() : cachedSRP_.get();
428  newE->setSubRunPrincipal(srp);
429  newE_ = std::move(newE);
430  }
431  if (newRP_) {
433  } else if (newSRP_) {
435  } else if (newE_) {
437  }
438  }
439  return result;
440 }
441 
442 template <class T>
443 void
445 {
446  state_ = input::IsStop; // Default -- may change below.
451  generatorHasMoreData;
452  if (generatorHasMoreData(detail_)) {
454  }
455  } else {
456  currentFileName_ = fh_.next();
457  if (!currentFileName_.empty()) {
459  }
460  }
461 }
462 
463 template <class T>
466 {
467  if (remainingEvents_ == 0) {
469  }
470  switch (state_) {
471  case input::IsInvalid:
473  state_ = input::IsFile; // Once.
474  } else {
476  }
477  break;
478  case input::IsFile:
480  break;
481  case input::IsRun:
484  pendingSubRun_ = false;
485  } else if (pendingEvent_)
487  << "Input file '" << currentFileName_ << "' contains an Event "
488  << newE_->id() << " that belongs to no SubRun\n";
489  else {
491  }
492  break;
493  case input::IsSubRun:
494  if (pendingEvent_) {
496  pendingEvent_ = false;
497  } else {
499  }
500  break;
501  case input::IsEvent:
502  if (!readNext_()) {
504  }
505  break;
506  case input::IsStop:
507  break;
508  }
509  if ((state_ == input::IsRun || state_ == input::IsSubRun) &&
510  remainingSubRuns_ == 0) {
512  }
513  if (state_ == input::IsStop) {
514  // FIXME: upon the advent of a catalog system which can do something
515  // intelligent with the difference between whole-file success,
516  // partial-file success, partial-file failure and whole-file failure
517  // (such as file-open failure), we will need to communicate that
518  // difference here. The file disposition options as they are now
519  // (and the mapping to any concrete implementation we are are aware
520  // of currently) are not sufficient to the task, so we deliberately
521  // do not distinguish here between partial-file and whole-file
522  // success in particular.
523  fh_.finish();
524  }
525  return state_;
526 }
527 
528 template <class T>
529 void
531 {
532  if (readNext_()) {
533  if (state_ != input::IsRun) {
534  if (cachedRP_) {
535  state_ = input::IsRun; // Regurgitate existing cached run.
536  } else {
538  << "Input file '" << currentFileName_ << "' has a"
539  << (state_ == input::IsSubRun ? " SubRun" : "n Event")
540  << " where a Run is expected\n";
541  }
542  }
543  } else {
545  }
546 }
547 
548 template <class T>
549 void
551 {
552  if (readNext_()) {
553  if (state_ == input::IsEvent) {
555  << "Input file '" << currentFileName_
556  << "' has an Event where a Run or SubRun is expected\n";
557  }
558  } else {
560  }
561 }
562 
563 template <class T>
564 std::unique_ptr<art::RangeSetHandler>
566 {
567  return std::make_unique<OpenRangeSetHandler>(cachedRP_->run());
568 }
569 
570 template <class T>
571 std::unique_ptr<art::RangeSetHandler>
573 {
574  return std::make_unique<OpenRangeSetHandler>(cachedSRP_->run());
575 }
576 
577 template <class T>
578 std::unique_ptr<art::FileBlock>
580 {
581  FileBlock* newF{nullptr};
582  detail_.readFile(currentFileName_, newF);
583  if (!newF) {
585  << "detail_::readFile() failed to return a valid FileBlock object\n";
586  }
587  return std::unique_ptr<FileBlock>(newF);
588 }
589 
590 template <class T>
591 void
593 {
594  detail_.closeCurrentFile();
595  // Cached pointers are no longer valid since the PrincipalCache is
596  // cleared after file close.
597  cachedRP_ = nullptr;
598  cachedSRP_ = nullptr;
599 }
600 
601 template <class T>
602 std::unique_ptr<art::RunPrincipal>
604 {
605  if (!newRP_)
607  << "Error in Source<T>\n"
608  << "readRun() called when no RunPrincipal exists\n"
609  << "Please report this to the art developers\n";
610  cachedRP_ = newRP_.get();
611  return std::move(newRP_);
612 }
613 
614 template <class T>
615 std::unique_ptr<art::SubRunPrincipal>
617 {
618  if (!newSRP_)
620  << "Error in Source<T>\n"
621  << "readSubRun() called when no SubRunPrincipal exists\n"
622  << "Please report this to the art developers\n";
623  if (subRunIsNew_) {
624  if (haveSRLimit_) {
626  }
627  subRunIsNew_ = false;
628  }
629  cachedSRP_ = newSRP_.get();
630  return std::move(newSRP_);
631 }
632 
633 template <class T>
634 std::unique_ptr<art::EventPrincipal>
636 {
637  if (haveEventLimit_) {
639  }
640  return std::move(newE_);
641 }
642 
643 template <class T>
644 void
646 {
647  // These _xERROR_ strings should never appear in branch names; they
648  // are here as tracers to help identify any failures in coding.
649  ProductDescriptions descriptions{};
651  d.productRegistry,
652  descriptions,
653  ModuleDescription{fhicl::ParameterSet{}.id(), // Dummy
654  "_NAMEERROR_",
655  "_LABELERROR_",
661  presentProducts_ = ProductTables{descriptions};
663 }
664 
665 #endif /* art_Framework_IO_Sources_Source_h */
666 
667 // Local Variables:
668 // mode: c++
669 // End:
void checkForNextFile_()
Definition: Source.h:444
MasterProductRegistry & productRegistry
void registerProducts(MasterProductRegistry &mpr, ProductDescriptions &productsToRegister, ModuleDescription const &md)
static constexpr ModuleDescriptionID invalidID()
bool parentageEnabled() const
T SourceDetail
Definition: Source.h:161
bool isValid() const
Definition: EventID.h:122
const char * p
Definition: xmltok.h:285
bool readNext_()
Definition: Source.h:396
bool pendingSubRun_
Definition: Source.h:200
void setPresentProducts(cet::exempt_ptr< ProductTables const > presentProducts)
bool subRunIsNew_
Definition: Source.h:203
EventNumber_t remainingEvents_
Definition: Source.h:207
static void throwDataCorruption_(const char *msg)
Definition: Source.h:276
bool const parentageEnabled_
Definition: Source.h:209
std::vector< BranchDescription > ProductDescriptions
enable_if_same_t< FT, decltype(f), R > enable_if_function_exists_t
bool isValid() const
Definition: SubRunID.h:95
cet::exempt_ptr< SubRunPrincipal const > subRunPrincipalExemptPtr() const
void readNextAndRequireRun_()
Definition: Source.h:530
void closeFile() override
Definition: Source.h:592
input::ItemType state_
Definition: Source.h:186
RunID const & runID() const
Definition: SubRunID.h:77
cet::exempt_ptr< SubRunPrincipal > cachedSRP_
Definition: Source.h:198
bool pendingEvent_
Definition: Source.h:201
static const char *const errMsg[]
Definition: Error.h:69
const XML_Char int const XML_Char * value
Definition: expat.h:331
SubRunID id() const
SourceDetail detail_
Definition: Source.h:185
std::unique_ptr< SubRunPrincipal > newSRP_
Definition: Source.h:192
exempt_ptr< E > make_exempt_ptr(E *) noexcept
SubRunID const & subRunID() const
Definition: EventID.h:104
SourceHelper sourceHelper_
Definition: Source.h:184
cet::exempt_ptr< ActivityRegistry > act_
Definition: Source.h:180
cet::exempt_ptr< RunPrincipal > cachedRP_
Definition: Source.h:197
input::ItemType nextItemType() override
Definition: Source.h:465
ProductTables presentProducts_
Definition: Source.h:183
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:118
Float_t d
Definition: plot.C:236
static ProductTables invalid()
std::unique_ptr< FileBlock > readFile() override
Definition: Source.h:579
std::unique_ptr< SubRunPrincipal > readSubRun(cet::exempt_ptr< RunPrincipal const > rp) override
Definition: Source.h:616
std::unique_ptr< EventPrincipal > newE_
Definition: Source.h:193
void throwIfInsane_(bool result, RunPrincipal *newR, SubRunPrincipal *newSR, EventPrincipal *newE) const
Definition: Source.h:283
ModuleDescriptionID id() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
bool const rangesEnabled_
Definition: Source.h:210
void finishProductRegistration_(InputSourceDescription &d)
Definition: Source.h:645
::xsd::cxx::tree::string< char, simple_type > string
Definition: Database.h:154
Source(Source< T > const &)=delete
std::unique_ptr< RunPrincipal > newRP_
Definition: Source.h:191
std::unique_ptr< RunPrincipal > readRun() override
Definition: Source.h:603
std::unique_ptr< art::RangeSetHandler > subRunRangeSetHandler() override
Definition: Source.h:572
RunID const & id() const
Definition: RunPrincipal.h:48
ProductRegistryHelper h_
Definition: Source.h:182
detail::FileNamesHandler< Source_wantFileServices< T >::value > fh_
Definition: Source.h:188
bool haveEventLimit_
Definition: Source.h:208
SubRunNumber_t remainingSubRuns_
Definition: Source.h:205
cet::exempt_ptr< RunPrincipal const > runPrincipalExemptPtr() const
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:117
bool isValid() const
Definition: RunID.h:68
ModuleDescription const & moduleDescription
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
EventID const & id() const
bool haveSRLimit_
Definition: Source.h:206
int nE
double T
Definition: Xdiff_gwt.C:5
void readNextAndRefuseEvent_()
Definition: Source.h:550
std::string currentFileName_
Definition: Source.h:189
std::unique_ptr< EventPrincipal > readEvent(cet::exempt_ptr< SubRunPrincipal const > srp) override
Definition: Source.h:635
std::unique_ptr< art::RangeSetHandler > runRangeSetHandler() override
Definition: Source.h:565
ProcessConfiguration const & processConfiguration() const