Skip to content

Commit 0514627

Browse files
committed
Add: Taskflow backend
1 parent 8ed7080 commit 0514627

File tree

3 files changed

+56
-6
lines changed

3 files changed

+56
-6
lines changed

CMakeLists.txt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ find_package(OpenMP QUIET)
8181
find_package(OpenCL QUIET)
8282
if (NOT APPLE)
8383
find_package(BLAS REQUIRED)
84-
endif()
84+
endif ()
8585

8686
set(FETCHCONTENT_QUIET OFF)
8787
include(FetchContent)
@@ -93,6 +93,13 @@ FetchContent_Declare(
9393
)
9494
FetchContent_MakeAvailable(fmt)
9595

96+
FetchContent_Declare(
97+
taskflow
98+
GIT_REPOSITORY https://github.com/taskflow/taskflow.git
99+
GIT_TAG v3.10.0
100+
)
101+
FetchContent_MakeAvailable(taskflow)
102+
96103
FetchContent_Declare(
97104
fork_union
98105
GIT_REPOSITORY https://github.com/ashvardanian/fork_union.git
@@ -164,13 +171,13 @@ set(CMAKE_CUDA_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2")
164171
set(CMAKE_GCC_FLAGS "${CMAKE_GCC_FLAGS} -march=native -fopenmp")
165172

166173
add_executable(reduce_bench reduce_bench.cpp)
167-
target_link_libraries(reduce_bench PRIVATE benchmark::benchmark fmt::fmt fork_union Threads::Threads)
174+
target_link_libraries(reduce_bench PRIVATE benchmark::benchmark fmt::fmt fork_union Taskflow Threads::Threads)
168175

169176
if (APPLE)
170177
target_link_libraries(reduce_bench PRIVATE "-framework Accelerate")
171-
else()
178+
else ()
172179
target_compile_definitions(reduce_bench PRIVATE BLAS::BLAS)
173-
endif()
180+
endif ()
174181

175182
if (USE_INTEL_TBB)
176183
target_link_libraries(reduce_bench PRIVATE TBB::tbb)

reduce_bench.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,13 +337,15 @@ int main(int argc, char **argv) {
337337
register_("std::accumulate/f32", stl_accumulate_gt<float> {}, dataset);
338338
register_("std::accumulate/f64", stl_accumulate_gt<double> {}, dataset);
339339
register_("serial/f32/av::fork_union", fork_union_gt<unrolled_gt<float>> {}, dataset);
340+
register_("serial/f32/taskflow", taskflow_gt<unrolled_gt<float>> {}, dataset);
340341
register_("serial/f64/av::fork_union", fork_union_gt<unrolled_gt<double>> {}, dataset);
342+
register_("serial/f64/taskflow", taskflow_gt<unrolled_gt<double>> {}, dataset);
341343
#if defined(_OPENMP)
342344
register_("serial/f32/openmp", openmp_t {}, dataset);
343345
#endif // defined(_OPENMP)
344346

345-
//! BLAS struggles with zero-strided arguments!
346-
//! register_("blas/f32", blas_dot_t {}, dataset);
347+
// ! BLAS struggles with zero-strided arguments!
348+
// ! register_("blas/f32", blas_dot_t {}, dataset);
347349

348350
#if defined(__cpp_lib_execution)
349351
register_("std::reduce<par>/f32", stl_par_reduce_gt<float> {}, dataset);
@@ -380,6 +382,7 @@ int main(int argc, char **argv) {
380382
#if defined(__ARM_NEON)
381383
register_("neon/f32", neon_f32_t {}, dataset);
382384
register_("neon/f32/av::fork_union", fork_union_gt<neon_f32_t> {}, dataset);
385+
register_("neon/f32/taskflow", taskflow_gt<neon_f32_t> {}, dataset);
383386
register_("neon/f32/std::threads", threads_gt<neon_f32_t> {}, dataset);
384387
register_("neon/f32/openmp", openmp_gt<neon_f32_t> {}, dataset);
385388
#endif
@@ -388,6 +391,7 @@ int main(int argc, char **argv) {
388391
#if defined(__ARM_FEATURE_SVE)
389392
register_("sve/f32", sve_f32_t {}, dataset);
390393
register_("sve/f32/av::fork_union", fork_union_gt<sve_f32_t> {}, dataset);
394+
register_("sve/f32/taskflow", taskflow_gt<sve_f32_t> {}, dataset);
391395
register_("sve/f32/std::threads", threads_gt<sve_f32_t> {}, dataset);
392396
register_("sve/f32/openmp", openmp_gt<sve_f32_t> {}, dataset);
393397
#endif // defined(__ARM_FEATURE_SVE__)

reduce_cpu.hpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#endif
2828

2929
#include <fork_union.hpp>
30+
#include <taskflow/taskflow.hpp>
3031

3132
namespace ashvardanian {
3233

@@ -719,6 +720,44 @@ class fork_union_gt {
719720
}
720721
};
721722

723+
template <typename serial_at = stl_accumulate_gt<float>>
724+
class taskflow_gt {
725+
float const *const begin_ = nullptr;
726+
float const *const end_ = nullptr;
727+
std::size_t const cores_ = 0;
728+
729+
tf::Executor executor_;
730+
tf::Taskflow taskflow_;
731+
732+
struct alignas(128) thread_result_t {
733+
double partial_sum = 0.0;
734+
};
735+
std::vector<thread_result_t> sums_;
736+
737+
public:
738+
taskflow_gt() = default;
739+
taskflow_gt(float const *b, float const *e)
740+
: begin_ {b}, end_ {e}, cores_ {total_cores()}, executor_ {static_cast<unsigned>(cores_)}, sums_(cores_) {}
741+
742+
double operator()() {
743+
auto const input_size = static_cast<std::size_t>(end_ - begin_);
744+
auto const chunk_size = scalars_per_core(input_size, cores_);
745+
746+
taskflow_.clear();
747+
for (std::size_t tid = 0; tid < cores_; ++tid) {
748+
taskflow_.emplace([&, tid] {
749+
std::size_t const start = std::min(tid * chunk_size, input_size);
750+
std::size_t const stop = std::min(start + chunk_size, input_size);
751+
sums_[tid].partial_sum = serial_at {begin_ + start, begin_ + stop}();
752+
});
753+
}
754+
755+
executor_.run(taskflow_).wait();
756+
return std::accumulate(sums_.begin(), sums_.end(), 0.0,
757+
[](double acc, thread_result_t const &x) noexcept { return acc + x.partial_sum; });
758+
}
759+
};
760+
722761
#pragma endregion - Multicore
723762

724763
} // namespace ashvardanian

0 commit comments

Comments
 (0)