runNovaSAM_cmake.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 import os, sys
4 import shutil
5 import samweb_client, ifdh
6 import pprint
7 import argparse
8 import subprocess
9 import ROOT
10 import re
11 import md5, json
12 
13 from samweb_client.utility import fileEnstoreChecksum
14 
15 # Stop ROOT from complaining about not having dictionaries, 3000 is below kError but above kWarning.
16 ROOT.gErrorIgnoreLevel=3000
17 
18 detectors = dict()
19 detectors["fd"] = "fardet"
20 detectors["nd"] = "neardet"
21 detectors["ndos"] = "ndos"
22 
23 #Necessary for createMetadata function
24 _lineregex = re.compile(r'\d+: ([^ \t]+)[ \t]+(.+)')
25 
26 #files
27 _skip_pattern = re.compile(r"^.*(hist|RootOutput).*\.root", re.IGNORECASE)
28 #_skip_pattern = re.compile(r"^.*(RootOutput).*\.root", re.IGNORECASE)
29 
30 def unCamelCase(s):
31  """ Convert CamelCase to camel_case """
32  # \B matches an empty string which is NOT at the beginning of a word
33  # so requiring this means no _ will be inserted at the start
34  return re.sub( r'\B([A-Z])', r'_\1', s).lower()
35 
36 
37 def createMetadata(inFile):
38  filename = os.path.basename(inFile)
39  filesize = os.path.getsize(inFile)
40 
41  try:
42  data = subprocess.check_output(["sam_metadata_dumper", str(inFile)])
43  except subprocess.CalledProcessError:
44  print "sam_metadata_dumper failed!"
45  return None
46 
47  md = {'file_name': filename, 'group': 'nova', 'file_size': filesize, 'file_format': 'root'}
48 
49  # An ad-hoc parser for the output
50  mode = 'new'
51  for line in data.split('\n'):
52  m = _lineregex.match(line)
53  if mode == 'json':
54  if not m and not line.startswith('-'):
55  data += line
56  else:
57  md[key] = json.loads(data)
58  mode = 'new'
59 
60  if mode == 'new':
61  if m:
62  key = m.group(1)
63  value = m.group(2)
64 
65  if '.' not in key:
66  key = unCamelCase(key)
67 
68  # ART does not appear to set the application name
69  # but does provide a process name
70  if key == 'process_name' and md.get('application',[]).get('name') is None:
71  key = 'application_name'
72  elif key == 'stream_name':
73  key = 'data_stream'
74 
75  if key in ('start_date','end_date'):
76  value = long(value)
77 
78  if key == 'runs' or key == 'parents':
79  mode = 'json'
80  data = value
81  elif key.startswith('application_'):
82  # special handling for applications
83  if 'application' not in md:
84  md['application'] = {}
85  md['application'][key[12:]] = value
86  else:
87  md[key] = value
88 
89  md['crc'] = fileEnstoreChecksum(inFile)
90 
91  return md
92 
93 def getOutDir(pathname, hashDirs=False):
94  if "DEST" in os.environ:
95  dest = os.environ["DEST"]
96  else:
97  print "No destination set!"
98  exit(1)
99  dirs = [dest]
100  if hashDirs:
101  head, tail = os.path.split(pathname)
102  hash = md5.new(tail)
103  dirs += list(hash.hexdigest()[:3])
104  return os.path.join(*dirs)
105 
106 def checkAndMoveFiles(inFile, declareFiles):
107  """Checks all root files in the current directories for zombie or recovered status. Bad files are deleted while good files are moved to the results subdirectory for copy out ease"""
108  inFileBase = os.path.basename(inFile)
109  baseDir = "."
110  for root, dirs, filenames in os.walk(baseDir):
111  if root == baseDir:
112  for file in filenames:
113  if file.endswith (".root") and file != inFileBase:
114  fileWPath = os.path.join(root, file)
115  rootFile = ROOT.TFile(fileWPath)
116  if rootFile.IsZombie() or rootFile.TestBit(ROOT.TFile.kRecovered):
117  #do not copy over, just delete
118  print "File", fileWPath, "is Zombie - remove it"
119  os.remove(fileWPath)
120  else:
121  if declareFiles == True:
122  declareFile(fileWPath)
123  newFilePath = os.path.join(root, "results", file)
124  os.renames(fileWPath, newFilePath)
125  rootFile.Close()
126  return
127 
128 def declareFile(fileWPath):
129  """Checks file for a TKey of RootFileDB. If it exists, run sam_metadata_dumper and construct appropriate metadata for the file. Use that metadata to declare the file to SAM"""
130  samweb = samweb_client.SAMWebClient(experiment='nova')
131  rootFile = ROOT.TFile(fileWPath)
132  if rootFile.FindKey("RootFileDB"):
133  print "Found key (RootFileDB): constructing metadata for", fileWPath
134  md = createMetadata(fileWPath)
135  if (md == None):
136  print "No metadata found!"
137  else:
138  pprint.pprint(md)
139  print ''
140  print "Declaring", fileWPath, "to SAM"
141  try:
142  samweb.declareFile(md)
143  except:
144  print fileWPath, "already exists in SAM"
145  else:
146  print fileWPath, "does not contain RootFileDB, do not try to declare"
147  rootFile.Close()
148  return
149 
150 def copyOutFiles(hashDirs=False):
151  """Builtin facility to copy out art files (automatically excludes hist files). This adds in a subdirectories with a single hex digit each corresponding to the first three digits in the hash of the output file name. This splits up files into 4096 separate subdirectories, preventing overfull directories. Copy out does not happen if the file already exists in the output"""
152  print "Copying out files"
153  dh = ifdh.ifdh("http://samweb.fnal.gov:8480/sam/nova/api")
154  baseDir = "./results"
155  for root, dirs, filenames in os.walk(baseDir):
156  if root == baseDir:
157  for file in filenames:
158  if file.endswith (".root") and file != inFileBase:
159  fileWPath = os.path.join(root, file)
160  dest = getOutDir(file, hashDirs)
161  skip_match = _skip_pattern.match(file)
162  if skip_match == None:
163  try:
164  print "Checking if", dest + "/" + file, "exists"
165  dh.ls(dest + "/" + file, 1, "")
166  except RuntimeError:
167  print "It doesn't - copy", fileWPath, "to", dest
168  dh.cp(["-D", fileWPath, dest])
169  print "Removing", fileWPath
170  os.remove(fileWPath)
171  dh.cleanup()
172  return
173 
174 def getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt):
175  if dataFlag == "sim":
176  pattern = re.compile(r"^.*?_(.*?)_\d+.*_v\d+_(.*?)(\.|\_)sim\..+")
177  match = pattern.match(inFileBase)
178  if match != None:
179  simLabel = match.groups()[0]
180  uniq = match.groups()[1]
181  outName = '{0}_{1}_{2}_r{3:08d}_s{4:02d}_{5}_v{6}_{7}_{8}.{9}.root'.format(detector, simLabel, nevt, runNum, subNum, release, subversion, uniq, dataFlag, tier)
182  else:
183  print "regex couldn't find the sim label and the uniqueifier"
184  exit(1)
185  else:
186  outName = '{0}_r{1:08d}_s{2:02d}_{3}_{4}_v{5}_{6}.{7}.root'.format(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier)
187  return outName
188 
189 def resolveFclPath(fcl):
190  # Check if we have an absolute path name, return it if so
191  if fcl[0] == "/":
192  return fcl
193 
194  # SRT sets up FHICL_FILE_PATH with extra / at end of directories;
195  # cmake does not. Need to add + "/" + below to account for this.
196  fclPaths = os.environ["FHICL_FILE_PATH"].split(":")
197  for path in fclPaths:
198  if os.path.isfile(path + "/" + fcl):
199  return path + "/" + fcl
200 
201  # If we haven't found it, we have a problem.
202  raise IOError(sys.argv[0] + ": config file "+ fcl+" not found in FHICL_FILE_PATH: $FHICL_FILE_PATH")
203 
204 
205 if __name__=='__main__':
206  parser = argparse.ArgumentParser(description='Run the nova command using SAM metadata')
207  parser.add_argument('inFile', help='The input file to run over', type=str)
208  parser.add_argument('--config', '-c', help='FHiCL file to use as configuration for nova executable', type=str)
209  parser.add_argument('--outTier', help='Data tier of the output file, multiple allowed, formatted as <name_in_fcl_outputs>:<data_tier>', type=str, action='append')
210  parser.add_argument('--cafTier', help='Module label for CAF output, multiple allowed. Format as <cafmaker_module_label>:<data_tier>', type=str, action='append')
211  parser.add_argument('-n', help='Number of events to run over', type=int)
212  parser.add_argument('--copyOut', help='Use the built in copy out mechanism', action='store_true')
213  parser.add_argument('--hashDirs', help='Use hash directory structure in destination directory.', action='store_true')
214  parser.add_argument('--declareFiles', help='Declare files with metadata on worker node', action='store_true')
215  parser.add_argument('--noCleanup', help='Skip working directory cleanup step, good for interactive debugging', action='store_true')
216 
217  args = parser.parse_args()
218 
219  inFile = args.inFile
220 
221  samweb = samweb_client.SAMWebClient(experiment='nova')
222 
223  inFileBase = os.path.basename(inFile)
224 
225  metadata = samweb.getMetadata(inFileBase)
226 
227  #make all lower case keys
228  metadata = {k.lower():v for k,v in metadata.items()}
229 
230  print "Input File = ", inFile, "with metadata:"
231  pprint.pprint(metadata)
232  print ''
233 
234  #if new format "runs" metadata, use that, otherwise, get from file name
235  if "runs" in metadata and len(metadata["runs"][0]) == 3:
236  runNum = int(metadata["runs"][0][0])
237  subNum = int(metadata["runs"][0][1])
238  else:
239  run_pattern = re.compile(r"^.*?_r([0-9]+)_s([0-9]+).+")
240  run_match = run_pattern.match(inFileBase)
241  if run_match != None:
242  runNum = int(run_match.groups()[0])
243  subNum = int(run_match.groups()[1])
244  else:
245  print "No run number/subrun number found!"
246  exit(1)
247 
248  print "Run Number: ", runNum
249  print "Subrun Number: ", subNum
250 
251  if "data_stream" in metadata:
252  try:
253  stream = int(metadata["data_stream"])
254  streamEntry = 't{0:02d}'.format(stream)
255  except ValueError:
256  stream = metadata["data_stream"]
257  streamEntry = stream
258  else:
259  print "No data stream found!"
260  exit(1)
261 
262  #Check for a non-standard data stream in the file name
263 
264  stream_pattern = re.compile(r"^.*?_(dd.*?)[_,.].+", re.IGNORECASE)
265  stream_match = stream_pattern.match(inFileBase)
266  if stream_match != None:
267  stream = stream_match.groups()[0]
268  streamEntry = stream
269  print "Found match!", stream
270  else:
271  print "No match, using", stream
272 
273  if "online.detector" in metadata:
274  detector = metadata["online.detector"].lower()
275  elif "nova.detectorid" in metadata:
276  detId = metadata["nova.detectorid"].lower()
277  if detId in detectors.keys():
278  detector = detectors[detId]
279  else:
280  print "No detector name found for detectorid:", detId
281  exit(1)
282  else:
283  print "No detector name found!"
284  exit(1)
285 
286  # Changed for cmake. Assumed to have no effect beyond
287  # release entry in metadata
288  if "NOVASOFT_VERSION" in os.environ:
289  release = os.environ["NOVASOFT_VERSION"]
290  else:
291  print "No release set!"
292  exit(1)
293 
294  if "file_type" in metadata:
295  file_type = metadata["file_type"]
296  else:
297  print "No file_type set!"
298  exit(1)
299 
300  if file_type == "importedDetector":
301  dataFlag = "data"
302  elif file_type == "importedSimulated":
303  dataFlag = "sim"
304  else:
305  print "Unrecognized file_type:", file_type
306  exit(1)
307 
308  if "NPASS" in os.environ:
309  subversion = os.environ["NPASS"]
310  elif "nova.subversion" in metadata:
311  subversion = metadata["nova.subversion"]
312  else:
313  subversion = 1
314 
315  nevt = None
316  if "simulated.number_of_spills" in metadata:
317  nevt = metadata["simulated.number_of_spills"]
318 
319  fclFile = args.config
320 
321  # Get a copy of the fcl file and bring it here
322  # First step, find the fcl
323  fclPath = resolveFclPath(fclFile)
324  print "Found fcl file here: ", fclPath
325  # Next step, make a temporary name for the fcl in the local directory and copy it here
326  tmpFclName = os.path.basename(fclPath).replace(".fcl", "_" + os.path.splitext(inFileBase)[0] + ".fcl")
327  print "Creating local copy : ", tmpFclName
328  shutil.copy(fclPath, tmpFclName)
329 
330  # Open the fcl file so that we can append output filenames to it
331  fclFileObj = open(tmpFclName, 'a')
332 
333  # Start setting up the nova command, add SAM parameters
334  cmdList = ['nova']
335  cmdList.append('-c')
336  cmdList.append(tmpFclName)
337  cmdList.append('--sam-application-family=nova')
338  cmdList.append('--sam-application-version=' + release)
339  cmdList.append('--sam-file-type=' + file_type)
340 
341  # if not (args.outTier or args.cafTier):
342  # raise ValueError("No output tiers or CAF module labels specified.")
343  if not args.outTier:
344  args.outTier = []
345  if not args.cafTier:
346  args.cafTier = []
347 
348  outList = []
349  # Loop over output tiers
350  for outTier in args.outTier:
351  try:
352  output = outTier.split(":")[0]
353  tier = outTier.split(":")[1]
354  except:
355  raise ValueError("Output data tier: " + outTier + "not formatted corectly, should be <output_name>:<data_tier>")
356  cmdList.append('--sam-data-tier=' + outTier)
357  cmdList.append('--sam-stream-name=' +output + ':' + str(stream))
358  outName = getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt)
359  outList.append(outName);
360  print "Output file name: ", outName, " for tier ", tier, " and output ", output
361  fclFileObj.write("outputs." + output + '.fileName: "'+ outName + '"\n')
362 
363 
364  for cafTier in args.cafTier:
365  try:
366  cafLabel = cafTier.split(":")[0]
367  tier = cafTier.split(":")[1]
368  except:
369  raise ValueError("Output data tier: " + outTier + "not formatted corectly, should be <output_name>:<data_tier>")
370  cafName = getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt)
371  print cafLabel, tier, cafName
372  fclFileObj.write("physics.producers." + cafLabel + '.CAFFilename: "' + cafName + '" \n')
373  fclFileObj.write("physics.producers." + cafLabel + '.DataTier: "' + tier + '" \n')
374 
375 
376  fclFileObj.close()
377 
378  print "Config: "
379  for line in open(tmpFclName):
380  print line.strip()
381 
382  if args.n != None:
383  cmdList.append('-n')
384  cmdList.append(str(args.n))
385  cmdList.append(inFile)
386 
387  cmd = ' '.join(cmdList)
388 
389  print 'Running:', cmd
390 
391  retCode = subprocess.call(cmdList)
392 
393  if retCode == 0:
394  if args.declareFiles == True:
395  checkAndMoveFiles(inFile, True)
396  else:
397  checkAndMoveFiles(inFile, False)
398 
399  if args.copyOut == True:
400  copyOutFiles(args.hashDirs)
401  else:
402  #job didn't succeed, remove output files if they exist
403  for file in outList:
404  try:
405  os.remove("./" + file)
406  except OSError:
407  pass
408 
409  #clean up section
410  dirList = os.listdir(".")
411  for file in dirList:
412  skip_match = _skip_pattern.match(file)
413  if skip_match != None:
414  print file, "contains *hist*.root or RootOutput*.root: clean up"
415  os.remove("./" + file)
416  if not args.noCleanup:
417  os.remove(tmpFclName)
418  os.remove(inFile)
419  dh = ifdh.ifdh("http://samweb.fnal.gov:8480/sam/nova/api")
420  dh.cleanup()
421 
422  exit(retCode)
423 
void split(double tt, double *fr)
def createMetadata(inFile)
def declareFile(fileWPath)
def fileEnstoreChecksum(path)
Definition: utility.py:50
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14
procfile open("FD_BRL_v0.txt")
def checkAndMoveFiles(inFile, declareFiles)
def copyOutFiles(hashDirs=False)
exit(0)
def getOutDir(pathname, hashDirs=False)
def getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt)