Functions | Variables
submit_concat_project Namespace Reference

Functions

def BuildOutputNameStub (dataset_name, decaf_macro, rename_cafs=True)
 
def CalculateNParts (defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None)
 
def CheckProjects (dataset)
 
def CheckProxy (role)
 
def MakeOutputDir (dirname)
 
def ParseArgs ()
 
def ParseParts (parts_string)
 
def Submit (args)
 

Variables

 search_path
 
 file
 
 cafana_dir
 
 CVMFS_RELEASE_DIR
 
 DECAF_MACROS
 
 MAX_N_JOBS
 
 SAM
 
 args
 
 testrel
 
 release_path
 
 release
 
 out_name
 
 decaf_positional
 
 rename_cafs
 
 parts
 
 parts_args
 
 parts_joined
 
 experiment
 
 snapshot_id
 
 e
 
 nparts
 
 dataset
 
 requested_num_parts
 
 disk
 
 memory
 
 concat
 

Detailed Description

  submit_concat_project.py:
      Submit jobs for producing concats from a dataset.
    
    Original author: J. Wolcott <jwolcott@fnal.gov>, rewritten from submit_concat_project.sh
       Date: August 2017

Function Documentation

def submit_concat_project.BuildOutputNameStub (   dataset_name,
  decaf_macro,
  rename_cafs = True 
)

Definition at line 72 of file submit_concat_project.py.

72 def BuildOutputNameStub(dataset_name, decaf_macro, rename_cafs=True):
73  # make the output file name stub.
74  # this is a concat script, so we want to ensure "caf" -> "sumcaf",
75  # and "decaf" -> "sumdecaf"...
76 
77  out_name = dataset_name
78  replacements = {"_caf": "_sumcaf", "_decaf": "_sumdecaf", "_restrictedcaf": "_sumrestrictedcaf"}
79  for repl in replacements:
80  out_name = out_name.replace(repl, replacements[repl])
81 
82  # ... and if we're actually decaffing, that "caf" -> "decaf"
83  if rename_cafs and os.path.basename(decaf_macro) != "concat_dataset.C":
84  out_name = out_name.replace("_sumcaf", "_sumdecaf")
85  out_name = out_name.replace("_sumrestrictedcaf", "_sumrestricteddecaf")
86  if decaf_macro in DECAF_MACROS:
87  out_name = out_name + "_" + decaf_macro
88 
89  return out_name
90 
91 """
92  Compute number of parts dataset should be split into.
93 
94  snapshot_id = id of snapshot of dataset
95  chunk_size = size of one file (in GB)
96  max_singlefile_size = if dataset size is less than this size in GB, only a single file will be made
97 """
def BuildOutputNameStub(dataset_name, decaf_macro, rename_cafs=True)
def submit_concat_project.CalculateNParts (   defname,
  snapshot_id,
  chunk_size = 2,
  max_singlefile_size = 20,
  requested_num_parts = None 
)

Definition at line 98 of file submit_concat_project.py.

References print.

98 def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None):
99  if snapshot_id:
100  dimensions = "snapshot_id=%d" % snapshot_id
101  else:
102  dimensions = "dataset_def_name_newest_snapshot %s" % defname
103  summary = SAM.listFilesSummary(dimensions=dimensions)
104  size = summary["total_file_size"]
105  n_files = summary["file_count"]
106 
107  if requested_num_parts:
108  if requested_num_parts <= n_files:
109  return requested_num_parts
110  else:
111  print("You requested %d parts, but there are only %d files in the dataset. Can't proceed." % (requested_num_parts, n_files), file=sys.stderr)
112  sys.exit(1)
113 
114  GBs = old_div(size, 1024**3) # bytes to gigabytes
115  if GBs < max_singlefile_size:
116  return 1
117  else:
118  return old_div(GBs,chunk_size)
119 
bool print
def CalculateNParts(defname, snapshot_id, chunk_size=2, max_singlefile_size=20, requested_num_parts=None)
def submit_concat_project.CheckProjects (   dataset)

Definition at line 120 of file submit_concat_project.py.

References print.

120 def CheckProjects(dataset):
121  already_running_projects = SAM.listProjects(name='%%_proj_concat_%s%%' % dataset, state='running')
122  if len(already_running_projects) + nparts > MAX_N_JOBS:
123  print("Current concat request would require %d jobs, but" % nparts, file=sys.stderr)
124  if len(already_running_projects) > 0:
125  print("there are already %d SAM projects running for this definition," % len(already_running_projects), file=sys.stderr)
126  print("and ", end=' ', file=sys.stderr)
127  print("there is a maximum of %d running concat projects for the same definition." % MAX_N_JOBS, file=sys.stderr)
128  print("Please retry using --parts to restrict how many jobs are submitted.", file=sys.stderr)
129  sys.exit(1)
130 
131 
bool print
def submit_concat_project.CheckProxy (   role)

Definition at line 132 of file submit_concat_project.py.

References print.

132 def CheckProxy(role):
133  # now check we have a usable proxy
134  arg = "-c"
135  if role == "Production":
136  arg += "p"
137  try:
138  subprocess.check_call(["setup_fnal_security", arg])
139  except subprocess.CalledProcessError:
140  print("Please run 'setup_fnal_security' with the correct options to get a valid proxy and try again.", file=sys.stderr)
141  sys.exit(2)
142 
bool print
def submit_concat_project.MakeOutputDir (   dirname)

Definition at line 143 of file submit_concat_project.py.

References print.

143 def MakeOutputDir(dirname):
144  try:
145  if not os.path.isdir(dirname):
146  os.makedirs(dirname)
147  os.chmod(dirname, 0o775)
148  except OSError:
149  print("Output directory '%s' cannot be created!" % dirname, file=sys.stderr)
150  sys.exit(1)
151 
bool print
def submit_concat_project.ParseArgs ( )

Definition at line 152 of file submit_concat_project.py.

References parse_dependency_file_t.list.

152 def ParseArgs():
153  parser = argparse.ArgumentParser(description="Submit jobs for concatentation of (de)CAFs.")
154 
155  parser.add_argument("dataset",
156  metavar="DATASET",
157  help="Dataset containing (de)CAFs to concatenate",
158  )
159  parser.add_argument("outputdir",
160  metavar="OUTPUTDIR",
161  help="Where to write output",
162  )
163  parser.add_argument("release",
164  metavar="RELEASE",
165  help="Version of novasoft to use",
166  )
167 
168  # support old (positional) syntax as well as nicer (option-based) one
169  gp = parser.add_mutually_exclusive_group()
170  gp.add_argument("decaf_positional",
171  metavar="DECAF",
172  nargs="?",
173  default="$NOVAGRIDUTILS_DIR/bin/concat_dataset.C",
174  help="Decaffing macro to run. Either specify full path here or one of the following shortcut options: %s. Can also specify as --decaf (below)." % list(DECAF_MACROS.keys()),
175  )
176  gp.add_argument("--decaf", "--decaf_macro",
177  metavar="DECAF",
178  help="Alternate form of positional DECAF (above).",
179  )
180 
181  # etc.
182  gp = parser.add_mutually_exclusive_group()
183  gp.add_argument("role_positional",
184  metavar="ROLE",
185  nargs="?",
186  default="Analysis",
187  choices=["Analysis", "Production"],
188  help="VOMS role to submit with. (You must have Production-eligible proxy to use the Production role.) Can also specify as --role or --voms_role (below).",
189  )
190  gp.add_argument("--role", "--voms_role",
191  choices=["Analysis", "Production"],
192  help="Alternate form of positional ROLE (above).",
193  )
194 
195  gp = parser.add_mutually_exclusive_group()
196  gp.add_argument("nparts_positional",
197  nargs="?",
198  type=int,
199  metavar="NPARTS",
200  help="Divide concats into this many files. If PARTS is also specified, NPARTS must correspond exactly to the number of parts. Can also specify as --nparts (below).",
201  )
202  gp.add_argument("--nparts",
203  metavar="NPARTS",
204  type=int,
205  help="Alternate form of positional NPARTS (above).",
206  )
207 
208  gp = parser.add_mutually_exclusive_group()
209  gp.add_argument("parts_positional",
210  nargs="?",
211  metavar="PARTS",
212  help="The indices of which concat files to process. Optional form '--parts' (see description below) is preferred. ",
213  )
214  gp.add_argument("--parts",
215  metavar="PARTS",
216  action="append",
217  help="The indices of which concat files to process. Can separate multiple items with commas, or specify --parts multiple times." \
218  +" Ranges can be specified using a hyphen. So, '--parts 3,5,7-9 --parts 11-15 --parts 27' is legal.",
219  )
220 
221  # allow user to opt out of 'caf' -> 'decaf' renaming in case we don't automatically get it right
222  parser.add_argument("--no_caf_rename",
223  action="store_true",
224  default=False,
225  help="Do not substitute 'decaf' for 'caf' in the output name. Useful if you're only concatting CAFs with some script other than concat_dataset.C.",
226  )
227 
228  parser.add_argument("--use_last_snapshot",
229  action="store_true",
230  default=False,
231  help="Use the last snapshot of the definition rather than taking a new one.",
232  )
233 
234  parser.add_argument("--flatten",
235  action="store_true",
236  default=True,
237  help="Flatten concat file",
238  )
239 
240  parser.add_argument("--flat_only",
241  action="store_true",
242  default=False,
243  help="Produce only flattened concat file",
244  )
245 
246  gp = parser.add_mutually_exclusive_group(required=False)
247  gp.add_argument('-t', '--testrel', metavar='DIR',
248  help='Use a test release at location TESTREL. It will be tarred up, and sent to the worker node. (Conflicts with --user_tarball)',
249  default=None)
250 
251  gp.add_argument("--user_tarball",
252  help="Use existing test release tarball in specified location rather than having jobsub make one for you (conflicts with --testrel, and is redunant with --reuse_tarball)",
253  type=str)
254 
255  gp.add_argument('--reuse_tarball',
256  help='Do you want to reuse a tarball that is already in resilient space? If using this option avoid trailing slash in --testrel option. (redundant with --user_tarball)',
257  action='store_true',default=False)
258 
259  parser.add_argument("-n", "--dry-run", "--test-only",
260  action="store_true",
261  default=False,
262  help="Test only; don't actually submit any jobs. Print jobsub command and exit."
263  )
264 
265  parser.add_argument("--disk",
266  type=int,
267  help="Local disk space requirement for worker node in MB (default is 2400MB without flattening).",) # No default here for flatten resource logic
268 
269  parser.add_argument("--memory",
270  type=int,
271  help="Local memory requirement for worker node in MB (default is 3000MB without flattening).") # No default here for flatten resource logic
272 
273 
274 
275  return parser.parse_args()
276 
def submit_concat_project.ParseParts (   parts_string)

Definition at line 277 of file submit_concat_project.py.

References makeTrainCVSamples.int, and PandAna.Demos.demo1.range.

277 def ParseParts(parts_string):
278  parts = set()
279  for tok in parts_string.split(","):
280  if "-" in tok:
281  start, end = (int(t) for t in tok.split("-"))
282  parts |= set(range(start, end+1))
283  else:
284  parts.add(int(tok))
285 
286  return parts
287 
def ParseParts(parts_string)
def submit_concat_project.Submit (   args)

Definition at line 288 of file submit_concat_project.py.

References print, split(), and submit_syst.str.

288 def Submit(args):
289  # can be used for interactive testing if needed
290 # cmd = "$NOVAGRIDUTILS_DIR/bin/concat_job_project.sh" \
291 
292 
293  decafscript = args["decaf_macro"]
294 
295  cmd = "jobsub_submit -g -N%(nparts)d --role=%(role)s "
296 
297 
298  # Set resource requirements
299  cmd += " --disk=%sMB" % args["disk"]
300  cmd += " --memory=%sMB" % args["memory"]
301  cmd += " --expected-lifetime=23h"
302 
303  # files on blue arc are not accessible directly, we need to copy them to the grid nodes
304  if "/nova/" in os.path.expandvars(args["decaf_macro"]):
305  print("decaf macro has a blue arc location, copying it to grid nodes")
306  # copy it at submission time
307  cmd += " -f dropbox://%(decaf_macro)s"
308  # remove path string containing /nova/ for concat_job_project.sh
309  decafscript = args["decaf_macro"].split('/')[-1]
310 
311  if args["testrel"]:
312  cmd += " --tar_file_name " + args["testrel"]
313 
314  cmd += " file://$NOVAGRIDUTILS_DIR/bin/concat_job_project.sh" \
315  + " --dataset %(dataset)s --outname %(outname)s --snapid %(snapshot_id)s" \
316  + " --nparts %(nparts)d --release %(release)s --outdir %(outputdir)s"
317 
318  cmd %= args
319  cmd += " --decafscript %s " % decafscript
320 
321  if args["testrel"]:
322  cmd += " --testrel"
323  if len(args["parts"]):
324  cmd += " --parts " + ",".join(str(a) for a in args["parts"])
325  if args["flatten"]:
326  cmd += " --flatten"
327  if args["flat_only"]:
328  cmd += " --flat_only"
329 
330  cmd = os.path.expandvars(cmd)
331  print(cmd)
332  if not args["dry_run"]:
333  subprocess.check_call(shlex.split(cmd))
334 
void split(double tt, double *fr)
bool print

Variable Documentation

submit_concat_project.args

Definition at line 338 of file submit_concat_project.py.

submit_concat_project.cafana_dir

Definition at line 30 of file submit_concat_project.py.

submit_concat_project.concat
submit_concat_project.CVMFS_RELEASE_DIR

Definition at line 39 of file submit_concat_project.py.

submit_concat_project.dataset

Definition at line 392 of file submit_concat_project.py.

submit_concat_project.DECAF_MACROS

Definition at line 41 of file submit_concat_project.py.

submit_concat_project.decaf_positional

Definition at line 364 of file submit_concat_project.py.

submit_concat_project.disk

Definition at line 404 of file submit_concat_project.py.

Referenced by plot_abstree().

submit_concat_project.e

Definition at line 389 of file submit_concat_project.py.

submit_concat_project.experiment

Definition at line 381 of file submit_concat_project.py.

submit_concat_project.file

Definition at line 28 of file submit_concat_project.py.

submit_concat_project.MAX_N_JOBS

Definition at line 65 of file submit_concat_project.py.

submit_concat_project.memory

Definition at line 406 of file submit_concat_project.py.

submit_concat_project.nparts
submit_concat_project.out_name
submit_concat_project.parts
submit_concat_project.parts_args

Definition at line 368 of file submit_concat_project.py.

submit_concat_project.parts_joined

Definition at line 370 of file submit_concat_project.py.

submit_concat_project.release

Definition at line 361 of file submit_concat_project.py.

submit_concat_project.release_path

Definition at line 358 of file submit_concat_project.py.

submit_concat_project.rename_cafs

Definition at line 364 of file submit_concat_project.py.

submit_concat_project.requested_num_parts

Definition at line 392 of file submit_concat_project.py.

submit_concat_project.SAM

Definition at line 70 of file submit_concat_project.py.

submit_concat_project.search_path

Definition at line 26 of file submit_concat_project.py.

submit_concat_project.snapshot_id

Definition at line 383 of file submit_concat_project.py.

submit_concat_project.testrel

Definition at line 341 of file submit_concat_project.py.