Classes | Public Types | Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
ana::ThreadPool Class Reference

A very simple thread pool for use by Surface. More...

#include "/cvmfs/nova.opensciencegrid.org/externals/cafanacore/v01.11/src/CAFAna/Core/ThreadPool.h"

Classes

class  TaskQueue
 
class  ThreadsafeProgress
 

Public Types

typedef std::function< void(void)> func_t
 The type of the user's worker functions. More...
 

Public Member Functions

 ThreadPool (unsigned int maxThreads=0)
 
virtual ~ThreadPool ()
 
void ShowProgress (const std::string &title)
 
void AddTask (const func_t &func)
 
void Finish ()
 Wait for all threads to complete before returning. More...
 
int NThreads () const
 

Protected Member Functions

void WorkerFunc ()
 

Protected Attributes

unsigned int fMaxThreads
 
TaskQueue fTasks
 
std::vector< std::thread > fThreads
 All threads we ever created. More...
 
std::atomic< unsigned intfNumLiveThreads
 Number of threads that are running. More...
 
std::atomic< intfTasksCompleted
 
std::atomic< intfTotalTasks
 How many tasks have we ever seen? More...
 
ThreadsafeProgressfProgress
 

Detailed Description

A very simple thread pool for use by Surface.

With great power comes great responsibility. Use caution on the grid or on shared interactive machines before you get yelled at or banned.

Definition at line 18 of file ThreadPool.h.

Member Typedef Documentation

typedef std::function<void(void)> ana::ThreadPool::func_t

The type of the user's worker functions.

Use a lambda function or std::bind etc to pass arguments

Definition at line 33 of file ThreadPool.h.

Constructor & Destructor Documentation

ana::ThreadPool::ThreadPool ( unsigned int  maxThreads = 0)
explicit
Parameters
maxThreadsMaximum number of threads to use at one time. If unspecified, uses number of cores in machine. Use great caution on the grid or on shared interactive machines.

Definition at line 26 of file ThreadPool.cxx.

References fMaxThreads.

27  : fMaxThreads(maxThreads), fNumLiveThreads(0),
29  {
30  if(maxThreads == 0){
31  fMaxThreads = sysconf(_SC_NPROCESSORS_ONLN);
32  }
33  }
std::atomic< int > fTasksCompleted
Definition: ThreadPool.h:75
std::atomic< unsigned int > fNumLiveThreads
Number of threads that are running.
Definition: ThreadPool.h:73
std::atomic< int > fTotalTasks
How many tasks have we ever seen?
Definition: ThreadPool.h:76
unsigned int fMaxThreads
Definition: ThreadPool.h:45
ThreadsafeProgress * fProgress
Definition: ThreadPool.h:78
ana::ThreadPool::~ThreadPool ( )
virtual

Definition at line 36 of file ThreadPool.cxx.

References Finish().

37  {
38  // This is safe, even if it's already been explicitly called
39  Finish();
40  }
void Finish()
Wait for all threads to complete before returning.
Definition: ThreadPool.cxx:50

Member Function Documentation

void ana::ThreadPool::AddTask ( const func_t func)

Definition at line 89 of file ThreadPool.cxx.

References ana::ThreadPool::TaskQueue::add(), fMaxThreads, fNumLiveThreads, fTasks, fThreads, fTotalTasks, and WorkerFunc().

Referenced by ana::FrequentistSurface::FillSurface().

90  {
91  fTasks.add(func);
92 
93  ++fTotalTasks;
94 
96  fThreads.emplace_back([this](){WorkerFunc();});
98  }
99  }
TaskQueue fTasks
Definition: ThreadPool.h:70
double func(double x, double y)
std::atomic< unsigned int > fNumLiveThreads
Number of threads that are running.
Definition: ThreadPool.h:73
std::vector< std::thread > fThreads
All threads we ever created.
Definition: ThreadPool.h:72
std::atomic< int > fTotalTasks
How many tasks have we ever seen?
Definition: ThreadPool.h:76
void add(const func_t &x)
Definition: ThreadPool.h:50
unsigned int fMaxThreads
Definition: ThreadPool.h:45
void ana::ThreadPool::Finish ( )

Wait for all threads to complete before returning.

Definition at line 50 of file ThreadPool.cxx.

References ana::assert(), fNumLiveThreads, fProgress, and fThreads.

Referenced by ana::FrequentistSurface::FillSurface(), and ~ThreadPool().

51  {
52  // TODO switch to std::jthread once we are on C++20
53  for(std::thread& th: fThreads) th.join();
54 
55  assert(fNumLiveThreads == 0);
56 
57  fThreads.clear(); // Make it safe to call a second time
58 
59  if(fProgress){
60  delete fProgress;
61  fProgress = 0;
62  }
63  }
std::atomic< unsigned int > fNumLiveThreads
Number of threads that are running.
Definition: ThreadPool.h:73
std::vector< std::thread > fThreads
All threads we ever created.
Definition: ThreadPool.h:72
assert(nhit_max >=nhit_nbins)
ThreadsafeProgress * fProgress
Definition: ThreadPool.h:78
int ana::ThreadPool::NThreads ( ) const
inline

Definition at line 40 of file ThreadPool.h.

References fMaxThreads, and WorkerFunc().

Referenced by ana::FrequentistSurface::FillSurface().

40 {return fMaxThreads;}
unsigned int fMaxThreads
Definition: ThreadPool.h:45
void ana::ThreadPool::ShowProgress ( const std::string title)

Definition at line 43 of file ThreadPool.cxx.

References fProgress.

Referenced by ana::FrequentistSurface::FillSurface().

44  {
45  if(fProgress) delete fProgress;
46  fProgress = new ThreadsafeProgress(title);
47  }
ThreadsafeProgress * fProgress
Definition: ThreadPool.h:78
void ana::ThreadPool::WorkerFunc ( )
protected

Definition at line 66 of file ThreadPool.cxx.

References ana::ThreadPool::TaskQueue::consume(), fNumLiveThreads, fProgress, fTasks, fTasksCompleted, fTotalTasks, and ana::ThreadPool::ThreadsafeProgress::SetProgress().

Referenced by AddTask(), and NThreads().

67  {
68  while(true){
69  func_t task = fTasks.consume();
70 
71  if(!task){
72  // Nothing left to do, commit suicide
74  return;
75  }
76 
77  // Actually do the user's work
78  task();
79 
81 
82  if(fProgress){
84  }
85  }
86  }
func_t consume()
Null if the queue is empty.
Definition: ThreadPool.h:57
std::function< void(void)> func_t
The type of the user&#39;s worker functions.
Definition: ThreadPool.h:33
std::atomic< int > fTasksCompleted
Definition: ThreadPool.h:75
TaskQueue fTasks
Definition: ThreadPool.h:70
std::atomic< unsigned int > fNumLiveThreads
Number of threads that are running.
Definition: ThreadPool.h:73
std::atomic< int > fTotalTasks
How many tasks have we ever seen?
Definition: ThreadPool.h:76
ThreadsafeProgress * fProgress
Definition: ThreadPool.h:78

Member Data Documentation

unsigned int ana::ThreadPool::fMaxThreads
protected

Definition at line 45 of file ThreadPool.h.

Referenced by AddTask(), NThreads(), and ThreadPool().

std::atomic<unsigned int> ana::ThreadPool::fNumLiveThreads
protected

Number of threads that are running.

Definition at line 73 of file ThreadPool.h.

Referenced by AddTask(), Finish(), and WorkerFunc().

ThreadsafeProgress* ana::ThreadPool::fProgress
protected

Definition at line 78 of file ThreadPool.h.

Referenced by Finish(), ShowProgress(), and WorkerFunc().

TaskQueue ana::ThreadPool::fTasks
protected

Definition at line 70 of file ThreadPool.h.

Referenced by AddTask(), and WorkerFunc().

std::atomic<int> ana::ThreadPool::fTasksCompleted
protected

Definition at line 75 of file ThreadPool.h.

Referenced by WorkerFunc().

std::vector<std::thread> ana::ThreadPool::fThreads
protected

All threads we ever created.

Definition at line 72 of file ThreadPool.h.

Referenced by AddTask(), and Finish().

std::atomic<int> ana::ThreadPool::fTotalTasks
protected

How many tasks have we ever seen?

Definition at line 76 of file ThreadPool.h.

Referenced by AddTask(), and WorkerFunc().


The documentation for this class was generated from the following files: