SPH
Pool.h
Go to the documentation of this file.
1 #pragma once
2 
7 
11 #include "thread/Scheduler.h"
12 #include <atomic>
13 #include <condition_variable>
14 #include <queue>
15 #include <thread>
16 
18 
20 class Task : public ITask, public ShareFromThis<Task> {
21 private:
22  std::condition_variable waitVar;
23  std::mutex waitMutex;
24 
26  std::atomic<int> tasksLeft{ 1 };
27 
28  Function<void()> callable = nullptr;
29 
30  SharedPtr<Task> parent = nullptr;
31 
32  std::exception_ptr caughtException = nullptr;
33 
34 public:
35  explicit Task(const Function<void()>& callable);
36 
37  ~Task();
38 
39  virtual void wait() override;
40 
41  virtual bool completed() const override;
42 
46  void setParent(SharedPtr<Task> parent);
47 
51  void setException(std::exception_ptr exception);
52 
54  bool isRoot() const;
55 
56  SharedPtr<Task> getParent() const;
57 
59  static SharedPtr<Task> getCurrent();
60 
61  void runAndNotify();
62 
63 private:
64  void addReference();
65 
66  void removeReference();
67 };
68 
70 class ThreadPool : public IScheduler {
71  friend class Task;
72 
73 private:
76 
78  Size granularity;
79 
81  std::queue<SharedPtr<Task>> tasks;
82 
84  std::condition_variable taskVar;
85  std::mutex taskMutex;
86 
88  std::condition_variable waitVar;
89  std::mutex waitMutex;
90 
92  std::atomic<bool> stop;
93 
95  std::atomic<int> tasksLeft;
96 
99  static SharedPtr<ThreadPool> globalInstance;
100 
101 public:
105  ThreadPool(const Size numThreads = 0, const Size granularity = 1000);
106 
107  ~ThreadPool();
108 
112  virtual SharedPtr<ITask> submit(const Function<void()>& task) override;
113 
117  virtual Optional<Size> getThreadIdx() const override;
118 
122  virtual Size getThreadCnt() const override;
123 
124  virtual Size getRecommendedGranularity() const override;
125 
127  void waitForAll();
128 
133  return tasksLeft;
134  }
135 
137  void setGranularity(const Size newGranularity) {
138  granularity = newGranularity;
139  }
140 
145 
146 private:
147  SharedPtr<Task> getNextTask(const bool wait);
148 
149  void processTask(const bool wait);
150 };
151 
Generic dynamically allocated resizable storage.
NAMESPACE_SPH_BEGIN
Definition: BarnesHut.cpp:13
Generic wrappers of lambdas, functors and other callables.
uint32_t Size
Integral type used to index arrays (by default).
Definition: Globals.h:16
#define NAMESPACE_SPH_END
Definition: Object.h:12
Wrapper of type value of which may or may not be present.
Interface for executing tasks (potentially) asynchronously.
Generic dynamically allocated resizable storage.
Definition: Array.h:43
Interface that allows unified implementation of sequential and parallelized versions of algorithms.
Definition: Scheduler.h:27
Handle used to control tasks submitted into the scheduler.
Definition: Scheduler.h:15
Task to be executed by one of available threads.
Definition: Pool.h:20
void runAndNotify()
Definition: Pool.cpp:83
virtual bool completed() const override
Checks if the task already finished.
Definition: Pool.cpp:48
~Task()
Definition: Pool.cpp:24
static SharedPtr< Task > getCurrent()
Returns the currently execute task, or nullptr if no task is currently executed on this thread.
Definition: Pool.cpp:60
SharedPtr< Task > getParent() const
Definition: Pool.cpp:56
Task(const Function< void()> &callable)
Definition: Pool.cpp:21
virtual void wait() override
Waits till the task and all the child tasks are completed.
Definition: Pool.cpp:28
bool isRoot() const
Returns true if this is the top-most task.
Definition: Pool.cpp:52
void setException(std::exception_ptr exception)
Saves exception into the task.
Definition: Pool.cpp:75
void setParent(SharedPtr< Task > parent)
Assigns a task that spawned this task.
Definition: Pool.cpp:64
Thread pool capable of executing tasks concurrently.
Definition: Pool.h:70
ThreadPool(const Size numThreads=0, const Size granularity=1000)
Initialize thread pool given the number of threads to use.
Definition: Pool.cpp:120
Size remainingTaskCnt()
Returns the number of unfinished tasks.
Definition: Pool.h:132
~ThreadPool()
Definition: Pool.cpp:145
virtual SharedPtr< ITask > submit(const Function< void()> &task) override
Submits a task into the thread pool.
Definition: Pool.cpp:157
static SharedPtr< ThreadPool > getGlobalInstance()
Returns the global instance of the thread pool.
Definition: Pool.cpp:211
void waitForAll()
Blocks until all submitted tasks has been finished.
Definition: Pool.cpp:187
virtual Size getRecommendedGranularity() const override
Returns a value of granularity that is expected to perform well with the current thread count.
Definition: Pool.cpp:195
virtual Optional< Size > getThreadIdx() const override
Returns the index of this thread, or NOTHING if this thread was not invoked by the thread pool.
Definition: Pool.cpp:199
virtual Size getThreadCnt() const override
Returns the number of threads used by this thread pool.
Definition: Pool.cpp:207
void setGranularity(const Size newGranularity)
Modifies the default granularity of the thread pool.
Definition: Pool.h:137