submit_concat_project.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 """
4  submit_concat_project.py:
5  Submit jobs for producing concats from a dataset.
6 
7  Original author: J. Wolcott <jwolcott@fnal.gov>, rewritten from submit_concat_project.sh
8  Date: August 2017
9 """
10 
11 import argparse
12 import os, os.path
13 import shlex
14 import stat
15 import subprocess
16 import sys
17 
18 import samweb_client
19 
20 try:
21  search_path = os.environ["FW_SEARCH_PATH"]
22 except KeyError:
23  print >> sys.stderr, "Release not yet set up. Please set up novasoft before using this script!"
24  sys.exit(1)
25 cafana_dir = ""
26 for path in search_path.split(":"):
27  if os.path.isdir(os.path.join(path, "CAFAna")):
28  cafana_dir = path
29  break
30 if cafana_dir == "":
31  print >> sys.stderr, "Couldn't find CAFAna directory! Abort."
32  sys.exit(1)
33 
34 CVMFS_RELEASE_DIR = "/cvmfs/nova.opensciencegrid.org/novasoft/slf6/novasoft/releases"
35 
36 DECAF_MACROS={
37  "validation" : cafana_dir + "/CAFAna/nus/reduce_nue_or_numu_or_nus.C",
38  "concat" : "$NOVAGRIDUTILS_DIR/bin/concat_dataset.C",
39  "nueSA" : cafana_dir + "/CAFAna/nue/reduce_nue_sa.C",
40  "nue2017" : cafana_dir + "/CAFAna/nue/reducer/reduce_bendecomp2017.C",
41  "nue2018" : cafana_dir + "/CAFAna/nue/reducer/reduce_nue_2018.C",
42  "nue2019" : cafana_dir + "/CAFAna/nue/reducer/reduce_nue_2018.C",
43  "numuSA" : cafana_dir + "/CAFAna/numu/FirstAnalysis/reduce_numu_fa.C",
44  "numu2017" : cafana_dir + "/CAFAna/numu/Analysis2017/reduce_numu_ana2017.C",
45  "numu2018" : cafana_dir + "/CAFAna/numu/Analysis2018/reduce_numu_ana2018.C",
46  "numu2019" : cafana_dir + "/CAFAna/numu/Analysis2018/reduce_numu_ana2018.C",
47  "nus" : cafana_dir + "/CAFAna/nus/reduce_nus.C",
48  "nus2019" : cafana_dir + "/CAFAna/nus/reduce_nus_ana2019.C",
49  "numu2020" : cafana_dir + "/CAFAna/3flavor/Ana2020/reducer/reduce_prod5_numu.C",
50  "nue2020" : cafana_dir + "/CAFAna/3flavor/Ana2020/reducer/reduce_prod5_nue.C",
51  "nus2020" : cafana_dir + "/CAFAna/nus/reduce_nus_ana2020.C",
52  "nue_or_numu_SA" : cafana_dir + "/CAFAna/nue/reduce_nue_or_numu_sa.C",
53  "nu_or_numu_or_nus" : cafana_dir + "/CAFAna/nus/reduce_nue_or_numu_or_nus.C",
54 }
55 
56 # There's a hard cap on the total number of SAM projects
57 # allowed to be running experiment-wide (=1000).
58 # Don't allow a single concatting pass to eat more than
59 # 10% of that.
60 MAX_N_JOBS = 100
61 
62 # this will be the SAM client,
63 # but we don't want to load it until we've checked
64 # command-line arguments etc.
65 SAM = None
66 
67 def BuildOutputNameStub(dataset_name, decaf_macro, rename_cafs=True):
68  # make the output file name stub.
69  # this is a concat script, so we want to ensure "caf" -> "sumcaf",
70  # and "decaf" -> "sumdecaf"...
71 
72  out_name = dataset_name
73  replacements = {"_caf": "_sumcaf", "_decaf": "_sumdecaf", "_restrictedcaf": "_sumrestrictedcaf"}
74  for repl in replacements:
75  out_name = out_name.replace(repl, replacements[repl])
76 
77  # ... and if we're actually decaffing, that "caf" -> "decaf"
78  if rename_cafs and os.path.basename(decaf_macro) != "concat_dataset.C":
79  out_name = out_name.replace("_sumcaf", "_sumdecaf")
80  out_name = out_name.replace("_sumrestrictedcaf", "_sumrestricteddecaf")
81  if decaf_macro in DECAF_MACROS:
82  out_name = out_name + "_" + decaf_macro
83 
84  return out_name
85 
86 """
87  Compute number of parts dataset should be split into.
88 
89  snapshot_id = id of snapshot of dataset
90  chunk_size = size of one file (in GB)
91  max_singlefile_size = if dataset size is less than this size in GB, only a single file will be made
92 """
93 def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None):
94  if snapshot_id:
95  dimensions = "snapshot_id=%d" % snapshot_id
96  else:
97  dimensions = "dataset_def_name_newest_snapshot %s" % defname
98  summary = SAM.listFilesSummary(dimensions=dimensions)
99  size = summary["total_file_size"]
100  n_files = summary["file_count"]
101 
102  if requested_num_parts:
103  if requested_num_parts <= n_files:
104  return requested_num_parts
105  else:
106  print >> sys.stderr, "You requested %d parts, but there are only %d files in the dataset. Can't proceed." % (requested_num_parts, n_files)
107  sys.exit(1)
108 
109  GBs = size / 1024**3 # bytes to gigabytes
110  if GBs < max_singlefile_size:
111  return 1
112  else:
113  return GBs/chunk_size
114 
115 def CheckProjects(dataset):
116  already_running_projects = SAM.listProjects(name='%%_proj_concat_%s%%' % dataset, state='running')
117  if len(already_running_projects) + nparts > MAX_N_JOBS:
118  print >> sys.stderr, "Current concat request would require %d jobs, but" % nparts
119  if len(already_running_projects) > 0:
120  print >> sys.stderr, "there are already %d SAM projects running for this definition," % len(already_running_projects)
121  print >> sys.stderr, "and ",
122  print >> sys.stderr, "there is a maximum of %d running concat projects for the same definition." % MAX_N_JOBS
123  print >> sys.stderr, "Please retry using --parts to restrict how many jobs are submitted."
124  sys.exit(1)
125 
126 
127 def CheckProxy(role):
128  # now check we have a usable proxy
129  arg = "-c"
130  if role == "Production":
131  arg += "p"
132  try:
133  subprocess.check_call(["setup_fnal_security", arg])
134  except subprocess.CalledProcessError:
135  print >> sys.stderr, "Please run 'setup_fnal_security' with the correct options to get a valid proxy and try again."
136  sys.exit(2)
137 
138 def MakeOutputDir(dirname):
139  try:
140  if not os.path.isdir(dirname):
141  os.makedirs(dirname)
142  os.chmod(dirname, 0775)
143  except OSError:
144  print >> sys.stderr, "Output directory '%s' cannot be created!" % dirname
145  sys.exit(1)
146 
147 def ParseArgs():
148  parser = argparse.ArgumentParser(description="Submit jobs for concatentation of (de)CAFs.")
149 
150  parser.add_argument("dataset",
151  metavar="DATASET",
152  help="Dataset containing (de)CAFs to concatenate",
153  )
154  parser.add_argument("outputdir",
155  metavar="OUTPUTDIR",
156  help="Where to write output",
157  )
158  parser.add_argument("release",
159  metavar="RELEASE",
160  help="Version of novasoft to use",
161  )
162 
163  # support old (positional) syntax as well as nicer (option-based) one
164  gp = parser.add_mutually_exclusive_group()
165  gp.add_argument("decaf_positional",
166  metavar="DECAF",
167  nargs="?",
168  default="$NOVAGRIDUTILS_DIR/bin/concat_dataset.C",
169  help="Decaffing macro to run. Either specify full path here or one of the following shortcut options: %s. Can also specify as --decaf (below)." % DECAF_MACROS.keys(),
170  )
171  gp.add_argument("--decaf", "--decaf_macro",
172  metavar="DECAF",
173  help="Alternate form of positional DECAF (above).",
174  )
175 
176  # etc.
177  gp = parser.add_mutually_exclusive_group()
178  gp.add_argument("role_positional",
179  metavar="ROLE",
180  nargs="?",
181  default="Analysis",
182  choices=["Analysis", "Production"],
183  help="VOMS role to submit with. (You must have Production-eligible proxy to use the Production role.) Can also specify as --role or --voms_role (below).",
184  )
185  gp.add_argument("--role", "--voms_role",
186  choices=["Analysis", "Production"],
187  help="Alternate form of positional ROLE (above).",
188  )
189 
190  gp = parser.add_mutually_exclusive_group()
191  gp.add_argument("nparts_positional",
192  nargs="?",
193  type=int,
194  metavar="NPARTS",
195  help="Divide concats into this many files. If PARTS is also specified, NPARTS must correspond exactly to the number of parts. Can also specify as --nparts (below).",
196  )
197  gp.add_argument("--nparts",
198  metavar="NPARTS",
199  type=int,
200  help="Alternate form of positional NPARTS (above).",
201  )
202 
203  gp = parser.add_mutually_exclusive_group()
204  gp.add_argument("parts_positional",
205  nargs="?",
206  metavar="PARTS",
207  help="The indices of which concat files to process. Optional form '--parts' (see description below) is preferred. ",
208  )
209  gp.add_argument("--parts",
210  metavar="PARTS",
211  action="append",
212  help="The indices of which concat files to process. Can separate multiple items with commas, or specify --parts multiple times." \
213  +" Ranges can be specified using a hyphen. So, '--parts 3,5,7-9 --parts 11-15 --parts 27' is legal.",
214  )
215 
216  # allow user to opt out of 'caf' -> 'decaf' renaming in case we don't automatically get it right
217  parser.add_argument("--no_caf_rename",
218  action="store_true",
219  default=False,
220  help="Do not substitute 'decaf' for 'caf' in the output name. Useful if you're only concatting CAFs with some script other than concat_dataset.C.",
221  )
222 
223  parser.add_argument("--use_last_snapshot",
224  action="store_true",
225  default=False,
226  help="Use the last snapshot of the definition rather than taking a new one.",
227  )
228 
229  parser.add_argument("--flatten",
230  action="store_true",
231  default=False,
232  help="Flatten concat file",
233  )
234 
235  parser.add_argument("--flat_only",
236  action="store_true",
237  default=False,
238  help="Produce only flattened concat file",
239  )
240 
241  gp = parser.add_mutually_exclusive_group(required=False)
242  gp.add_argument('-t', '--testrel', metavar='DIR',
243  help='Use a test release at location TESTREL. It will be tarred up, and sent to the worker node. (Conflicts with --user_tarball)',
244  default=None)
245 
246  gp.add_argument("--user_tarball",
247  help="Use existing test release tarball in specified location rather than having jobsub make one for you (conflicts with --testrel, and is redunant with --reuse_tarball)",
248  type=str)
249 
250  gp.add_argument('--reuse_tarball',
251  help='Do you want to reuse a tarball that is already in resilient space? If using this option avoid trailing slash in --testrel option. (redundant with --user_tarball)',
252  action='store_true',default=False)
253 
254  parser.add_argument("-n", "--dry-run", "--test-only",
255  action="store_true",
256  default=False,
257  help="Test only; don't actually submit any jobs. Print jobsub command and exit."
258  )
259 
260 
261 
262  return parser.parse_args()
263 
264 def ParseParts(parts_string):
265  parts = set()
266  for tok in parts_string.split(","):
267  if "-" in tok:
268  start, end = (int(t) for t in tok.split("-"))
269  parts |= set(range(start, end+1))
270  else:
271  parts.add(int(tok))
272 
273  return parts
274 
275 def Submit(args):
276  # can be used for interactive testing if needed
277 # cmd = "$NOVAGRIDUTILS_DIR/bin/concat_job_project.sh" \
278 
279 
280  decafscript = args["decaf_macro"]
281 
282  cmd = "jobsub_submit -g --OS=SL6" \
283  + " -N%(nparts)d --role=%(role)s "
284 
285  # increase resource requirements for flattening
286  if args["flatten"]:
287  cmd += " --memory=1999MB --disk=5GB --expected-lifetime=23h"
288  else:
289  cmd += " --expected-lifetime=23h"
290 
291  # files on blue arc are not accessible directly, we need to copy them to the grid nodes
292  if "/nova/" in os.path.expandvars(args["decaf_macro"]):
293  print "decaf macro has a blue arc location, copying it to grid nodes"
294  # copy it at submission time
295  cmd += " -f dropbox://%(decaf_macro)s"
296  # remove path string containing /nova/ for concat_job_project.sh
297  decafscript = args["decaf_macro"].split('/')[-1]
298 
299  if args["testrel"]:
300  cmd += " --tar_file_name " + args["testrel"]
301 
302  cmd += " file://$NOVAGRIDUTILS_DIR/bin/concat_job_project.sh" \
303  + " --dataset %(dataset)s --outname %(outname)s --snapid %(snapshot_id)s" \
304  + " --nparts %(nparts)d --release %(release)s --outdir %(outputdir)s"
305 
306  cmd %= args
307  cmd += " --decafscript %s " % decafscript
308 
309  if args["testrel"]:
310  cmd += " --testrel"
311  if len(args["parts"]):
312  cmd += " --parts " + ",".join(str(a) for a in args["parts"])
313  if args["flatten"]:
314  cmd += "--flatten"
315  if args["flat_only"]:
316  cmd += "--flat_only"
317 
318  cmd = os.path.expandvars(cmd)
319  print cmd
320  if not args["dry_run"]:
321  subprocess.check_call(shlex.split(cmd))
322 
323 #####################################
324 
325 if __name__ == "__main__":
326  args = ParseArgs()
327 
328  # get path to test release
329  testrel = ""
330  if args.testrel:
331  if args.reuse_tarball:
332  testrel += 'dropbox://'+os.path.basename(args.testrel)+'.tar'
333  else:
334  testrel += 'tardir://'+args.testrel+'\\\n'
335  elif args.user_tarball:
336  if not os.path.isfile(args.user_tarball):
337  print "Tarball filename passed to --user_tarball does not exit:", args.user_tarball
338  sys.exit(1)
339  testrel += 'dropbox://' + args.user_tarball + ' \\\n'
340 
341  # sanity checking
342  if args.flat_only and not args.flatten:
343  print "Need to provide a --flatten option if you need a flattened concat!"
344  sys.exit(1)
345 
346  release_path = os.path.expandvars(os.path.join(CVMFS_RELEASE_DIR, args.release))
347  if not os.path.isdir(release_path):
348 # print release_path
349  print >> sys.stderr, "Invalid release specified:", args.release
350  sys.exit(1)
351 
352  out_name = BuildOutputNameStub(args.dataset, args.decaf or args.decaf_positional, rename_cafs=not args.no_caf_rename)
353 
354  # compute the PARTS if commas or ranges
355  parts = set()
356  parts_args = args.parts or args.parts_positional
357  if parts_args:
358  parts_joined = "".join(parts_args)
359  try:
360  parts = ParseParts(parts_joined)
361  except ValueError as e:
362 # raise
363  print e
364  print "Invalid value specified for --parts:", parts
365  sys.exit(1)
366 
367  CheckProxy(args.role or args.role_positional)
368 
369  SAM = samweb_client.SAMWebClient(experiment='nova')
370  if args.use_last_snapshot:
371  snapshot_id = None
372  else:
373  try:
374  snapshot_id = int(SAM.takeSnapshot(args.dataset))
375  print "Took snapshot %d for defn %s" % (snapshot_id, args.dataset)
376  except samweb_client.exceptions.Error as e:
377  print >> sys.stderr, e
378  sys.exit(1)
379 
380  nparts = CalculateNParts(args.dataset, snapshot_id, requested_num_parts=(args.nparts or args.nparts_positional))
381  if any(p > nparts-1 for p in parts):
382  print >> sys.stderr, "One or more requested parts (you requested: %s) is larger than the number of parts (%d)!" % (list(parts), nparts)
383  sys.exit(1)
384  print "Requires submitting", nparts, "jobs."
385 
386  # don't think we need this anymore
387  # CheckProjects(args.dataset)
388 
389  # check if this is a shortcut string
390  concat = args.decaf or args.decaf_positional
391  if concat in DECAF_MACROS:
392  concat = DECAF_MACROS[concat]
393  print concat
394 
395  MakeOutputDir(args.outputdir)
396  print "Output sent to:", args.outputdir
397 
398  print
399  if args.dry_run:
400  print "Invoked with --dry-run, so not submitting. But I would have used the following jobsub command:"
401  else:
402  print "Submitting:"
403  Submit({
404  "nparts": nparts,
405  "role": args.role or args.role_positional,
406  "dataset": args.dataset,
407  "outname": out_name,
408  "snapshot_id": snapshot_id or "latest",
409  "release": args.release,
410  "testrel": testrel,
411  "outputdir": args.outputdir,
412  "decaf_macro": concat,
413  "parts": parts,
414  "dry_run": args.dry_run,
415  "flatten": args.flatten,
416  "flat_only": args.flat_only
417  })
void split(double tt, double *fr)
def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None)
def ParseParts(parts_string)
def BuildOutputNameStub(dataset_name, decaf_macro, rename_cafs=True)