Classes | Functions
stan::math::internal Namespace Reference

Classes

struct  bounded
 
struct  bounded< T_y, T_low, T_high, true >
 
class  broadcast_array
 
class  empty_broadcast_array
 
class  empty_broadcast_array< ViewElt, Eigen::Matrix< OpElt, R, C > >
 
class  falling_factorial_vd_vari
 
class  map_rect_combine
 
class  map_rect_reduce
 
class  map_rect_reduce< F, double, double >
 
struct  map_rect_reduce< F, double, var >
 
struct  map_rect_reduce< F, var, double >
 
struct  map_rect_reduce< F, var, var >
 
class  ops_partials_edge
 
class  ops_partials_edge< double, Eigen::Matrix< var, R, C > >
 
class  ops_partials_edge< double, std::vector< Eigen::Matrix< var, R, C > > >
 
class  ops_partials_edge< double, std::vector< std::vector< var > > >
 
class  ops_partials_edge< double, std::vector< var > >
 
class  ops_partials_edge< double, var >
 
class  ops_partials_edge< Dx, Eigen::Matrix< fvar< Dx >, R, C > >
 
class  ops_partials_edge< Dx, fvar< Dx > >
 
class  ops_partials_edge< Dx, std::vector< Eigen::Matrix< fvar< Dx >, R, C > > >
 
class  ops_partials_edge< Dx, std::vector< fvar< Dx > > >
 
class  ops_partials_edge< Dx, std::vector< std::vector< fvar< Dx > > > >
 
class  ops_partials_edge< ViewElt, Eigen::Matrix< Op, R, C > >
 
class  ops_partials_edge< ViewElt, std::vector< Eigen::Matrix< Op, R, C > > >
 
class  ops_partials_edge< ViewElt, std::vector< std::vector< Op > > >
 
class  rising_factorial_vd_vari
 

Functions

int get_num_threads (int num_jobs)
 
template<int call_id, typename F , typename T_shared_param , typename T_job_param >
Eigen::Matrix< typename stan::return_type< T_shared_param, T_job_param >::type, Eigen::Dynamic, 1 > map_rect_concurrent (const Eigen::Matrix< T_shared_param, Eigen::Dynamic, 1 > &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 >> &job_params, const std::vector< std::vector< double >> &x_r, const std::vector< std::vector< int >> &x_i, std::ostream *msgs=nullptr)
 

Function Documentation

int stan::math::internal::get_num_threads ( int  num_jobs)
inline

Get number of threads to use for num_jobs jobs. The function uses the environment variable STAN_NUM_THREADS and follows these conventions:

  • STAN_NUM_THREADS is not defined or is not a number => num_threads=1
  • STAN_NUM_THREADS is positive => num_threads is set to the specified number
  • STAN_NUM_THREADS is set to -1 => num_threads is the number of available cores on the machine
  • STAN_NUM_THREADS < -1 => num_threads is 1

Should num_threads exceed the number of jobs, then num_threads will be set equal to the number of jobs.

Parameters
num_jobsnumber of jobs
Returns
number of threads to use

Definition at line 36 of file map_rect_concurrent.hpp.

References F, cet::getenv(), and generate_CCQE_events::num_threads.

Referenced by map_rect_concurrent().

36  {
37  int num_threads = 1;
38 #ifdef STAN_THREADS
39  const char* env_stan_num_threads = std::getenv("STAN_NUM_THREADS");
40  if (env_stan_num_threads != nullptr) {
41  const int env_num_threads = std::atoi(env_stan_num_threads);
42  if (env_num_threads > 0)
43  num_threads = env_num_threads;
44  else if (env_num_threads == -1)
45  num_threads = std::thread::hardware_concurrency();
46  // anything else will use 1 thread.
47  }
48  if (num_threads > num_jobs)
49  num_threads = num_jobs;
50 #endif
51  return num_threads;
52 }
std::string getenv(std::string const &name)
template<int call_id, typename F , typename T_shared_param , typename T_job_param >
Eigen::Matrix<typename stan::return_type<T_shared_param, T_job_param>::type, Eigen::Dynamic, 1> stan::math::internal::map_rect_concurrent ( const Eigen::Matrix< T_shared_param, Eigen::Dynamic, 1 > &  shared_params,
const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 >> &  job_params,
const std::vector< std::vector< double >> &  x_r,
const std::vector< std::vector< int >> &  x_i,
std::ostream *  msgs = nullptr 
)

Definition at line 58 of file map_rect_concurrent.hpp.

References stan::math::cols(), febshutoff_auto::end, get_num_threads(), MECModelEnuComparisons::i, lem_server::msgs, generate_CCQE_events::num_threads, PandAna.reco_validation.add_data::offset, stan::math::rows(), stan::math::size(), febshutoff_auto::start, and stan::math::value_of().

63  {
64  typedef map_rect_reduce<F, T_shared_param, T_job_param> ReduceF;
65  typedef map_rect_combine<F, T_shared_param, T_job_param> CombineF;
66 
67  const int num_jobs = job_params.size();
68  const vector_d shared_params_dbl = value_of(shared_params);
69  std::vector<std::future<std::vector<matrix_d>>> futures;
70 
71  auto execute_chunk = [&](int start, int size) -> std::vector<matrix_d> {
72  const int end = start + size;
73  std::vector<matrix_d> chunk_f_out;
74  chunk_f_out.reserve(size);
75  for (int i = start; i != end; i++)
76  chunk_f_out.push_back(ReduceF()(
77  shared_params_dbl, value_of(job_params[i]), x_r[i], x_i[i], msgs));
78  return chunk_f_out;
79  };
80 
81  int num_threads = get_num_threads(num_jobs);
82  int num_jobs_per_thread = num_jobs / num_threads;
83  futures.emplace_back(
84  std::async(std::launch::deferred, execute_chunk, 0, num_jobs_per_thread));
85 
86 #ifdef STAN_THREADS
87  if (num_threads > 1) {
88  const int num_big_threads
89  = (num_jobs - num_jobs_per_thread) % (num_threads - 1);
90  const int first_big_thread = num_threads - num_big_threads;
91  for (int i = 1, job_start = num_jobs_per_thread, job_size = 0;
92  i < num_threads; ++i, job_start += job_size) {
93  job_size = i >= first_big_thread ? num_jobs_per_thread + 1
94  : num_jobs_per_thread;
95  futures.emplace_back(
96  std::async(std::launch::async, execute_chunk, job_start, job_size));
97  }
98  }
99 #endif
100 
101  // collect results
102  std::vector<int> world_f_out;
103  world_f_out.reserve(num_jobs);
104  matrix_d world_output(0, 0);
105 
106  int offset = 0;
107  for (std::size_t i = 0; i < futures.size(); ++i) {
108  const std::vector<matrix_d>& chunk_result = futures[i].get();
109  if (i == 0)
110  world_output.resize(chunk_result[0].rows(),
111  num_jobs * chunk_result[0].cols());
112 
113  for (const auto& job_result : chunk_result) {
114  const int num_job_outputs = job_result.cols();
115  world_f_out.push_back(num_job_outputs);
116 
117  if (world_output.cols() < offset + num_job_outputs)
118  world_output.conservativeResize(Eigen::NoChange,
119  2 * (offset + num_job_outputs));
120 
121  world_output.block(0, offset, world_output.rows(), num_job_outputs)
122  = job_result;
123 
124  offset += num_job_outputs;
125  }
126  }
127  CombineF combine(shared_params, job_params);
128  return combine(world_output, world_f_out);
129 }
int get_num_threads(int num_jobs)
T value_of(const fvar< T > &v)
Definition: value_of.hpp:16
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Definition: newton.hpp:14
const int cols[3]
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Definition: newton.hpp:15