submit_concat_project.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 """
4  submit_concat_project.py:
5  Submit jobs for producing concats from a dataset.
6 
7  Original author: J. Wolcott <jwolcott@fnal.gov>, rewritten from submit_concat_project.sh
8  Date: August 2017
9 """
10 from __future__ import print_function
11 from __future__ import division
12 
13 from builtins import str
14 from builtins import range
15 from past.utils import old_div
16 import argparse
17 import os, os.path
18 import shlex
19 import stat
20 import subprocess
21 import sys
22 
23 import samweb_client
24 
25 try:
26  search_path = os.environ["FW_SEARCH_PATH"]
27 except KeyError:
28  print("Release not yet set up. Please set up novasoft before using this script!", file=sys.stderr)
29  sys.exit(1)
30 cafana_dir = ""
31 for path in search_path.split(":"):
32  if os.path.isdir(os.path.join(path, "CAFAna")):
33  cafana_dir = path
34  break
35 if cafana_dir == "":
36  print("Couldn't find CAFAna directory! Abort.", file=sys.stderr)
37  sys.exit(1)
38 
39 CVMFS_RELEASE_DIR = "/cvmfs/nova.opensciencegrid.org/novasoft/slf6/novasoft/releases"
40 
41 DECAF_MACROS={
42  "validation" : cafana_dir + "/CAFAna/nus/reduce_nue_or_numu_or_nus.C",
43  "concat" : "$NOVAGRIDUTILS_DIR/bin/concat_dataset.C",
44  "nueSA" : cafana_dir + "/CAFAna/nue/reduce_nue_sa.C",
45  "nue2017" : cafana_dir + "/CAFAna/nue/reducer/reduce_bendecomp2017.C",
46  "nue2018" : cafana_dir + "/CAFAna/nue/reducer/reduce_nue_2018.C",
47  "nue2019" : cafana_dir + "/CAFAna/nue/reducer/reduce_nue_2018.C",
48  "numuSA" : cafana_dir + "/CAFAna/numu/FirstAnalysis/reduce_numu_fa.C",
49  "numu2017" : cafana_dir + "/CAFAna/numu/Analysis2017/reduce_numu_ana2017.C",
50  "numu2018" : cafana_dir + "/CAFAna/numu/Analysis2018/reduce_numu_ana2018.C",
51  "numu2019" : cafana_dir + "/CAFAna/numu/Analysis2018/reduce_numu_ana2018.C",
52  "nus" : cafana_dir + "/CAFAna/nus/reduce_nus.C",
53  "nus2019" : cafana_dir + "/CAFAna/nus/reduce_nus_ana2019.C",
54  "numu2020" : cafana_dir + "/3FlavorAna/Ana2020/reducer/reduce_prod5_numu.C",
55  "nue2020" : cafana_dir + "/3FlavorAna/Ana2020/reducer/reduce_prod5_nue.C",
56  "nus2020" : cafana_dir + "/NuXAna/macros/reduce_nus_ana2020.C",
57  "nue_or_numu_SA" : cafana_dir + "/CAFAna/nue/reduce_nue_or_numu_sa.C",
58  "nu_or_numu_or_nus" : cafana_dir + "/CAFAna/nus/reduce_nue_or_numu_or_nus.C",
59 }
60 
61 # There's a hard cap on the total number of SAM projects
62 # allowed to be running experiment-wide (=1000).
63 # Don't allow a single concatting pass to eat more than
64 # 10% of that.
65 MAX_N_JOBS = 100
66 
67 # this will be the SAM client,
68 # but we don't want to load it until we've checked
69 # command-line arguments etc.
70 SAM = None
71 
72 def BuildOutputNameStub(dataset_name, decaf_macro, rename_cafs=True):
73  # make the output file name stub.
74  # this is a concat script, so we want to ensure "caf" -> "sumcaf",
75  # and "decaf" -> "sumdecaf"...
76 
77  out_name = dataset_name
78  replacements = {"_caf": "_sumcaf", "_decaf": "_sumdecaf", "_restrictedcaf": "_sumrestrictedcaf"}
79  for repl in replacements:
80  out_name = out_name.replace(repl, replacements[repl])
81 
82  # ... and if we're actually decaffing, that "caf" -> "decaf"
83  if rename_cafs and os.path.basename(decaf_macro) != "concat_dataset.C":
84  out_name = out_name.replace("_sumcaf", "_sumdecaf")
85  out_name = out_name.replace("_sumrestrictedcaf", "_sumrestricteddecaf")
86  if decaf_macro in DECAF_MACROS:
87  out_name = out_name + "_" + decaf_macro
88 
89  return out_name
90 
91 """
92  Compute number of parts dataset should be split into.
93 
94  snapshot_id = id of snapshot of dataset
95  chunk_size = size of one file (in GB)
96  max_singlefile_size = if dataset size is less than this size in GB, only a single file will be made
97 """
98 def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None):
99  if snapshot_id:
100  dimensions = "snapshot_id=%d" % snapshot_id
101  else:
102  dimensions = "dataset_def_name_newest_snapshot %s" % defname
103  summary = SAM.listFilesSummary(dimensions=dimensions)
104  size = summary["total_file_size"]
105  n_files = summary["file_count"]
106 
107  if requested_num_parts:
108  if requested_num_parts <= n_files:
109  return requested_num_parts
110  else:
111  print("You requested %d parts, but there are only %d files in the dataset. Can't proceed." % (requested_num_parts, n_files), file=sys.stderr)
112  sys.exit(1)
113 
114  GBs = old_div(size, 1024**3) # bytes to gigabytes
115  if GBs < max_singlefile_size:
116  return 1
117  else:
118  return old_div(GBs,chunk_size)
119 
120 def CheckProjects(dataset):
121  already_running_projects = SAM.listProjects(name='%%_proj_concat_%s%%' % dataset, state='running')
122  if len(already_running_projects) + nparts > MAX_N_JOBS:
123  print("Current concat request would require %d jobs, but" % nparts, file=sys.stderr)
124  if len(already_running_projects) > 0:
125  print("there are already %d SAM projects running for this definition," % len(already_running_projects), file=sys.stderr)
126  print("and ", end=' ', file=sys.stderr)
127  print("there is a maximum of %d running concat projects for the same definition." % MAX_N_JOBS, file=sys.stderr)
128  print("Please retry using --parts to restrict how many jobs are submitted.", file=sys.stderr)
129  sys.exit(1)
130 
131 
132 def CheckProxy(role):
133  # now check we have a usable proxy
134  arg = "-c"
135  if role == "Production":
136  arg += "p"
137  try:
138  subprocess.check_call(["setup_fnal_security", arg])
139  except subprocess.CalledProcessError:
140  print("Please run 'setup_fnal_security' with the correct options to get a valid proxy and try again.", file=sys.stderr)
141  sys.exit(2)
142 
143 def MakeOutputDir(dirname):
144  try:
145  if not os.path.isdir(dirname):
146  os.makedirs(dirname)
147  os.chmod(dirname, 0o775)
148  except OSError:
149  print("Output directory '%s' cannot be created!" % dirname, file=sys.stderr)
150  sys.exit(1)
151 
152 def ParseArgs():
153  parser = argparse.ArgumentParser(description="Submit jobs for concatentation of (de)CAFs.")
154 
155  parser.add_argument("dataset",
156  metavar="DATASET",
157  help="Dataset containing (de)CAFs to concatenate",
158  )
159  parser.add_argument("outputdir",
160  metavar="OUTPUTDIR",
161  help="Where to write output",
162  )
163  parser.add_argument("release",
164  metavar="RELEASE",
165  help="Version of novasoft to use",
166  )
167 
168  # support old (positional) syntax as well as nicer (option-based) one
169  gp = parser.add_mutually_exclusive_group()
170  gp.add_argument("decaf_positional",
171  metavar="DECAF",
172  nargs="?",
173  default="$NOVAGRIDUTILS_DIR/bin/concat_dataset.C",
174  help="Decaffing macro to run. Either specify full path here or one of the following shortcut options: %s. Can also specify as --decaf (below)." % list(DECAF_MACROS.keys()),
175  )
176  gp.add_argument("--decaf", "--decaf_macro",
177  metavar="DECAF",
178  help="Alternate form of positional DECAF (above).",
179  )
180 
181  # etc.
182  gp = parser.add_mutually_exclusive_group()
183  gp.add_argument("role_positional",
184  metavar="ROLE",
185  nargs="?",
186  default="Analysis",
187  choices=["Analysis", "Production"],
188  help="VOMS role to submit with. (You must have Production-eligible proxy to use the Production role.) Can also specify as --role or --voms_role (below).",
189  )
190  gp.add_argument("--role", "--voms_role",
191  choices=["Analysis", "Production"],
192  help="Alternate form of positional ROLE (above).",
193  )
194 
195  gp = parser.add_mutually_exclusive_group()
196  gp.add_argument("nparts_positional",
197  nargs="?",
198  type=int,
199  metavar="NPARTS",
200  help="Divide concats into this many files. If PARTS is also specified, NPARTS must correspond exactly to the number of parts. Can also specify as --nparts (below).",
201  )
202  gp.add_argument("--nparts",
203  metavar="NPARTS",
204  type=int,
205  help="Alternate form of positional NPARTS (above).",
206  )
207 
208  gp = parser.add_mutually_exclusive_group()
209  gp.add_argument("parts_positional",
210  nargs="?",
211  metavar="PARTS",
212  help="The indices of which concat files to process. Optional form '--parts' (see description below) is preferred. ",
213  )
214  gp.add_argument("--parts",
215  metavar="PARTS",
216  action="append",
217  help="The indices of which concat files to process. Can separate multiple items with commas, or specify --parts multiple times." \
218  +" Ranges can be specified using a hyphen. So, '--parts 3,5,7-9 --parts 11-15 --parts 27' is legal.",
219  )
220 
221  # allow user to opt out of 'caf' -> 'decaf' renaming in case we don't automatically get it right
222  parser.add_argument("--no_caf_rename",
223  action="store_true",
224  default=False,
225  help="Do not substitute 'decaf' for 'caf' in the output name. Useful if you're only concatting CAFs with some script other than concat_dataset.C.",
226  )
227 
228  parser.add_argument("--use_last_snapshot",
229  action="store_true",
230  default=False,
231  help="Use the last snapshot of the definition rather than taking a new one.",
232  )
233 
234  parser.add_argument("--flatten",
235  action="store_true",
236  default=True,
237  help="Flatten concat file",
238  )
239 
240  parser.add_argument("--flat_only",
241  action="store_true",
242  default=False,
243  help="Produce only flattened concat file",
244  )
245 
246  gp = parser.add_mutually_exclusive_group(required=False)
247  gp.add_argument('-t', '--testrel', metavar='DIR',
248  help='Use a test release at location TESTREL. It will be tarred up, and sent to the worker node. (Conflicts with --user_tarball)',
249  default=None)
250 
251  gp.add_argument("--user_tarball",
252  help="Use existing test release tarball in specified location rather than having jobsub make one for you (conflicts with --testrel, and is redunant with --reuse_tarball)",
253  type=str)
254 
255  gp.add_argument('--reuse_tarball',
256  help='Do you want to reuse a tarball that is already in resilient space? If using this option avoid trailing slash in --testrel option. (redundant with --user_tarball)',
257  action='store_true',default=False)
258 
259  parser.add_argument("-n", "--dry-run", "--test-only",
260  action="store_true",
261  default=False,
262  help="Test only; don't actually submit any jobs. Print jobsub command and exit."
263  )
264 
265  parser.add_argument("--disk",
266  type=int,
267  help="Local disk space requirement for worker node in MB (default is 2400MB without flattening).",) # No default here for flatten resource logic
268 
269  parser.add_argument("--memory",
270  type=int,
271  help="Local memory requirement for worker node in MB (default is 3000MB without flattening).") # No default here for flatten resource logic
272 
273 
274 
275  return parser.parse_args()
276 
277 def ParseParts(parts_string):
278  parts = set()
279  for tok in parts_string.split(","):
280  if "-" in tok:
281  start, end = (int(t) for t in tok.split("-"))
282  parts |= set(range(start, end+1))
283  else:
284  parts.add(int(tok))
285 
286  return parts
287 
288 def Submit(args):
289  # can be used for interactive testing if needed
290 # cmd = "$NOVAGRIDUTILS_DIR/bin/concat_job_project.sh" \
291 
292 
293  decafscript = args["decaf_macro"]
294 
295  cmd = "jobsub_submit -g -N%(nparts)d --role=%(role)s "
296 
297 
298  # Set resource requirements
299  cmd += " --disk=%sMB" % args["disk"]
300  cmd += " --memory=%sMB" % args["memory"]
301  cmd += " --expected-lifetime=23h"
302 
303  # files on blue arc are not accessible directly, we need to copy them to the grid nodes
304  if "/nova/" in os.path.expandvars(args["decaf_macro"]):
305  print("decaf macro has a blue arc location, copying it to grid nodes")
306  # copy it at submission time
307  cmd += " -f dropbox://%(decaf_macro)s"
308  # remove path string containing /nova/ for concat_job_project.sh
309  decafscript = args["decaf_macro"].split('/')[-1]
310 
311  if args["testrel"]:
312  cmd += " --tar_file_name " + args["testrel"]
313 
314  cmd += " file://$NOVAGRIDUTILS_DIR/bin/concat_job_project.sh" \
315  + " --dataset %(dataset)s --outname %(outname)s --snapid %(snapshot_id)s" \
316  + " --nparts %(nparts)d --release %(release)s --outdir %(outputdir)s"
317 
318  cmd %= args
319  cmd += " --decafscript %s " % decafscript
320 
321  if args["testrel"]:
322  cmd += " --testrel"
323  if len(args["parts"]):
324  cmd += " --parts " + ",".join(str(a) for a in args["parts"])
325  if args["flatten"]:
326  cmd += " --flatten"
327  if args["flat_only"]:
328  cmd += " --flat_only"
329 
330  cmd = os.path.expandvars(cmd)
331  print(cmd)
332  if not args["dry_run"]:
333  subprocess.check_call(shlex.split(cmd))
334 
335 #####################################
336 
337 if __name__ == "__main__":
338  args = ParseArgs()
339 
340  # get path to test release
341  testrel = ""
342  if args.testrel:
343  if args.reuse_tarball:
344  testrel += 'dropbox://'+os.path.basename(args.testrel)+'.tar'
345  else:
346  testrel += 'tardir://'+args.testrel+'\\\n'
347  elif args.user_tarball:
348  if not os.path.isfile(args.user_tarball):
349  print("Tarball filename passed to --user_tarball does not exit:", args.user_tarball)
350  sys.exit(1)
351  testrel += 'dropbox://' + args.user_tarball + ' \\\n'
352 
353  # sanity checking
354  if args.flat_only and not args.flatten:
355  print("Need to provide a --flatten option if you need a flattened concat!")
356  sys.exit(1)
357 
358  release_path = os.path.expandvars(os.path.join(CVMFS_RELEASE_DIR, args.release))
359  if not os.path.isdir(release_path):
360 # print release_path
361  print("Invalid release specified:", args.release, file=sys.stderr)
362  sys.exit(1)
363 
364  out_name = BuildOutputNameStub(args.dataset, args.decaf or args.decaf_positional, rename_cafs=not args.no_caf_rename)
365 
366  # compute the PARTS if commas or ranges
367  parts = set()
368  parts_args = args.parts or args.parts_positional
369  if parts_args:
370  parts_joined = "".join(parts_args)
371  try:
372  parts = ParseParts(parts_joined)
373  except ValueError as e:
374 # raise
375  print(e)
376  print("Invalid value specified for --parts:", parts)
377  sys.exit(1)
378 
379  CheckProxy(args.role or args.role_positional)
380 
381  SAM = samweb_client.SAMWebClient(experiment='nova')
382  if args.use_last_snapshot:
383  snapshot_id = None
384  else:
385  try:
386  snapshot_id = int(SAM.takeSnapshot(args.dataset))
387  print("Took snapshot %d for defn %s" % (snapshot_id, args.dataset))
388  except samweb_client.exceptions.Error as e:
389  print(e, file=sys.stderr)
390  sys.exit(1)
391 
392  nparts = CalculateNParts(args.dataset, snapshot_id, requested_num_parts=(args.nparts or args.nparts_positional))
393  if any(p > nparts-1 for p in parts):
394  print("One or more requested parts (you requested: %s) is larger than the number of parts (%d)!" % (list(parts), nparts), file=sys.stderr)
395  sys.exit(1)
396  print("Requires submitting", nparts, "jobs.")
397 
398  # don't think we need this anymore
399  # CheckProjects(args.dataset)
400 
401  # increase resource requirements for flattening
402  if args.flatten:
403  if not args.disk:
404  args.disk = 5000
405  if not args.memory:
406  args.memory = 1999
407 
408  # check if this is a shortcut string
409  concat = args.decaf or args.decaf_positional
410  if concat in DECAF_MACROS:
411  concat = DECAF_MACROS[concat]
412  print(concat)
413 
414  MakeOutputDir(args.outputdir)
415  print("Output sent to:", args.outputdir)
416 
417  print()
418  if args.dry_run:
419  print("Invoked with --dry-run, so not submitting. But I would have used the following jobsub command:")
420  else:
421  print("Submitting:")
422  Submit({
423  "nparts": nparts,
424  "role": args.role or args.role_positional,
425  "dataset": args.dataset,
426  "outname": out_name,
427  "snapshot_id": snapshot_id or "latest",
428  "release": args.release,
429  "testrel": testrel,
430  "outputdir": args.outputdir,
431  "decaf_macro": concat,
432  "parts": parts,
433  "dry_run": args.dry_run,
434  "flatten": args.flatten,
435  "flat_only": args.flat_only,
436  "disk": args.disk or 2400,
437  "memory": args.memory or 3000
438  })
void split(double tt, double *fr)
bool print
def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None)
def ParseParts(parts_string)
def BuildOutputNameStub(dataset_name, decaf_macro, rename_cafs=True)