Public Member Functions | Protected Member Functions | Protected Attributes | Static Protected Attributes | List of all members
ana::SAMQuerySource Class Reference

File source based on a SAM query or dataset (definition) More...

#include "/cvmfs/nova.opensciencegrid.org/externals/cafanacore/v01.11/src/CAFAna/Core/SAMQuerySource.h"

Inheritance diagram for ana::SAMQuerySource:
ana::FileListSource ana::IFileSource

Public Member Functions

 SAMQuerySource (const std::string &query, int stride=-1, int offset=-1, int limit=-1)
 
virtual ~SAMQuerySource ()
 
virtual TFile * GetNextFile () override
 Returns the next file in sequence, ready for reading. More...
 
int NFiles () const override
 May return -1 indicating the number of files is not known. More...
 
const std::vector< std::string > & GetFileNames () const
 

Protected Member Functions

std::vector< std::stringLocationsForSAMQuery (const std::string &str, int stride, int offset, int limit=-1)
 
std::vector< std::stringLocateSAMFiles (const std::vector< std::string > &fnames)
 Take filenames, return locations suitable for TFile::Open() More...
 
bool RunningOnGrid () const
 
std::string EnsureDataset (const std::string &query) const
 
std::string EnsureSnapshot (const std::string &def) const
 

Protected Attributes

std::vector< std::stringfFileNames
 The list of files. More...
 
std::vector< std::string >::iterator fIt
 Iterator into fFileNames. More...
 
std::vector< std::stringfRetry
 List of files that failed 1st attempt. More...
 
bool fInRetry
 Did we finish fFileNames and are now in fRetry? More...
 
TFile * fFile
 The most-recently-returned file. More...
 

Static Protected Attributes

static bool fgGotTickets = false
 Have we renewed our tickets? More...
 

Detailed Description

File source based on a SAM query or dataset (definition)

Locates the files on bluearc or pnfs (bluearc preferred).

Definition at line 10 of file SAMQuerySource.h.

Constructor & Destructor Documentation

ana::SAMQuerySource::SAMQuerySource ( const std::string query,
int  stride = -1,
int  offset = -1,
int  limit = -1 
)
Parameters
queryMay be a SAM dataset name or a SAM query string

Definition at line 19 of file SAMQuerySource.cxx.

23  {
24  }
FileListSource(const std::vector< std::string > &files, int stride=-1, int offset=-1, int limit=-1)
Default stride, offset, and limit mean obey cmd-line options.
std::vector< std::string > LocationsForSAMQuery(const std::string &str, int stride, int offset, int limit=-1)
ana::SAMQuerySource::~SAMQuerySource ( )
virtual

Definition at line 27 of file SAMQuerySource.cxx.

28  {
29  }

Member Function Documentation

std::string ana::SAMQuerySource::EnsureDataset ( const std::string query) const
protected

Definition at line 38 of file SAMQuerySource.cxx.

References ana::assert(), om::cout, allTimeWatchdog::endl, genie::utils::style::Format(), cet::getenv(), ana::SAMDefinitionExists(), system(), and drop_table::user.

Referenced by LocationsForSAMQuery().

39  {
40  const char* user = getenv("GRID_USER");
41  assert(user);
42 
43  TString dset = TString::Format("%s_cafana_%s", user, query.c_str());
44  // Sanitize various special characters that can appear in queries
45  dset.ReplaceAll(" ", "_");
46  dset.ReplaceAll("(", "_OPEN_");
47  dset.ReplaceAll(")", "_CLOSE_");
48  dset.ReplaceAll(":", "_COLON_");
49  dset.ReplaceAll("'", "_SQUOTE_");
50  dset.ReplaceAll("\"", "_DQUOTE_");
51 
52  std::cout << "Creating dataset " << dset << " for query " << query << std::endl;
53 
54  if(!SAMDefinitionExists(dset.Data())){
55  // I would be much much happier to do this in proper code, but I'm not
56  // sure how, there's no samweb C++ API?
57  system(TString::Format("samweb create-definition %s %s",
58  dset.Data(), query.c_str()).Data());
59  }
60 
61  return dset.Data();
62  }
system("rm -rf microbeam.root")
std::string getenv(std::string const &name)
OStream cout
Definition: OStream.cxx:6
bool SAMDefinitionExists(const std::string &def)
Definition: UtilsExt.cxx:296
assert(nhit_max >=nhit_nbins)
void Format(TGraph *gr, int lcol, int lsty, int lwid, int mcol, int msty, double msiz)
Definition: Style.cxx:154
std::string ana::SAMQuerySource::EnsureSnapshot ( const std::string def) const
protected

Definition at line 65 of file SAMQuerySource.cxx.

References ana::assert(), om::cout, allTimeWatchdog::endl, genie::utils::style::Format(), cet::getenv(), LocationsForSAMQuery(), DCS_db_parser::period, launch_batch_jobs::process, ana::SAMDefinitionExists(), string, system(), and drop_table::user.

Referenced by LocationsForSAMQuery().

66  {
67  if(!SAMDefinitionExists(def)){
68  std::cout << "SAMQuerySource::EnsureSnapshot(): definition '"
69  << def << "' not found." << std::endl;
70  abort();
71  }
72 
73  const char* user = getenv("GRID_USER");
74  assert(user);
75  const char* cluster = getenv("CLUSTER");
76  assert(cluster);
77  const char* process = getenv("PROCESS");
78  assert(process);
79 
80  // Jobs in the same cluster should share the same snapshot of the dataset
81  // so as not to hammer SAM with multiple requests for the same file list,
82  // but so that the dataset snapshot is updated with every new submission.
83  const std::string snap = TString::Format("%s_cafana_snap_%s_%s",
84  user, def.c_str(), cluster).Data();
85 
86  // I'd love to do all this with a proper API, but samweb doesn't seem to
87  // have a C++ one? So we get this stew of system() calls...
88 
89  // Use this name as an indication that someone is working on creating the
90  // snapshot and every one else should stand by.
91  const std::string snaplock = TString::Format("%s_cafana_snap_lock_%s_%s",
92  user, def.c_str(), cluster).Data();
93 
94  // Try to create the lock. Success means we have to create the snapshot,
95  // failure means someone else is working on it. The content of the
96  // definition (the nova.special) doesn't matter, except it has to be unique
97  // between the jobs, because trying to create an exact duplicate of an
98  // existing definition counts as success.
99  std::cout << "Checking lock " << snaplock << std::endl;
100  if(system(TString::Format("samweb create-definition %s nova.special %s",
101  snaplock.c_str(), process).Data()) == 0){
102  // No one took the lock, it's up to us. Make the actual snapshot
103  std::cout << "Snapshotting " << def << " as " << snap << std::endl;
104  system(TString::Format("samweb take-snapshot %s | xargs samweb create-definition %s snapshot_id",
105  def.c_str(), snap.c_str()).Data());
106  }
107  else{
108  // Lock already exists, just wait for the real snapshot to be created
109  double period = 1;
110  while(!SAMDefinitionExists(snap)){
111  sleep(int(period));
112  period *= 1.5;
113  if(period > 60*30){
114  std::cout << "We've been waiting a real long time for " << snap << " to be created. I don't think it's happening." << std::endl;
115  abort();
116  }
117  }
118  }
119  std::cout << "Will use " << snap << std::endl;
120 
121  return snap;
122  }
system("rm -rf microbeam.root")
std::string getenv(std::string const &name)
OStream cout
Definition: OStream.cxx:6
bool SAMDefinitionExists(const std::string &def)
Definition: UtilsExt.cxx:296
assert(nhit_max >=nhit_nbins)
void Format(TGraph *gr, int lcol, int lsty, int lwid, int mcol, int msty, double msiz)
Definition: Style.cxx:154
enum BeamMode string
const std::vector<std::string>& ana::FileListSource::GetFileNames ( ) const
inlineinherited

Definition at line 22 of file FileListSource.h.

References ana::FileListSource::fFileNames.

22 { return fFileNames; }
std::vector< std::string > fFileNames
The list of files.
TFile * ana::FileListSource::GetNextFile ( )
overridevirtualinherited

Returns the next file in sequence, ready for reading.

A null return means that the end of the sequence has been reached. DO NOT close or delete the file that is returned.

Implements ana::IFileSource.

Definition at line 78 of file FileListSource.cxx.

References ana::assert(), om::cout, allTimeWatchdog::endl, ana::FileListSource::fFile, ana::FileListSource::fFileNames, ana::FileListSource::fInRetry, ana::FileListSource::fIt, ana::FileListSource::fRetry, MECModelEnuComparisons::i, PandAna.Demos.demo0::loc, ana::pnfs2xrootd(), and string.

Referenced by make_dst_cosrejbdttrain(), trimmubarid(), and trimvar().

79  {
80  // Tidy up the last file we gave, which the caller no longer needs
81  delete fFile;
82  fFile = 0;
83 
84  // Did we run out of files?
85  if(fInRetry && fIt == fRetry.end()) return 0;
86  if(fIt == fFileNames.end()){
87  if(fRetry.empty()) return 0;
88  fIt = fRetry.begin();
89  fInRetry = true;
90  }
91 
92  // If the file is on pnfs rewrite it to an xrootd address
93  std::string loc = *fIt;
94  loc = pnfs2xrootd(loc); // no-op for non /pnfs locations
95 
96  if(fInRetry) std::cout << "Retrying " << loc << " which was previously deferred..." << std::endl;
97 
98  fFile = TFile::Open(loc.c_str()); // This pattern allows xrootd
99 
100  if(!fFile){
101  if(!fInRetry){
102  std::cout << "Unable to open " << loc << std::endl;
103  std::cout << "Will skip this file and try again later." << std::endl;
104  fRetry.push_back(*fIt);
105  ++fIt;
106  return GetNextFile();
107  }
108  else{
109  const int Nretry = 3;
110  std::cout << "Failed to open. Will retry " << Nretry << " more times" << std::endl;
111  for(int i = 0; i < Nretry; ++i){
112  std::cout << "Attempt " << i << std::endl;
113  fFile = TFile::Open(loc.c_str());
114  if(fFile){
115  std::cout << "Success!" << std::endl;
116  break;
117  }
118  } // end for i
119  if(!fFile){
120  std::cout << "Still unable to read " << loc << std::endl;
121  std::cout << "Aborting" << std::endl;
122  abort();
123  }
124  }
125  }
126 
127  // The above should have guaranteed this
128  assert(fFile);
129 
130  ++fIt; // Move on to the next file, for the subsequent call
131 
132  return fFile;
133  }
std::vector< std::string >::iterator fIt
Iterator into fFileNames.
std::string pnfs2xrootd(std::string loc, bool unauth)
Definition: UtilsExt.cxx:237
std::vector< std::string > fFileNames
The list of files.
bool fInRetry
Did we finish fFileNames and are now in fRetry?
OStream cout
Definition: OStream.cxx:6
std::vector< std::string > fRetry
List of files that failed 1st attempt.
virtual TFile * GetNextFile() override
Returns the next file in sequence, ready for reading.
assert(nhit_max >=nhit_nbins)
TFile * fFile
The most-recently-returned file.
enum BeamMode string
std::vector< std::string > ana::SAMQuerySource::LocateSAMFiles ( const std::vector< std::string > &  fnames)
protected

Take filenames, return locations suitable for TFile::Open()

Definition at line 190 of file SAMQuerySource.cxx.

References om::cerr, allTimeWatchdog::endl, MakeMiniprodValidationCuts::f, genie::utils::style::Format(), cet::getenv(), MECModelEnuComparisons::i, ProjMan::ifdh, it, PandAna.Demos.demo0::loc, cacheDefinitionData::prog, runNovaSAM::ret, RunningOnGrid(), and string.

Referenced by LocationsForSAMQuery().

191  {
192  std::vector<std::string> ret;
193 
194  // We're going to fill this map of locations for all the files
195  std::map<std::string, std::vector<std::string>> locmap;
196 
197  Progress prog(TString::Format("Looking up locations of %ld files using SAM", fnames.size()).Data());
198 
199  ifdh i;
200  i.set_debug("0"); // shut up
201 
202  // locateFiles() saves the roundtrip time of talking to the server about
203  // every file individually, but it seems to bog down for large
204  // queries. Split the query into chunks. Experimentally this is about the
205  // sweet spot.
206  const unsigned int kStep = 50;
207  for(unsigned int fIdx = 0; fIdx < fnames.size(); fIdx += kStep){
208  prog.SetProgress(double(fIdx)/fnames.size());
209 
210  // The files we're looking up right now. Careful not to run off the end
211  // of the vector.
212  const std::vector<std::string> fslice(fnames.begin()+fIdx, fIdx+kStep < fnames.size() ? fnames.begin()+fIdx+kStep : fnames.end());
213 
214  const auto locslice = i.locateFiles(fslice);
215 
216  locmap.insert(locslice.begin(), locslice.end());
217  }
218 
219  prog.Done();
220 
221 
222  // Now go through the map and pick our favourite location for each file,
223  // and do some cleanup.
224  for(auto it: locmap){
225  const std::string& f = it.first;
226  const std::vector<std::string>& locs = it.second;
227 
228  int best = 0;
229 
230  std::string resolved;
231  for(TString loc: locs){
232  // This is a short-term hack to prevent files with odd locations (of
233  // which JINR is the only example so far) with locations inaccessible
234  // from FNAL from gumming things up. Really we need a generic system
235  // for deciding on the best file location, but no such thing seems to
236  // be readily available.
237  if(loc.Contains("jinr.ru") &&
238  getenv("CAFANA_ALLOW_ALL_FILE_LOCATIONS") == 0) continue;
239 
240  // Never try to access bluearc locations from the grid
241  if(!RunningOnGrid() && loc.BeginsWith("novadata:") && best < 3){
242  loc.Remove(0, 9);
243 
244  // Rewrite FNAL bluearc paths so users with matching directory
245  // structures offsite can access their local copies.
246  if(std::getenv("NOVA_ANA" )) loc.ReplaceAll("/nova/ana", std::getenv("NOVA_ANA"));
247  if(std::getenv("NOVA_APP" )) loc.ReplaceAll("/nova/app", std::getenv("NOVA_APP"));
248  if(std::getenv("NOVA_DATA")) loc.ReplaceAll("/nova/data", std::getenv("NOVA_DATA"));
249  if(std::getenv("NOVA_PROD")) loc.ReplaceAll("/nova/prod", std::getenv("NOVA_PROD"));
250 
251  // Check if the file exists at that location. If not, maybe pnfs has
252  // it.
253  struct stat junk;
254  if(stat((resolved+'/'+f).c_str(), &junk) == 0){
255  best = 3; // Prefer bluearc locations
256  resolved = loc;
257  }
258  }
259 
260  if(loc.BeginsWith("dcache:") && best < 2){
261  // Otherwise, used xrootd. Prefer "dache:" to "enstore:" because
262  // "dcache:" probably means /pnfs/nova/persistent/ so no chance of a
263  // huge wait for tape.
264  best = 2;
265 
266  // FileListSource does the actual conversion to xrootd
267  loc.ReplaceAll("dcache:/pnfs/", "/pnfs/");
268  // Strip the bit in brackets from the end
269  if(loc.First('(') >= 0) loc.Resize(loc.First('('));
270  resolved = loc;
271  }
272 
273  if(loc.BeginsWith("enstore:") && best < 1){
274  best = 1;
275 
276  loc.ReplaceAll("enstore:/pnfs/", "/pnfs/");
277  if(loc.First('(') >= 0) loc.Resize(loc.First('('));
278  resolved = loc;
279  }
280 
281  } // end for loc
282 
283  if(best == 0 || resolved.empty()){
284  std::cerr << "\nCouldn't find a usable location for " << f
285  << "'. Aborting..." << std::endl;
286  abort();
287  }
288 
289  ret.push_back((resolved+'/'+f));
290  } // end for fIdx
291 
292  return ret;
293  }
set< int >::iterator it
bool RunningOnGrid() const
OStream cerr
Definition: OStream.cxx:7
std::string getenv(std::string const &name)
ifdh
Definition: ProjMan.py:8
void Format(TGraph *gr, int lcol, int lsty, int lwid, int mcol, int msty, double msiz)
Definition: Style.cxx:154
enum BeamMode string
std::vector< std::string > ana::SAMQuerySource::LocationsForSAMQuery ( const std::string str,
int  stride,
int  offset,
int  limit = -1 
)
protected

Definition at line 126 of file SAMQuerySource.cxx.

References om::cerr, om::cout, e, allTimeWatchdog::endl, EnsureDataset(), EnsureSnapshot(), compareCafs::files, genie::utils::style::Format(), cet::getenv(), MECModelEnuComparisons::i, ProjMan::ifdh, LocateSAMFiles(), febshutoff_auto::query, RunningOnGrid(), and submit_syst::str.

Referenced by EnsureSnapshot().

127  {
128  TString query = str;
129 
130  // This is an actual query
131  if(query.Contains(' ') && RunningOnGrid()){
132  // On the grid we want to convert that to a dataset we can snapshot below
133  query = EnsureDataset(query.Data());
134  }
135 
136  // This is a dataset name
137  if(!query.Contains(' ')){
138  if(getenv("CAFANA_USE_SNAPSHOTS")){
139  query = "dataset_def_name_newest_snapshot "+query;
140  }
141  else{
142  // Take one snapshot between all the jobs and share that
143  if(RunningOnGrid()) query = EnsureSnapshot(query.Data());
144 
145  query = "defname: "+query;
146  }
147  }
148 
149  if(stride > 1){
150  query += TString::Format(" with stride %d", stride).Data();
151  if(offset > 0){
152  query += TString::Format(" offset %d", offset).Data();
153  }
154  }
155 
156  if(limit > 0)
157  query += TString::Format(" with limit %d", limit).Data();
158 
159  std::cout << "Looking up files matching '" << query << "' using SAM...\n";
160 
161  std::vector<std::string> files;
162 
163  ifdh i;
164  i.set_debug("0"); // shut up
165  try{
166  files = i.translateConstraints(query.Data());
167  }
168  catch(ifdh_util_ns::WebAPIException& e){
169  // I like my error message below better, since this could well be a
170  // mistyped filename.
171  }
172 
173  if(files.empty()){
174  std::cerr << "\nCan't find any files matching '" << str
175  << "'. Aborting..." << std::endl;
176  abort();
177  }
178 
179  // IFDH likes to give back an empty string as the last response
180  // https://cdcvs.fnal.gov/redmine/issues/6718
181  if(!files.empty() && files.back().empty()){
182  files.pop_back();
183  }
184 
185  return LocateSAMFiles(files);
186  }
bool RunningOnGrid() const
OStream cerr
Definition: OStream.cxx:7
std::string getenv(std::string const &name)
std::string EnsureDataset(const std::string &query) const
ifdh
Definition: ProjMan.py:8
OStream cout
Definition: OStream.cxx:6
std::string EnsureSnapshot(const std::string &def) const
std::vector< std::string > LocateSAMFiles(const std::vector< std::string > &fnames)
Take filenames, return locations suitable for TFile::Open()
void Format(TGraph *gr, int lcol, int lsty, int lwid, int mcol, int msty, double msiz)
Definition: Style.cxx:154
Float_t e
Definition: plot.C:35
int ana::FileListSource::NFiles ( ) const
inlineoverridevirtualinherited

May return -1 indicating the number of files is not known.

Reimplemented from ana::IFileSource.

Definition at line 20 of file FileListSource.h.

References ana::FileListSource::fFileNames.

Referenced by trimmubarid(), trimvar(), and ana::SpectrumLoaderBase::WildcardOrSAMQuery().

20 {return fFileNames.size();}
std::vector< std::string > fFileNames
The list of files.
bool ana::SAMQuerySource::RunningOnGrid ( ) const
protected

Definition at line 32 of file SAMQuerySource.cxx.

References cet::getenv().

Referenced by LocateSAMFiles(), and LocationsForSAMQuery().

33  {
34  return getenv("_CONDOR_SCRATCH_DIR") != 0;
35  }
std::string getenv(std::string const &name)

Member Data Documentation

TFile* ana::FileListSource::fFile
protectedinherited

The most-recently-returned file.

Definition at line 28 of file FileListSource.h.

Referenced by ana::FileListSource::GetNextFile(), and ana::FileListSource::~FileListSource().

std::vector<std::string> ana::FileListSource::fFileNames
protectedinherited
bool ana::FileListSource::fgGotTickets = false
staticprotectedinherited

Have we renewed our tickets?

Definition at line 29 of file FileListSource.h.

Referenced by ana::FileListSource::FileListSource().

bool ana::FileListSource::fInRetry
protectedinherited

Did we finish fFileNames and are now in fRetry?

Definition at line 27 of file FileListSource.h.

Referenced by ana::FileListSource::GetNextFile().

std::vector<std::string>::iterator ana::FileListSource::fIt
protectedinherited

Iterator into fFileNames.

Definition at line 25 of file FileListSource.h.

Referenced by ana::FileListSource::FileListSource(), and ana::FileListSource::GetNextFile().

std::vector<std::string> ana::FileListSource::fRetry
protectedinherited

List of files that failed 1st attempt.

Definition at line 26 of file FileListSource.h.

Referenced by ana::FileListSource::GetNextFile().


The documentation for this class was generated from the following files: