submit_cafana.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 from builtins import str
5 from builtins import range
6 import argparse
7 import os
8 import sys
9 import string
10 import re
11 from glob import glob
12 
13 # Various utility functions
14 import NovaGridUtils
15 from NovaGridUtils import warn, fail, sleep
16 import recommended_sites as rs
17 
18 def remove_comments(src):
19  if '#' not in src: return src
20  return src[0:src.find('#')]
21 
22 def test_not_dcache(l, warnOnly = False):
23  loc = os.path.expandvars(l)
24  for bad in ['/nova/app/', '/nova/ana/', '/nova/data/', '/grid']:
25  if loc.startswith(bad):
26  txt = "Location %s cannot be on %s it must be in dCache /pnfs/nova/" % (loc, bad)
27  if loc.startswith('/nova/app'):
28  txt = "Jobs can no longer access BlueArc directly. Test releases will be tarred up and sent to worker nodes, however input files should be moved to dCache."
29  if warnOnly:
30  warn(txt)
31  else:
32  fail(txt)
33 
34 
35 def find_file(paths, filename):
36  if os.path.isfile(filename):
37  return filename
38  for path in paths:
39  for root, dirs, files in os.walk(os.path.expandvars(path)):
40  if filename in files:
41  return os.path.join(root, filename)
42  fail("Cannot find file "+filename)
43 
44 
45 def find_file_in_list(filepath, pathlist):
46  for testpath in pathlist:
47  if os.path.basename(filepath) == os.path.basename(testpath):
48  return True
49  return False
50 
51 
52 if __name__ == '__main__':
53 
54  input_files=[]
55 
56  NovaGridUtils.prog = 'submit_cafana.py' # for error messages etc
57 
58  # Start this parser up just to extract the -f or --file argument if it's
59  # there, and push all of those args into the command line for the real
60  # parser to deal with.
61  if "-f" in sys.argv or "--file" in sys.argv:
62  prelim_parser = argparse.ArgumentParser()
63 
64  prelim_parser.add_argument('-f', '--file', type=str, action='append')
65  pre_args, unknown = prelim_parser.parse_known_args()
66 
67  if pre_args.file:
68  for filepath in pre_args.file:
69  NovaGridUtils.check_file(filepath)
70  for line in open(filepath, 'r'):
71  sys.argv += remove_comments(line).split()
72 
73 
74 
75  parser = argparse.ArgumentParser(description = 'Submit a CAFAna macro with datasets split between N jobs')
76 
77  ###required options
78  # So it'll show up in the help text, but it was already handled above
79  parser.add_argument('-f', '--file', type=str,
80  help='Text file containing any arguments to this utility',
81  action='append')
82 
83  parser.add_argument('-n', '--njobs', type=int, required=True, metavar='N',
84  help='Number of grid processes')
85 
86  parser.add_argument('-r', '--rel', required=True,
87  help='Release to use')
88 
89  parser.add_argument('-i', '--input_file', action="append",
90  help="Copy this input file to work area on worker node")
91 
92  parser.add_argument('-o', '--outdir', required=True, metavar='DIR',
93  help='Directory output files will go to')
94 
95  parser.add_argument('-ss', '--snapshot', action='store_true',
96  help='Use latest snapshot instead of requerying')
97 
98  parser.add_argument('-off', '--offsite', action='store_true',
99  help='Run this cafana job offsite')
100 
101  parser.add_argument('-d', '--drain', action='store_true',
102  help='Recover files missing from your output directory')
103 
104  parser.add_argument('-x', '--xrootdebug', action='store_true',
105  help='Add extra xrootd debugging information',
106  default=False)
107 
108  parser.add_argument('--numuccinc', action = 'store_true',
109  help = 'Load libraries for specific xsec analysis')
110  parser.add_argument('--numubarccinc', action = 'store_true',
111  help = 'Load libraries for specific xsec analysis')
112  parser.add_argument('--numucc2p2h', action = 'store_true',
113  help = 'Load libraries for specific xsec analysis')
114  parser.add_argument('--numucc0pi', action = 'store_true',
115  help = 'Load libraries for specific xsec analysis')
116  parser.add_argument('--nuebarccinc', action = 'store_true',
117  help = 'Load libraries for specific xsec analysis')
118  parser.add_argument('--numubarccpi0', action = 'store_true',
119  help = 'Load libraries for specific xsec analysis')
120  parser.add_argument('--no3flavor', action = 'store_true',
121  help = 'Do *not* load libraries for 3-flavor analysis')
122  parser.add_argument('--nuxana', action="store_true",
123  help="Load libraries for NuX analysis")
124 
125  parser.add_argument('macro.C',
126  help='The CAFAna macro to run')
127 
128  parser.add_argument('args', nargs='*',
129  help='Arguments to the macro')
130 
131  ###Define job_control_args
132  job_control_args = parser.add_argument_group("Job control options", "These optional arguments help control where and how your jobs land.")
133  ###tarball control options.
134  tarball_gp = job_control_args.add_mutually_exclusive_group(required=False)
135  tarball_gp.add_argument('-t', '--testrel', metavar='DIR',
136  help='Use a test release at location TESTREL. It will be tarred up, and sent to the worker node. (Conflicts with --user_tarball)',
137  default=None)
138 
139  tarball_gp.add_argument("--user_tarball",
140  help="Use existing test release tarball in specified location rather than having jobsub make one for you (conflicts with --testrel, and is redunant with --reuse_tarball)",
141  type=str)
142 
143  ###general job control
144  job_control_args.add_argument('--reuse_tarball',
145  help='Do you want to reuse a tarball that is already in resilient space? If using this option avoid trailing slash in --testrel option. (redundant with --user_tarball)',
146  action='store_true',default=False)
147 
148  job_control_args.add_argument('--dedicated',
149  help='Only run on dedicated nodes on fermigrid (default is to run opportunistically)',
150  action='store_true',default=False)
151 
152  job_control_args.add_argument('--site',
153  help='Specify allowed offsite locations. Omit to allow running at any offsite location',
154  type=str,action='append')
155 
156  job_control_args.add_argument('--exclude_site',
157  help='Specify an offsite location to exclude.',
158  metavar='SITE',
159  type=str,action='append')
160 
161  job_control_args.add_argument('--recommended_sites',
162  help='Specify known working offsite locations.',
163  action='store_true',default=False)
164 
165  job_control_args.add_argument('--disk',
166  help='Local disk space requirement for worker node in MB (default is 2000MB).',
167  type=int, default=2000)
168 
169  job_control_args.add_argument('--memory',
170  help='Local memory requirement for worker node in MB (default is 1900MB).',
171  type=int, default=1900)
172 
173  job_control_args.add_argument('--lifetime',
174  help='Expected job lifetime. Valid values are an integer number of seconds. (default is 10800=3h)',
175  type=int, default="10800")
176 
177  job_control_args.add_argument('--source',
178  help='Source script SOURCE:par1:par2:..',
179  type=str, action='append')
180 
181  job_control_args.add_argument('--ngu_test',
182  help='Setup the test version of NovaGridUtils in the grid jobs.',
183  action='store_true')
184 
185  job_control_args.add_argument('--ngu_version',
186  help='Setup a specific NovaGridUtils version in the grid jobs.', metavar='VERSION',
187  type=str)
188 
189  job_control_args.add_argument('--testrel_ngu',
190  help="Must be used with --testrel, with NGU checked out. After unpacking tarball will setup the local version of NGU you are using on the work.",
191  action='store_true')
192 
193  job_control_args.add_argument('-ep', '--extproduct', type=str, metavar='PRODUCT:VERSION',
194  help='Setup this external product on the worker node in format <product>:<version>',
195  default='NONE')
196 
197  job_control_args.add_argument("--mail_always",
198  help="Do you want an email whenever every jobs finishes?",
199  default=False, action="store_true")
200 
201  job_control_args.add_argument("--mail_on_error",
202  help="Do you want an email whenever a job fails on an error?",
203  default=False, action="store_true")
204 
205  # --singularity and --os are mutually exclusive
206  sing_group = job_control_args.add_mutually_exclusive_group()
207  sing_group.add_argument('--singularity',
208  help='Location in CVMFS of a singularity container to launch the job into. ' +\
209  "If you're looking for a stock Scientific Linux image, try --os instead. " +\
210  "(This option is mutually exclusive with --os.)",
211  type=str,
212  default=None)
213  sing_group.add_argument("--os",
214  help="Run this job inside a Scientific Linux singularity image with given OS. " +\
215  "(This option is mutually exclusive with --singularity.)",
216  choices=NovaGridUtils.KNOWN_OS_CONTAINERS)
217 
218  job_control_args.add_argument('--jobfactory',
219  help='Use the specified JobFactoryType. Only use with --singularity',
220  default="")
221 
222  job_control_args.add_argument("--gpu",
223  help="Request a node with a GPU",
224  default=False, action="store_true")
225 
226  job_control_args.add_argument("--export",
227  help='Export variable EXPORT to jobsub_submit',
228  type=str, action='append')
229 
230  job_control_args.add_argument('-c',
231  help='Append Condor requirements',
232  type=str, action='append')
233 
234  job_control_args.add_argument('--cpu',
235  help='Request worker nodes to have at least NUMBER cpus',
236  type=int)
237 
238  NovaGridUtils.add_node_features_arg(job_control_args)
239 
240  ###debugging
241  debugging_args = parser.add_argument_group("Debugging options", "These optional arguments can help debug your submission.")
242 
243  debugging_args.add_argument('--print_jobsub',
244  help='Print jobsub command',
245  action='store_true',default=False)
246 
247  debugging_args.add_argument('--test',
248  help='Do not actually do anything, just run tests and print jobsub cmd',
249  action='store_true',default=False)
250 
251  debugging_args.add_argument('--ifdh_debug',
252  help='Verbose output for pinning down IFDH/dCache issues',
253  action='store_true', default=False)
254 
255  opts = parser.parse_args()
256 
257  macro = os.path.abspath(vars(opts)['macro.C'])
258 
259  # currently can't non-tarred files (such as macros) through new jobsub release.
260  # Until there's a patch, catch that case and setup old jobsub
261  if (opts.testrel == None and opts.user_tarball == None):
262  import subprocess
263  jobsub_client_ver = subprocess.check_output ("ups active | grep jobsub_client", shell=True)
264  jobsub_client_ver = jobsub_client_ver.split()[-5]
265  if (jobsub_client_ver == "v1_3_3"):
266  print("WARNING: Can't pass macro with no testrel in jobsub_client v1_3_3, run")
267  print(" unsetup jobsub_client; setup jobsub_client v1_3_2_1")
268  print(" and resubmit")
269  sys.exit(1)
270 
271  # Some sanity checks
273  rel = NovaGridUtils.check_tag(opts.rel)
275  test_not_dcache(opts.outdir)
276  NovaGridUtils.check_dir(opts.outdir)
278  if opts.testrel:
279  test_not_dcache(opts.testrel, True)
280  NovaGridUtils.check_dir(opts.testrel)
281  if not os.path.isdir(opts.testrel+'/lib/'+os.getenv('SRT_ARCH')+'-GCC-maxopt'):
282  NovaGridUtils.fail(opts.testrel+' has never been built maxopt')
283  if opts.user_tarball:
284  test_not_dcache(opts.user_tarball, True)
285 
286  # If draining, check what is missing in the output dir
287  draincmd = ""
288  if opts.drain:
289  outfiles = os.listdir(opts.outdir)
290  completejobs = set()
291  missingjobs = []
292  for filename in outfiles:
293  # Log files aren't a sign of success of a job
294  if filename.endswith('.log.txt'): continue
295  # Nor are core files
296  if filename.startswith('core.'): continue
297 
298  # Any other type of file output will be counted as a success
299  regex = re.compile(".*?(\d+)_of_(\d+)\..*")
300  reresult = regex.match(filename)
301  if reresult:
302  thisjob = int(reresult.group(1))
303  thismax = int(reresult.group(2))
304  completejobs.add(thisjob)
305  if opts.njobs != thismax:
306  print("You specified {} total jobs, but output files have the form x_of_{}. These must be consistent! Bailing.".format(opts.njobs, thismax))
307  sys.exit(5)
308  for jobno in range(1,opts.njobs+1):
309  if jobno not in completejobs:
310  missingjobs.append(jobno)
311  if len(missingjobs) == 0:
312  print("No jobs to drain. Exiting.")
313  sys.exit(0)
314  else:
315  print("Found jobs to drain:")
316  print(" ",missingjobs)
317  if opts.njobs != len(missingjobs):
318  print(" Adjusting njobs from %i to %i to match." % (opts.njobs, len(missingjobs)))
319  draincmd = ' '
320  for jobno in missingjobs:
321  draincmd += '-d %i ' % jobno
322  draincmd += '\\\n'
323 
324  # Singularity, offsite checks
325  if opts.os:
326  # just redirect to the known container path
327  opts.singularity = NovaGridUtils.KNOWN_OS_CONTAINERS[opts.os]
328 
329  if opts.singularity:
330  if not os.path.exists(opts.singularity):
331  fail("Requested singularity image cannot be found: %s" % opts.singularity)
332 
333  if opts.gpu and not opts.singularity:
334  warn("Requested GPU, but did not request singularity. This is not likely to succeed.")
335 
336  if opts.gpu and not opts.offsite:
337  warn("GPUs are only available offsite, and you have not chosen --offsite")
338 
339  grid_script = os.getenv('NOVAGRIDUTILS_DIR')+'/bin/cafe_grid_script.sh'
340 
341  NovaGridUtils.check_file(grid_script)
342 
343  if opts.input_file:
344  for inFile in opts.input_file:
345  test_not_dcache(inFile)
346  input_files += opts.input_file
347 
348  # grab user supplied source scripts
349  source_scripts=[]
350  if opts.source:
351  source_scripts += opts.source
352 
353  for script in source_scripts:
354  if ":" in script:
355  script_path = script.split(":")[0]
356  else:
357  script_path = script
358 
359  if not find_file_in_list(os.path.expandvars(script_path), input_files):
360  if not find_file(os.environ["PATH"].split(os.pathsep), os.path.expandvars(script_path)):
361  fail("Script %s does not exist!" % script_path)
362 
363  # Process the job_control_args
364  jobsub_opts = ""
365 
366  #usage model
367  usage_models=["DEDICATED"]
368  if not opts.dedicated:
369  usage_models.append("OPPORTUNISTIC")
370  if opts.offsite:
371  usage_models.append("OFFSITE")
372  jobsub_opts += " --resource-provides=usage_model=%s \\\n" % (",".join(usage_models))
373 
374  #disk
375  if opts.disk:
376  disk_opt=" --disk=%sMB \\\n" % (opts.disk)
377  jobsub_opts += disk_opt
378  #memory
379  if opts.memory:
380  mem_opt=" --memory=%sMB \\\n" % (opts.memory)
381  jobsub_opts += mem_opt
382 
383  #cpu
384  if opts.cpu:
385  cpu_opt=" --cpu=%s \\\n" %(opts.cpu)
386  jobsub_opts += cpu_opt
387 
388  #extra condor requirements
389  if opts.c:
390  append_condor_opt = " -c %s \\\n" % (",".join(opts.c))
391  jobsub_opts += append_condor_opt
392 
393  #emails about jobs?
394  if opts.mail_always:
395  jobsub_opts += " --mail_always \\\n"
396  elif opts.mail_on_error:
397  jobsub_opts += " --mail_on_error \\\n"
398  else:
399  jobsub_opts += " --mail_never \\\n"
400 
401  if opts.node_features:
402  jobsub_opts += " %s \\\n" % (
404  )
405 
406  # Never kill me for being over time. Find a node with enough time left in
407  # its glidein for what I requested, but don't penalize me for going over
408  # time.
409  life_opt=(" --expected-lifetime=0s \\\n --append_condor_requirements='(((TARGET.GLIDEIN_ToDie-CurrentTime)>%s)||isUndefined(TARGET.GLIDEIN_ToDie))' \\\n" % opts.lifetime)
410  jobsub_opts += life_opt
411 
412  # xrootd debugging
413  if opts.xrootdebug:
414  jobsub_opts += " -e XrdSecDEBUG=2 \\\n"
415 
416  # Singularity
417  if opts.singularity:
418  jobsub_opts += " --line='+SingularityImage=\\\"%s\\\"' \\\n" % opts.singularity
419  jobsub_opts += " --append_condor_requirements='(TARGET.HAS_SINGULARITY=?=true)' \\\n"
420  if opts.jobfactory:
421  jobsub_opts += " --line='+JobFactoryType=\\\"%s\\\"' \\\n" % opts.jobfactory
422 
423  # GPUs
424  if opts.gpu:
425  jobsub_opts += " --line='+RequestGPUs=1' \\\n"
426 
427  if opts.recommended_sites or opts.site:
428  site_opt=" --site="
429 
430  if opts.recommended_sites:
431  istarball = opts.testrel or opts.user_tarball
432  recommended_sites=rs.get_recommended_sites('default', istarball, False)
433  for isite in recommended_sites:
434  site_opt += isite + ","
435  if opts.site:
436  for isite in opts.site:
437  if isite not in recommended_sites:
438  warn("Site "+isite+" is not known to work. Your jobs may fail at that site. Sleeping for 5 seconds")
439  sleep(5)
440  site_opt += isite + ","
441 
442  site_opt=site_opt[:-1] + " \\\n"
443  jobsub_opts += site_opt
444 
445  if opts.exclude_site:
446  for isite in opts.exclude_site:
447  jobsub_opts += " --append_condor_requirements='(TARGET.GLIDEIN_Site\ isnt\ \\\"%s\\\")' \\\n" % isite
448 
449  #Process the debugging_args
450  test=opts.test
451  print_jobsub=opts.print_jobsub
452  if test :
453  print_jobsub=True
454  print("")
455  warn("--test was specified, so all we do is run checks and print jobsub cmd.")
456 
457  if opts.ifdh_debug:
458  if os.path.isfile(".rootrc"):
459  fail("--ifdh_debug creates a temporary .rootrc file, but there is already one in this directory. Submit from somewhere else.")
460  else:
461  rc=open(".rootrc","w+")
462  rc.write("XNet.UseOldClient: yes\n") # root6 bug with new client, supresses other XNet configs like Debug
463  rc.write("XNet.Debug: 2\n")
464  rc.write("XNet.MaxRedirectCount: 255\n")
465  rc.close()
466  source_scripts += ['setup_test_product:NovaGridUtils']
467  source_scripts += ['export_var:IFDH_SILENT=0']
468  source_scripts += ['export_var:IFDH_DEBUG=1']
469  print_jobsub=True
470 
471  # Export
472  if opts.export:
473  for export in opts.export:
474  source_scripts += ['export_var:%s' % export]
475 
476  cmd = 'jobsub_submit '
477  if opts.drain:
478  cmd += '-N '+str(len(missingjobs))+' '
479  else:
480  cmd += '-N '+str(opts.njobs)+' '
481  cmd += "\\\n"+jobsub_opts
482  cmd += ' -f dropbox://%s \\\n' % (macro)
483  for f in input_files:
484  cmd += ' -f %s \\\n' % (f)
485  if opts.ifdh_debug:
486  cmd += ' -f dropbox://%s \\\n' % (os.path.abspath(".rootrc"))
487 
488  # Test release options.
489  if opts.testrel:
490  if opts.reuse_tarball:
491  cmd += ' --tar_file_name dropbox://'+os.path.basename(opts.testrel)+'.tar'
492  else:
493  cmd += ' --tar_file_name tardir://'+opts.testrel+'\\\n'
494  elif opts.user_tarball:
495  if not os.path.isfile(opts.user_tarball):
496  print("Tarball filename passed to --user_tarball does not exit:", opts.user_tarball)
497  sys.exit(5)
498  cmd += ' --tar_file_name dropbox://' + opts.user_tarball + ' \\\n'
499 
500  # cafe_grid_script.sh and arguments
501  cmd += ' file://%s \\\n' % (grid_script)
502  for source_script in source_scripts:
503  cmd += ' --source %s \\\n' % (source_script)
504 
505  cmd += ' --release %s \\\n' % (rel)
506  if opts.testrel or opts.user_tarball:
507  cmd += ' --tarball \\\n'
508 
509  # NovaGridUtils options.
510  if opts.ngu_test:
511  cmd += ' --source setup_test_product:NovaGridUtils \\\n'
512 
513  if opts.ngu_version:
514  cmd += ' --source setup_product:NovaGridUtils:%s \\\n' %(opts.ngu_version)
515 
516  if opts.testrel_ngu:
517  cmd += ' --testrel_ngu \\\n'
518 
519  cmd += ' --macro %s \\\n' % (os.path.basename(macro))
520  cmd += ' --outdir %s \\\n' % (opts.outdir)
521  cmd += ' --njobs %s \\\n' % (str(opts.njobs))
522 
523  if opts.snapshot: cmd += ' --snapshot \\\n'
524 
525  if opts.numuccinc: cmd += ' --numuccinc \\\n'
526  if opts.numubarccinc: cmd += ' --numubarccinc \\\n'
527  if opts.numucc2p2h: cmd += ' --numucc2p2h \\\n'
528  if opts.numucc0pi: cmd += ' --numucc0pi \\\n'
529  if opts.nuebarccinc: cmd += ' --nuebarccinc \\\n'
530  if opts.numubarccpi0: cmd += ' --numubarccpi0 \\\n'
531  if opts.no3flavor: cmd += ' --no3flavor \\\n'
532  if opts.nuxana: cmd += ' --nuxana \\\n'
533 
534 
535  if opts.extproduct != 'NONE': cmd += ' --extproduct %s \\\n' % (opts.extproduct)
536  if opts.drain:
537  if not draincmd:
538  print("Jobs to drain, but no draining command. This shouldn't happen.")
539  sys.exit(5)
540  cmd += draincmd
541  for a in opts.args: cmd += ' '+a
542 
543  if print_jobsub:
544  print(cmd)
545  sys.stdout.flush()
546  sys.stderr.flush()
547 
548  if not test:
549  os.system(cmd)
550 
551  if opts.ifdh_debug:
552  os.remove("./.rootrc")
void split(double tt, double *fr)
def fail(msg)
def check_file(fname)
def check_is_group_writable(fname)
def find_file(paths, filename)
def test_not_dcache(l, warnOnly=False)
def make_jobsub_node_features_arg(features)
bool print
def check_tag(tag)
const std::map< std::pair< std::string, std::string >, Variable > vars
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14
procfile open("FD_BRL_v0.txt")
def check_dir(dname)
def add_node_features_arg(parser)
def remove_comments(src)
def get_credentials(role)
def warn(msg)
def find_file_in_list(filepath, pathlist)