Skip to content

Commit 1ad4851

Browse files
committed
Add memory order to bounded queue. Revert schedule() implementation
1 parent 7ac1643 commit 1ad4851

File tree

1 file changed

+19
-12
lines changed

1 file changed

+19
-12
lines changed

src/concurrent.hpp

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ limitations under the License.
5656
#elif __GNUC__
5757
#define CPU_PAUSE() __builtin_ia32_pause()
5858
#elif _MSC_VER
59+
#include <immintrin.h>
5960
#define CPU_PAUSE() _mm_pause()
6061
#else
6162
#define CPU_PAUSE() std::this_thread::yield();
@@ -83,7 +84,11 @@ class Task {
8384
ThreadPool(int threads = 8);
8485

8586
template<class F, class... Args>
86-
auto schedule(F&& f, Args&&... args);
87+
#if __cplusplus >= 201703L // result_of deprecated from C++17
88+
std::future<typename std::invoke_result_t<F, Args...>> schedule(F&& f, Args&&... args);
89+
#else
90+
std::future<typename std::result_of<F(Args...)>::type> schedule(F&& f, Args&&... args);
91+
#endif
8792

8893
~ThreadPool() noexcept;
8994

@@ -131,24 +136,26 @@ class Task {
131136

132137

133138
template<class F, class... Args>
134-
auto ThreadPool::schedule(F&& f, Args&&... args)
139+
#if __cplusplus >= 201703L // result_of deprecated from C++17
140+
std::future<typename std::invoke_result_t<F, Args...> > ThreadPool::schedule(F&& f, Args&&... args)
141+
{
142+
using return_type = typename std::invoke_result<F, Args...>::type;
143+
#else
144+
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::schedule(F&& f, Args&&... args)
135145
{
136-
using return_type = std::invoke_result_t<F, Args...>;
146+
using return_type = typename std::result_of<F(Args...)>::type;
147+
#endif
137148

138-
auto task = std::make_shared<std::packaged_task<return_type()>>(
139-
[fn = std::forward<F>(f),
140-
tup = std::make_tuple(std::forward<Args>(args)...)]() mutable
141-
{
142-
return std::apply(std::move(fn), std::move(tup));
143-
}
144-
);
149+
auto task = std::make_shared< std::packaged_task<return_type()> >(
150+
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
151+
);
145152

146153
std::future<return_type> res = task->get_future();
147154

148155
{
149156
std::unique_lock<std::mutex> lock(_mutex);
150157

151-
if (_stop)
158+
if (_stop == true)
152159
throw std::runtime_error("ThreadPool stopped");
153160

154161
_tasks.emplace([task](){ (*task)(); });
@@ -182,7 +189,7 @@ class Task {
182189

183190
~BoundedConcurrentQueue() { }
184191

185-
T* get() { int idx = _index.fetch_add(1); return (idx >= _size) ? nullptr : &_data[idx]; }
192+
T* get() { int idx = _index.fetch_add(1, std::memory_order_acq_rel); return (idx >= _size) ? nullptr : &_data[idx]; }
186193

187194
void clear() { _index.store(_size); }
188195

0 commit comments

Comments
 (0)