7 def make_cmd(nnodes, ranks_per_node, procid, nfiles_per_rank, systs_batch_idx, idx_first_file, totfiles):
12 nfiles_last_batch = totfiles%nfiles_per_rank
21 for irank
in range(ranks_per_node):
22 if nnodes > 1
and int(procid) == nnodes - 1
and irank == ranks_per_node - 1:
23 ifilefirst = totfiles - nfiles_last_batch + idx_first_file
24 ifilelast = ifilefirst + nfiles_last_batch -1
26 ifilefirst =
int(procid) * nfiles_per_rank * ranks_per_node + irank * nfiles_per_rank + idx_first_file
27 ifilelast = ifilefirst + nfiles_per_rank - 1
29 icmd =
"cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C systs -1 -1 -1 -1 1 {} {}".
format(
30 ifilefirst, ifilelast)
31 iname_cmd =
"ProcID: {} , Systs batch index: {}, cafe -bq -nr --numuccinc /development/NDAna/numucc_inc/specprod_numuccinc.C systs -1 -1 -1 -1 1 {} {}".
format(
32 procid, systs_batch_idx, ifilefirst, ifilelast)
34 name_cmd.append(iname_cmd)
36 return (name_cmd, cmd)
40 multiprocessing.Process.__init__(self)
47 next_task = self.task_queue.get()
53 self.result_queue.put(answer)
64 subprocess.call(self.
b, shell=
True)
69 return '%s : %s' % (self.
a, self.
b)
74 tasks = multiprocessing.Queue()
75 results = multiprocessing.Queue()
78 if num_consumers == 0:
79 num_consumers = multiprocessing.cpu_count()
80 print(
'Creating {} consumers'.
format(num_consumers))
81 consumers = [
Consumer(tasks, results)
for i
in range(num_consumers)]
86 for i
in range(num_jobs):
87 tasks.put(
Task(nCMD[i], lCMD[i]))
90 for i
in range(num_consumers):
94 result = results.get()
100 if __name__ ==
"__main__":
101 parser = argparse.ArgumentParser(
102 prog=
'make_cafe_cmd',
103 description=
"""Form cafe cmd in shifter jobs. """,
104 epilog=
"Questions and Comments to dingpf@fnal.gov")
107 parser.add_argument(
'--nnodes', default=1, type=int,
108 help=
"number of nodes;")
109 parser.add_argument(
'--nfiles_per_rank', default=100, type=int,
110 help=
"number of files per rank;")
111 parser.add_argument(
'--systs_batch_idx', default=0, type=int,
112 help=
"Batch index for Systs universes;")
113 parser.add_argument(
'--idx_first_file', default=0, type=int,
114 help=
"index of the first file to run over;")
115 parser.add_argument(
'--totfiles', default=56324, type=int,
116 help=
"total number of files;")
117 parser.add_argument(
'--nranks_per_node', default=25, type=int,
118 help=
"number of ranks per node;")
120 args = parser.parse_args()
122 if args.nranks_per_node == -1:
123 nranks_per_node = multiprocessing.cpu_count()
125 nranks_per_node = args.nranks_per_node
127 if 'JOBSUBJOBID' in os.environ:
128 jsub_id = os.environ[
'JOBSUBJOBID']
129 jid = jsub_id.split(
'@')[0].
split(
'.')[0]
130 procid = jsub_id.split(
'@')[0].
split(
'.')[1]
132 jid = os.environ[
"SLURM_JOB_ID"]
133 procid = os.environ[
"SLURM_PROCID"]
135 (ncmd, cmd) =
make_cmd(args.nnodes, nranks_per_node, procid, args.nfiles_per_rank, args.systs_batch_idx, args.idx_first_file, args.totfiles)
137 for idx
in range(len(cmd)):
138 print(ncmd[idx], cmd[idx])
void split(double tt, double *fr)
def RunMultiProcess(nCMD, lCMD, num_consumers=0)
def __init__(self, task_queue, result_queue)
def make_cmd(nnodes, ranks_per_node, procid, nfiles_per_rank, systs_batch_idx, idx_first_file, totfiles)
std::string format(const int32_t &value, const int &ndigits=8)