run_unfold_genie_multiprocess.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import subprocess
4 import multiprocessing
5 import os
6 import argparse
7 def make_cmd(nnodes, ranks_per_node, procid, nuniv_per_rank, nfiles_per_rank, genie_batch_idx, totuniv, totfiles):
8 
9  cmd = []
10  name_cmd = []
11  covered = False
12  # ncmd = nuniv*nfiles
13  nfiles_last_batch = totfiles%nfiles_per_rank
14 
15  if nnodes > 1 and int(procid) == nnodes - 1:
16  ifilefirst = totfiles - nfiles_last_batch
17  ifilelast = ifilefirst + nfiles_last_batch -1
18  else:
19  ifilefirst = int(procid) * nfiles_per_rank
20  ifilelast = ifilefirst + nfiles_per_rank - 1
21  # if int(procid) == ranks_per_node:
22  # ifilefirst = int(procid) * nfiles_per_rank * ranks_per_node + irank * nfiles_per_rank
23  # ifilelast = ifilefirst + nfiles_per_rank - 1
24  # else:
25  # ifilefirst = totfiles - nfiles_last_batch
26  # ifilelast = totfiles - 1
27 
28  for irank in range(ranks_per_node):
29  iunivfirst = genie_batch_idx * totuniv + irank * nuniv_per_rank
30  iunivlast = iunivfirst + nuniv_per_rank - 1
31 
32  icmd = "cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C genie {} {} -1 -1 0 {} {}".format(
33  iunivfirst, iunivlast, ifilefirst, ifilelast)
34  iname_cmd = "ProcID: {} , Genie batch index: {}, cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C genie {} {} -1 -1 0 {} {}".format(
35  procid, genie_batch_idx, iunivfirst, iunivlast, ifilefirst, ifilelast)
36  cmd.append(icmd)
37  name_cmd.append(iname_cmd)
38 
39  return (name_cmd, cmd)
40 
41 class Consumer(multiprocessing.Process):
42  def __init__(self, task_queue, result_queue):
43  multiprocessing.Process.__init__(self)
44  self.task_queue = task_queue
45  self.result_queue = result_queue
46 
47  def run(self):
48  proc_name = self.name
49  while True:
50  next_task = self.task_queue.get()
51  if next_task is None:
52  print('{}: Exiting'.format(proc_name))
53  break
54  print('{}: {}'.format(proc_name, next_task))
55  answer = next_task()
56  self.result_queue.put(answer)
57  return
58 
59 class Task(object):
60  def __init__(self, a, b):
61  # a is for command name, b is for command
62  self.a = a
63  self.b = b
64 
65  def __call__(self):
66  print('Starting -- {} : {}'.format(self.a, self.b))
67  subprocess.call(self.b, shell=True)
68  print('Finishing -- {} : {}'.format(self.a, self.b))
69  return
70 
71  def __str__(self):
72  return '%s : %s' % (self.a, self.b)
73 
74 
75 def RunMultiProcess(nCMD, lCMD, num_consumers=0):
76  # Establish communication queues
77  tasks = multiprocessing.Queue()
78  results = multiprocessing.Queue()
79 
80  # Start consumers
81  if num_consumers == 0:
82  num_consumers = multiprocessing.cpu_count()
83  print('Creating {} consumers'.format(num_consumers))
84  consumers = [Consumer(tasks, results) for i in range(num_consumers)]
85  for w in consumers:
86  w.start()
87 
88  num_jobs = len(lCMD)
89  for i in range(num_jobs):
90  tasks.put(Task(nCMD[i], lCMD[i]))
91 
92  # Add a poison pill for each consumer
93  for i in range(num_consumers):
94  tasks.put(None)
95  # Start printing results
96  while num_jobs:
97  result = results.get()
98  print('Result: {}'.format(result))
99  num_jobs -= 1
100  return
101 
102 
103 if __name__ == "__main__":
104  parser = argparse.ArgumentParser(
105  prog='make_cafe_cmd',
106  description="""Form cafe cmd in shifter jobs. """,
107  epilog="Questions and Comments to dingpf@fnal.gov")
108 
109  # required arguments
110  parser.add_argument('--nnodes', default=1, type=int,
111  help="number of nodes;")
112  parser.add_argument('--nuniv_per_rank', default=5, type=int,
113  help="number of universes per rank;")
114  parser.add_argument('--nfiles_per_rank', default=100, type=int,
115  help="number of files per rank;")
116  # parser.add_argument('--nuniv-rank', default=20, type=int,
117  # help="number of universes per rank;")
118  parser.add_argument('--totuniv', default=1000, type=int,
119  help="total number of universes;")
120  parser.add_argument('--genie_batch_idx', default=0, type=int,
121  help="Batch index for Genie universes;")
122  parser.add_argument('--totfiles', default=56324, type=int,
123  help="total number of files;")
124  parser.add_argument('--nranks_per_node', default=25, type=int,
125  help="number of ranks per node;")
126 
127  args = parser.parse_args()
128 
129  if args.nranks_per_node == -1:
130  nranks_per_node = multiprocessing.cpu_count()
131  else:
132  nranks_per_node = args.nranks_per_node
133 
134  if 'JOBSUBJOBID' in os.environ:
135  jsub_id = os.environ['JOBSUBJOBID']
136  jid = jsub_id.split('@')[0].split('.')[0]
137  procid = jsub_id.split('@')[0].split('.')[1]
138  else:
139  jid = os.environ["SLURM_JOB_ID"]
140  procid = os.environ["SLURM_PROCID"]
141 
142  (ncmd, cmd) = make_cmd(args.nnodes, nranks_per_node, procid, args.nuniv_per_rank, args.nfiles_per_rank, args.genie_batch_idx, args.totuniv, args.totfiles)
143 
144  for idx in range(len(cmd)):
145  print(ncmd[idx], cmd[idx])
146  RunMultiProcess(ncmd, cmd, nranks_per_node)
147 
148  # subprocess.call("mkdir -p /output/{}/{}/{}".format(args.mode, jid, procid), shell=True)
149  # subprocess.call("mv /dev/shm/{}/{}/*.root /output/{}/{}/{}".format(jid, procid, args.mode, jid, procid), shell=True)
150  # subprocess.call("rm -rf /dev/shm/{}".format(jid), shell=True)
def make_cmd(nnodes, ranks_per_node, procid, nuniv_per_rank, nfiles_per_rank, genie_batch_idx, totuniv, totfiles)
void split(double tt, double *fr)
def __init__(self, task_queue, result_queue)
Definition: novas.h:112
bool print
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14
def RunMultiProcess(nCMD, lCMD, num_consumers=0)