SPH
Tbb.cpp
Go to the documentation of this file.
1 #include "thread/Tbb.h"
2 #include "math/MathUtils.h"
4 
5 #ifdef SPH_USE_TBB
6 #include <tbb/tbb.h>
7 #endif
8 
10 
11 #ifdef SPH_USE_TBB
12 
13 SharedPtr<Tbb> Tbb::globalInstance = nullptr;
14 
15 class TbbTask;
16 
17 struct TbbThreadContext {
18  SharedPtr<TbbTask> task;
19 };
20 
21 static thread_local TbbThreadContext tbbThreadContext;
22 
23 class TbbTask : public ITask, public ShareFromThis<TbbTask> {
24 private:
25  tbb::task_arena& arena;
26  tbb::task_group group;
27  std::atomic<int> taskCnt;
28 
29 public:
30  explicit TbbTask(tbb::task_arena& arena)
31  : arena(arena) {
32  // set task to 1 before submitting the root task, to avoid the task being completed
33  taskCnt = 1;
34  }
35 
36  ~TbbTask() noexcept { // needs noexcept because of tbb::task_group member
37  SPH_ASSERT(this->completed());
38  }
39 
40  virtual void wait() override {
41  SPH_ASSERT(!tbbThreadContext.task, "waiting on child tasks is currently not implemented");
42  arena.execute([this] { group.wait(); });
43  }
44 
45  virtual bool completed() const override {
46  return taskCnt == 0;
47  }
48 
49  void submit(const Function<void()>& task) {
50  arena.execute([this, task] {
51  group.run([this, task] {
52  tbbThreadContext.task = this->sharedFromThis();
53  task();
54  tbbThreadContext.task = nullptr;
55 
56  taskCnt--;
57  });
58  });
59  }
60 
61  void submitChild(const Function<void()>& task) {
62  SPH_ASSERT(!this->completed()); // cannot add children to already finished task
63  taskCnt++;
64  this->submit(task);
65  }
66 };
67 
68 struct TbbData {
69  tbb::task_arena arena;
70  Size granularity;
71 
72  TbbData(const Size numThreads, const Size granularity)
73  : granularity(granularity) {
74  arena.initialize(numThreads == 0 ? tbb::task_scheduler_init::default_num_threads() : numThreads);
75  }
76 };
77 
78 Tbb::Tbb(const Size numThreads, const Size granularity) {
79  data = makeAuto<TbbData>(numThreads, granularity);
80 }
81 
82 Tbb::~Tbb() = default;
83 
84 void Tbb::setGranularity(const Size newGranularity) {
85  data->granularity = newGranularity;
86 }
87 
88 void Tbb::setThreadCnt(const Size numThreads) {
89  data = makeAuto<TbbData>(numThreads, data->granularity);
90 }
91 
92 SharedPtr<ITask> Tbb::submit(const Function<void()>& task) {
94  if (tbbThreadContext.task) {
95  tbbThreadContext.task->submitChild(task);
98  return tbbThreadContext.task;
99  } else {
100  SharedPtr<TbbTask> handle = makeShared<TbbTask>(data->arena);
101  handle->submit(task);
102  return handle;
103  }
104 }
105 
107  const int index = tbb::this_task_arena::current_thread_index();
108  if (index >= 0) {
109  return index;
110  } else {
111  return NOTHING;
112  }
113 }
114 
115 Size Tbb::getThreadCnt() const {
116  return tbb::this_task_arena::max_concurrency();
117 }
118 
120  return data->granularity;
121 }
122 
123 void Tbb::parallelFor(const Size from,
124  const Size to,
125  const Size granularity,
126  const Function<void(Size, Size)>& functor) {
127  data->arena.execute([from, to, granularity, &functor] {
128  tbb::parallel_for(tbb::blocked_range<Size>(from, to, granularity),
129  [&functor](const tbb::blocked_range<Size> range) { functor(range.begin(), range.end()); });
130  });
131 }
132 
134  if (!globalInstance) {
135  globalInstance = makeShared<Tbb>();
136  }
137  return globalInstance;
138 }
139 
140 #endif
141 
#define SPH_ASSERT(x,...)
Definition: Assert.h:94
NAMESPACE_SPH_BEGIN
Definition: BarnesHut.cpp:13
uint32_t Size
Integral type used to index arrays (by default).
Definition: Globals.h:16
Additional math routines (with more includes).
#define NAMESPACE_SPH_END
Definition: Object.h:12
Wrapper of type value of which may or may not be present.
const NothingType NOTHING
Definition: Optional.h:16
Implements IScheduler interface using TBB.
Handle used to control tasks submitted into the scheduler.
Definition: Scheduler.h:15
virtual bool completed() const =0
Checks if the task already finished.
virtual void wait()=0
Waits till the task and all the child tasks are completed.
SharedPtr< T > sharedFromThis() const
Definition: SharedPtr.h:426
virtual Optional< Size > getThreadIdx() const override
Returns the index of the calling thread.
void setGranularity(const Size newGranularity)
virtual void parallelFor(const Size from, const Size to, const Size granularity, const Function< void(Size n1, Size n2)> &functor) override
Processes the given range concurrently.
void setThreadCnt(const Size numThreads)
Tbb(const Size numThreads=0, const Size granularity=1000)
~Tbb()
virtual Size getThreadCnt() const override
Returns the number of threads used by this scheduler.
virtual SharedPtr< ITask > submit(const Function< void()> &task) override
Submits a task to be potentially executed asynchronously.
static SharedPtr< Tbb > getGlobalInstance()
virtual Size getRecommendedGranularity() const override
Returns a value of granularity that is expected to perform well with the current thread count.