submit_cafana.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import argparse
4 import os
5 import sys
6 import string
7 import re
8 from glob import glob
9 
10 # Various utility functions
11 import NovaGridUtils
12 from NovaGridUtils import warn, fail, sleep
13 
14 recommended_sites=["BNL",
15  "Caltech",
16  "Clemson",
17  "Cornell",
18  "FZU",
19 # "Harvard",
20  "Hyak_CE",
21  "Michigan",
22 # "MIT",
23 # "MWT2",
24  "Nebraska",
25  "NotreDame",
26  "Omaha",
27  "OSC",
28  "SMU_HPC",
29  "SU-OG",
30  "SU-ITS",
31 # "UChicago",
32  "UCSD",
33 # "TTU",
34  "Wisconsin"]
35 
36 known_os_containers = {
37  "sl6": "/cvmfs/singularity.opensciencegrid.org/fermilab/fnal-wn-sl6:latest",
38  "sl7": "/cvmfs/singularity.opensciencegrid.org/fermilab/fnal-wn-sl7:latest",
39  "el8": "/cvmfs/singularity.opensciencegrid.org/fermilab/fnal-wn-el8:latest",
40 }
41 
42 def remove_comments(src):
43  if '#' not in src: return src
44  return src[0:src.find('#')]
45 
46 def test_not_dcache(l, warnOnly = False):
47  loc = os.path.expandvars(l)
48  for bad in ['/nova/app/', '/nova/ana/', '/nova/data/', '/grid']:
49  if loc.startswith(bad):
50  txt = "Location %s cannot be on %s it must be in dCache /pnfs/nova/" % (loc, bad)
51  if loc.startswith('/nova/app'):
52  txt = "Jobs can no longer access BlueArc directly. Test releases will be tarred up and sent to worker nodes, however input files should be moved to dCache."
53  if warnOnly:
54  warn(txt)
55  else:
56  fail(txt)
57 
58 
59 def find_file(paths, filename):
60  if os.path.isfile(filename):
61  return filename
62  for path in paths:
63  for root, dirs, files in os.walk(os.path.expandvars(path)):
64  if filename in files:
65  return os.path.join(root, filename)
66  fail("Cannot find file "+filename)
67 
68 
69 def find_file_in_list(filepath, pathlist):
70  for testpath in pathlist:
71  if os.path.basename(filepath) == os.path.basename(testpath):
72  return True
73  return False
74 
75 
76 if __name__ == '__main__':
77 
78  input_files=[]
79 
80  NovaGridUtils.prog = 'submit_cafana.py' # for error messages etc
81 
82  # Start this parser up just to extract the -f or --file argument if it's
83  # there, and push all of those args into the command line for the real
84  # parser to deal with.
85  if "-f" in sys.argv or "--file" in sys.argv:
86  prelim_parser = argparse.ArgumentParser()
87 
88  prelim_parser.add_argument('-f', '--file', type=str, action='append')
89  pre_args, unknown = prelim_parser.parse_known_args()
90 
91  if pre_args.file:
92  for filepath in pre_args.file:
93  NovaGridUtils.check_file(filepath)
94  for line in open(filepath, 'r'):
95  sys.argv += remove_comments(line).split()
96 
97 
98 
99  parser = argparse.ArgumentParser(description = 'Submit a CAFAna macro with datasets split between N jobs')
100 
101  ###required options
102  # So it'll show up in the help text, but it was already handled above
103  parser.add_argument('-f', '--file', type=str,
104  help='Text file containing any arguments to this utility',
105  action='append')
106 
107  parser.add_argument('-n', '--njobs', type=int, required=True, metavar='N',
108  help='Number of grid processes')
109 
110  parser.add_argument('-r', '--rel', required=True,
111  help='Release to use')
112 
113  parser.add_argument('-i', '--input_file', action="append",
114  help="Copy this input file to work area on worker node")
115 
116  parser.add_argument('-o', '--outdir', required=True, metavar='DIR',
117  help='Directory output files will go to')
118 
119  parser.add_argument('-ss', '--snapshot', action='store_true',
120  help='Use latest snapshot instead of requerying')
121 
122  parser.add_argument('-off', '--offsite', action='store_true',
123  help='Run this cafana job offsite')
124 
125  parser.add_argument('-d', '--drain', action='store_true',
126  help='Recover files missing from your output directory')
127 
128  parser.add_argument('-x', '--xrootdebug', action='store_true',
129  help='Add extra xrootd debugging information',
130  default=False)
131 
132  parser.add_argument('--numuccinc', action = 'store_true',
133  help = 'Load libraries for specific xsec analysis')
134  parser.add_argument('--numubarccinc', action = 'store_true',
135  help = 'Load libraries for specific xsec analysis')
136  parser.add_argument('--numucc2p2h', action = 'store_true',
137  help = 'Load libraries for specific xsec analysis')
138  parser.add_argument('--numucc0pi', action = 'store_true',
139  help = 'Load libraries for specific xsec analysis')
140  parser.add_argument('--nuebarccinc', action = 'store_true',
141  help = 'Load libraries for specific xsec analysis')
142  parser.add_argument('--numubarccpi0', action = 'store_true',
143  help = 'Load libraries for specific xsec analysis')
144  parser.add_argument('--no3flavor', action = 'store_true',
145  help = 'Do *not* load libraries for 3-flavor analysis')
146  parser.add_argument('--nuxana', action="store_true",
147  help="Load libraries for NuX analysis")
148 
149  parser.add_argument('macro.C',
150  help='The CAFAna macro to run')
151 
152  parser.add_argument('args', nargs='*',
153  help='Arguments to the macro')
154 
155  ###Define job_control_args
156  job_control_args = parser.add_argument_group("Job control options", "These optional arguments help control where and how your jobs land.")
157  ###tarball control options.
158  tarball_gp = job_control_args.add_mutually_exclusive_group(required=False)
159  tarball_gp.add_argument('-t', '--testrel', metavar='DIR',
160  help='Use a test release at location TESTREL. It will be tarred up, and sent to the worker node. (Conflicts with --user_tarball)',
161  default=None)
162 
163  tarball_gp.add_argument("--user_tarball",
164  help="Use existing test release tarball in specified location rather than having jobsub make one for you (conflicts with --testrel, and is redunant with --reuse_tarball)",
165  type=str)
166 
167  ###general job control
168  job_control_args.add_argument('--reuse_tarball',
169  help='Do you want to reuse a tarball that is already in resilient space? If using this option avoid trailing slash in --testrel option. (redundant with --user_tarball)',
170  action='store_true',default=False)
171 
172  job_control_args.add_argument('--dedicated',
173  help='Only run on dedicated nodes on fermigrid (default is to run opportunistically)',
174  action='store_true',default=False)
175 
176  job_control_args.add_argument('--site',
177  help='Specify allowed offsite locations. Omit to allow running at any offsite location',
178  type=str,action='append')
179 
180  job_control_args.add_argument('--exclude_site',
181  help='Specify an offsite location to exclude.',
182  metavar='SITE',
183  type=str,action='append')
184 
185  job_control_args.add_argument('--recommended_sites',
186  help='Specify known working offsite locations.',
187  action='store_true',default=False)
188 
189  job_control_args.add_argument('--disk',
190  help='Local disk space requirement for worker node in MB (default is 2000MB).',
191  type=int, default=2000)
192 
193  job_control_args.add_argument('--memory',
194  help='Local memory requirement for worker node in MB (default is 1900MB).',
195  type=int, default=1900)
196 
197  job_control_args.add_argument('--lifetime',
198  help='Expected job lifetime. Valid values are an integer number of seconds. (default is 10800=3h)',
199  type=int, default="10800")
200 
201  job_control_args.add_argument('--source',
202  help='Source script SOURCE:par1:par2:..',
203  type=str, action='append')
204 
205  job_control_args.add_argument('--ngu_test',
206  help='Setup the test version of NovaGridUtils in the grid jobs.',
207  action='store_true')
208 
209  job_control_args.add_argument('--ngu_version',
210  help='Setup a specific NovaGridUtils version in the grid jobs.', metavar='VERSION',
211  type=str)
212 
213  job_control_args.add_argument('--testrel_ngu',
214  help="Must be used with --testrel, with NGU checked out. After unpacking tarball will setup the local version of NGU you are using on the work.",
215  action='store_true')
216 
217  job_control_args.add_argument('-ep', '--extproduct', type=str, metavar='PRODUCT:VERSION',
218  help='Setup this external product on the worker node in format <product>:<version>',
219  default='NONE')
220 
221  job_control_args.add_argument("--mail_always",
222  help="Do you want an email whenever every jobs finishes?",
223  default=False, action="store_true")
224 
225  job_control_args.add_argument("--mail_on_error",
226  help="Do you want an email whenever a job fails on an error?",
227  default=False, action="store_true")
228 
229  # --singularity and --os are mutually exclusive
230  sing_group = job_control_args.add_mutually_exclusive_group()
231  sing_group.add_argument('--singularity',
232  help='Location in CVMFS of a singularity container to launch the job into. ' +\
233  "If you're looking for a stock Scientific Linux image, try --os instead. " +\
234  "(This option is mutually exclusive with --os.)",
235  type=str,
236  default=None)
237  sing_group.add_argument("--os",
238  help="Run this job inside a Scientific Linux singularity image with given OS. " +\
239  "(This option is mutually exclusive with --singularity.)",
240  choices=known_os_containers)
241 
242  job_control_args.add_argument('--jobfactory',
243  help='Use the specified JobFactoryType. Only use with --singularity',
244  default="")
245 
246  job_control_args.add_argument("--gpu",
247  help="Request a node with a GPU",
248  default=False, action="store_true")
249 
250  job_control_args.add_argument("--export",
251  help='Export variable EXPORT to jobsub_submit',
252  type=str, action='append')
253 
254  job_control_args.add_argument('-c',
255  help='Append Condor requirements',
256  type=str, action='append')
257 
258  job_control_args.add_argument('--cpu',
259  help='Request worker nodes to have at least NUMBER cpus',
260  type=int)
261 
262  NovaGridUtils.add_node_features_arg(job_control_args)
263 
264  ###debugging
265  debugging_args = parser.add_argument_group("Debugging options", "These optional arguments can help debug your submission.")
266 
267  debugging_args.add_argument('--print_jobsub',
268  help='Print jobsub command',
269  action='store_true',default=False)
270 
271  debugging_args.add_argument('--test',
272  help='Do not actually do anything, just run tests and print jobsub cmd',
273  action='store_true',default=False)
274 
275  debugging_args.add_argument('--ifdh_debug',
276  help='Verbose output for pinning down IFDH/dCache issues',
277  action='store_true', default=False)
278 
279  opts = parser.parse_args()
280 
281  macro = os.path.abspath(vars(opts)['macro.C'])
282 
283  # Some sanity checks
285  rel = NovaGridUtils.check_tag(opts.rel)
287  test_not_dcache(opts.outdir)
288  NovaGridUtils.check_dir(opts.outdir)
290  if opts.testrel:
291  test_not_dcache(opts.testrel, True)
292  NovaGridUtils.check_dir(opts.testrel)
293  if not os.path.isdir(opts.testrel+'/lib/'+os.getenv('SRT_ARCH')+'-GCC-maxopt'):
294  NovaGridUtils.fail(opts.testrel+' has never been built maxopt')
295  if opts.user_tarball:
296  test_not_dcache(opts.user_tarball, True)
297 
298  # If draining, check what is missing in the output dir
299  draincmd = ""
300  if opts.drain:
301  outfiles = os.listdir(opts.outdir)
302  completejobs = set()
303  missingjobs = []
304  for filename in outfiles:
305  # Log files aren't a sign of success of a job
306  if filename.endswith('.log.txt'): continue
307  # Nor are core files
308  if filename.startswith('core.'): continue
309 
310  # Any other type of file output will be counted as a success
311  regex = re.compile(".*?(\d+)_of_(\d+)\..*")
312  reresult = regex.match(filename)
313  if reresult:
314  thisjob = int(reresult.group(1))
315  thismax = int(reresult.group(2))
316  completejobs.add(thisjob)
317  if opts.njobs != thismax:
318  print "You specified {} total jobs, but output files have the form x_of_{}. These must be consistent! Bailing.".format(opts.njobs, thismax)
319  sys.exit(5)
320  for jobno in range(1,opts.njobs+1):
321  if jobno not in completejobs:
322  missingjobs.append(jobno)
323  if len(missingjobs) == 0:
324  print "No jobs to drain. Exiting."
325  sys.exit(0)
326  else:
327  print "Found jobs to drain:"
328  print " ",missingjobs
329  if opts.njobs != len(missingjobs):
330  print " Adjusting njobs from %i to %i to match." % (opts.njobs, len(missingjobs))
331  draincmd = ' '
332  for jobno in missingjobs:
333  draincmd += '-d %i ' % jobno
334  draincmd += '\\\n'
335 
336  # Singularity, offsite checks
337  if opts.os:
338  # just redirect to the known container path
339  opts.singularity = known_os_containers[opts.os]
340 
341  if opts.singularity:
342  if not os.path.exists(opts.singularity):
343  fail("Requested singularity image cannot be found: %s" % opts.singularity)
344 
345  if opts.gpu and not opts.singularity:
346  warn("Requested GPU, but did not request singularity. This is not likely to succeed.")
347 
348  if opts.gpu and not opts.offsite:
349  warn("GPUs are only available offsite, and you have not chosen --offsite")
350 
351  grid_script = os.getenv('NOVAGRIDUTILS_DIR')+'/bin/cafe_grid_script.sh'
352 
353  NovaGridUtils.check_file(grid_script)
354 
355  if opts.input_file:
356  for inFile in opts.input_file:
357  test_not_dcache(inFile)
358  input_files += opts.input_file
359 
360  # grab user supplied source scripts
361  source_scripts=[]
362  if opts.source:
363  source_scripts += opts.source
364 
365  for script in source_scripts:
366  if ":" in script:
367  script_path = script.split(":")[0]
368  else:
369  script_path = script
370 
371  if not find_file_in_list(os.path.expandvars(script_path), input_files):
372  if not find_file(os.environ["PATH"].split(os.pathsep), os.path.expandvars(script_path)):
373  fail("Script %s does not exist!" % script_path)
374 
375  # Process the job_control_args
376  jobsub_opts = ""
377 
378  #usage model
379  usage_models=["DEDICATED"]
380  if not opts.dedicated:
381  usage_models.append("OPPORTUNISTIC")
382  if opts.offsite:
383  usage_models.append("OFFSITE")
384  jobsub_opts += " --resource-provides=usage_model=%s \\\n" % (",".join(usage_models))
385 
386  #disk
387  if opts.disk:
388  disk_opt=" --disk=%sMB \\\n" % (opts.disk)
389  jobsub_opts += disk_opt
390  #memory
391  if opts.memory:
392  mem_opt=" --memory=%sMB \\\n" % (opts.memory)
393  jobsub_opts += mem_opt
394 
395  #cpu
396  if opts.cpu:
397  cpu_opt=" --cpu=%s \\\n" %(opts.cpu)
398  jobsub_opts += cpu_opt
399 
400  #extra condor requirements
401  if opts.c:
402  append_condor_opt = " -c %s \\\n" % (",".join(opts.c))
403  jobsub_opts += append_condor_opt
404 
405  #emails about jobs?
406  if opts.mail_always:
407  jobsub_opts += " --mail_always \\\n"
408  elif opts.mail_on_error:
409  jobsub_opts += " --mail_on_error \\\n"
410  else:
411  jobsub_opts += " --mail_never \\\n"
412 
413  if opts.node_features:
414  jobsub_opts += " %s \\\n" % (
416  )
417 
418  # Never kill me for being over time. Find a node with enough time left in
419  # its glidein for what I requested, but don't penalize me for going over
420  # time.
421  life_opt=(" --expected-lifetime=0s \\\n --append_condor_requirements='(((TARGET.GLIDEIN_ToDie-CurrentTime)>%s)||isUndefined(TARGET.GLIDEIN_ToDie))' \\\n" % opts.lifetime)
422  jobsub_opts += life_opt
423 
424  # xrootd debugging
425  if opts.xrootdebug:
426  jobsub_opts += " -e XrdSecDEBUG=2 \\\n"
427 
428  # Singularity
429  if opts.singularity:
430  jobsub_opts += " --line='+SingularityImage=\\\"%s\\\"' \\\n" % opts.singularity
431  jobsub_opts += " --append_condor_requirements='(TARGET.HAS_SINGULARITY=?=true)' \\\n"
432  if opts.jobfactory:
433  jobsub_opts += " --line='+JobFactoryType=\\\"%s\\\"' \\\n" % opts.jobfactory
434 
435  # GPUs
436  if opts.gpu:
437  jobsub_opts += " --line='+RequestGPUs=1' \\\n"
438 
439  if opts.recommended_sites or opts.site:
440  site_opt=" --site="
441 
442  if opts.recommended_sites:
443  for isite in recommended_sites:
444  site_opt += isite + ","
445  if opts.site:
446  for isite in opts.site:
447  if isite not in recommended_sites:
448  warn("Site "+isite+" is not known to work. Your jobs may fail at that site. Sleeping for 5 seconds")
449  sleep(5)
450  site_opt += isite + ","
451 
452  site_opt=site_opt[:-1] + " \\\n"
453  jobsub_opts += site_opt
454 
455  if opts.exclude_site:
456  for isite in opts.exclude_site:
457  jobsub_opts += " --append_condor_requirements='(TARGET.GLIDEIN_Site\ isnt\ \\\"%s\\\")' \\\n" % isite
458 
459  #Process the debugging_args
460  test=opts.test
461  print_jobsub=opts.print_jobsub
462  if test :
463  print_jobsub=True
464  print ""
465  warn("--test was specified, so all we do is run checks and print jobsub cmd.")
466 
467  if opts.ifdh_debug:
468  if os.path.isfile(".rootrc"):
469  fail("--ifdh_debug creates a temporary .rootrc file, but there is already one in this directory. Submit from somewhere else.")
470  else:
471  rc=open(".rootrc","w+")
472  rc.write("XNet.UseOldClient: yes\n") # root6 bug with new client, supresses other XNet configs like Debug
473  rc.write("XNet.Debug: 2\n")
474  rc.write("XNet.MaxRedirectCount: 255\n")
475  rc.close()
476  source_scripts += ['setup_test_product:NovaGridUtils']
477  source_scripts += ['export_var:IFDH_SILENT=0']
478  source_scripts += ['export_var:IFDH_DEBUG=1']
479  print_jobsub=True
480 
481  # Export
482  if opts.export:
483  for export in opts.export:
484  source_scripts += ['export_var:%s' % export]
485 
486  cmd = 'jobsub_submit '
487  if opts.drain:
488  cmd += '-N '+str(len(missingjobs))+' '
489  else:
490  cmd += '-N '+str(opts.njobs)+' '
491  cmd += "\\\n"+jobsub_opts
492  cmd += ' -f dropbox://%s \\\n' % (macro)
493  for f in input_files:
494  cmd += ' -f %s \\\n' % (f)
495  if opts.ifdh_debug:
496  cmd += ' -f dropbox://%s \\\n' % (os.path.abspath(".rootrc"))
497 
498  # Test release options.
499  if opts.testrel:
500  if opts.reuse_tarball:
501  cmd += ' --tar_file_name dropbox://'+os.path.basename(opts.testrel)+'.tar'
502  else:
503  cmd += ' --tar_file_name tardir://'+opts.testrel+'\\\n'
504  elif opts.user_tarball:
505  if not os.path.isfile(opts.user_tarball):
506  print "Tarball filename passed to --user_tarball does not exit:", opts.user_tarball
507  sys.exit(5)
508  cmd += ' --tar_file_name dropbox://' + opts.user_tarball + ' \\\n'
509 
510  # cafe_grid_script.sh and arguments
511  cmd += ' file://%s \\\n' % (grid_script)
512  for source_script in source_scripts:
513  cmd += ' --source %s \\\n' % (source_script)
514 
515  cmd += ' --release %s \\\n' % (rel)
516  if opts.testrel or opts.user_tarball:
517  cmd += ' --tarball \\\n'
518 
519  # NovaGridUtils options.
520  if opts.ngu_test:
521  cmd += ' --source setup_test_product:NovaGridUtils \\\n'
522 
523  if opts.ngu_version:
524  cmd += ' --source setup_product:NovaGridUtils:%s \\\n' %(opts.ngu_version)
525 
526  if opts.testrel_ngu:
527  cmd += ' --testrel_ngu \\\n'
528 
529  cmd += ' --macro %s \\\n' % (os.path.basename(macro))
530  cmd += ' --outdir %s \\\n' % (opts.outdir)
531  cmd += ' --njobs %s \\\n' % (str(opts.njobs))
532 
533  if opts.snapshot: cmd += ' --snapshot \\\n'
534 
535  if opts.numuccinc: cmd += ' --numuccinc \\\n'
536  if opts.numubarccinc: cmd += ' --numubarccinc \\\n'
537  if opts.numucc2p2h: cmd += ' --numucc2p2h \\\n'
538  if opts.numucc0pi: cmd += ' --numucc0pi \\\n'
539  if opts.nuebarccinc: cmd += ' --nuebarccinc \\\n'
540  if opts.numubarccpi0: cmd += ' --numubarccpi0 \\\n'
541  if opts.no3flavor: cmd += ' --no3flavor \\\n'
542  if opts.nuxana: cmd += ' --nuxana \\\n'
543 
544 
545  if opts.extproduct != 'NONE': cmd += ' --extproduct %s \\\n' % (opts.extproduct)
546  if opts.drain:
547  if not draincmd:
548  print "Jobs to drain, but no draining command. This shouldn't happen."
549  sys.exit(5)
550  cmd += draincmd
551  for a in opts.args: cmd += ' '+a
552 
553  if print_jobsub:
554  print cmd
555  sys.stdout.flush()
556  sys.stderr.flush()
557 
558  if not test:
559  os.system(cmd)
560 
561  if opts.ifdh_debug:
562  os.remove("./.rootrc")
void split(double tt, double *fr)
def fail(msg)
Definition: NovaGridUtils.py:7
def check_file(fname)
def check_is_group_writable(fname)
def find_file(paths, filename)
def test_not_dcache(l, warnOnly=False)
def make_jobsub_node_features_arg(features)
def check_tag(tag)
const std::map< std::pair< std::string, std::string >, Variable > vars
def fail(msg)
print a failure message, from: https://cdcvs.fnal.gov/redmine/projects/novaart/repository/entry/trunk...
Definition: common_tools.py:7
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14
def warn(msg)
print a warning message, from: https://cdcvs.fnal.gov/redmine/projects/novaart/repository/entry/trunk...
Definition: common_tools.py:16
procfile open("FD_BRL_v0.txt")
def check_dir(dname)
def add_node_features_arg(parser)
def remove_comments(src)
def get_credentials(role)
def find_file_in_list(filepath, pathlist)