3 from __future__
import print_function
4 from builtins
import str
7 import samweb_client, ifdh
18 ROOT.gErrorIgnoreLevel=3000
21 detectors[
"fd"] =
"fardet" 22 detectors[
"nd"] =
"neardet" 23 detectors[
"ndos"] =
"ndos" 26 _lineregex = re.compile(
r'\d+: ([^ \t]+)[ \t]+(.+)')
29 _skip_pattern = re.compile(
r"^.*(hist|RootOutput).*\.root", re.IGNORECASE)
33 """ Convert CamelCase to camel_case """ 36 return re.sub(
r'\B([A-Z])',
r'_\1', s).lower()
40 filename = os.path.basename(inFile)
41 filesize = os.path.getsize(inFile)
44 data = subprocess.check_output([
"sam_metadata_dumper",
str(inFile)])
45 except subprocess.CalledProcessError:
46 print(
"sam_metadata_dumper failed!")
49 md = {
'file_name': filename,
'group':
'nova',
'file_size': filesize,
'file_format':
'root'}
53 for line
in data.split(
'\n'):
54 m = _lineregex.match(line)
56 if not m
and not line.startswith(
'-'):
59 md[key] = json.loads(data)
72 if key ==
'process_name' and md.get(
'application',[]).
get(
'name')
is None:
73 key =
'application_name' 74 elif key ==
'stream_name':
77 if key
in (
'start_date',
'end_date'):
80 if key ==
'runs' or key ==
'parents':
83 elif key.startswith(
'application_'):
85 if 'application' not in md:
86 md[
'application'] = {}
87 md[
'application'][key[12:]] = value
96 if "DEST" in os.environ:
97 dest = os.environ[
"DEST"]
99 print(
"No destination set!")
103 head, tail = os.path.split(pathname)
105 dirs +=
list(hash.hexdigest()[:3])
106 return os.path.join(*dirs)
109 """Checks all root files in the current directories for zombie or recovered status. Bad files are deleted while good files are moved to the results subdirectory for copy out ease""" 110 inFileBase = os.path.basename(inFile)
112 for root, dirs, filenames
in os.walk(baseDir):
114 for file
in filenames:
115 if file.endswith (
".root")
and file != inFileBase:
116 fileWPath = os.path.join(root, file)
117 rootFile = ROOT.TFile(fileWPath)
118 if rootFile.IsZombie()
or rootFile.TestBit(ROOT.TFile.kRecovered):
120 print(
"File", fileWPath,
"is Zombie - remove it")
123 if declareFiles ==
True:
125 newFilePath = os.path.join(root,
"results", file)
126 os.renames(fileWPath, newFilePath)
131 """Checks file for a TKey of RootFileDB. If it exists, run sam_metadata_dumper and construct appropriate metadata for the file. Use that metadata to declare the file to SAM""" 132 samweb = samweb_client.SAMWebClient(experiment=
'nova')
133 rootFile = ROOT.TFile(fileWPath)
134 if rootFile.FindKey(
"RootFileDB"):
135 print(
"Found key (RootFileDB): constructing metadata for", fileWPath)
138 print(
"No metadata found!")
142 print(
"Declaring", fileWPath,
"to SAM")
144 samweb.declareFile(md)
146 print(fileWPath,
"already exists in SAM")
148 print(fileWPath,
"does not contain RootFileDB, do not try to declare")
153 """Builtin facility to copy out art files (automatically excludes hist files). This adds in a subdirectories with a single hex digit each corresponding to the first three digits in the hash of the output file name. This splits up files into 4096 separate subdirectories, preventing overfull directories. Copy out does not happen if the file already exists in the output""" 154 print(
"Copying out files")
155 dh = ifdh.ifdh(
"http://samweb.fnal.gov:8480/sam/nova/api")
156 baseDir =
"./results" 157 for root, dirs, filenames
in os.walk(baseDir):
159 for file
in filenames:
160 if file.endswith (
".root")
and file != inFileBase:
161 fileWPath = os.path.join(root, file)
163 skip_match = _skip_pattern.match(file)
164 if skip_match ==
None:
166 print(
"Checking if", dest +
"/" + file,
"exists")
167 dh.ls(dest +
"/" + file, 1,
"")
169 print(
"It doesn't - copy", fileWPath,
"to", dest)
170 dh.cp([
"-D", fileWPath, dest])
171 print(
"Removing", fileWPath)
176 def getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt):
177 if dataFlag ==
"sim":
178 pattern = re.compile(
r"^.*?_(.*?)_\d+.*_v\d+_(.*?)(\.|\_)sim\..+")
179 match = pattern.match(inFileBase)
181 simLabel = match.groups()[0]
182 uniq = match.groups()[1]
183 outName =
'{0}_{1}_{2}_r{3:08d}_s{4:02d}_{5}_v{6}_{7}_{8}.{9}.root'.
format(detector, simLabel, nevt, runNum, subNum, release, subversion, uniq, dataFlag, tier)
185 print(
"regex couldn't find the sim label and the uniqueifier")
188 outName =
'{0}_r{1:08d}_s{2:02d}_{3}_{4}_v{5}_{6}.{7}.root'.
format(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier)
198 fclPaths = os.environ[
"FHICL_FILE_PATH"].
split(
":")
199 for path
in fclPaths:
200 if os.path.isfile(path +
"/" + fcl):
201 return path +
"/" + fcl
204 raise IOError(sys.argv[0] +
": config file "+ fcl+
" not found in FHICL_FILE_PATH: $FHICL_FILE_PATH")
207 if __name__==
'__main__':
208 parser = argparse.ArgumentParser(description=
'Run the nova command using SAM metadata')
209 parser.add_argument(
'inFile', help=
'The input file to run over', type=str)
210 parser.add_argument(
'--config',
'-c', help=
'FHiCL file to use as configuration for nova executable', type=str)
211 parser.add_argument(
'--outTier', help=
'Data tier of the output file, multiple allowed, formatted as <name_in_fcl_outputs>:<data_tier>', type=str, action=
'append')
212 parser.add_argument(
'--cafTier', help=
'Module label for CAF output, multiple allowed. Format as <cafmaker_module_label>:<data_tier>', type=str, action=
'append')
213 parser.add_argument(
'-n', help=
'Number of events to run over', type=int)
214 parser.add_argument(
'--copyOut', help=
'Use the built in copy out mechanism', action=
'store_true')
215 parser.add_argument(
'--hashDirs', help=
'Use hash directory structure in destination directory.', action=
'store_true')
216 parser.add_argument(
'--declareFiles', help=
'Declare files with metadata on worker node', action=
'store_true')
217 parser.add_argument(
'--noCleanup', help=
'Skip working directory cleanup step, good for interactive debugging', action=
'store_true')
219 args = parser.parse_args()
223 samweb = samweb_client.SAMWebClient(experiment=
'nova')
225 inFileBase = os.path.basename(inFile)
227 metadata = samweb.getMetadata(inFileBase)
230 metadata = {k.lower():v
for k,v
in list(metadata.items())}
232 print(
"Input File = ", inFile,
"with metadata:")
233 pprint.pprint(metadata)
237 if "runs" in metadata
and len(metadata[
"runs"][0]) == 3:
238 runNum =
int(metadata[
"runs"][0][0])
239 subNum =
int(metadata[
"runs"][0][1])
241 run_pattern = re.compile(
r"^.*?_r([0-9]+)_s([0-9]+).+")
242 run_match = run_pattern.match(inFileBase)
243 if run_match !=
None:
244 runNum =
int(run_match.groups()[0])
245 subNum =
int(run_match.groups()[1])
247 print(
"No run number/subrun number found!")
250 print(
"Run Number: ", runNum)
251 print(
"Subrun Number: ", subNum)
253 if "data_stream" in metadata:
255 stream =
int(metadata[
"data_stream"])
258 stream = metadata[
"data_stream"]
261 print(
"No data stream found!")
266 stream_pattern = re.compile(
r"^.*?_(dd.*?)[_,.].+", re.IGNORECASE)
267 stream_match = stream_pattern.match(inFileBase)
268 if stream_match !=
None:
269 stream = stream_match.groups()[0]
271 print(
"Found match!", stream)
273 print(
"No match, using", stream)
275 if "online.detector" in metadata:
276 detector = metadata[
"online.detector"].lower()
277 elif "nova.detectorid" in metadata:
278 detId = metadata[
"nova.detectorid"].lower()
279 if detId
in list(detectors.keys()):
280 detector = detectors[detId]
282 print(
"No detector name found for detectorid:", detId)
285 print(
"No detector name found!")
290 if "NOVASOFT_VERSION" in os.environ:
291 release = os.environ[
"NOVASOFT_VERSION"]
293 print(
"No release set!")
296 if "file_type" in metadata:
297 file_type = metadata[
"file_type"]
299 print(
"No file_type set!")
302 if file_type ==
"importedDetector":
304 elif file_type ==
"importedSimulated":
307 print(
"Unrecognized file_type:", file_type)
310 if "NPASS" in os.environ:
311 subversion = os.environ[
"NPASS"]
312 elif "nova.subversion" in metadata:
313 subversion = metadata[
"nova.subversion"]
318 if "simulated.number_of_spills" in metadata:
319 nevt = metadata[
"simulated.number_of_spills"]
321 fclFile = args.config
326 print(
"Found fcl file here: ", fclPath)
328 tmpFclName = os.path.basename(fclPath).replace(
".fcl",
"_" + os.path.splitext(inFileBase)[0] +
".fcl")
329 print(
"Creating local copy : ", tmpFclName)
330 shutil.copy(fclPath, tmpFclName)
333 fclFileObj =
open(tmpFclName,
'a')
338 cmdList.append(tmpFclName)
339 cmdList.append(
'--sam-application-family=nova')
340 cmdList.append(
'--sam-application-version=' + release)
341 cmdList.append(
'--sam-file-type=' + file_type)
352 for outTier
in args.outTier:
354 output = outTier.split(
":")[0]
355 tier = outTier.split(
":")[1]
357 raise ValueError(
"Output data tier: " + outTier +
"not formatted corectly, should be <output_name>:<data_tier>")
358 cmdList.append(
'--sam-data-tier=' + outTier)
359 cmdList.append(
'--sam-stream-name=' +output +
':' +
str(stream))
360 outName =
getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt)
361 outList.append(outName);
362 print(
"Output file name: ", outName,
" for tier ", tier,
" and output ", output)
363 fclFileObj.write(
"outputs." + output +
'.fileName: "'+ outName +
'"\n')
366 for cafTier
in args.cafTier:
368 cafLabel = cafTier.split(
":")[0]
369 tier = cafTier.split(
":")[1]
371 raise ValueError(
"Output data tier: " + outTier +
"not formatted corectly, should be <output_name>:<data_tier>")
372 cafName =
getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt)
373 print(cafLabel, tier, cafName)
374 fclFileObj.write(
"physics.producers." + cafLabel +
'.CAFFilename: "' + cafName +
'" \n')
375 fclFileObj.write(
"physics.producers." + cafLabel +
'.DataTier: "' + tier +
'" \n')
381 for line
in open(tmpFclName):
386 cmdList.append(
str(args.n))
387 cmdList.append(inFile)
389 cmd =
' '.join(cmdList)
391 print(
'Running:', cmd)
393 retCode = subprocess.call(cmdList)
396 if args.declareFiles ==
True:
401 if args.copyOut ==
True:
407 os.remove(
"./" + file)
412 dirList = os.listdir(
".")
414 skip_match = _skip_pattern.match(file)
415 if skip_match !=
None:
416 print(file,
"contains *hist*.root or RootOutput*.root: clean up")
417 os.remove(
"./" + file)
418 if not args.noCleanup:
419 os.remove(tmpFclName)
421 dh = ifdh.ifdh(
"http://samweb.fnal.gov:8480/sam/nova/api")
void split(double tt, double *fr)
def createMetadata(inFile)
def declareFile(fileWPath)
def fileEnstoreChecksum(path)
std::string format(const int32_t &value, const int &ndigits=8)
procfile open("FD_BRL_v0.txt")
def checkAndMoveFiles(inFile, declareFiles)
def copyOutFiles(hashDirs=False)
def getOutDir(pathname, hashDirs=False)
def getOutputFileName(detector, runNum, subNum, streamEntry, release, subversion, dataFlag, tier, nevt)