55// / @author Peter Cucka
66
77#include " Queue.h"
8-
98#include " File.h"
109#include " Stream.h"
11- #include < openvdb/Exceptions.h>
12- #include < openvdb/util/logging.h>
10+ #include " openvdb/Exceptions.h"
11+ #include " openvdb/util/logging.h"
12+
1313#include < tbb/concurrent_hash_map.h>
14- #include < tbb/task .h>
15- # include < tbb/tbb_thread.h > // for tbb::this_tbb_thread::sleep()
16- #include < tbb/tick_count.h >
14+ #include < tbb/task_arena .h>
15+
16+ #include < thread >
1717#include < algorithm> // for std::max()
1818#include < atomic>
1919#include < iostream>
2020#include < map>
2121#include < mutex>
22+ #include < chrono>
23+
2224
2325namespace openvdb {
2426OPENVDB_USE_VERSION_NAMESPACE
@@ -28,18 +30,19 @@ namespace io {
2830namespace {
2931
3032// Abstract base class for queuable TBB tasks that adds a task completion callback
31- class Task : public tbb ::task
33+ class Task
3234{
3335public:
3436 Task (Queue::Id id): mId (id) {}
35- ~Task () override {}
37+ virtual ~Task () {}
3638
3739 Queue::Id id () const { return mId ; }
3840
3941 void setNotifier (Queue::Notifier& notifier) { mNotify = notifier; }
42+ virtual void execute () const = 0;
4043
4144protected:
42- void notify (Queue::Status status) { if (mNotify ) mNotify (this ->id (), status); }
45+ void notify (Queue::Status status) const { if (mNotify ) mNotify (this ->id (), status); }
4346
4447private:
4548 Queue::Id mId ;
@@ -48,18 +51,18 @@ class Task: public tbb::task
4851
4952
5053// Queuable TBB task that writes one or more grids to a .vdb file or an output stream
51- class OutputTask : public Task
54+ class OutputTask : public Task
5255{
5356public:
5457 OutputTask (Queue::Id id, const GridCPtrVec& grids, const Archive& archive,
5558 const MetaMap& metadata)
5659 : Task(id)
5760 , mGrids (grids)
5861 , mArchive (archive.copy())
59- , mMetadata (metadata)
60- {}
62+ , mMetadata (metadata) {}
63+ ~OutputTask () override {}
6164
62- tbb::task* execute () override
65+ void execute () const override
6366 {
6467 Queue::Status status = Queue::FAILED;
6568 try {
@@ -69,10 +72,8 @@ class OutputTask: public Task
6972 if (const char * msg = e.what ()) {
7073 OPENVDB_LOG_ERROR (msg);
7174 }
72- } catch (...) {
73- }
75+ } catch (...) {}
7476 this ->notify (status);
75- return nullptr ; // no successor to this task
7677 }
7778
7879private:
@@ -94,7 +95,6 @@ struct Queue::Impl
9495 // / @todo Provide more information than just "succeeded" or "failed"?
9596 using StatusMap = tbb::concurrent_hash_map<Queue::Id, Queue::Status>;
9697
97-
9898 Impl ()
9999 : mTimeout (Queue::DEFAULT_TIMEOUT)
100100 , mCapacity (Queue::DEFAULT_CAPACITY)
@@ -159,12 +159,15 @@ struct Queue::Impl
159159
160160 bool canEnqueue () const { return mNumTasks < Int64 (mCapacity ); }
161161
162- void enqueue (Task & task)
162+ void enqueue (OutputTask & task)
163163 {
164- tbb::tick_count start = tbb::tick_count ::now ();
164+ auto start = std::chrono::steady_clock ::now ();
165165 while (!canEnqueue ()) {
166- tbb::this_tbb_thread::sleep (tbb::tick_count::interval_t (0.5 /* sec*/ ));
167- if ((tbb::tick_count::now () - start).seconds () > double (mTimeout )) {
166+ std::this_thread::sleep_for (/* 0.5s*/ std::chrono::milliseconds (500 ));
167+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
168+ std::chrono::steady_clock::now () - start);
169+ const double seconds = double (duration.count ()) / 1000.0 ;
170+ if (seconds > double (mTimeout )) {
168171 OPENVDB_THROW (RuntimeError,
169172 " unable to queue I/O task; " << mTimeout << " -second time limit expired" );
170173 }
@@ -173,7 +176,10 @@ struct Queue::Impl
173176 std::placeholders::_1, std::placeholders::_2);
174177 task.setNotifier (notify);
175178 this ->setStatus (task.id (), Queue::PENDING);
176- tbb::task::enqueue (task);
179+
180+ // get the global task arena
181+ tbb::task_arena arena (tbb::task_arena::attach{});
182+ arena.enqueue ([task = std::move (task)] { task.execute (); });
177183 ++mNumTasks ;
178184 }
179185
@@ -204,7 +210,7 @@ Queue::~Queue()
204210 // / (e.g., by keeping a static registry of queues that also dispatches
205211 // / or blocks notifications)?
206212 while (mImpl ->mNumTasks > 0 ) {
207- tbb::this_tbb_thread::sleep ( tbb::tick_count::interval_t ( 0.5 /* sec */ ));
213+ std::this_thread::sleep_for ( /* 0.5s */ std::chrono::milliseconds ( 500 ));
208214 }
209215}
210216
@@ -290,16 +296,8 @@ Queue::Id
290296Queue::writeGridVec (const GridCPtrVec& grids, const Archive& archive, const MetaMap& metadata)
291297{
292298 const Queue::Id taskId = mImpl ->mNextId ++;
293- // From the "GUI Thread" chapter in the TBB Design Patterns guide
294- OutputTask* task =
295- new (tbb::task::allocate_root ()) OutputTask (taskId, grids, archive, metadata);
296- try {
297- mImpl ->enqueue (*task);
298- } catch (openvdb::RuntimeError&) {
299- // Destroy the task if it could not be enqueued, then rethrow the exception.
300- tbb::task::destroy (*task);
301- throw ;
302- }
299+ OutputTask task (taskId, grids, archive, metadata);
300+ mImpl ->enqueue (task);
303301 return taskId;
304302}
305303
0 commit comments