SAMQuerySource.cxx
Go to the documentation of this file.
2 
3 #include "CAFAna/Core/Progress.h"
4 #include "CAFAna/Core/UtilsExt.h"
5 
6 #include "ifdh.h"
7 
8 #include <cassert>
9 #include <iostream>
10 #include <map>
11 #include <sys/stat.h>
12 #include <unistd.h>
13 
14 #include "TString.h"
15 
16 namespace ana
17 {
18  //----------------------------------------------------------------------
20  int stride, int offset, int limit)
21  // Stride and offset already taken account of in the query
22  : FileListSource(LocationsForSAMQuery(query, stride, offset, limit), 1, 0)
23  {
24  }
25 
26  //----------------------------------------------------------------------
28  {
29  }
30 
31  //----------------------------------------------------------------------
33  {
34  return getenv("_CONDOR_SCRATCH_DIR") != 0;
35  }
36 
37  //----------------------------------------------------------------------
39  {
40  const char* user = getenv("GRID_USER");
41  assert(user);
42 
43  TString dset = TString::Format("%s_cafana_%s", user, query.c_str());
44  // Sanitize various special characters that can appear in queries
45  dset.ReplaceAll(" ", "_");
46  dset.ReplaceAll("(", "_OPEN_");
47  dset.ReplaceAll(")", "_CLOSE_");
48  dset.ReplaceAll(":", "_COLON_");
49  dset.ReplaceAll("'", "_SQUOTE_");
50  dset.ReplaceAll("\"", "_DQUOTE_");
51 
52  std::cout << "Creating dataset " << dset << " for query " << query << std::endl;
53 
54  if(!SAMDefinitionExists(dset.Data())){
55  // I would be much much happier to do this in proper code, but I'm not
56  // sure how, there's no samweb C++ API?
57  system(TString::Format("samweb create-definition %s %s",
58  dset.Data(), query.c_str()).Data());
59  }
60 
61  return dset.Data();
62  }
63 
64  //----------------------------------------------------------------------
66  {
67  if(!SAMDefinitionExists(def)){
68  std::cout << "SAMQuerySource::EnsureSnapshot(): definition '"
69  << def << "' not found." << std::endl;
70  abort();
71  }
72 
73  const char* user = getenv("GRID_USER");
74  assert(user);
75  const char* cluster = getenv("CLUSTER");
76  assert(cluster);
77  const char* process = getenv("PROCESS");
78  assert(process);
79 
80  // Jobs in the same cluster should share the same snapshot of the dataset
81  // so as not to hammer SAM with multiple requests for the same file list,
82  // but so that the dataset snapshot is updated with every new submission.
83  const std::string snap = TString::Format("%s_cafana_snap_%s_%s",
84  user, def.c_str(), cluster).Data();
85 
86  // I'd love to do all this with a proper API, but samweb doesn't seem to
87  // have a C++ one? So we get this stew of system() calls...
88 
89  // Use this name as an indication that someone is working on creating the
90  // snapshot and every one else should stand by.
91  const std::string snaplock = TString::Format("%s_cafana_snap_lock_%s_%s",
92  user, def.c_str(), cluster).Data();
93 
94  // Try to create the lock. Success means we have to create the snapshot,
95  // failure means someone else is working on it. The content of the
96  // definition (the nova.special) doesn't matter, except it has to be unique
97  // between the jobs, because trying to create an exact duplicate of an
98  // existing definition counts as success.
99  std::cout << "Checking lock " << snaplock << std::endl;
100  if(system(TString::Format("samweb create-definition %s nova.special %s",
101  snaplock.c_str(), process).Data()) == 0){
102  // No one took the lock, it's up to us. Make the actual snapshot
103  std::cout << "Snapshotting " << def << " as " << snap << std::endl;
104  system(TString::Format("samweb take-snapshot %s | xargs samweb create-definition %s snapshot_id",
105  def.c_str(), snap.c_str()).Data());
106  }
107  else{
108  // Lock already exists, just wait for the real snapshot to be created
109  double period = 1;
110  while(!SAMDefinitionExists(snap)){
111  sleep(int(period));
112  period *= 1.5;
113  if(period > 60*30){
114  std::cout << "We've been waiting a real long time for " << snap << " to be created. I don't think it's happening." << std::endl;
115  abort();
116  }
117  }
118  }
119  std::cout << "Will use " << snap << std::endl;
120 
121  return snap;
122  }
123 
124  //----------------------------------------------------------------------
125  std::vector<std::string> SAMQuerySource::
127  {
128  TString query = str;
129 
130  // This is an actual query
131  if(query.Contains(' ') && RunningOnGrid()){
132  // On the grid we want to convert that to a dataset we can snapshot below
133  query = EnsureDataset(query.Data());
134  }
135 
136  // This is a dataset name
137  if(!query.Contains(' ')){
138  if(getenv("CAFANA_USE_SNAPSHOTS")){
139  query = "dataset_def_name_newest_snapshot "+query;
140  }
141  else{
142  // Take one snapshot between all the jobs and share that
143  if(RunningOnGrid()) query = EnsureSnapshot(query.Data());
144 
145  query = "defname: "+query;
146  }
147  }
148 
149  if(stride > 1){
150  query += TString::Format(" with stride %d", stride).Data();
151  if(offset > 0){
152  query += TString::Format(" offset %d", offset).Data();
153  }
154  }
155 
156  if(limit > 0)
157  query += TString::Format(" with limit %d", limit).Data();
158 
159  std::cout << "Looking up files matching '" << query << "' using SAM...\n";
160 
161  std::vector<std::string> files;
162 
163  ifdh i;
164  i.set_debug("0"); // shut up
165  try{
166  files = i.translateConstraints(query.Data());
167  }
168  catch(ifdh_util_ns::WebAPIException& e){
169  // I like my error message below better, since this could well be a
170  // mistyped filename.
171  }
172 
173  if(files.empty()){
174  std::cerr << "\nCan't find any files matching '" << str
175  << "'. Aborting..." << std::endl;
176  abort();
177  }
178 
179  // IFDH likes to give back an empty string as the last response
180  // https://cdcvs.fnal.gov/redmine/issues/6718
181  if(!files.empty() && files.back().empty()){
182  files.pop_back();
183  }
184 
185  return LocateSAMFiles(files);
186  }
187 
188  //----------------------------------------------------------------------
189  std::vector<std::string> SAMQuerySource::
190  LocateSAMFiles(const std::vector<std::string>& fnames)
191  {
192  std::vector<std::string> ret;
193 
194  // We're going to fill this map of locations for all the files
195  std::map<std::string, std::vector<std::string>> locmap;
196 
197  Progress prog(TString::Format("Looking up locations of %ld files using SAM", fnames.size()).Data());
198 
199  ifdh i;
200  i.set_debug("0"); // shut up
201 
202  // locateFiles() saves the roundtrip time of talking to the server about
203  // every file individually, but it seems to bog down for large
204  // queries. Split the query into chunks. Experimentally this is about the
205  // sweet spot.
206  const unsigned int kStep = 50;
207  for(unsigned int fIdx = 0; fIdx < fnames.size(); fIdx += kStep){
208  prog.SetProgress(double(fIdx)/fnames.size());
209 
210  // The files we're looking up right now. Careful not to run off the end
211  // of the vector.
212  const std::vector<std::string> fslice(fnames.begin()+fIdx, fIdx+kStep < fnames.size() ? fnames.begin()+fIdx+kStep : fnames.end());
213 
214  const auto locslice = i.locateFiles(fslice);
215 
216  locmap.insert(locslice.begin(), locslice.end());
217  }
218 
219  prog.Done();
220 
221 
222  // Now go through the map and pick our favourite location for each file,
223  // and do some cleanup.
224  for(auto it: locmap){
225  const std::string& f = it.first;
226  const std::vector<std::string>& locs = it.second;
227 
228  int best = 0;
229 
230  std::string resolved;
231  for(TString loc: locs){
232  // Never try to access bluearc locations from the grid
233  if(!RunningOnGrid() && loc.BeginsWith("novadata:") && best < 3){
234  loc.Remove(0, 9);
235 
236  // Rewrite FNAL bluearc paths so users with matching directory
237  // structures offsite can access their local copies.
238  if(std::getenv("NOVA_ANA" )) loc.ReplaceAll("/nova/ana", std::getenv("NOVA_ANA"));
239  if(std::getenv("NOVA_APP" )) loc.ReplaceAll("/nova/app", std::getenv("NOVA_APP"));
240  if(std::getenv("NOVA_DATA")) loc.ReplaceAll("/nova/data", std::getenv("NOVA_DATA"));
241  if(std::getenv("NOVA_PROD")) loc.ReplaceAll("/nova/prod", std::getenv("NOVA_PROD"));
242 
243  // Check if the file exists at that location. If not, maybe pnfs has
244  // it.
245  struct stat junk;
246  if(stat((resolved+'/'+f).c_str(), &junk) == 0){
247  best = 3; // Prefer bluearc locations
248  resolved = loc;
249  }
250  }
251 
252  if(loc.BeginsWith("dcache:") && best < 2){
253  // Otherwise, used xrootd. Prefer "dache:" to "enstore:" because
254  // "dcache:" probably means /pnfs/nova/persistent/ so no chance of a
255  // huge wait for tape.
256  best = 2;
257 
258  // FileListSource does the actual conversion to xrootd
259  loc.ReplaceAll("dcache:/pnfs/", "/pnfs/");
260  // Strip the bit in brackets from the end
261  if(loc.First('(') >= 0) loc.Resize(loc.First('('));
262  resolved = loc;
263  }
264 
265  if(loc.BeginsWith("enstore:") && best < 1){
266  best = 1;
267 
268  loc.ReplaceAll("enstore:/pnfs/", "/pnfs/");
269  if(loc.First('(') >= 0) loc.Resize(loc.First('('));
270  resolved = loc;
271  }
272 
273  } // end for loc
274 
275  if(best == 0 || resolved.empty()){
276  std::cerr << "\nCouldn't find a usable location for " << f
277  << "'. Aborting..." << std::endl;
278  abort();
279  }
280 
281  ret.push_back((resolved+'/'+f));
282  } // end for fIdx
283 
284  return ret;
285  }
286 }
Cuts and Vars for the 2020 FD DiF Study.
Definition: vars.h:6
system("rm -rf microbeam.root")
set< int >::iterator it
std::vector< std::string > LocationsForSAMQuery(const std::string &str, int stride, int offset, int limit=-1)
bool RunningOnGrid() const
OStream cerr
Definition: OStream.cxx:7
SAMQuerySource(const std::string &query, int stride=-1, int offset=-1, int limit=-1)
std::string getenv(std::string const &name)
std::string EnsureDataset(const std::string &query) const
ifdh
Definition: ProjMan.py:8
OStream cout
Definition: OStream.cxx:6
::xsd::cxx::tree::string< char, simple_type > string
Definition: Database.h:154
std::string EnsureSnapshot(const std::string &def) const
Simple file source based on an explicit list provided by the user.
std::vector< std::string > LocateSAMFiles(const std::vector< std::string > &fnames)
Take filenames, return locations suitable for TFile::Open()
bool SAMDefinitionExists(const std::string &def)
Definition: UtilsExt.cxx:296
assert(nhit_max >=nhit_nbins)
A simple ascii-art progress bar.
Definition: Progress.h:9
void Format(TGraph *gr, int lcol, int lsty, int lwid, int mcol, int msty, double msiz)
Definition: Style.cxx:154
Float_t e
Definition: plot.C:35