4 submit_concat_project.py: 5 Submit jobs for producing concats from a dataset. 7 Original author: J. Wolcott <jwolcott@fnal.gov>, rewritten from submit_concat_project.sh 21 search_path = os.environ[
"FW_SEARCH_PATH"]
23 print >> sys.stderr,
"Release not yet set up. Please set up novasoft before using this script!" 26 for path
in search_path.split(
":"):
27 if os.path.isdir(os.path.join(path,
"CAFAna")):
31 print >> sys.stderr,
"Couldn't find CAFAna directory! Abort." 34 CVMFS_RELEASE_DIR =
"/cvmfs/nova.opensciencegrid.org/novasoft/slf6/novasoft/releases" 37 "validation" : cafana_dir +
"/CAFAna/nus/reduce_nue_or_numu_or_nus.C",
38 "concat" :
"$NOVAGRIDUTILS_DIR/bin/concat_dataset.C",
39 "nueSA" : cafana_dir +
"/CAFAna/nue/reduce_nue_sa.C",
40 "nue2017" : cafana_dir +
"/CAFAna/nue/reducer/reduce_bendecomp2017.C",
41 "nue2018" : cafana_dir +
"/CAFAna/nue/reducer/reduce_nue_2018.C",
42 "nue2019" : cafana_dir +
"/CAFAna/nue/reducer/reduce_nue_2018.C",
43 "numuSA" : cafana_dir +
"/CAFAna/numu/FirstAnalysis/reduce_numu_fa.C",
44 "numu2017" : cafana_dir +
"/CAFAna/numu/Analysis2017/reduce_numu_ana2017.C",
45 "numu2018" : cafana_dir +
"/CAFAna/numu/Analysis2018/reduce_numu_ana2018.C",
46 "numu2019" : cafana_dir +
"/CAFAna/numu/Analysis2018/reduce_numu_ana2018.C",
47 "nus" : cafana_dir +
"/CAFAna/nus/reduce_nus.C",
48 "nus2019" : cafana_dir +
"/CAFAna/nus/reduce_nus_ana2019.C",
49 "numu2020" : cafana_dir +
"/3FlavorAna/Ana2020/reducer/reduce_prod5_numu.C",
50 "nue2020" : cafana_dir +
"/3FlavorAna/Ana2020/reducer/reduce_prod5_nue.C",
51 "nus2020" : cafana_dir +
"/NuXAna/macros/reduce_nus_ana2020.C",
52 "nue_or_numu_SA" : cafana_dir +
"/CAFAna/nue/reduce_nue_or_numu_sa.C",
53 "nu_or_numu_or_nus" : cafana_dir +
"/CAFAna/nus/reduce_nue_or_numu_or_nus.C",
72 out_name = dataset_name
73 replacements = {
"_caf":
"_sumcaf",
"_decaf":
"_sumdecaf",
"_restrictedcaf":
"_sumrestrictedcaf"}
74 for repl
in replacements:
75 out_name = out_name.replace(repl, replacements[repl])
78 if rename_cafs
and os.path.basename(decaf_macro) !=
"concat_dataset.C":
79 out_name = out_name.replace(
"_sumcaf",
"_sumdecaf")
80 out_name = out_name.replace(
"_sumrestrictedcaf",
"_sumrestricteddecaf")
81 if decaf_macro
in DECAF_MACROS:
82 out_name = out_name +
"_" + decaf_macro
87 Compute number of parts dataset should be split into. 89 snapshot_id = id of snapshot of dataset 90 chunk_size = size of one file (in GB) 91 max_singlefile_size = if dataset size is less than this size in GB, only a single file will be made 93 def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None):
95 dimensions =
"snapshot_id=%d" % snapshot_id
97 dimensions =
"dataset_def_name_newest_snapshot %s" % defname
98 summary = SAM.listFilesSummary(dimensions=dimensions)
99 size = summary[
"total_file_size"]
100 n_files = summary[
"file_count"]
102 if requested_num_parts:
103 if requested_num_parts <= n_files:
104 return requested_num_parts
106 print >> sys.stderr,
"You requested %d parts, but there are only %d files in the dataset. Can't proceed." % (requested_num_parts, n_files)
110 if GBs < max_singlefile_size:
113 return GBs/chunk_size
116 already_running_projects = SAM.listProjects(name=
'%%_proj_concat_%s%%' % dataset, state=
'running')
117 if len(already_running_projects) + nparts > MAX_N_JOBS:
118 print >> sys.stderr,
"Current concat request would require %d jobs, but" % nparts
119 if len(already_running_projects) > 0:
120 print >> sys.stderr,
"there are already %d SAM projects running for this definition," % len(already_running_projects)
121 print >> sys.stderr,
"and ",
122 print >> sys.stderr,
"there is a maximum of %d running concat projects for the same definition." % MAX_N_JOBS
123 print >> sys.stderr,
"Please retry using --parts to restrict how many jobs are submitted." 130 if role ==
"Production":
133 subprocess.check_call([
"setup_fnal_security", arg])
134 except subprocess.CalledProcessError:
135 print >> sys.stderr,
"Please run 'setup_fnal_security' with the correct options to get a valid proxy and try again." 140 if not os.path.isdir(dirname):
142 os.chmod(dirname, 0775)
144 print >> sys.stderr,
"Output directory '%s' cannot be created!" % dirname
148 parser = argparse.ArgumentParser(description=
"Submit jobs for concatentation of (de)CAFs.")
150 parser.add_argument(
"dataset",
152 help=
"Dataset containing (de)CAFs to concatenate",
154 parser.add_argument(
"outputdir",
156 help=
"Where to write output",
158 parser.add_argument(
"release",
160 help=
"Version of novasoft to use",
164 gp = parser.add_mutually_exclusive_group()
165 gp.add_argument(
"decaf_positional",
168 default=
"$NOVAGRIDUTILS_DIR/bin/concat_dataset.C",
169 help=
"Decaffing macro to run. Either specify full path here or one of the following shortcut options: %s. Can also specify as --decaf (below)." % DECAF_MACROS.keys(),
171 gp.add_argument(
"--decaf",
"--decaf_macro",
173 help=
"Alternate form of positional DECAF (above).",
177 gp = parser.add_mutually_exclusive_group()
178 gp.add_argument(
"role_positional",
182 choices=[
"Analysis",
"Production"],
183 help=
"VOMS role to submit with. (You must have Production-eligible proxy to use the Production role.) Can also specify as --role or --voms_role (below).",
185 gp.add_argument(
"--role",
"--voms_role",
186 choices=[
"Analysis",
"Production"],
187 help=
"Alternate form of positional ROLE (above).",
190 gp = parser.add_mutually_exclusive_group()
191 gp.add_argument(
"nparts_positional",
195 help=
"Divide concats into this many files. If PARTS is also specified, NPARTS must correspond exactly to the number of parts. Can also specify as --nparts (below).",
197 gp.add_argument(
"--nparts",
200 help=
"Alternate form of positional NPARTS (above).",
203 gp = parser.add_mutually_exclusive_group()
204 gp.add_argument(
"parts_positional",
207 help=
"The indices of which concat files to process. Optional form '--parts' (see description below) is preferred. ",
209 gp.add_argument(
"--parts",
212 help=
"The indices of which concat files to process. Can separate multiple items with commas, or specify --parts multiple times." \
213 +
" Ranges can be specified using a hyphen. So, '--parts 3,5,7-9 --parts 11-15 --parts 27' is legal.",
217 parser.add_argument(
"--no_caf_rename",
220 help=
"Do not substitute 'decaf' for 'caf' in the output name. Useful if you're only concatting CAFs with some script other than concat_dataset.C.",
223 parser.add_argument(
"--use_last_snapshot",
226 help=
"Use the last snapshot of the definition rather than taking a new one.",
229 parser.add_argument(
"--flatten",
232 help=
"Flatten concat file",
235 parser.add_argument(
"--flat_only",
238 help=
"Produce only flattened concat file",
241 gp = parser.add_mutually_exclusive_group(required=
False)
242 gp.add_argument(
'-t',
'--testrel', metavar=
'DIR',
243 help=
'Use a test release at location TESTREL. It will be tarred up, and sent to the worker node. (Conflicts with --user_tarball)',
246 gp.add_argument(
"--user_tarball",
247 help=
"Use existing test release tarball in specified location rather than having jobsub make one for you (conflicts with --testrel, and is redunant with --reuse_tarball)",
250 gp.add_argument(
'--reuse_tarball',
251 help=
'Do you want to reuse a tarball that is already in resilient space? If using this option avoid trailing slash in --testrel option. (redundant with --user_tarball)',
252 action=
'store_true',default=
False)
254 parser.add_argument(
"-n",
"--dry-run",
"--test-only",
257 help=
"Test only; don't actually submit any jobs. Print jobsub command and exit." 262 return parser.parse_args()
266 for tok
in parts_string.split(
","):
268 start, end = (
int(t)
for t
in tok.split(
"-"))
269 parts |= set(
range(start, end+1))
280 decafscript = args[
"decaf_macro"]
282 cmd =
"jobsub_submit -g -N%(nparts)d --role=%(role)s " 286 cmd +=
" --memory=1999MB --disk=5GB --expected-lifetime=23h" 288 cmd +=
" --memory=2400MB --expected-lifetime=23h" 291 if "/nova/" in os.path.expandvars(args[
"decaf_macro"]):
292 print "decaf macro has a blue arc location, copying it to grid nodes" 294 cmd +=
" -f dropbox://%(decaf_macro)s" 296 decafscript = args[
"decaf_macro"].
split(
'/')[-1]
299 cmd +=
" --tar_file_name " + args[
"testrel"]
301 cmd +=
" file://$NOVAGRIDUTILS_DIR/bin/concat_job_project.sh" \
302 +
" --dataset %(dataset)s --outname %(outname)s --snapid %(snapshot_id)s" \
303 +
" --nparts %(nparts)d --release %(release)s --outdir %(outputdir)s" 306 cmd +=
" --decafscript %s " % decafscript
310 if len(args[
"parts"]):
311 cmd +=
" --parts " +
",".join(
str(a)
for a
in args[
"parts"])
314 if args[
"flat_only"]:
317 cmd = os.path.expandvars(cmd)
319 if not args[
"dry_run"]:
320 subprocess.check_call(shlex.split(cmd))
324 if __name__ ==
"__main__":
330 if args.reuse_tarball:
331 testrel +=
'dropbox://'+os.path.basename(args.testrel)+
'.tar' 333 testrel +=
'tardir://'+args.testrel+
'\\\n' 334 elif args.user_tarball:
335 if not os.path.isfile(args.user_tarball):
336 print "Tarball filename passed to --user_tarball does not exit:", args.user_tarball
338 testrel +=
'dropbox://' + args.user_tarball +
' \\\n' 341 if args.flat_only
and not args.flatten:
342 print "Need to provide a --flatten option if you need a flattened concat!" 345 release_path = os.path.expandvars(os.path.join(CVMFS_RELEASE_DIR, args.release))
346 if not os.path.isdir(release_path):
348 print >> sys.stderr,
"Invalid release specified:", args.release
351 out_name =
BuildOutputNameStub(args.dataset, args.decaf
or args.decaf_positional, rename_cafs=
not args.no_caf_rename)
355 parts_args = args.parts
or args.parts_positional
357 parts_joined =
"".join(parts_args)
360 except ValueError
as e:
363 print "Invalid value specified for --parts:", parts
368 SAM = samweb_client.SAMWebClient(experiment=
'nova')
369 if args.use_last_snapshot:
373 snapshot_id =
int(SAM.takeSnapshot(args.dataset))
374 print "Took snapshot %d for defn %s" % (snapshot_id, args.dataset)
375 except samweb_client.exceptions.Error
as e:
376 print >> sys.stderr, e
379 nparts =
CalculateNParts(args.dataset, snapshot_id, requested_num_parts=(args.nparts
or args.nparts_positional))
380 if any(p > nparts-1
for p
in parts):
381 print >> sys.stderr,
"One or more requested parts (you requested: %s) is larger than the number of parts (%d)!" % (
list(parts), nparts)
383 print "Requires submitting", nparts,
"jobs." 389 concat = args.decaf
or args.decaf_positional
390 if concat
in DECAF_MACROS:
391 concat = DECAF_MACROS[concat]
395 print "Output sent to:", args.outputdir
399 print "Invoked with --dry-run, so not submitting. But I would have used the following jobsub command:" 404 "role": args.role
or args.role_positional,
405 "dataset": args.dataset,
407 "snapshot_id": snapshot_id
or "latest",
408 "release": args.release,
410 "outputdir": args.outputdir,
411 "decaf_macro": concat,
413 "dry_run": args.dry_run,
414 "flatten": args.flatten,
415 "flat_only": args.flat_only
void split(double tt, double *fr)
def MakeOutputDir(dirname)
def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None)
def ParseParts(parts_string)
def BuildOutputNameStub(dataset_name, decaf_macro, rename_cafs=True)
def CheckProjects(dataset)