Functions | Variables
submit_concat_project Namespace Reference

Functions

def BuildOutputNameStub (dataset_name, decaf_macro, rename_cafs=True)
 
def CalculateNParts (defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None)
 
def CheckProjects (dataset)
 
def CheckProxy (role)
 
def MakeOutputDir (dirname)
 
def ParseArgs ()
 
def ParseParts (parts_string)
 
def Submit (args)
 

Variables

 search_path
 
 cafana_dir
 
 CVMFS_RELEASE_DIR
 
 DECAF_MACROS
 
 MAX_N_JOBS
 
 SAM
 
 args
 
 testrel
 
 release_path
 
 out_name
 
 decaf_positional
 
 rename_cafs
 
 parts
 
 parts_args
 
 parts_joined
 
 experiment
 
 snapshot_id
 
 nparts
 
 dataset
 
 requested_num_parts
 
 concat
 

Detailed Description

  submit_concat_project.py:
      Submit jobs for producing concats from a dataset.
    
    Original author: J. Wolcott <jwolcott@fnal.gov>, rewritten from submit_concat_project.sh
       Date: August 2017

Function Documentation

def submit_concat_project.BuildOutputNameStub (   dataset_name,
  decaf_macro,
  rename_cafs = True 
)

Definition at line 67 of file submit_concat_project.py.

67 def BuildOutputNameStub(dataset_name, decaf_macro, rename_cafs=True):
68  # make the output file name stub.
69  # this is a concat script, so we want to ensure "caf" -> "sumcaf",
70  # and "decaf" -> "sumdecaf"...
71 
72  out_name = dataset_name
73  replacements = {"_caf": "_sumcaf", "_decaf": "_sumdecaf", "_restrictedcaf": "_sumrestrictedcaf"}
74  for repl in replacements:
75  out_name = out_name.replace(repl, replacements[repl])
76 
77  # ... and if we're actually decaffing, that "caf" -> "decaf"
78  if rename_cafs and os.path.basename(decaf_macro) != "concat_dataset.C":
79  out_name = out_name.replace("_sumcaf", "_sumdecaf")
80  out_name = out_name.replace("_sumrestrictedcaf", "_sumrestricteddecaf")
81  if decaf_macro in DECAF_MACROS:
82  out_name = out_name + "_" + decaf_macro
83 
84  return out_name
85 
86 """
87  Compute number of parts dataset should be split into.
88 
89  snapshot_id = id of snapshot of dataset
90  chunk_size = size of one file (in GB)
91  max_singlefile_size = if dataset size is less than this size in GB, only a single file will be made
92 """
def BuildOutputNameStub(dataset_name, decaf_macro, rename_cafs=True)
def submit_concat_project.CalculateNParts (   defname,
  snapshot_id,
  chunk_size = 2,
  max_singlefile_size = 20,
  requested_num_parts = None 
)

Definition at line 93 of file submit_concat_project.py.

93 def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None):
94  if snapshot_id:
95  dimensions = "snapshot_id=%d" % snapshot_id
96  else:
97  dimensions = "dataset_def_name_newest_snapshot %s" % defname
98  summary = SAM.listFilesSummary(dimensions=dimensions)
99  size = summary["total_file_size"]
100  n_files = summary["file_count"]
101 
102  if requested_num_parts:
103  if requested_num_parts <= n_files:
104  return requested_num_parts
105  else:
106  print >> sys.stderr, "You requested %d parts, but there are only %d files in the dataset. Can't proceed." % (requested_num_parts, n_files)
107  sys.exit(1)
108 
109  GBs = size / 1024**3 # bytes to gigabytes
110  if GBs < max_singlefile_size:
111  return 1
112  else:
113  return GBs/chunk_size
114 
def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None)
def submit_concat_project.CheckProjects (   dataset)

Definition at line 115 of file submit_concat_project.py.

115 def CheckProjects(dataset):
116  already_running_projects = SAM.listProjects(name='%%_proj_concat_%s%%' % dataset, state='running')
117  if len(already_running_projects) + nparts > MAX_N_JOBS:
118  print >> sys.stderr, "Current concat request would require %d jobs, but" % nparts
119  if len(already_running_projects) > 0:
120  print >> sys.stderr, "there are already %d SAM projects running for this definition," % len(already_running_projects)
121  print >> sys.stderr, "and ",
122  print >> sys.stderr, "there is a maximum of %d running concat projects for the same definition." % MAX_N_JOBS
123  print >> sys.stderr, "Please retry using --parts to restrict how many jobs are submitted."
124  sys.exit(1)
125 
126 
def submit_concat_project.CheckProxy (   role)

Definition at line 127 of file submit_concat_project.py.

127 def CheckProxy(role):
128  # now check we have a usable proxy
129  arg = "-c"
130  if role == "Production":
131  arg += "p"
132  try:
133  subprocess.check_call(["setup_fnal_security", arg])
134  except subprocess.CalledProcessError:
135  print >> sys.stderr, "Please run 'setup_fnal_security' with the correct options to get a valid proxy and try again."
136  sys.exit(2)
137 
def submit_concat_project.MakeOutputDir (   dirname)

Definition at line 138 of file submit_concat_project.py.

138 def MakeOutputDir(dirname):
139  try:
140  if not os.path.isdir(dirname):
141  os.makedirs(dirname)
142  os.chmod(dirname, 0775)
143  except OSError:
144  print >> sys.stderr, "Output directory '%s' cannot be created!" % dirname
145  sys.exit(1)
146 
def submit_concat_project.ParseArgs ( )

Definition at line 147 of file submit_concat_project.py.

147 def ParseArgs():
148  parser = argparse.ArgumentParser(description="Submit jobs for concatentation of (de)CAFs.")
149 
150  parser.add_argument("dataset",
151  metavar="DATASET",
152  help="Dataset containing (de)CAFs to concatenate",
153  )
154  parser.add_argument("outputdir",
155  metavar="OUTPUTDIR",
156  help="Where to write output",
157  )
158  parser.add_argument("release",
159  metavar="RELEASE",
160  help="Version of novasoft to use",
161  )
162 
163  # support old (positional) syntax as well as nicer (option-based) one
164  gp = parser.add_mutually_exclusive_group()
165  gp.add_argument("decaf_positional",
166  metavar="DECAF",
167  nargs="?",
168  default="$NOVAGRIDUTILS_DIR/bin/concat_dataset.C",
169  help="Decaffing macro to run. Either specify full path here or one of the following shortcut options: %s. Can also specify as --decaf (below)." % DECAF_MACROS.keys(),
170  )
171  gp.add_argument("--decaf", "--decaf_macro",
172  metavar="DECAF",
173  help="Alternate form of positional DECAF (above).",
174  )
175 
176  # etc.
177  gp = parser.add_mutually_exclusive_group()
178  gp.add_argument("role_positional",
179  metavar="ROLE",
180  nargs="?",
181  default="Analysis",
182  choices=["Analysis", "Production"],
183  help="VOMS role to submit with. (You must have Production-eligible proxy to use the Production role.) Can also specify as --role or --voms_role (below).",
184  )
185  gp.add_argument("--role", "--voms_role",
186  choices=["Analysis", "Production"],
187  help="Alternate form of positional ROLE (above).",
188  )
189 
190  gp = parser.add_mutually_exclusive_group()
191  gp.add_argument("nparts_positional",
192  nargs="?",
193  type=int,
194  metavar="NPARTS",
195  help="Divide concats into this many files. If PARTS is also specified, NPARTS must correspond exactly to the number of parts. Can also specify as --nparts (below).",
196  )
197  gp.add_argument("--nparts",
198  metavar="NPARTS",
199  type=int,
200  help="Alternate form of positional NPARTS (above).",
201  )
202 
203  gp = parser.add_mutually_exclusive_group()
204  gp.add_argument("parts_positional",
205  nargs="?",
206  metavar="PARTS",
207  help="The indices of which concat files to process. Optional form '--parts' (see description below) is preferred. ",
208  )
209  gp.add_argument("--parts",
210  metavar="PARTS",
211  action="append",
212  help="The indices of which concat files to process. Can separate multiple items with commas, or specify --parts multiple times." \
213  +" Ranges can be specified using a hyphen. So, '--parts 3,5,7-9 --parts 11-15 --parts 27' is legal.",
214  )
215 
216  # allow user to opt out of 'caf' -> 'decaf' renaming in case we don't automatically get it right
217  parser.add_argument("--no_caf_rename",
218  action="store_true",
219  default=False,
220  help="Do not substitute 'decaf' for 'caf' in the output name. Useful if you're only concatting CAFs with some script other than concat_dataset.C.",
221  )
222 
223  parser.add_argument("--use_last_snapshot",
224  action="store_true",
225  default=False,
226  help="Use the last snapshot of the definition rather than taking a new one.",
227  )
228 
229  parser.add_argument("--flatten",
230  action="store_true",
231  default=False,
232  help="Flatten concat file",
233  )
234 
235  parser.add_argument("--flat_only",
236  action="store_true",
237  default=False,
238  help="Produce only flattened concat file",
239  )
240 
241  gp = parser.add_mutually_exclusive_group(required=False)
242  gp.add_argument('-t', '--testrel', metavar='DIR',
243  help='Use a test release at location TESTREL. It will be tarred up, and sent to the worker node. (Conflicts with --user_tarball)',
244  default=None)
245 
246  gp.add_argument("--user_tarball",
247  help="Use existing test release tarball in specified location rather than having jobsub make one for you (conflicts with --testrel, and is redunant with --reuse_tarball)",
248  type=str)
249 
250  gp.add_argument('--reuse_tarball',
251  help='Do you want to reuse a tarball that is already in resilient space? If using this option avoid trailing slash in --testrel option. (redundant with --user_tarball)',
252  action='store_true',default=False)
253 
254  parser.add_argument("-n", "--dry-run", "--test-only",
255  action="store_true",
256  default=False,
257  help="Test only; don't actually submit any jobs. Print jobsub command and exit."
258  )
259 
260 
261 
262  return parser.parse_args()
263 
def submit_concat_project.ParseParts (   parts_string)

Definition at line 264 of file submit_concat_project.py.

References makeTrainCVSamples.int, and PandAna.Demos.demo1.range.

264 def ParseParts(parts_string):
265  parts = set()
266  for tok in parts_string.split(","):
267  if "-" in tok:
268  start, end = (int(t) for t in tok.split("-"))
269  parts |= set(range(start, end+1))
270  else:
271  parts.add(int(tok))
272 
273  return parts
274 
def ParseParts(parts_string)
def submit_concat_project.Submit (   args)

Definition at line 275 of file submit_concat_project.py.

References split(), and runNovaSAM.str.

275 def Submit(args):
276  # can be used for interactive testing if needed
277 # cmd = "$NOVAGRIDUTILS_DIR/bin/concat_job_project.sh" \
278 
279 
280  decafscript = args["decaf_macro"]
281 
282  cmd = "jobsub_submit -g --OS=SL6" \
283  + " -N%(nparts)d --role=%(role)s "
284 
285  # increase resource requirements for flattening
286  if args["flatten"]:
287  cmd += " --memory=1999MB --disk=5GB --expected-lifetime=23h"
288  else:
289  cmd += " --expected-lifetime=23h"
290 
291  # files on blue arc are not accessible directly, we need to copy them to the grid nodes
292  if "/nova/" in os.path.expandvars(args["decaf_macro"]):
293  print "decaf macro has a blue arc location, copying it to grid nodes"
294  # copy it at submission time
295  cmd += " -f dropbox://%(decaf_macro)s"
296  # remove path string containing /nova/ for concat_job_project.sh
297  decafscript = args["decaf_macro"].split('/')[-1]
298 
299  if args["testrel"]:
300  cmd += " --tar_file_name " + args["testrel"]
301 
302  cmd += " file://$NOVAGRIDUTILS_DIR/bin/concat_job_project.sh" \
303  + " --dataset %(dataset)s --outname %(outname)s --snapid %(snapshot_id)s" \
304  + " --nparts %(nparts)d --release %(release)s --outdir %(outputdir)s"
305 
306  cmd %= args
307  cmd += " --decafscript %s " % decafscript
308 
309  if args["testrel"]:
310  cmd += " --testrel"
311  if len(args["parts"]):
312  cmd += " --parts " + ",".join(str(a) for a in args["parts"])
313  if args["flatten"]:
314  cmd += "--flatten"
315  if args["flat_only"]:
316  cmd += "--flat_only"
317 
318  cmd = os.path.expandvars(cmd)
319  print cmd
320  if not args["dry_run"]:
321  subprocess.check_call(shlex.split(cmd))
322 
void split(double tt, double *fr)

Variable Documentation

submit_concat_project.args

Definition at line 326 of file submit_concat_project.py.

submit_concat_project.cafana_dir

Definition at line 25 of file submit_concat_project.py.

submit_concat_project.concat
submit_concat_project.CVMFS_RELEASE_DIR

Definition at line 34 of file submit_concat_project.py.

submit_concat_project.dataset

Definition at line 380 of file submit_concat_project.py.

submit_concat_project.DECAF_MACROS

Definition at line 36 of file submit_concat_project.py.

submit_concat_project.decaf_positional

Definition at line 352 of file submit_concat_project.py.

submit_concat_project.experiment

Definition at line 369 of file submit_concat_project.py.

submit_concat_project.MAX_N_JOBS

Definition at line 60 of file submit_concat_project.py.

submit_concat_project.nparts
submit_concat_project.out_name
submit_concat_project.parts
submit_concat_project.parts_args

Definition at line 356 of file submit_concat_project.py.

submit_concat_project.parts_joined

Definition at line 358 of file submit_concat_project.py.

submit_concat_project.release_path

Definition at line 346 of file submit_concat_project.py.

submit_concat_project.rename_cafs

Definition at line 352 of file submit_concat_project.py.

submit_concat_project.requested_num_parts

Definition at line 380 of file submit_concat_project.py.

submit_concat_project.SAM

Definition at line 65 of file submit_concat_project.py.

submit_concat_project.search_path

Definition at line 21 of file submit_concat_project.py.

Referenced by create_search_path(), is_parsable(), and TEST().

submit_concat_project.snapshot_id

Definition at line 371 of file submit_concat_project.py.

submit_concat_project.testrel

Definition at line 329 of file submit_concat_project.py.