TimeTracker_service.cc
Go to the documentation of this file.
1 // vim: set sw=2 expandtab :
2 
13 #include "art/Utilities/Globals.h"
15 #include "boost/format.hpp"
18 #include "cetlib/HorizontalRule.h"
20 #include "cetlib/sqlite/Ntuple.h"
21 #include "cetlib/sqlite/helpers.h"
23 #include "fhiclcpp/types/Atom.h"
24 #include "fhiclcpp/types/Name.h"
25 #include "fhiclcpp/types/Table.h"
26 #include "hep_concurrency/RecursiveMutex.h"
28 #include "tbb/concurrent_unordered_map.h"
29 
30 #include <algorithm>
31 #include <chrono>
32 #include <iomanip>
33 #include <iostream>
34 #include <memory>
35 #include <sstream>
36 #include <string>
37 #include <vector>
38 
39 using namespace std;
40 using namespace cet;
41 using namespace hep::concurrency;
42 
43 using chrono::steady_clock;
44 
45 namespace art {
46 
47  namespace {
48 
49  using ConcurrentKey = std::pair<ScheduleID, std::string>;
50  auto
51  key(ScheduleID const sid)
52  {
53  return ConcurrentKey{sid, {}};
54  }
55  auto
56  key(ModuleContext const& mc)
57  {
58  return ConcurrentKey{mc.scheduleID(), mc.moduleLabel()};
59  }
60 
61  auto now = bind(&steady_clock::now);
62 
63  struct Statistics {
64  explicit Statistics() = default;
65 
66  explicit Statistics(string const& p,
67  string const& label,
68  string const& type,
69  sqlite3* const db,
70  string const& table,
71  string const& column)
72  : path{p}
73  , mod_label{label}
74  , mod_type{type}
75  , min{sqlite::min(db, table, column)}
76  , mean{sqlite::mean(db, table, column)}
77  , max{sqlite::max(db, table, column)}
78  , median{sqlite::median(db, table, column)}
79  , rms{sqlite::rms(db, table, column)}
80  , n{sqlite::nrows(db, table)}
81  {}
82 
83  string path{};
84  string mod_label{};
85  string mod_type{};
86  double min{-1.};
87  double mean{-1.};
88  double max{-1.};
89  double median{-1.};
90  double rms{-1.};
91  unsigned n{0u};
92  };
93 
94  ostream&
95  operator<<(ostream& os, Statistics const& info)
96  {
97  string label{info.path};
98  if (!info.mod_label.empty()) {
99  label += ':' + info.mod_label;
100  }
101  if (!info.mod_type.empty()) {
102  label += ':' + info.mod_type;
103  }
104  os << label << " " << boost::format(" %=12g ") % info.min
105  << boost::format(" %=12g ") % info.mean
106  << boost::format(" %=12g ") % info.max
107  << boost::format(" %=12g ") % info.median
108  << boost::format(" %=12g ") % info.rms
109  << boost::format(" %=10d ") % info.n;
110  return os;
111  }
112 
113  } // unnamed namespace
114 
115  class TimeTracker {
116  public:
117  static constexpr bool service_handle_allowed{false};
118 
119  struct Config {
120  fhicl::Atom<bool> printSummary{fhicl::Name{"printSummary"}, true};
121  struct DBoutput {
124  };
125  fhicl::Table<DBoutput> dbOutput{fhicl::Name{"dbOutput"}};
126  };
128  explicit TimeTracker(Parameters const&, ActivityRegistry&);
129 
130  private:
133  steady_clock::time_point eventStart;
134  steady_clock::time_point moduleStart;
135  };
136  template <unsigned SIZE>
138  using timeSource_t =
140  using timeEvent_t =
142  using timeModule_t = cet::sqlite::
143  Ntuple<uint32_t, uint32_t, uint32_t, string, string, string, double>;
144 
145  void postSourceConstruction(ModuleDescription const&);
146  void postEndJob();
147  void preEventReading(ScheduleContext);
148  void postEventReading(Event const&, ScheduleContext);
149  void preEventProcessing(Event const&, ScheduleContext);
150  void postEventProcessing(Event const&, ScheduleContext);
151  void startTime(ModuleContext const& mc);
152  void recordTime(ModuleContext const& mc, string const& suffix);
153  void logToDestination_(Statistics const& evt,
154  vector<Statistics> const& modules);
155 
156  tbb::concurrent_unordered_map<ConcurrentKey, PerScheduleData> data_;
157  bool const printSummary_;
158  unique_ptr<cet::sqlite::Connection> const db_;
159  bool const overwriteContents_;
160  string sourceType_{};
167  };
168 
169  TimeTracker::TimeTracker(Parameters const& config, ActivityRegistry& areg)
170  : printSummary_{config().printSummary()}
172  config().dbOutput().filename())}
173  , overwriteContents_{config().dbOutput().overwrite()}
174  , timeSourceColumnNames_{{"Run", "SubRun", "Event", "Source", "Time"}}
175  , timeEventColumnNames_{{"Run", "SubRun", "Event", "Time"}}
176  , timeModuleColumnNames_{{"Run",
177  "SubRun",
178  "Event",
179  "Path",
180  "ModuleLabel",
181  "ModuleType",
182  "Time"}}
184  "TimeSource",
187  , timeEventTable_{*db_,
188  "TimeEvent",
190  overwriteContents_}
192  "TimeModule",
194  overwriteContents_}
195  {
196  areg.sPostSourceConstruction.watch(this,
198  areg.sPostEndJob.watch(this, &TimeTracker::postEndJob);
199  // Event reading
200  areg.sPreSourceEvent.watch(this, &TimeTracker::preEventReading);
201  areg.sPostSourceEvent.watch(this, &TimeTracker::postEventReading);
202  // Event execution
203  areg.sPreProcessEvent.watch(this, &TimeTracker::preEventProcessing);
204  areg.sPostProcessEvent.watch(this, &TimeTracker::postEventProcessing);
205  // Module execution
206  areg.sPreModule.watch(this, &TimeTracker::startTime);
207  areg.sPostModule.watch(
208  [this](auto const& mc) { this->recordTime(mc, ""s); });
209  areg.sPreWriteEvent.watch(this, &TimeTracker::startTime);
210  areg.sPostWriteEvent.watch(
211  [this](auto const& mc) { this->recordTime(mc, "(write)"s); });
212  }
213 
214  void
216  {
220  if (!printSummary_) {
221  return;
222  }
223  using namespace cet::sqlite;
224  query_result<size_t> rEvents;
225  rEvents << select("count(*)").from(*db_, timeEventTable_.name());
226  query_result<size_t> rModules;
227  rModules << select("count(*)").from(*db_, timeModuleTable_.name());
228  auto const nEventRows = unique_value(rEvents);
229  auto const nModuleRows = unique_value(rModules);
230  if ((nEventRows == 0) && (nModuleRows == 0)) {
231  logToDestination_(Statistics{}, vector<Statistics>{});
232  return;
233  }
234  if (nEventRows == 0 && nModuleRows != 0) {
235  string const errMsg{
236  "Malformed TimeTracker database. The TimeEvent table is empty, but\n"
237  "the TimeModule table is not. This can happen if an exception has\n"
238  "been thrown from a module while processing the first event. Any\n"
239  "saved database file is suspect and should not be used."};
240  mf::LogAbsolute("TimeTracker") << errMsg;
241  return;
242  }
243  // Gather statistics for full Event
244  // -- Unfortunately, this is not a simple query since the (e.g.)
245  // 'RootOutput(write)' times and the source time are not
246  // recorded in the TimeEvent rows. They must be added in.
247  string const fullEventTime_ddl =
248  "CREATE TABLE temp.fullEventTime AS "
249  "SELECT Run,Subrun,Event,SUM(Time) AS FullEventTime FROM ("
250  " SELECT Run,Subrun,Event,Time FROM TimeEvent"
251  " UNION"
252  " SELECT Run,Subrun,Event,Time FROM TimeModule WHERE ModuleType "
253  "LIKE '%(write)'"
254  " UNION"
255  " SELECT Run,Subrun,Event,Time FROM TimeSource"
256  ") GROUP BY Run,Subrun,Event";
257  using namespace cet::sqlite;
258  exec(*db_, fullEventTime_ddl);
259  Statistics const evtStats{
260  "Full event", "", "", *db_, "temp.fullEventTime", "FullEventTime"};
261  drop_table(*db_, "temp.fullEventTime");
263  r << select_distinct("Path", "ModuleLabel", "ModuleType")
264  .from(*db_, timeModuleTable_.name());
265  vector<Statistics> modStats;
266  modStats.emplace_back(
267  "source", sourceType_ + "(read)", "", *db_, "TimeSource", "Time");
268  for (auto const& row : r) {
269  auto const& [path, mod_label, mod_type] = row;
270  create_table_as("temp.tmpModTable",
271  select("*")
272  .from(*db_, "TimeModule")
273  .where("Path='"s + path + "'"s + " AND ModuleLabel='"s +
274  mod_label + "'"s + " AND ModuleType='"s +
275  mod_type + "'"s));
276  modStats.emplace_back(
277  path, mod_label, mod_type, *db_, "temp.tmpModTable", "Time");
278  drop_table(*db_, "temp.tmpModTable");
279  }
280  logToDestination_(evtStats, modStats);
281  }
282 
283  void
285  {
286  sourceType_ = md.moduleName();
287  }
288 
289  void
291  {
292  auto& d = data_[key(sc.id())];
293  d.eventID = EventID::invalidEvent();
294  d.eventStart = now();
295  }
296 
297  void
299  {
300  auto& d = data_[key(sc.id())];
301  d.eventID = e.id();
302  auto const t = chrono::duration<double>{now() - d.eventStart}.count();
304  d.eventID.run(), d.eventID.subRun(), d.eventID.event(), sourceType_, t);
305  }
306 
307  void
308  TimeTracker::preEventProcessing(Event const& e [[maybe_unused]],
309  ScheduleContext const sc)
310  {
311  auto& d = data_[key(sc.id())];
312  assert(d.eventID == e.id());
313  d.eventStart = now();
314  }
315 
316  void
318  {
319  auto const& d = data_[key(sc.id())];
320  auto const t = chrono::duration<double>{now() - d.eventStart}.count();
322  d.eventID.run(), d.eventID.subRun(), d.eventID.event(), t);
323  }
324 
325  void
327  {
328  data_[key(mc)].eventID = data_[key(mc.scheduleID())].eventID;
329  data_[key(mc)].moduleStart = now();
330  }
331 
332  void
333  TimeTracker::recordTime(ModuleContext const& mc, string const& suffix)
334  {
335  auto const& d = data_[key(mc)];
336  auto const t = chrono::duration<double>{now() - d.moduleStart}.count();
337  timeModuleTable_.insert(d.eventID.run(),
338  d.eventID.subRun(),
339  d.eventID.event(),
340  mc.pathName(),
341  mc.moduleLabel(),
342  mc.moduleName() + suffix,
343  t);
344  }
345 
346  void
348  vector<Statistics> const& modules)
349  {
350  size_t width{30};
351  auto identifier_size = [](Statistics const& s) {
352  return s.path.size() + s.mod_label.size() + s.mod_type.size() +
353  2; // Don't forget the two ':'s.
354  };
355  cet::for_all(modules, [&identifier_size, &width](auto const& mod) {
356  width = max(width, identifier_size(mod));
357  });
358  ostringstream msgOss;
359  HorizontalRule const rule{width + 4 + 5 * 14 + 12};
360  msgOss << '\n'
361  << rule('=') << '\n'
362  << std::setw(width + 2) << std::left << "TimeTracker printout (sec)"
363  << boost::format(" %=12s ") % "Min"
364  << boost::format(" %=12s ") % "Avg"
365  << boost::format(" %=12s ") % "Max"
366  << boost::format(" %=12s ") % "Median"
367  << boost::format(" %=12s ") % "RMS"
368  << boost::format(" %=10s ") % "nEvts"
369  << "\n";
370  msgOss << rule('=') << '\n';
371  if (evt.n == 0u) {
372  msgOss << "[ No processed events ]\n";
373  } else {
374  // N.B. setw(width) applies to the first field in
375  // ostream& operator<<(ostream&, Statistics const&).
376  msgOss << setw(width) << evt << '\n' << rule('-') << '\n';
377  for (auto const& mod : modules) {
378  msgOss << setw(width) << mod << '\n';
379  }
380  }
381  msgOss << rule('=');
382  mf::LogAbsolute("TimeTracker") << msgOss.str();
383  }
384 
385 } // namespace art
386 
void insert(Args const ...)
Definition: Ntuple.h:233
T max(const caf::Proxy< T > &a, T b)
const XML_Char XML_Encoding * info
Definition: expat.h:530
void logToDestination_(Statistics const &evt, vector< Statistics > const &modules)
void preEventReading(ScheduleContext)
T unique_value(query_result< T > const &r)
Definition: query_result.h:94
auto const & pathName() const
Definition: ModuleContext.h:33
T * get() const
Definition: ServiceHandle.h:63
#define DEFINE_ART_SERVICE(svc)
Definition: ServiceMacros.h:88
const char * p
Definition: xmltok.h:285
auto select_distinct(T const &...t)
Definition: select.h:154
static constexpr EventID invalidEvent() noexcept
Definition: EventID.h:202
void preEventProcessing(Event const &, ScheduleContext)
auto scheduleID() const
Definition: ModuleContext.h:28
tbb::concurrent_unordered_map< ConcurrentKey, PerScheduleData > data_
Int_t eventID
Definition: plot.C:81
#define DECLARE_ART_SERVICE(svc, scope)
Definition: ServiceMacros.h:86
Definition: config.py:1
string filename
Definition: shutoffs.py:106
name_array< 4u > const timeEventColumnNames_
std::string const & name() const
Definition: Ntuple.h:147
void postEventProcessing(Event const &, ScheduleContext)
void postSourceConstruction(ModuleDescription const &)
const char * label
void create_table_as(std::string const &tablename, SelectStmt const &stmt)
Definition: create_table.h:127
cet::sqlite::name_array< SIZE > name_array
const XML_Char * s
Definition: expat.h:262
static const char *const errMsg[]
Definition: Error.h:69
auto const & moduleName() const
Definition: ModuleContext.h:48
void postEventReading(Event const &, ScheduleContext)
timeModule_t timeModuleTable_
std::void_t< T > n
std::string const & moduleName() const
int evt
Float_t d
Definition: plot.C:236
unique_ptr< cet::sqlite::Connection > const db_
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14
double median(TH1D *h)
Definition: absCal.cxx:524
name_array< 7u > const timeModuleColumnNames_
timeSource_t timeSourceTable_
auto select(T const &...t)
Definition: select.h:146
steady_clock::time_point moduleStart
void recordTime(ModuleContext const &mc, string const &suffix)
static float min(const float a, const float b, const float c)
Definition: absgeo.cxx:45
std::ostream & operator<<(std::ostream &o, skim::ParametersNue const &p)
const std::string path
Definition: plot_BEN.C:43
void drop_table(sqlite3 *db, std::string const &tablename)
name_array< 5u > const timeSourceColumnNames_
constexpr auto const & left(const_AssnsIter< L, R, D, Dir > const &a, const_AssnsIter< L, R, D, Dir > const &b)
Definition: AssnsIter.h:96
void startTime(ModuleContext const &mc)
auto const & moduleLabel() const
Definition: ModuleContext.h:43
std::array< std::string, N > name_array
Definition: column.h:40
auto for_all(FwdCont &, Func)
assert(nhit_max >=nhit_nbins)
TRandom3 r(0)
Service to store calibration data products (CDP) in the SQLite3 metadatabase of a file...
Definition: FillParentInfo.h:8
T min(const caf::Proxy< T > &a, T b)
Float_t e
Definition: plot.C:35
T max(sqlite3 *const db, std::string const &table_name, std::string const &column_name)
Definition: statistics.h:66
void exec(sqlite3 *db, std::string const &ddl)
MaybeLogger_< ELseverityLevel::ELsev_severe, true > LogAbsolute
EventID id() const