SamplingInput_source.cc
Go to the documentation of this file.
1 // ====================================================================
2 // SamplingInput
3 //
4 // - Only one "input file", one Run and one SubRun are created for a
5 // job that uses the SamplingInput source.
6 //
7 // - Run and SubRun products are available only through the
8 // SampledProduct<T> wrapper. This product wrapper is a container
9 // that retains the (Sub)Run products from each dataset.
10 //
11 // Technical notes:
12 //
13 // - Whereas event principals are created by the individual
14 // SamplingInputFile objects, the (sub)run principals are created by
15 // the input source itself. The goal is to encapsulate as much as
16 // possible the product internals to the source, and not the
17 // underlying files. This is possible for (sub)run products as
18 // there is only one primary (sub)run per job. For event products,
19 // it is more natural to go to the individual input files
20 // themselves. This asymmetry is undesirable, but it might be
21 // required due to the conceptual difference between events and
22 // (sub)runs.
23 // ===================================================================
24 
50 
51 #include "fhiclcpp/types/Atom.h"
55 
56 #include <memory>
57 #include <string>
58 #include <type_traits>
59 #include <vector>
60 
61 using namespace fhicl;
62 using namespace std::string_literals;
63 using namespace art;
64 
66 
67 namespace {
68 
69  std::unique_ptr<EDProduct>
70  make_sampled_product(Products_t& read_products, BranchKey const& original_key)
71  {
72  InputTag const tag{original_key.moduleLabel_,
73  original_key.productInstanceName_,
74  original_key.processName_};
75 
76  auto& datasets_with_product = read_products.at(original_key);
77 
78  auto const first_entry = begin(datasets_with_product);
79  auto const& products = first_entry->second;
80  assert(!products.empty());
81 
82  auto sampled_product =
83  cbegin(products)->second->createEmptySampledProduct(tag);
84  for (auto&& pr : datasets_with_product) {
85  auto const& dataset = pr.first;
86  auto&& products_per_id = std::move(pr.second);
87  for (auto&& pr2 : products_per_id) {
88  sampled_product->insertIfSampledProduct(
89  dataset, pr2.first, move(pr2.second));
90  }
91  }
92  return sampled_product;
93  }
94 
96  sampled_process_configurations(
97  std::map<BranchKey, BranchDescription> const& descriptions,
98  ProcessConfiguration const& pc)
99  {
100  std::set<std::string> processNames;
101  cet::transform_all(descriptions,
102  inserter(processNames, end(processNames)),
103  [](auto const& pr) { return pr.second.processName(); });
104 
107  processNames, back_inserter(result), [&pc](auto const& name) {
108  return ProcessConfiguration{
109  name, pc.parameterSetID(), pc.releaseVersion()};
110  });
111 
112  return result;
113  }
114 
116  sampled_process_history_id(ProcessConfigurations const& pcs)
117  {
118  ProcessHistory const result{pcs};
119  auto const id = result.id();
120  ProcessHistoryRegistry::emplace(id, result);
121  return id;
122  }
123 
124 }
125 
126 namespace art {
127 
128  class SamplingInput : public InputSource {
129  public:
130  struct Config {
131  Atom<std::string> module_type{Name{"module_type"}};
132  Atom<unsigned> maxEvents{Name{"maxEvents"}, 1u};
134  Name{"run"},
135  Comment{
136  "The specified run and subrun numbers are used for all primary "
137  "events\n"
138  "generated in a given art job. If no values are specified, default\n"
139  "values of 1 and 0 are chosen for the run and subrun, "
140  "respectively."}};
142  OptionalAtom<EventNumber_t> firstEvent{Name{"firstEvent"}};
144  Name{"dataSets"},
145  Comment{
146  "The value of the 'dataSets' parameter is a table of the form:\n\n"
147  " dataSets: {\n"
148  " <dataset name>: {\n"
149  " fileNames: [<string>]\n"
150  " weight: <double>\n"
151  " skipToEvent: \"\" #default\n"
152  " }\n"
153  " ...\n"
154  " }\n\n"
155  "where the '<dataset name>' parameter labels a particular\n"
156  "dataset (e.g. 'signal'), the 'fileNames' refers to the files\n"
157  "that contains events of the dataset, and the 'weight'\n"
158  "is a floating-point number used to determine the frequency\n"
159  "with which events from this dataset are sampled, relative to\n"
160  "the sum of weights across all datasets.\n\n"
161  "The 'skipToEvent' parameter specifies the first event that\n"
162  "should be used when sampling from a specific input file. The\n"
163  "correct specification is via the triplet \"run:subrun:event\".\n"
164  "For example, to begin sampling at run 3, subrun 2, event 39\n"
165  "from this dataset, the user should specify:\n\n"
166  " skipToEvent: \"3:2:39\"\n\n"
167  "Specifying the empty string (the default) is equivalent to\n"
168  "beginning at the first event of the file. All other\n"
169  "specifications are ill-formed and will result in an exception\n"
170  "throw at the beginning of the job.\n\n"
171  "The ellipsis indicates that multiple datasets can be configured.\n\n"
172  "N.B. Only one file per dataset is currently allowed."}};
173  Atom<bool> summary{Name{"summary"}, true};
174  Atom<bool> delayedReadEventProducts{Name{"delayedReadEventProducts"},
175  true};
176  Sequence<std::string> inputCommands{Name{"inputCommands"},
177  std::vector<std::string>{"keep *"}};
178  Atom<bool> dropDescendantsOfDroppedBranches{
179  Name{"dropDescendantsOfDroppedBranches"},
180  true};
181  Atom<bool> readParameterSets{Name{"readParameterSets"}, true};
182  Atom<unsigned> treeCacheSize{Name("treeCacheSize"), 0u};
183  Atom<std::int64_t> treeMaxVirtualSize{Name("treeMaxVirtualSize"), -1};
184  Atom<int64_t> saveMemoryObjectThreshold{Name{"saveMemoryObjectThreshold"},
185  -1};
186  Atom<bool> compactRanges{
187  Name{"compactRanges"},
188  Comment{
189  "The 'compactRanges' parameter can be set to 'true'\n"
190  "if the user can guarantee that a SubRun is not spread across\n"
191  "multiple input files. Setting it to 'true' can yield some\n"
192  "memory savings when reading input files created before art 2.01,\n"
193  "but it can also modify the provenance of the workflow. Please\n"
194  "consult your experiment's policy on provenance before setting this\n"
195  "parameter to 'true'."},
196  false};
197 
198  struct KeysToIgnore {
199  std::set<std::string>
200  operator()() const
201  {
202  return {"module_label"};
203  }
204  };
205  };
206 
208  explicit SamplingInput(Parameters const& config,
210 
211  private:
212  template <typename T>
214  putSampledProductsInto_(T& principal,
215  Products_t read_products,
216  RangeSet&& rs) const;
217 
218  input::ItemType nextItemType() override;
219  std::unique_ptr<FileBlock> readFile() override;
220  void closeFile() override;
221  std::unique_ptr<RunPrincipal> readRun() override;
222  std::unique_ptr<SubRunPrincipal> readSubRun(
224  std::unique_ptr<EventPrincipal> readEvent(
226  std::unique_ptr<RangeSetHandler> runRangeSetHandler() override;
227  std::unique_ptr<RangeSetHandler> subRunRangeSetHandler() override;
228 
235  unsigned eventsLeft_;
236  unsigned totalCounts_{};
238  bool const summary_;
240  std::map<BranchKey, BranchDescription> oldKeyToSampledProductDescription_;
244  ProcessConfigurations sampledProcessConfigs_{};
245  ProcessHistoryID sampledProcessHistoryID_{};
248  };
249 }
250 
251 namespace {
252  constexpr auto
253  nullTimestamp()
254  {
255  return art::Timestamp{};
256  }
257 
258  template <typename T>
259  auto
260  type_label_for(std::string const& emulated_module_name)
261  {
262  return art::TypeLabel{
263  art::TypeID{typeid(T)}, {}, false, emulated_module_name};
264  }
265 }
266 
270  , md_{isd.moduleDescription}
272  , eventsLeft_{config().maxEvents()}
273  , dataSetBroker_{config().dataSets.get<ParameterSet>()}
274  , summary_{config().summary()}
275  , delayedReadEventProducts_{config().delayedReadEventProducts()}
276 {
277  auto const readParameterSets = config().readParameterSets();
278  if (!readParameterSets) {
279  mf::LogWarning("PROVENANCE")
280  << "Source parameter readParameterSets was set to false: parameter set "
281  "provenance\n"
282  << "will NOT be available in this or subsequent jobs using output from "
283  "this job.\n"
284  << "Check your experiment's policy on this issue to avoid future "
285  "problems\n"
286  << "with analysis reproducibility.";
287  }
288 
289  RunNumber_t r{};
290  bool const haveFirstRun = config().run(r);
291  runID_ = haveFirstRun ? RunID{r} : RunID::firstRun();
292 
293  SubRunNumber_t sr{};
294  bool const haveFirstSubRun = config().subRun(sr);
295  subRunID_ =
296  haveFirstSubRun ? SubRunID{runID_, sr} : SubRunID::firstSubRun(runID_);
297 
298  EventNumber_t event{};
299  bool const haveFirstEvent = config().firstEvent(event);
300  nextEventID_ =
301  haveFirstEvent ? EventID{subRunID_, event} : EventID::firstEvent(subRunID_);
302 
303  // Register SampledRunInfo, SampledSubRunInfo, and SampledEventInfo data
304  ProductDescriptions presentSampledProducts;
305  auto const emulated_module_name = "SamplingInput"s;
306  presentSampledProducts.emplace_back(
307  InEvent,
308  type_label_for<SampledEventInfo>(emulated_module_name),
309  isd.moduleDescription);
310  sampledEventInfoDesc_ = presentSampledProducts.back();
311  presentSampledProducts.emplace_back(
312  InSubRun,
313  type_label_for<SampledSubRunInfo>(emulated_module_name),
314  isd.moduleDescription);
315  sampledSubRunInfoDesc_ = presentSampledProducts.back();
316  presentSampledProducts.emplace_back(
317  InRun,
318  type_label_for<SampledRunInfo>(emulated_module_name),
319  isd.moduleDescription);
320  sampledRunInfoDesc_ = presentSampledProducts.back();
321 
323  dataSetBroker_.openInputFiles(config().inputCommands(),
324  config().dropDescendantsOfDroppedBranches(),
325  config().treeCacheSize(),
326  config().treeMaxVirtualSize(),
327  config().saveMemoryObjectThreshold(),
329  config().compactRanges(),
330  md_,
331  readParameterSets,
332  isd.productRegistry);
333 
335  sampled_process_configurations(oldKeyToSampledProductDescription_, pc_);
336  sampledProcessHistoryID_ = sampled_process_history_id(sampledProcessConfigs_);
337 
338  for (auto const& pr : oldKeyToSampledProductDescription_) {
339  presentSampledProducts.push_back(pr.second);
340  }
341 
342  // Specify present products for SubRuns and Runs. Only the
343  // Sampled(Sub)RunInfo products are present for the (Sub)Runs.
344  presentSubRunProducts_ = ProductTable{presentSampledProducts, InSubRun};
345  presentRunProducts_ = ProductTable{presentSampledProducts, InRun};
346 
347  isd.productRegistry.addProductsFromModule(
348  ProductDescriptions{presentSampledProducts});
349 }
350 
351 std::unique_ptr<art::FileBlock>
353 {
354  return std::make_unique<art::FileBlock>(
355  art::FileFormatVersion{1, "SamplingInput_2018"}, "SamplingInput");
356 }
357 
358 void
360 {
361  if (!summary_)
362  return;
363 
365 }
366 
369 {
370  switch (currentItemType_) {
371  case input::IsInvalid: {
373  }
374  case input::IsFile: {
376  }
377  case input::IsRun: {
379  }
380  case input::IsSubRun: {
381  // Do not return prematurely when moving to the event.
383  }
384  default: {} // Handle other transitions below.
385  }
386 
387  if (eventsLeft_ == 0u) {
388  return input::IsStop;
389  }
390 
391  bool const inputExhausted = !dataSetBroker_.canReadEvent();
392 
393  if (inputExhausted) {
394  return input::IsStop;
395  }
396 
397  --eventsLeft_;
398  ++totalCounts_;
399  return input::IsEvent;
400 }
401 
402 template <typename T>
405  Products_t read_products,
406  RangeSet&& rs) const
407 {
408  for (auto const& pr : oldKeyToSampledProductDescription_) {
409  auto const& old_key = pr.first;
410  if (old_key.branchType_ != principal.branchType())
411  continue;
412 
413  auto const& sampled_pd = pr.second;
414 
415  principal.put(make_sampled_product(read_products, old_key),
416  sampled_pd,
417  std::make_unique<ProductProvenance const>(
418  sampled_pd.productID(), productstatus::present()),
419  std::move(rs));
420  }
421 }
422 
423 std::unique_ptr<art::RunPrincipal>
425 {
426  Products_t read_products;
427  auto sampledRunInfo = dataSetBroker_.readAllRunProducts(read_products);
428 
429  art::RunAuxiliary aux{runID_, nullTimestamp(), nullTimestamp()};
430  aux.setProcessHistoryID(sampledProcessHistoryID_);
431 
432  auto rp = std::make_unique<art::RunPrincipal>(
434 
436  *rp, std::move(read_products), RangeSet::forRun(runID_));
437 
438  // Place sampled run info onto the run
439  auto wp = std::make_unique<Wrapper<SampledRunInfo>>(move(sampledRunInfo));
440  rp->put(std::move(wp),
442  std::make_unique<ProductProvenance const>(
445  return rp;
446 }
447 
448 std::unique_ptr<art::SubRunPrincipal>
450 {
451  Products_t read_products;
452  auto sampledSubRunInfo = dataSetBroker_.readAllSubRunProducts(read_products);
453 
454  art::SubRunAuxiliary aux{subRunID_, nullTimestamp(), nullTimestamp()};
455  aux.setProcessHistoryID(sampledProcessHistoryID_);
456 
457  auto srp = std::make_unique<SubRunPrincipal>(
459  srp->setRunPrincipal(rp);
460 
462  *srp, std::move(read_products), RangeSet::forSubRun(subRunID_));
463 
464  // Place sampled run info onto the run
465  auto wp =
466  std::make_unique<Wrapper<SampledSubRunInfo>>(move(sampledSubRunInfo));
467  srp->put(std::move(wp),
469  std::make_unique<ProductProvenance const>(
472  return srp;
473 }
474 
475 std::unique_ptr<art::EventPrincipal>
477 {
479 
480  auto ep =
482  ep->setSubRunPrincipal(srp);
484  ep->readImmediate();
485  }
487  return ep;
488 }
489 
490 std::unique_ptr<art::RangeSetHandler>
492 {
493  return std::make_unique<art::OpenRangeSetHandler>(runID_.run());
494 }
495 
496 std::unique_ptr<art::RangeSetHandler>
498 {
499  return std::make_unique<art::OpenRangeSetHandler>(subRunID_.run());
500 }
501 
::xsd::cxx::tree::id< char, ncname > id
Definition: Database.h:165
BranchDescription sampledRunInfoDesc_
const XML_Char * name
Definition: expat.h:151
std::enable_if_t< detail::RangeSetsSupported< T::branch_type >::value > putSampledProductsInto_(T &principal, Products_t read_products, RangeSet &&rs) const
static RangeSet forSubRun(SubRunID)
std::unique_ptr< FileBlock > readFile() override
bool const delayedReadEventProducts_
std::set< std::string > operator()() const
static EventID firstEvent()
Definition: EventID.h:190
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler() override
input::ItemType currentItemType_
input::ItemType nextItemType() override
ReleaseVersion const & releaseVersion() const
std::unique_ptr< SampledRunInfo > readAllRunProducts(Products_t &read_products)
Definition: config.py:1
std::vector< BranchDescription > ProductDescriptions
ProcessConfigurations sampledProcessConfigs_
std::string productInstanceName_
Definition: BranchKey.h:44
RunNumber_t run() const
Definition: RunID.h:62
ProductTable presentRunProducts_
detail::DataSetBroker dataSetBroker_
ProcessHistoryID id() const
std::unique_ptr< RangeSetHandler > runRangeSetHandler() override
static RangeSet forRun(RunID)
const XML_Char * s
Definition: expat.h:262
std::unique_ptr< SampledSubRunInfo > readAllSubRunProducts(Products_t &read_products)
void issue_reports(unsigned count, EventID const &id)
const XML_Char int const XML_Char * value
Definition: expat.h:331
exempt_ptr< E > make_exempt_ptr(E *) noexcept
RunNumber_t run() const
Definition: SubRunID.h:83
BranchDescription sampledEventInfoDesc_
SamplingInput(Parameters const &config, InputSourceDescription &isd)
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:118
std::unique_ptr< RunPrincipal > readRun() override
caf::StandardRecord * sr
ProductID productID() const
EventID next() const
Definition: EventID.h:134
static SubRunID firstSubRun()
Definition: SubRunID.h:151
auto transform_all(Container &, OutputIt, UnaryOp)
Definition: run.py:1
std::unique_ptr< SubRunPrincipal > readSubRun(cet::exempt_ptr< RunPrincipal const > rp) override
std::map< BranchKey, BranchDescription > openInputFiles(std::vector< std::string > const &inputCommands, bool dropDescendants, unsigned int treeCacheSize, int64_t treeMaxVirtualSize, int64_t saveMemoryObjectThreshold, BranchDescription const &sampledEventInfoDesc, bool const compactRangeSetsForReading, ModuleDescription const &md, bool const readParameterSets, MasterProductRegistry &preg)
ofstream summary
std::vector< ProcessConfiguration > ProcessConfigurations
std::map< BranchKey, ProductsForDataset_t > Products_t
Definition: DataSetBroker.h:19
ProcessConfiguration const & pc_
std::map< BranchKey, BranchDescription > oldKeyToSampledProductDescription_
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
std::string processName_
Definition: BranchKey.h:45
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:117
assert(nhit_max >=nhit_nbins)
ModuleDescription const & moduleDescription
TRandom3 r(0)
ProcessHistoryID sampledProcessHistoryID_
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
ModuleDescription const md_
static RunID firstRun()
Definition: RunID.h:114
std::unique_ptr< EventPrincipal > readEvent(cet::exempt_ptr< SubRunPrincipal const > srp) override
double T
Definition: Xdiff_gwt.C:5
std::string moduleLabel_
Definition: BranchKey.h:43
BranchDescription sampledSubRunInfoDesc_
#define DEFINE_ART_INPUT_SOURCE(klass)
ProductTable presentSubRunProducts_
constexpr ProductStatus present() noexcept
Definition: ProductStatus.h:10
ProcessConfiguration const & processConfiguration() const
std::unique_ptr< EventPrincipal > readNextEvent(EventID const &id, ProcessConfigurations const &sampled_pcs, ProcessConfiguration const &current_pc)
fhicl::ParameterSetID const & parameterSetID() const
IDNumber_t< Level::Run > RunNumber_t
Definition: IDNumber.h:119
enum BeamMode string