Source.h
Go to the documentation of this file.
1 #ifndef art_Framework_IO_Sources_Source_h
2 #define art_Framework_IO_Sources_Source_h
3 // vim: set sw=2 expandtab :
4 
5 // ======================================================================
6 //
7 // The Source class template is used to create InputSources which
8 // are capable of reading Runs, SubRuns and Events from non-standard
9 // input files. Sources instantiated from Source are *not* random
10 // access sources.
11 //
12 // The Source class template requires the use of a type T as its
13 // template parameter, satisfying the conditions outlined below. In
14 // one's XXX_module.cc class one must provide a typedef and module macro
15 // call along the lines of:
16 //
17 // namespace arttest {
18 // typedef art::Source<GeneratorTestDetail> GeneratorTest;
19 // }
20 //
21 // DEFINE_ART_INPUT_SOURCE(arttest::GeneratorTest)
22 //
23 // However, there are several "flavors" of InputSource possible using
24 // this template, and one may wish to specify them using the "type
25 // traits" found in SourceTraits.h. Type traits are simple class
26 // templates that are used to signal properties of the classes used as
27 // their template arguments. Specialization is common. There are many
28 // examples of type traits in the standard, such as std::is_const<T> or
29 // std::is_integral<T>. Any traits you wish to specialize must be
30 // defined *after* the definition of your detail class T, but *before*
31 // the typedef above which will attempt to instantiate them. See
32 // SourceTraits.h for descriptions of the different traits one might
33 // wish to apply.
34 //
35 // The type T must supply the following non-static member functions:
36 //
37 // * Construct an object of type T. The ParameterSet provided will be
38 // that constructed by the 'source' statement in the job
39 // configuration file. The ProductRegistryHelper must be used to
40 // register products to be reconstituted by this source.
41 //
42 // T(fhicl::ParameterSet const&,
43 // art::ProductRegistryHelper&,
44 // art::SourceHelper const&);
45 //
46 // * Open the file of the given name, returning a new fileblock in
47 // fb. If readFile is unable to return a valid FileBlock it should
48 // throw. Suggestions for suitable exceptions are:
49 // art::Exception(art::errors::FileOpenError) or
50 // art::Exception(art::errors::FileReadError).
51 //
52 // void readFile(std::string const& filename,
53 // art::FileBlock*& fb);
54 //
55 // * Read the next part of the current file. Return false if nothing
56 // was read; return true and set the appropriate 'out' arguments if
57 // something was read.
58 //
59 // bool readNext(art::RunPrincipal const* const inR,
60 // art::SubRunPrincipal const* const inSR,
61 // art::RunPrincipal*& outR,
62 // art::SubRunPrincipal*& outSR,
63 // art::EventPrincipal*& outE);
64 //
65 // * After readNext has returned false, the behavior differs
66 // depending on whether Source_Generator<XXX>::value is true or
67 // false. If false (the default), then readFile(...) will be called
68 // provided there is an unused string remaining in
69 // source.fileNames. If true, then the source will finish unless
70 // there exists an *optional* function:
71 //
72 // bool hasMoreData(); // or
73 //
74 // bool hasMoreData() const;
75 //
76 // which returns true.
77 //
78 // * Close the current input file.
79 //
80 // void closeCurrentFile();
81 //
82 // ======================================================================
83 
102 #include "cetlib/exempt_ptr.h"
103 #include "cetlib/metaprogramming.h"
104 #include "fhiclcpp/ParameterSet.h"
105 
106 #include <algorithm>
107 #include <memory>
108 #include <type_traits>
109 
110 namespace art {
111 
112  template <class T>
113  class Source;
114 
115  namespace detail {
116 
117  // Template metaprogramming.
118 
119  template <typename T, typename = void>
120  struct has_hasMoreData : std::false_type {};
121 
122  template <typename T>
124  T,
125  cet::enable_if_function_exists_t<bool (T::*)(), &T::hasMoreData>>
126  : std::true_type {};
127 
128  template <typename T>
130  T,
131  cet::enable_if_function_exists_t<bool (T::*)() const, &T::hasMoreData>>
132  : std::true_type {};
133 
134  template <typename T>
136  bool
138  {
139  return t.hasMoreData();
140  }
141  };
142 
143  template <typename T>
145  bool
147  {
148  return false;
149  }
150  };
151 
152  } // namespace detail
153 
154  // No-one gets to override this class.
155  template <class T>
156  class Source final : public InputSource {
157 
158  public: // TYPES
159  using SourceDetail = T;
160 
161  public: // MEMBER FUNCTIONS -- Special Member Functions
163 
164  Source(Source<T> const&) = delete;
165 
166  Source(Source<T>&&) = delete;
167 
168  Source<T>& operator=(Source<T> const&) = delete;
169 
170  Source<T>& operator=(Source<T>&&) = delete;
171 
172  public: // MEMBER FUNCTIONS
173  input::ItemType nextItemType() override;
174 
175  std::unique_ptr<FileBlock> readFile() override;
176 
177  void closeFile() override;
178 
180 
181  std::unique_ptr<RunPrincipal> readRun() override;
182 
183  std::unique_ptr<SubRunPrincipal> readSubRun(
185 
186  std::unique_ptr<EventPrincipal> readEvent(
188 
189  std::unique_ptr<RangeSetHandler> runRangeSetHandler() override;
190 
191  std::unique_ptr<RangeSetHandler> subRunRangeSetHandler() override;
192 
193  private: // MEMBER FUNCTIONS
194  // Called in the constructor, to finish the process of product
195  // registration.
196  void finishProductRegistration_(InputSourceDescription& d);
197 
198  // Make detail_ try to read more stuff from its file. Cache any new
199  // run/subrun/event. Throw an exception if we detect an error in the
200  // data stream, or logic of the detail_ class. Move to the
201  // appropriate new state.
202  bool readNext_();
203 
204  // Check to see whether we have a new file to attempt to read,
205  // moving to either the IsStop or IsFile state.
206  void checkForNextFile_();
207 
208  // Call readNext_() and throw if we have not moved to the IsRun state.
209  void readNextAndRequireRun_();
210 
211  // Call readNext_() and throw if we have moved to the IsEvent state.
212  void readNextAndRefuseEvent_();
213 
214  // Test the newly read data for validity, given our current state.
215  void throwIfInsane_(bool result,
216  RunPrincipal* newR,
217  SubRunPrincipal* newSR,
218  EventPrincipal* newE) const;
219 
220  // Throw an Exception(errors::DataCorruption), with the given
221  // message text.
222  static void throwDataCorruption_(const char* msg);
223 
224  private: // MEMBER DATA
226 
228 
230 
231  // So it can be used by detail.
233 
235 
237 
239 
240  std::string currentFileName_{};
241 
242  std::unique_ptr<RunPrincipal> newRP_{};
243 
244  std::unique_ptr<SubRunPrincipal> newSRP_{};
245 
246  std::unique_ptr<EventPrincipal> newE_{};
247 
248  // Cached Run and SubRun Principals used for users creating new
249  // SubRun and Event Principals. These are non owning!
250 
251  cet::exempt_ptr<RunPrincipal> cachedRP_{nullptr};
252 
253  cet::exempt_ptr<SubRunPrincipal> cachedSRP_{nullptr};
254 
255  bool pendingSubRun_{false};
256 
257  bool pendingEvent_{false};
258 
259  bool subRunIsNew_{false};
260 
261  SubRunNumber_t remainingSubRuns_{1};
262 
263  bool haveSRLimit_{false};
264 
265  EventNumber_t remainingEvents_{1};
266 
267  bool haveEventLimit_{false};
268  };
269 
270  template <class T>
273  , outputCallbacks_{d.productRegistry}
274  , sourceHelper_{d.moduleDescription}
276  , fh_{p.get<std::vector<std::string>>("fileNames",
277  std::vector<std::string>())}
278  {
279  // Handle maxSubRuns parameter.
280  int64_t maxSubRuns_par = p.get<int64_t>("maxSubRuns", -1);
281  if (maxSubRuns_par > -1) {
282  remainingSubRuns_ = maxSubRuns_par;
283  haveSRLimit_ = true;
284  }
285  // Handle maxEvents parameter.
286  int64_t maxEvents_par = p.get<int64_t>("maxEvents", -1);
287  if (maxEvents_par > -1) {
288  remainingEvents_ = maxEvents_par;
289  haveEventLimit_ = true;
290  }
291  // Finish product registration.
293  }
294 
295  template <class T>
296  void
298  {
300  }
301 
302  template <class T>
303  void
305  RunPrincipal* newR,
306  SubRunPrincipal* newSR,
307  EventPrincipal* newE) const
308  {
309  std::ostringstream errMsg;
310  if (result) {
311  if (!newR && !newSR && !newE) {
313  << "readNext returned true but created no new data\n";
314  }
315  if (!cachedRP_ && !newR) {
317  << "readNext returned true but no RunPrincipal has been set, and no "
318  "cached RunPrincipal exists.\n"
319  "This can happen if a new input file has been opened and the "
320  "RunPrincipal has not been appropriately assigned.";
321  }
322  if (!cachedSRP_ && !newSR) {
324  << "readNext returned true but no SubRunPrincipal has been set, and "
325  "no cached SubRunPrincipal exists.\n"
326  "This can happen if a new input file has been opened and the "
327  "SubRunPrincipal has not been appropriately assigned.";
328  }
329  if (cachedRP_ && newR && cachedRP_.get() == newR) {
330  errMsg << "readNext returned a new Run which is the old Run for "
331  << cachedRP_->runID()
332  << ".\nIf you don't have a new run, don't return one!\n";
333  }
334  if (cachedSRP_ && newSR && cachedSRP_.get() == newSR) {
335  errMsg << "readNext returned a new SubRun which is the old SubRun for "
336  << cachedSRP_->subRunID()
337  << ".\nIf you don't have a new subRun, don't return one!\n";
338  }
339  // Either or both of the above cases could be true and we need
340  // to make both of them safe before we throw:
341  if (!errMsg.str().empty())
342  throw Exception(errors::LogicError) << errMsg.str();
343  if (cachedRP_ && cachedSRP_) {
344  if (!newR && newSR && newSR->subRunID() == cachedSRP_->subRunID())
345  throwDataCorruption_("readNext returned a 'new' SubRun "
346  "that was the same as the previous "
347  "SubRun\n");
348  if (newR && newR->runID() == cachedRP_->runID())
349  throwDataCorruption_("readNext returned a 'new' Run "
350  "that was the same as the previous "
351  "Run\n");
352  if (newR && !newSR && newE)
353  throwDataCorruption_("readNext returned a new Run and "
354  "Event without a SubRun\n");
355  if (newR && newSR && newSR->subRunID() == cachedSRP_->subRunID())
356  throwDataCorruption_("readNext returned a new Run with "
357  "a SubRun from the wrong Run\n");
358  }
359  RunID rID;
360  SubRunID srID;
361  EventID eID;
362  if (newR) {
363  rID = newR->runID();
364  if (!rID.isValid()) {
366  "readNext returned a Run with an invalid RunID.\n");
367  }
368  } else if (cachedRP_) {
369  rID = cachedRP_->runID();
370  }
371  if (newSR) {
372  srID = newSR->subRunID();
373  if (rID != srID.runID()) {
374  errMsg << "readNext returned a SubRun " << srID
375  << " which is a mismatch to " << rID << "\n";
376  throwDataCorruption_(errMsg.str().c_str());
377  }
378  if (!srID.isValid()) {
380  "readNext returned a SubRun with an invalid SubRunID.\n");
381  }
382  if (newSR->runPrincipalExemptPtr()) {
384  "readNext returned a SubRun with a non-null embedded Run.\n");
385  }
386  } else if (cachedSRP_) {
387  srID = cachedSRP_->subRunID();
388  }
389  if (newE) {
390  eID = newE->eventID();
391  if (srID != eID.subRunID()) {
392  errMsg << "readNext returned an Event " << eID
393  << " which is a mismatch to " << srID << "\n";
394  throwDataCorruption_(errMsg.str().c_str());
395  }
396  if (!eID.isValid()) {
398  "readNext returned an Event with an invalid EventID.\n");
399  }
400  if (newE->subRunPrincipalPtr()) {
402  "readNext returned an Event with a non-null embedded SubRun.\n");
403  }
404  }
405  } else {
406  if (newR || newSR || newE)
408  << "readNext returned false but created new data\n";
409  }
410  }
411 
412  template <class T>
413  bool
415  {
416  std::unique_ptr<RunPrincipal> newR{nullptr};
417  std::unique_ptr<SubRunPrincipal> newSR{nullptr};
418  std::unique_ptr<EventPrincipal> newE{nullptr};
419  bool result{false};
420  {
421  RunPrincipal* nR{nullptr};
422  SubRunPrincipal* nSR{nullptr};
423  EventPrincipal* nE{nullptr};
424  result = detail_.readNext(cachedRP_.get(), cachedSRP_.get(), nR, nSR, nE);
425  newR.reset(nR);
426  newSR.reset(nSR);
427  newE.reset(nE);
428  throwIfInsane_(result, newR.get(), newSR.get(), newE.get());
429  }
430  if (result) {
431  subRunIsNew_ =
432  newSR && ((!cachedSRP_) || newSR->subRunID() != cachedSRP_->subRunID());
433  pendingSubRun_ = newSR.get() != nullptr;
434  pendingEvent_ = newE.get() != nullptr;
435  if (newR) {
436  newRP_ = std::move(newR);
437  }
438  if (newSR) {
439  auto rp = newRP_ ? newRP_.get() : cachedRP_.get();
440  newSR->setRunPrincipal(rp);
441  newSRP_ = std::move(newSR);
442  }
443  if (newE) {
444  auto srp = newSRP_ ? newSRP_.get() : cachedSRP_.get();
445  newE->setSubRunPrincipal(srp);
446  newE_ = std::move(newE);
447  }
448  if (newRP_) {
450  } else if (newSRP_) {
452  } else if (newE_) {
454  }
455  }
456  return result;
457  }
458 
459  template <class T>
460  void
462  {
463  state_ = input::IsStop; // Default -- may change below.
468  generatorHasMoreData;
469  if (generatorHasMoreData(detail_)) {
471  }
472  } else {
473  currentFileName_ = fh_.next();
474  if (!currentFileName_.empty()) {
476  }
477  }
478  }
479 
480  template <class T>
483  {
484  if (remainingEvents_ == 0) {
486  }
487  switch (state_) {
488  case input::IsInvalid:
490  state_ = input::IsFile; // Once.
491  } else {
493  }
494  break;
495  case input::IsFile:
497  break;
498  case input::IsRun:
501  pendingSubRun_ = false;
502  } else if (pendingEvent_)
504  << "Input file '" << currentFileName_ << "' contains an Event "
505  << newE_->eventID() << " that belongs to no SubRun\n";
506  else {
508  }
509  break;
510  case input::IsSubRun:
511  if (pendingEvent_) {
513  pendingEvent_ = false;
514  } else {
516  }
517  break;
518  case input::IsEvent:
519  if (!readNext_()) {
521  }
522  break;
523  case input::IsStop:
524  break;
525  }
526  if ((state_ == input::IsRun || state_ == input::IsSubRun) &&
527  remainingSubRuns_ == 0) {
529  }
530  if (state_ == input::IsStop) {
531  // FIXME: upon the advent of a catalog system which can do something
532  // intelligent with the difference between whole-file success,
533  // partial-file success, partial-file failure and whole-file failure
534  // (such as file-open failure), we will need to communicate that
535  // difference here. The file disposition options as they are now
536  // (and the mapping to any concrete implementation we are are aware
537  // of currently) are not sufficient to the task, so we deliberately
538  // do not distinguish here between partial-file and whole-file
539  // success in particular.
540  fh_.finish();
541  }
542  return state_;
543  }
544 
545  template <class T>
546  void
548  {
549  if (readNext_()) {
550  if (state_ != input::IsRun) {
551  if (cachedRP_) {
552  state_ = input::IsRun; // Regurgitate existing cached run.
553  } else {
555  << "Input file '" << currentFileName_ << "' has a"
556  << (state_ == input::IsSubRun ? " SubRun" : "n Event")
557  << " where a Run is expected\n";
558  }
559  }
560  } else {
562  }
563  }
564 
565  template <class T>
566  void
568  {
569  if (readNext_()) {
570  if (state_ == input::IsEvent) {
572  << "Input file '" << currentFileName_
573  << "' has an Event where a Run or SubRun is expected\n";
574  }
575  } else {
577  }
578  }
579 
580  template <class T>
581  std::unique_ptr<RangeSetHandler>
583  {
584  return std::make_unique<OpenRangeSetHandler>(cachedRP_->run());
585  }
586 
587  template <class T>
588  std::unique_ptr<RangeSetHandler>
590  {
591  return std::make_unique<OpenRangeSetHandler>(cachedSRP_->run());
592  }
593 
594  template <class T>
595  std::unique_ptr<FileBlock>
597  {
598  FileBlock* newF{nullptr};
599  detail_.readFile(currentFileName_, newF);
600  if (!newF) {
602  << "detail_::readFile() failed to return a valid FileBlock object\n";
603  }
604  return std::unique_ptr<FileBlock>(newF);
605  }
606 
607  template <class T>
608  void
610  {
611  detail_.closeCurrentFile();
612  // Cached pointers are no longer valid since the PrincipalCache is
613  // cleared after file close.
614  cachedRP_ = nullptr;
615  cachedSRP_ = nullptr;
616  }
617 
618  template <class T>
619  std::unique_ptr<RunPrincipal>
621  {
622  if (!newRP_)
624  << "Error in Source<T>\n"
625  << "readRun() called when no RunPrincipal exists\n"
626  << "Please report this to the art developers\n";
627  cachedRP_ = newRP_.get();
628  return std::move(newRP_);
629  }
630 
631  template <class T>
632  std::unique_ptr<SubRunPrincipal>
634  {
635  if (!newSRP_)
637  << "Error in Source<T>\n"
638  << "readSubRun() called when no SubRunPrincipal exists\n"
639  << "Please report this to the art developers\n";
640  if (subRunIsNew_) {
641  if (haveSRLimit_) {
643  }
644  subRunIsNew_ = false;
645  }
646  cachedSRP_ = newSRP_.get();
647  return std::move(newSRP_);
648  }
649 
650  template <class T>
651  std::unique_ptr<EventPrincipal>
653  {
654  if (haveEventLimit_) {
656  }
657  return std::move(newE_);
658  }
659 
660  template <class T>
661  void
663  {
664  // These _xERROR_ strings should never appear in branch names; they
665  // are here as tracers to help identify any failures in coding.
666  ProductDescriptions descriptions;
668  descriptions,
670  "_NAMEERROR_",
671  "_LABELERROR_",
674  true /*isEmulated*/});
675  presentProducts_ = ProductTables{descriptions};
676  sourceHelper_.setPresentProducts(cet::make_exempt_ptr(&presentProducts_));
678  }
679 
680 } // namespace art
681 
682 #endif /* art_Framework_IO_Sources_Source_h */
683 
684 // Local Variables:
685 // mode: c++
686 // End:
void checkForNextFile_()
Definition: Source.h:461
T SourceDetail
Definition: Source.h:159
EventID const & eventID() const
std::unique_ptr< EventPrincipal > readEvent(cet::exempt_ptr< SubRunPrincipal const > srp) override
Definition: Source.h:652
bool isValid() const
Definition: EventID.h:122
const char * p
Definition: xmltok.h:285
bool readNext_()
Definition: Source.h:414
bool pendingSubRun_
Definition: Source.h:255
virtual std::unique_ptr< EventPrincipal > readEvent(cet::exempt_ptr< SubRunPrincipal const > srp)=0
bool subRunIsNew_
Definition: Source.h:259
EventNumber_t remainingEvents_
Definition: Source.h:265
static void throwDataCorruption_(const char *msg)
Definition: Source.h:297
std::vector< BranchDescription > ProductDescriptions
enable_if_same_t< FT, decltype(f), R > enable_if_function_exists_t
ParameterSetID id() const
std::unique_ptr< RangeSetHandler > runRangeSetHandler() override
Definition: Source.h:582
bool isValid() const
Definition: SubRunID.h:95
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler() override
Definition: Source.h:589
void readNextAndRequireRun_()
Definition: Source.h:547
void closeFile() override
Definition: Source.h:609
input::ItemType state_
Definition: Source.h:236
RunID const & runID() const
Definition: SubRunID.h:77
cet::exempt_ptr< SubRunPrincipal > cachedSRP_
Definition: Source.h:253
void registerProducts(ProductDescriptions &productsToRegister, ModuleDescription const &md)
std::unique_ptr< SubRunPrincipal > readSubRun(cet::exempt_ptr< RunPrincipal const > rp) override
Definition: Source.h:633
bool pendingEvent_
Definition: Source.h:257
static const char *const errMsg[]
Definition: Error.h:69
Source(fhicl::ParameterSet const &p, InputSourceDescription &d)
Definition: Source.h:271
const XML_Char int const XML_Char * value
Definition: expat.h:331
SourceDetail detail_
Definition: Source.h:234
std::unique_ptr< SubRunPrincipal > newSRP_
Definition: Source.h:244
exempt_ptr< E > make_exempt_ptr(E *) noexcept
input::ItemType nextItemType() override
Definition: Source.h:482
SubRunID const & subRunID() const
Definition: EventID.h:104
SourceHelper sourceHelper_
Definition: Source.h:232
cet::exempt_ptr< RunPrincipal const > runPrincipalExemptPtr() const
cet::exempt_ptr< RunPrincipal > cachedRP_
Definition: Source.h:251
SubRunID subRunID() const
ProductTables presentProducts_
Definition: Source.h:229
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:119
Float_t d
Definition: plot.C:236
static ProductTables invalid()
void invoke(ProductTables const &)
std::unique_ptr< EventPrincipal > newE_
Definition: Source.h:246
RunID const & runID() const
void throwIfInsane_(bool result, RunPrincipal *newR, SubRunPrincipal *newSR, EventPrincipal *newE) const
Definition: Source.h:304
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void finishProductRegistration_(InputSourceDescription &d)
Definition: Source.h:662
std::unique_ptr< RunPrincipal > newRP_
Definition: Source.h:242
std::unique_ptr< FileBlock > readFile() override
Definition: Source.h:596
ProductRegistryHelper h_
Definition: Source.h:225
SubRunPrincipal const * subRunPrincipalPtr() const
detail::FileNamesHandler< Source_wantFileServices< T >::value > fh_
Definition: Source.h:238
bool haveEventLimit_
Definition: Source.h:267
SubRunNumber_t remainingSubRuns_
Definition: Source.h:261
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:118
UpdateOutputCallbacks & outputCallbacks_
Definition: Source.h:227
bool isValid() const
Definition: RunID.h:70
ModuleDescription const & moduleDescription
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
bool haveSRLimit_
Definition: Source.h:263
int nE
double T
Definition: Xdiff_gwt.C:5
void readNextAndRefuseEvent_()
Definition: Source.h:567
std::unique_ptr< RunPrincipal > readRun() override
Definition: Source.h:620
std::string currentFileName_
Definition: Source.h:240
ProcessConfiguration const & processConfiguration() const
enum BeamMode string