7 def make_cmd(nnodes, ranks_per_node, procid, nuniv_per_rank, nfiles_per_rank, genie_batch_idx, totuniv, totfiles):
13 nfiles_last_batch = totfiles%nfiles_per_rank
15 if nnodes > 1
and int(procid) == nnodes - 1:
16 ifilefirst = totfiles - nfiles_last_batch
17 ifilelast = ifilefirst + nfiles_last_batch -1
19 ifilefirst =
int(procid) * nfiles_per_rank
20 ifilelast = ifilefirst + nfiles_per_rank - 1
28 for irank
in range(ranks_per_node):
29 iunivfirst = genie_batch_idx * totuniv + irank * nuniv_per_rank
30 iunivlast = iunivfirst + nuniv_per_rank - 1
32 icmd =
"cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C genie {} {} -1 -1 0 {} {}".
format(
33 iunivfirst, iunivlast, ifilefirst, ifilelast)
34 iname_cmd =
"ProcID: {} , Genie batch index: {}, cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C genie {} {} -1 -1 0 {} {}".
format(
35 procid, genie_batch_idx, iunivfirst, iunivlast, ifilefirst, ifilelast)
37 name_cmd.append(iname_cmd)
39 return (name_cmd, cmd)
43 multiprocessing.Process.__init__(self)
50 next_task = self.task_queue.get()
56 self.result_queue.put(answer)
67 subprocess.call(self.
b, shell=
True)
72 return '%s : %s' % (self.
a, self.
b)
77 tasks = multiprocessing.Queue()
78 results = multiprocessing.Queue()
81 if num_consumers == 0:
82 num_consumers = multiprocessing.cpu_count()
83 print(
'Creating {} consumers'.
format(num_consumers))
84 consumers = [
Consumer(tasks, results)
for i
in range(num_consumers)]
89 for i
in range(num_jobs):
90 tasks.put(
Task(nCMD[i], lCMD[i]))
93 for i
in range(num_consumers):
97 result = results.get()
103 if __name__ ==
"__main__":
104 parser = argparse.ArgumentParser(
105 prog=
'make_cafe_cmd',
106 description=
"""Form cafe cmd in shifter jobs. """,
107 epilog=
"Questions and Comments to dingpf@fnal.gov")
110 parser.add_argument(
'--nnodes', default=1, type=int,
111 help=
"number of nodes;")
112 parser.add_argument(
'--nuniv_per_rank', default=5, type=int,
113 help=
"number of universes per rank;")
114 parser.add_argument(
'--nfiles_per_rank', default=100, type=int,
115 help=
"number of files per rank;")
118 parser.add_argument(
'--totuniv', default=1000, type=int,
119 help=
"total number of universes;")
120 parser.add_argument(
'--genie_batch_idx', default=0, type=int,
121 help=
"Batch index for Genie universes;")
122 parser.add_argument(
'--totfiles', default=56324, type=int,
123 help=
"total number of files;")
124 parser.add_argument(
'--nranks_per_node', default=25, type=int,
125 help=
"number of ranks per node;")
127 args = parser.parse_args()
129 if args.nranks_per_node == -1:
130 nranks_per_node = multiprocessing.cpu_count()
132 nranks_per_node = args.nranks_per_node
134 if 'JOBSUBJOBID' in os.environ:
135 jsub_id = os.environ[
'JOBSUBJOBID']
136 jid = jsub_id.split(
'@')[0].
split(
'.')[0]
137 procid = jsub_id.split(
'@')[0].
split(
'.')[1]
139 jid = os.environ[
"SLURM_JOB_ID"]
140 procid = os.environ[
"SLURM_PROCID"]
142 (ncmd, cmd) =
make_cmd(args.nnodes, nranks_per_node, procid, args.nuniv_per_rank, args.nfiles_per_rank, args.genie_batch_idx, args.totuniv, args.totfiles)
144 for idx
in range(len(cmd)):
145 print(ncmd[idx], cmd[idx])
def make_cmd(nnodes, ranks_per_node, procid, nuniv_per_rank, nfiles_per_rank, genie_batch_idx, totuniv, totfiles)
void split(double tt, double *fr)
def __init__(self, task_queue, result_queue)
std::string format(const int32_t &value, const int &ndigits=8)
def RunMultiProcess(nCMD, lCMD, num_consumers=0)