map_rect_concurrent.hpp
Go to the documentation of this file.
1 #ifndef STAN_MATH_PRIM_MAT_FUNCTOR_MAP_RECT_CONCURRENT_HPP
2 #define STAN_MATH_PRIM_MAT_FUNCTOR_MAP_RECT_CONCURRENT_HPP
3 
5 
8 
9 #include <vector>
10 #include <thread>
11 #include <future>
12 #include <cstdlib>
13 
14 namespace stan {
15 namespace math {
16 namespace internal {
17 
18 /**
19  * Get number of threads to use for num_jobs jobs. The function uses
20  * the environment variable STAN_NUM_THREADS and follows these
21  * conventions:
22  *
23  * - STAN_NUM_THREADS is not defined or is not a number => num_threads=1
24  * - STAN_NUM_THREADS is positive => num_threads is set to the
25  * specified number
26  * - STAN_NUM_THREADS is set to -1 => num_threads is the number of
27  * available cores on the machine
28  * - STAN_NUM_THREADS < -1 => num_threads is 1
29  *
30  * Should num_threads exceed the number of jobs, then num_threads will
31  * be set equal to the number of jobs.
32  *
33  * @param num_jobs number of jobs
34  * @return number of threads to use
35  */
36 inline int get_num_threads(int num_jobs) {
37  int num_threads = 1;
38 #ifdef STAN_THREADS
39  const char* env_stan_num_threads = std::getenv("STAN_NUM_THREADS");
40  if (env_stan_num_threads != nullptr) {
41  const int env_num_threads = std::atoi(env_stan_num_threads);
42  if (env_num_threads > 0)
43  num_threads = env_num_threads;
44  else if (env_num_threads == -1)
45  num_threads = std::thread::hardware_concurrency();
46  // anything else will use 1 thread.
47  }
48  if (num_threads > num_jobs)
49  num_threads = num_jobs;
50 #endif
51  return num_threads;
52 }
53 
54 template <int call_id, typename F, typename T_shared_param,
55  typename T_job_param>
57  Eigen::Dynamic, 1>
59  const Eigen::Matrix<T_shared_param, Eigen::Dynamic, 1>& shared_params,
60  const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>&
61  job_params,
62  const std::vector<std::vector<double>>& x_r,
63  const std::vector<std::vector<int>>& x_i, std::ostream* msgs = nullptr) {
66 
67  const int num_jobs = job_params.size();
68  const vector_d shared_params_dbl = value_of(shared_params);
69  std::vector<std::future<std::vector<matrix_d>>> futures;
70 
71  auto execute_chunk = [&](int start, int size) -> std::vector<matrix_d> {
72  const int end = start + size;
73  std::vector<matrix_d> chunk_f_out;
74  chunk_f_out.reserve(size);
75  for (int i = start; i != end; i++)
76  chunk_f_out.push_back(ReduceF()(
77  shared_params_dbl, value_of(job_params[i]), x_r[i], x_i[i], msgs));
78  return chunk_f_out;
79  };
80 
81  int num_threads = get_num_threads(num_jobs);
82  int num_jobs_per_thread = num_jobs / num_threads;
83  futures.emplace_back(
84  std::async(std::launch::deferred, execute_chunk, 0, num_jobs_per_thread));
85 
86 #ifdef STAN_THREADS
87  if (num_threads > 1) {
88  const int num_big_threads
89  = (num_jobs - num_jobs_per_thread) % (num_threads - 1);
90  const int first_big_thread = num_threads - num_big_threads;
91  for (int i = 1, job_start = num_jobs_per_thread, job_size = 0;
92  i < num_threads; ++i, job_start += job_size) {
93  job_size = i >= first_big_thread ? num_jobs_per_thread + 1
94  : num_jobs_per_thread;
95  futures.emplace_back(
96  std::async(std::launch::async, execute_chunk, job_start, job_size));
97  }
98  }
99 #endif
100 
101  // collect results
102  std::vector<int> world_f_out;
103  world_f_out.reserve(num_jobs);
104  matrix_d world_output(0, 0);
105 
106  int offset = 0;
107  for (std::size_t i = 0; i < futures.size(); ++i) {
108  const std::vector<matrix_d>& chunk_result = futures[i].get();
109  if (i == 0)
110  world_output.resize(chunk_result[0].rows(),
111  num_jobs * chunk_result[0].cols());
112 
113  for (const auto& job_result : chunk_result) {
114  const int num_job_outputs = job_result.cols();
115  world_f_out.push_back(num_job_outputs);
116 
117  if (world_output.cols() < offset + num_job_outputs)
118  world_output.conservativeResize(Eigen::NoChange,
119  2 * (offset + num_job_outputs));
120 
121  world_output.block(0, offset, world_output.rows(), num_job_outputs)
122  = job_result;
123 
124  offset += num_job_outputs;
125  }
126  }
127  CombineF combine(shared_params, job_params);
128  return combine(world_output, world_f_out);
129 }
130 
131 } // namespace internal
132 } // namespace math
133 } // namespace stan
134 
135 #endif
#define F(x, y, z)
int rows(const Eigen::Matrix< T, R, C > &m)
Definition: rows.hpp:20
int get_num_threads(int num_jobs)
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Definition: typedefs.hpp:24
T value_of(const fvar< T > &v)
Definition: value_of.hpp:16
std::string getenv(std::string const &name)
Eigen::Matrix< typename stan::return_type< T_shared_param, T_job_param >::type, Eigen::Dynamic, 1 > map_rect_concurrent(const Eigen::Matrix< T_shared_param, Eigen::Dynamic, 1 > &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 >> &job_params, const std::vector< std::vector< double >> &x_r, const std::vector< std::vector< int >> &x_i, std::ostream *msgs=nullptr)
int cols(const Eigen::Matrix< T, R, C > &m)
Definition: cols.hpp:20
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Definition: typedefs.hpp:19
int size(const std::vector< T > &x)
Definition: size.hpp:17