run_unfold_genie_multiprocess.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import subprocess
4 import multiprocessing
5 import os
6 import argparse
7 def make_cmd(nnodes, ranks_per_node, procid, nuniv_per_rank, nfiles_per_rank, genie_batch_idx, totuniv, totfiles, mode):
8 
9  cmd = []
10  name_cmd = []
11  covered = False
12  # ncmd = nuniv*nfiles
13  nfiles_last_batch = totfiles%nfiles_per_rank
14 
15  # if nnodes > 1 and int(procid) == nnodes - 1:
16  # ifilefirst = totfiles - nfiles_last_batch
17  # ifilelast = ifilefirst + nfiles_last_batch -1
18  # else:
19  ifilefirst = int(procid) * nfiles_per_rank
20  ifilelast = ifilefirst + nfiles_per_rank - 1
21  if ifilefirst > totfiles-1:
22  print('First file index exceeds total number of files in the dataset')
23  sys.exit(-1)
24  if ifilelast > totfiles-1:
25  print('Reached the end of the dataset. Setting last file index to the last file')
26  ifilelast = totfiles-1
27  # if int(procid) == ranks_per_node:
28  # ifilefirst = int(procid) * nfiles_per_rank * ranks_per_node + irank * nfiles_per_rank
29  # ifilelast = ifilefirst + nfiles_per_rank - 1
30  # else:
31  # ifilefirst = totfiles - nfiles_last_batch
32  # ifilelast = totfiles - 1
33 
34  for irank in range(ranks_per_node):
35  iunivfirst = genie_batch_idx * totuniv + irank * nuniv_per_rank
36  iunivlast = iunivfirst + nuniv_per_rank - 1
37  cmd_base='/usr/bin/time -v cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/nersc/unfolding_macros/genie_ppfx/specprod_numuccinc.C {0}'.format(mode)
38 # cmd_base='cafe -bq --numuccinc /development/NDAna/numucc_inc/nersc/unfolding_macros/genie_ppfx/specprod_numuccinc.C {0}'.format(mode)
39  if 'genie' in mode:
40  icmd = cmd_base + " {0} {1} -1 -1 {2} {3}".format(iunivfirst, iunivlast, ifilefirst, ifilelast)
41  elif 'ppfx' in mode:
42  icmd = cmd_base + " -1 -1 {0} {1} {2} {3}".format(iunivfirst, iunivlast, ifilefirst, ifilelast)
43  else:
44  print('Mode {} not recognized. Must run in mode genie or ppfx'.format(mode))
45  sys.exit(-1)
46 
47  iname_cmd = "ProcID: {0} , Genie batch index: {1}, {2}".format(procid, genie_batch_idx, icmd)
48  cmd.append(icmd)
49  name_cmd.append(iname_cmd)
50 
51  return (name_cmd, cmd)
52 
53 class Consumer(multiprocessing.Process):
54  def __init__(self, task_queue, result_queue):
55  multiprocessing.Process.__init__(self)
56  self.task_queue = task_queue
57  self.result_queue = result_queue
58 
59  def run(self):
60  proc_name = self.name
61  while True:
62  next_task = self.task_queue.get()
63  if next_task is None:
64  print('{}: Exiting'.format(proc_name))
65  break
66  print('{}: {}'.format(proc_name, next_task))
67  answer = next_task()
68  self.result_queue.put(answer)
69  return
70 
71 class Task(object):
72  def __init__(self, a, b):
73  # a is for command name, b is for command
74  self.a = a
75  self.b = b
76 
77  def __call__(self):
78  print('Starting -- {} : {}'.format(self.a, self.b))
79  subprocess.call(self.b, shell=True)
80  print('Finishing -- {} : {}'.format(self.a, self.b))
81  return
82 
83  def __str__(self):
84  return '%s : %s' % (self.a, self.b)
85 
86 
87 def RunMultiProcess(nCMD, lCMD, num_consumers=0):
88  # Establish communication queues
89  tasks = multiprocessing.Queue()
90  results = multiprocessing.Queue()
91 
92  # Start consumers
93  if num_consumers == 0:
94  num_consumers = multiprocessing.cpu_count()
95  print('Creating {} consumers'.format(num_consumers))
96  consumers = [Consumer(tasks, results) for i in range(num_consumers)]
97  for w in consumers:
98  w.start()
99 
100  num_jobs = len(lCMD)
101  for i in range(num_jobs):
102  tasks.put(Task(nCMD[i], lCMD[i]))
103 
104  # Add a poison pill for each consumer
105  for i in range(num_consumers):
106  tasks.put(None)
107  # Start printing results
108  while num_jobs:
109  result = results.get()
110  print('Result: {}'.format(result))
111  num_jobs -= 1
112  return
113 
114 
115 if __name__ == "__main__":
116  parser = argparse.ArgumentParser(
117  prog='make_cafe_cmd',
118  description="""Form cafe cmd in shifter jobs. """,
119  epilog="Questions and Comments to dingpf@fnal.gov")
120 
121  # required arguments
122  parser.add_argument('--nnodes', default=1, type=int,
123  help="number of nodes;")
124  parser.add_argument('--nuniv_per_rank', default=5, type=int,
125  help="number of universes per rank;")
126  parser.add_argument('--nfiles_per_rank', default=100, type=int,
127  help="number of files per rank;")
128  # parser.add_argument('--nuniv-rank', default=20, type=int,
129  # help="number of universes per rank;")
130  parser.add_argument('--totuniv', default=1000, type=int,
131  help="total number of universes;")
132  parser.add_argument('--genie_batch_idx', default=0, type=int,
133  help="Batch index for Genie universes;")
134  parser.add_argument('--totfiles', default=56324, type=int,
135  help="total number of files in the full dataset;")
136  parser.add_argument('--nranks_per_node', default=25, type=int,
137  help="number of ranks per node;")
138  parser.add_argument('--mode', default='genie', type=str,
139  help='Mode in which to run the job (genie/ppfx)')
140  parser.add_argument('--test', default=False, action='store_true',
141  help='Just output the commands for each subprocess')
142 
143 
144  args = parser.parse_args()
145 
146  if args.nranks_per_node == -1:
147  nranks_per_node = multiprocessing.cpu_count()
148  else:
149  nranks_per_node = args.nranks_per_node
150 
151  if 'JOBSUBJOBID' in os.environ:
152  jsub_id = os.environ['JOBSUBJOBID']
153  jid = jsub_id.split('@')[0].split('.')[0]
154  procid = jsub_id.split('@')[0].split('.')[1]
155  else:
156  jid = os.environ["SLURM_JOB_ID"]
157  procid = os.environ["SLURM_PROCID"]
158 
159  (ncmd, cmd) = make_cmd(args.nnodes, nranks_per_node, procid, args.nuniv_per_rank, args.nfiles_per_rank, args.genie_batch_idx, args.totuniv, args.totfiles,
160  args.mode)
161 
162  for idx in range(len(cmd)):
163  print(ncmd[idx], cmd[idx])
164  if not args.test:
165  RunMultiProcess(ncmd, cmd, nranks_per_node)
166 
167  # subprocess.call("mkdir -p /output/{}/{}/{}".format(args.mode, jid, procid), shell=True)
168  # subprocess.call("mv /dev/shm/{}/{}/*.root /output/{}/{}/{}".format(jid, procid, args.mode, jid, procid), shell=True)
169  # subprocess.call("rm -rf /dev/shm/{}".format(jid), shell=True)
def make_cmd(nnodes, ranks_per_node, procid, nuniv_per_rank, nfiles_per_rank, genie_batch_idx, totuniv, totfiles)
void split(double tt, double *fr)
def __init__(self, task_queue, result_queue)
Definition: novas.h:112
bool print
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14
def RunMultiProcess(nCMD, lCMD, num_consumers=0)