run_unfold_systs_multiprocess.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import subprocess
4 import multiprocessing
5 import os
6 import argparse
7 def make_cmd(nnodes, ranks_per_node, procid, nfiles_per_rank, systs_batch_idx, idx_first_file, totfiles):
8 
9  cmd = []
10  name_cmd = []
11  covered = False
12  nfiles_last_batch = totfiles%nfiles_per_rank
13 
14  # if int(procid) == ranks_per_node:
15  # ifilefirst = int(procid) * nfiles_per_rank * ranks_per_node + irank * nfiles_per_rank
16  # ifilelast = ifilefirst + nfiles_per_rank - 1
17  # else:
18  # ifilefirst = totfiles - nfiles_last_batch
19  # ifilelast = totfiles - 1
20 
21  for irank in range(ranks_per_node):
22  if nnodes > 1 and int(procid) == nnodes - 1 and irank == ranks_per_node - 1:
23  ifilefirst = totfiles - nfiles_last_batch + idx_first_file
24  ifilelast = ifilefirst + nfiles_last_batch -1
25  else:
26  ifilefirst = int(procid) * nfiles_per_rank * ranks_per_node + irank * nfiles_per_rank + idx_first_file
27  ifilelast = ifilefirst + nfiles_per_rank - 1
28 
29  icmd = "cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C systs -1 -1 -1 -1 1 {} {}".format(
30  ifilefirst, ifilelast)
31  iname_cmd = "ProcID: {} , Systs batch index: {}, cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C systs -1 -1 -1 -1 1 {} {}".format(
32  procid, systs_batch_idx, ifilefirst, ifilelast)
33  cmd.append(icmd)
34  name_cmd.append(iname_cmd)
35 
36  return (name_cmd, cmd)
37 
38 class Consumer(multiprocessing.Process):
39  def __init__(self, task_queue, result_queue):
40  multiprocessing.Process.__init__(self)
41  self.task_queue = task_queue
42  self.result_queue = result_queue
43 
44  def run(self):
45  proc_name = self.name
46  while True:
47  next_task = self.task_queue.get()
48  if next_task is None:
49  print('{}: Exiting'.format(proc_name))
50  break
51  print('{}: {}'.format(proc_name, next_task))
52  answer = next_task()
53  self.result_queue.put(answer)
54  return
55 
56 class Task(object):
57  def __init__(self, a, b):
58  # a is for command name, b is for command
59  self.a = a
60  self.b = b
61 
62  def __call__(self):
63  print('Starting -- {} : {}'.format(self.a, self.b))
64  subprocess.call(self.b, shell=True)
65  print('Finishing -- {} : {}'.format(self.a, self.b))
66  return
67 
68  def __str__(self):
69  return '%s : %s' % (self.a, self.b)
70 
71 
72 def RunMultiProcess(nCMD, lCMD, num_consumers=0):
73  # Establish communication queues
74  tasks = multiprocessing.Queue()
75  results = multiprocessing.Queue()
76 
77  # Start consumers
78  if num_consumers == 0:
79  num_consumers = multiprocessing.cpu_count()
80  print('Creating {} consumers'.format(num_consumers))
81  consumers = [Consumer(tasks, results) for i in range(num_consumers)]
82  for w in consumers:
83  w.start()
84 
85  num_jobs = len(lCMD)
86  for i in range(num_jobs):
87  tasks.put(Task(nCMD[i], lCMD[i]))
88 
89  # Add a poison pill for each consumer
90  for i in range(num_consumers):
91  tasks.put(None)
92  # Start printing results
93  while num_jobs:
94  result = results.get()
95  print('Result: {}'.format(result))
96  num_jobs -= 1
97  return
98 
99 
100 if __name__ == "__main__":
101  parser = argparse.ArgumentParser(
102  prog='make_cafe_cmd',
103  description="""Form cafe cmd in shifter jobs. """,
104  epilog="Questions and Comments to dingpf@fnal.gov")
105 
106  # required arguments
107  parser.add_argument('--nnodes', default=1, type=int,
108  help="number of nodes;")
109  parser.add_argument('--nfiles_per_rank', default=100, type=int,
110  help="number of files per rank;")
111  parser.add_argument('--systs_batch_idx', default=0, type=int,
112  help="Batch index for Systs universes;")
113  parser.add_argument('--idx_first_file', default=0, type=int,
114  help="index of the first file to run over;")
115  parser.add_argument('--totfiles', default=56324, type=int,
116  help="total number of files;")
117  parser.add_argument('--nranks_per_node', default=25, type=int,
118  help="number of ranks per node;")
119 
120  args = parser.parse_args()
121 
122  if args.nranks_per_node == -1:
123  nranks_per_node = multiprocessing.cpu_count()
124  else:
125  nranks_per_node = args.nranks_per_node
126 
127  if 'JOBSUBJOBID' in os.environ:
128  jsub_id = os.environ['JOBSUBJOBID']
129  jid = jsub_id.split('@')[0].split('.')[0]
130  procid = jsub_id.split('@')[0].split('.')[1]
131  else:
132  jid = os.environ["SLURM_JOB_ID"]
133  procid = os.environ["SLURM_PROCID"]
134 
135  (ncmd, cmd) = make_cmd(args.nnodes, nranks_per_node, procid, args.nfiles_per_rank, args.systs_batch_idx, args.idx_first_file, args.totfiles)
136 
137  for idx in range(len(cmd)):
138  print(ncmd[idx], cmd[idx])
139  RunMultiProcess(ncmd, cmd, nranks_per_node)
140 
141  # subprocess.call("mkdir -p /output/{}/{}/{}".format(args.mode, jid, procid), shell=True)
142  # subprocess.call("mv /dev/shm/{}/{}/*.root /output/{}/{}/{}".format(jid, procid, args.mode, jid, procid), shell=True)
143  # subprocess.call("rm -rf /dev/shm/{}".format(jid), shell=True)
void split(double tt, double *fr)
def RunMultiProcess(nCMD, lCMD, num_consumers=0)
def __init__(self, task_queue, result_queue)
Definition: novas.h:112
def make_cmd(nnodes, ranks_per_node, procid, nfiles_per_rank, systs_batch_idx, idx_first_file, totfiles)
bool print
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14