7 def make_cmd(nnodes, ranks_per_node, procid, nuniv_per_rank, nfiles_per_rank, ppfx_batch_idx, totuniv, totfiles):
13 nfiles_last_batch = totfiles%nfiles_per_rank
15 if nnodes > 1
and int(procid) == nnodes - 1:
16 ifilefirst = totfiles - nfiles_last_batch
17 ifilelast = ifilefirst + nfiles_last_batch -1
19 ifilefirst =
int(procid) * nfiles_per_rank
20 ifilelast = ifilefirst + nfiles_per_rank - 1
22 for irank
in range(ranks_per_node):
23 iunivfirst = ppfx_batch_idx * totuniv + irank * nuniv_per_rank
24 iunivlast = iunivfirst + nuniv_per_rank - 1
26 icmd =
"cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C ppfx -1 -1 {} {} 0 {} {}".
format(
27 iunivfirst, iunivlast, ifilefirst, ifilelast)
28 iname_cmd =
"ProcID: {} , Ppfx batch index: {}, cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C ppfx -1 -1 {} {} 0 {} {}".
format(
29 procid, ppfx_batch_idx, iunivfirst, iunivlast, ifilefirst, ifilelast)
31 name_cmd.append(iname_cmd)
33 return (name_cmd, cmd)
37 multiprocessing.Process.__init__(self)
44 next_task = self.task_queue.get()
50 self.result_queue.put(answer)
61 subprocess.call(self.
b, shell=
True)
66 return '%s : %s' % (self.
a, self.
b)
71 tasks = multiprocessing.Queue()
72 results = multiprocessing.Queue()
75 if num_consumers == 0:
76 num_consumers = multiprocessing.cpu_count()
77 print(
'Creating {} consumers'.
format(num_consumers))
78 consumers = [
Consumer(tasks, results)
for i
in range(num_consumers)]
83 for i
in range(num_jobs):
84 tasks.put(
Task(nCMD[i], lCMD[i]))
87 for i
in range(num_consumers):
91 result = results.get()
97 if __name__ ==
"__main__":
98 parser = argparse.ArgumentParser(
100 description=
"""Form cafe cmd in shifter jobs. """,
101 epilog=
"Questions and Comments to dingpf@fnal.gov")
104 parser.add_argument(
'--nnodes', default=1, type=int,
105 help=
"number of nodes;")
106 parser.add_argument(
'--nuniv_per_rank', default=5, type=int,
107 help=
"number of universes per rank;")
108 parser.add_argument(
'--nfiles_per_rank', default=100, type=int,
109 help=
"number of files per rank;")
112 parser.add_argument(
'--totuniv', default=1000, type=int,
113 help=
"total number of universes;")
114 parser.add_argument(
'--ppfx_batch_idx', default=0, type=int,
115 help=
"Batch index for Ppfx universes;")
116 parser.add_argument(
'--totfiles', default=56324, type=int,
117 help=
"total number of files;")
118 parser.add_argument(
'--nranks_per_node', default=25, type=int,
119 help=
"number of ranks per node;")
121 args = parser.parse_args()
123 if args.nranks_per_node == -1:
124 nranks_per_node = multiprocessing.cpu_count()
126 nranks_per_node = args.nranks_per_node
128 if 'JOBSUBJOBID' in os.environ:
129 jsub_id = os.environ[
'JOBSUBJOBID']
130 jid = jsub_id.split(
'@')[0].
split(
'.')[0]
131 procid = jsub_id.split(
'@')[0].
split(
'.')[1]
133 jid = os.environ[
"SLURM_JOB_ID"]
134 procid = os.environ[
"SLURM_PROCID"]
136 (ncmd, cmd) =
make_cmd(args.nnodes, nranks_per_node, procid, args.nuniv_per_rank, args.nfiles_per_rank, args.ppfx_batch_idx, args.totuniv, args.totfiles)
138 for idx
in range(len(cmd)):
139 print(ncmd[idx], cmd[idx])
void split(double tt, double *fr)
std::string format(const int32_t &value, const int &ndigits=8)
def RunMultiProcess(nCMD, lCMD, num_consumers=0)
def make_cmd(nnodes, ranks_per_node, procid, nuniv_per_rank, nfiles_per_rank, ppfx_batch_idx, totuniv, totfiles)
def __init__(self, task_queue, result_queue)