runNovaSAM_cmake.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 from __future__ import print_function
4 from builtins import str
5 import os, sys
6 import shutil
7 import samweb_client, ifdh
8 import pprint
9 import argparse
10 import subprocess
11 import ROOT
12 import re
13 import md5, json
14 
15 from samweb_client.utility import fileEnstoreChecksum
16 
17 # Stop ROOT from complaining about not having dictionaries, 3000 is below kError but above kWarning.
18 ROOT.gErrorIgnoreLevel=3000
19 
20 detectors = dict()
21 detectors["fd"] = "fardet"
22 detectors["nd"] = "neardet"
23 detectors["ndos"] = "ndos"
24 
25 #Necessary for createMetadata function
26 _lineregex = re.compile(r'\d+: ([^ \t]+)[ \t]+(.+)')
27 
28 #files
29 _skip_pattern = re.compile(r"^.*(hist|RootOutput).*\.root", re.IGNORECASE)
30 #_skip_pattern = re.compile(r"^.*(RootOutput).*\.root", re.IGNORECASE)
31 
32 def unCamelCase(s):
33  """ Convert CamelCase to camel_case """
34  # \B matches an empty string which is NOT at the beginning of a word
35  # so requiring this means no _ will be inserted at the start
36  return re.sub( r'\B([A-Z])', r'_\1', s).lower()
37 
38 
39 def createMetadata(inFile):
40  filename = os.path.basename(inFile)
41  filesize = os.path.getsize(inFile)
42 
43  try:
44  data = subprocess.check_output(["sam_metadata_dumper", str(inFile)])
45  except subprocess.CalledProcessError:
46  print("sam_metadata_dumper failed!")
47  return None
48 
49  md = {'file_name': filename, 'group': 'nova', 'file_size': filesize, 'file_format': 'root'}
50 
51  # An ad-hoc parser for the output
52  mode = 'new'
53  for line in data.split('\n'):
54  m = _lineregex.match(line)
55  if mode == 'json':
56  if not m and not line.startswith('-'):
57  data += line
58  else:
59  md[key] = json.loads(data)
60  mode = 'new'
61 
62  if mode == 'new':
63  if m:
64  key = m.group(1)
65  value = m.group(2)
66 
67  if '.' not in key:
68  key = unCamelCase(key)
69 
70  # ART does not appear to set the application name
71  # but does provide a process name
72  if key == 'process_name' and md.get('application',[]).get('name') is None:
73  key = 'application_name'
74  elif key == 'stream_name':
75  key = 'data_stream'
76 
77  if key in ('start_date','end_date'):
78  value = int(value)
79 
80  if key == 'runs' or key == 'parents':
81  mode = 'json'
82  data = value
83  elif key.startswith('application_'):
84  # special handling for applications
85  if 'application' not in md:
86  md['application'] = {}
87  md['application'][key[12:]] = value
88  else:
89  md[key] = value
90 
91  md['crc'] = fileEnstoreChecksum(inFile)
92 
93  return md
94 
95 def getOutDir(pathname, hashDirs=False):
96  if "DEST" in os.environ:
97  dest = os.environ["DEST"]
98  else:
99  print("No destination set!")
100  exit(1)
101  dirs = [dest]
102  if hashDirs:
103  head, tail = os.path.split(pathname)
104  hash = md5.new(tail)
105  dirs += list(hash.hexdigest()[:3])
106  return os.path.join(*dirs)
107 
108 def checkAndMoveFiles(inFile, declareFiles):
109  """Checks all root files in the current directories for zombie or recovered status. Bad files are deleted while good files are moved to the results subdirectory for copy out ease"""
110  inFileBase = os.path.basename(inFile)
111  baseDir = "."
112  for root, dirs, filenames in os.walk(baseDir):
113  if root == baseDir:
114  for file in filenames:
115  if file.endswith (".root") and file != inFileBase:
116  fileWPath = os.path.join(root, file)
117  rootFile = ROOT.TFile(fileWPath)
118  if rootFile.IsZombie() or rootFile.TestBit(ROOT.TFile.kRecovered):
119  #do not copy over, just delete
120  print("File", fileWPath, "is Zombie - remove it")
121  os.remove(fileWPath)
122  else:
123  if declareFiles == True:
124  declareFile(fileWPath)
125  newFilePath = os.path.join(root, "results", file)
126  os.renames(fileWPath, newFilePath)
127  rootFile.Close()
128  return
129 
130 def declareFile(fileWPath):
131  """Checks file for a TKey of RootFileDB. If it exists, run sam_metadata_dumper and construct appropriate metadata for the file. Use that metadata to declare the file to SAM"""
132  samweb = samweb_client.SAMWebClient(experiment='nova')
133  rootFile = ROOT.TFile(fileWPath)
134  if rootFile.FindKey("RootFileDB"):
135  print("Found key (RootFileDB): constructing metadata for", fileWPath)
136  md = createMetadata(fileWPath)
137  if (md == None):
138  print("No metadata found!")
139  else:
140  pprint.pprint(md)
141  print('')
142  print("Declaring", fileWPath, "to SAM")
143  try:
144  samweb.declareFile(md)
145  except:
146  print(fileWPath, "already exists in SAM")
147  else:
148  print(fileWPath, "does not contain RootFileDB, do not try to declare")
149  rootFile.Close()
150  return
151 
152 def copyOutFiles(hashDirs=False):
153  """Builtin facility to copy out art files (automatically excludes hist files). This adds in a subdirectories with a single hex digit each corresponding to the first three digits in the hash of the output file name. This splits up files into 4096 separate subdirectories, preventing overfull directories. Copy out does not happen if the file already exists in the output"""
154  print("Copying out files")
155  dh = ifdh.ifdh("http://samweb.fnal.gov:8480/sam/nova/api")
156  baseDir = "./results"
157  for root, dirs, filenames in os.walk(baseDir):
158  if root == baseDir:
159  for file in filenames:
160  if file.endswith (".root") and file != inFileBase:
161  fileWPath = os.path.join(root, file)
162  dest = getOutDir(file, hashDirs)
163  skip_match = _skip_pattern.match(file)
164  if skip_match == None:
165  try:
166  print("Checking if", dest + "/" + file, "exists")
167  dh.ls(dest + "/" + file, 1, "")
168  except RuntimeError:
169  print("It doesn't - copy", fileWPath, "to", dest)
170  dh.cp(["-D", fileWPath, dest])
171  print("Removing", fileWPath)
172  os.remove(fileWPath)
173  dh.cleanup()
174  return
175 
176 def getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt):
177  if dataFlag == "sim":
178  pattern = re.compile(r"^.*?_(.*?)_\d+.*_v\d+_(.*?)(\.|\_)sim\..+")
179  match = pattern.match(inFileBase)
180  if match != None:
181  simLabel = match.groups()[0]
182  uniq = match.groups()[1]
183  outName = '{0}_{1}_{2}_r{3:08d}_s{4:02d}_{5}_v{6}_{7}_{8}.{9}.root'.format(detector, simLabel, nevt, runNum, subNum, release, subversion, uniq, dataFlag, tier)
184  else:
185  print("regex couldn't find the sim label and the uniqueifier")
186  exit(1)
187  else:
188  outName = '{0}_r{1:08d}_s{2:02d}_{3}_{4}_v{5}_{6}.{7}.root'.format(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier)
189  return outName
190 
191 def resolveFclPath(fcl):
192  # Check if we have an absolute path name, return it if so
193  if fcl[0] == "/":
194  return fcl
195 
196  # SRT sets up FHICL_FILE_PATH with extra / at end of directories;
197  # cmake does not. Need to add + "/" + below to account for this.
198  fclPaths = os.environ["FHICL_FILE_PATH"].split(":")
199  for path in fclPaths:
200  if os.path.isfile(path + "/" + fcl):
201  return path + "/" + fcl
202 
203  # If we haven't found it, we have a problem.
204  raise IOError(sys.argv[0] + ": config file "+ fcl+" not found in FHICL_FILE_PATH: $FHICL_FILE_PATH")
205 
206 
207 if __name__=='__main__':
208  parser = argparse.ArgumentParser(description='Run the nova command using SAM metadata')
209  parser.add_argument('inFile', help='The input file to run over', type=str)
210  parser.add_argument('--config', '-c', help='FHiCL file to use as configuration for nova executable', type=str)
211  parser.add_argument('--outTier', help='Data tier of the output file, multiple allowed, formatted as <name_in_fcl_outputs>:<data_tier>', type=str, action='append')
212  parser.add_argument('--cafTier', help='Module label for CAF output, multiple allowed. Format as <cafmaker_module_label>:<data_tier>', type=str, action='append')
213  parser.add_argument('-n', help='Number of events to run over', type=int)
214  parser.add_argument('--copyOut', help='Use the built in copy out mechanism', action='store_true')
215  parser.add_argument('--hashDirs', help='Use hash directory structure in destination directory.', action='store_true')
216  parser.add_argument('--declareFiles', help='Declare files with metadata on worker node', action='store_true')
217  parser.add_argument('--noCleanup', help='Skip working directory cleanup step, good for interactive debugging', action='store_true')
218 
219  args = parser.parse_args()
220 
221  inFile = args.inFile
222 
223  samweb = samweb_client.SAMWebClient(experiment='nova')
224 
225  inFileBase = os.path.basename(inFile)
226 
227  metadata = samweb.getMetadata(inFileBase)
228 
229  #make all lower case keys
230  metadata = {k.lower():v for k,v in list(metadata.items())}
231 
232  print("Input File = ", inFile, "with metadata:")
233  pprint.pprint(metadata)
234  print('')
235 
236  #if new format "runs" metadata, use that, otherwise, get from file name
237  if "runs" in metadata and len(metadata["runs"][0]) == 3:
238  runNum = int(metadata["runs"][0][0])
239  subNum = int(metadata["runs"][0][1])
240  else:
241  run_pattern = re.compile(r"^.*?_r([0-9]+)_s([0-9]+).+")
242  run_match = run_pattern.match(inFileBase)
243  if run_match != None:
244  runNum = int(run_match.groups()[0])
245  subNum = int(run_match.groups()[1])
246  else:
247  print("No run number/subrun number found!")
248  exit(1)
249 
250  print("Run Number: ", runNum)
251  print("Subrun Number: ", subNum)
252 
253  if "data_stream" in metadata:
254  try:
255  stream = int(metadata["data_stream"])
256  streamEntry = 't{0:02d}'.format(stream)
257  except ValueError:
258  stream = metadata["data_stream"]
259  streamEntry = stream
260  else:
261  print("No data stream found!")
262  exit(1)
263 
264  #Check for a non-standard data stream in the file name
265 
266  stream_pattern = re.compile(r"^.*?_(dd.*?)[_,.].+", re.IGNORECASE)
267  stream_match = stream_pattern.match(inFileBase)
268  if stream_match != None:
269  stream = stream_match.groups()[0]
270  streamEntry = stream
271  print("Found match!", stream)
272  else:
273  print("No match, using", stream)
274 
275  if "online.detector" in metadata:
276  detector = metadata["online.detector"].lower()
277  elif "nova.detectorid" in metadata:
278  detId = metadata["nova.detectorid"].lower()
279  if detId in list(detectors.keys()):
280  detector = detectors[detId]
281  else:
282  print("No detector name found for detectorid:", detId)
283  exit(1)
284  else:
285  print("No detector name found!")
286  exit(1)
287 
288  # Changed for cmake. Assumed to have no effect beyond
289  # release entry in metadata
290  if "NOVASOFT_VERSION" in os.environ:
291  release = os.environ["NOVASOFT_VERSION"]
292  else:
293  print("No release set!")
294  exit(1)
295 
296  if "file_type" in metadata:
297  file_type = metadata["file_type"]
298  else:
299  print("No file_type set!")
300  exit(1)
301 
302  if file_type == "importedDetector":
303  dataFlag = "data"
304  elif file_type == "importedSimulated":
305  dataFlag = "sim"
306  else:
307  print("Unrecognized file_type:", file_type)
308  exit(1)
309 
310  if "NPASS" in os.environ:
311  subversion = os.environ["NPASS"]
312  elif "nova.subversion" in metadata:
313  subversion = metadata["nova.subversion"]
314  else:
315  subversion = 1
316 
317  nevt = None
318  if "simulated.number_of_spills" in metadata:
319  nevt = metadata["simulated.number_of_spills"]
320 
321  fclFile = args.config
322 
323  # Get a copy of the fcl file and bring it here
324  # First step, find the fcl
325  fclPath = resolveFclPath(fclFile)
326  print("Found fcl file here: ", fclPath)
327  # Next step, make a temporary name for the fcl in the local directory and copy it here
328  tmpFclName = os.path.basename(fclPath).replace(".fcl", "_" + os.path.splitext(inFileBase)[0] + ".fcl")
329  print("Creating local copy : ", tmpFclName)
330  shutil.copy(fclPath, tmpFclName)
331 
332  # Open the fcl file so that we can append output filenames to it
333  fclFileObj = open(tmpFclName, 'a')
334 
335  # Start setting up the nova command, add SAM parameters
336  cmdList = ['nova']
337  cmdList.append('-c')
338  cmdList.append(tmpFclName)
339  cmdList.append('--sam-application-family=nova')
340  cmdList.append('--sam-application-version=' + release)
341  cmdList.append('--sam-file-type=' + file_type)
342 
343  # if not (args.outTier or args.cafTier):
344  # raise ValueError("No output tiers or CAF module labels specified.")
345  if not args.outTier:
346  args.outTier = []
347  if not args.cafTier:
348  args.cafTier = []
349 
350  outList = []
351  # Loop over output tiers
352  for outTier in args.outTier:
353  try:
354  output = outTier.split(":")[0]
355  tier = outTier.split(":")[1]
356  except:
357  raise ValueError("Output data tier: " + outTier + "not formatted corectly, should be <output_name>:<data_tier>")
358  cmdList.append('--sam-data-tier=' + outTier)
359  cmdList.append('--sam-stream-name=' +output + ':' + str(stream))
360  outName = getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt)
361  outList.append(outName);
362  print("Output file name: ", outName, " for tier ", tier, " and output ", output)
363  fclFileObj.write("outputs." + output + '.fileName: "'+ outName + '"\n')
364 
365 
366  for cafTier in args.cafTier:
367  try:
368  cafLabel = cafTier.split(":")[0]
369  tier = cafTier.split(":")[1]
370  except:
371  raise ValueError("Output data tier: " + outTier + "not formatted corectly, should be <output_name>:<data_tier>")
372  cafName = getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt)
373  print(cafLabel, tier, cafName)
374  fclFileObj.write("physics.producers." + cafLabel + '.CAFFilename: "' + cafName + '" \n')
375  fclFileObj.write("physics.producers." + cafLabel + '.DataTier: "' + tier + '" \n')
376 
377 
378  fclFileObj.close()
379 
380  print("Config: ")
381  for line in open(tmpFclName):
382  print(line.strip())
383 
384  if args.n != None:
385  cmdList.append('-n')
386  cmdList.append(str(args.n))
387  cmdList.append(inFile)
388 
389  cmd = ' '.join(cmdList)
390 
391  print('Running:', cmd)
392 
393  retCode = subprocess.call(cmdList)
394 
395  if retCode == 0:
396  if args.declareFiles == True:
397  checkAndMoveFiles(inFile, True)
398  else:
399  checkAndMoveFiles(inFile, False)
400 
401  if args.copyOut == True:
402  copyOutFiles(args.hashDirs)
403  else:
404  #job didn't succeed, remove output files if they exist
405  for file in outList:
406  try:
407  os.remove("./" + file)
408  except OSError:
409  pass
410 
411  #clean up section
412  dirList = os.listdir(".")
413  for file in dirList:
414  skip_match = _skip_pattern.match(file)
415  if skip_match != None:
416  print(file, "contains *hist*.root or RootOutput*.root: clean up")
417  os.remove("./" + file)
418  if not args.noCleanup:
419  os.remove(tmpFclName)
420  os.remove(inFile)
421  dh = ifdh.ifdh("http://samweb.fnal.gov:8480/sam/nova/api")
422  dh.cleanup()
423 
424  exit(retCode)
425 
void split(double tt, double *fr)
def createMetadata(inFile)
def declareFile(fileWPath)
bool print
def fileEnstoreChecksum(path)
Definition: utility.py:50
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14
procfile open("FD_BRL_v0.txt")
def checkAndMoveFiles(inFile, declareFiles)
def copyOutFiles(hashDirs=False)
exit(0)
def getOutDir(pathname, hashDirs=False)
def getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt)