22 : callable(callable) {}
31 while (tasksLeft > 0) {
32 threadLocalContext.
parentPool->processTask(
false);
35 std::unique_lock<std::mutex> lock(waitMutex);
38 waitVar.wait(lock, [
this] {
return tasksLeft == 0; });
43 if (caughtException) {
44 std::rethrow_exception(caughtException);
49 return tasksLeft == 0;
53 return parent ==
nullptr;
61 return threadLocalContext.
current;
77 caughtException = exception;
88 auto guard =
finally([
this, callingTask] {
89 threadLocalContext.
current = callingTask;
90 this->removeReference();
101 void Task::addReference() {
102 std::unique_lock<std::mutex> lock(waitMutex);
107 void Task::removeReference() {
108 std::unique_lock<std::mutex> lock(waitMutex);
112 if (tasksLeft == 0) {
114 parent->removeReference();
116 waitVar.notify_all();
121 : threads(numThreads == 0 ?
std::thread::hardware_concurrency() : numThreads)
122 , granularity(granularity) {
124 auto loop = [
this](
const Size index) {
127 threadLocalContext.
index = index;
131 this->processTask(
true);
139 for (
auto& t : threads) {
140 t = makeAuto<std::thread>(loop, index);
148 taskVar.notify_all();
150 for (
auto& t : threads) {
162 std::unique_lock<std::mutex> lock(waitMutex);
166 std::unique_lock<std::mutex> lock(taskMutex);
167 tasks.emplace(handle);
169 taskVar.notify_all();
173 void ThreadPool::processTask(
const bool wait) {
179 std::unique_lock<std::mutex> lock(waitMutex);
184 waitVar.notify_one();
188 std::unique_lock<std::mutex> lock(waitMutex);
190 waitVar.wait(lock, [
this] {
return tasksLeft == 0; });
204 return threadLocalContext.
index;
208 return threads.
size();
212 if (!globalInstance) {
213 globalInstance = makeShared<ThreadPool>();
215 return globalInstance;
219 std::unique_lock<std::mutex> lock(taskMutex);
223 taskVar.wait(lock, [
this] {
return !tasks.empty() || stop; });
227 if (!stop && !tasks.empty()) {
#define SPH_ASSERT(x,...)
Wraps a functor and executes it once the wrapper goes out of scope.
uint32_t Size
Integral type used to index arrays (by default).
#define NAMESPACE_SPH_END
const NothingType NOTHING
Simple thread pool with fixed number of threads.
INLINE TCounter size() const noexcept
INLINE bool empty() const noexcept
Non-owning wrapper of pointer.
SharedPtr< Task > sharedFromThis() const
INLINE RawPtr< T > get() const
virtual bool completed() const override
Checks if the task already finished.
static SharedPtr< Task > getCurrent()
Returns the currently execute task, or nullptr if no task is currently executed on this thread.
SharedPtr< Task > getParent() const
Task(const Function< void()> &callable)
virtual void wait() override
Waits till the task and all the child tasks are completed.
bool isRoot() const
Returns true if this is the top-most task.
void setException(std::exception_ptr exception)
Saves exception into the task.
void setParent(SharedPtr< Task > parent)
Assigns a task that spawned this task.
Thread pool capable of executing tasks concurrently.
ThreadPool(const Size numThreads=0, const Size granularity=1000)
Initialize thread pool given the number of threads to use.
virtual SharedPtr< ITask > submit(const Function< void()> &task) override
Submits a task into the thread pool.
static SharedPtr< ThreadPool > getGlobalInstance()
Returns the global instance of the thread pool.
void waitForAll()
Blocks until all submitted tasks has been finished.
virtual Size getRecommendedGranularity() const override
Returns a value of granularity that is expected to perform well with the current thread count.
virtual Optional< Size > getThreadIdx() const override
Returns the index of this thread, or NOTHING if this thread was not invoked by the thread pool.
virtual Size getThreadCnt() const override
Returns the number of threads used by this thread pool.
Overload of std::swap for Sph::Array.
ThreadPool * parentPool
Owner of this thread.
Size index
Index of this thread in the parent thread pool (not std::this_thread::get_id() !)
SharedPtr< Task > current
Task currently processed by this thread.