Skip to content

Commit c40b7f3

Browse files
committed
Add: fork_union parallel version
This is a work in progress light-weight thread-pool variant targeting OpenMP-like use-cases. It doesn't match OpenMP performance on small inputs and is still a long way from our goal #7.
1 parent cce8be4 commit c40b7f3

File tree

4 files changed

+56
-6
lines changed

4 files changed

+56
-6
lines changed

CMakeLists.txt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ FetchContent_Declare(
8888
)
8989
FetchContent_MakeAvailable(fmt)
9090

91+
FetchContent_Declare(
92+
fork_union
93+
GIT_REPOSITORY https://github.com/ashvardanian/fork_union.git
94+
GIT_TAG main
95+
)
96+
FetchContent_MakeAvailable(fork_union)
97+
9198
# Fetch GBenchmark and suppress internal tests.
9299
# https://github.com/google/benchmark/blob/main/docs/user_guide.md#using-register-benchmark
93100
FetchContent_Declare(
@@ -152,7 +159,7 @@ set(CMAKE_CUDA_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2")
152159
set(CMAKE_GCC_FLAGS "${CMAKE_GCC_FLAGS} -march=native -fopenmp")
153160

154161
add_executable(reduce_bench reduce_bench.cpp)
155-
target_link_libraries(reduce_bench PRIVATE benchmark::benchmark fmt::fmt Threads::Threads BLAS::BLAS)
162+
target_link_libraries(reduce_bench PRIVATE benchmark::benchmark fmt::fmt fork_union Threads::Threads BLAS::BLAS)
156163

157164
if (USE_INTEL_TBB)
158165
target_link_libraries(reduce_bench PRIVATE TBB::tbb)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ Observations:
195195
- 370 GB/s can be reached in dual-socket DDR5 setups with 12 channel memory.
196196
- Using Kahan-like schemes is 3x slower than pure `float` and 2x slower than `double`.
197197

198-
One of the interesting observations is the effect of latency hiding, interleaving the operations executing on different ports of the same CPU.
198+
One of the interesting observations is the effect of [latency hiding, interleaving the operations executing on different ports of the same CPU](https://ashvardanian.com/posts/cpu-ports).
199199
It is evident when benchmarking AVX-512 kernels on very small arrays:
200200

201201
```sh

reduce_bench.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,9 @@ std::size_t alignment_ram_page() {
188188
* 2. `std::aligned_alloc` aligned to the system page size with optional @b `madvise(MADV_HUGEPAGE)`.
189189
* If NUMA is available (libNUMA on Linux), memory is distributed across NUMA nodes.
190190
*
191-
* @param elements Number of float elements to allocate.
192-
* @return dataset_t A dataset wrapper holding the pointer and type of allocation.
193-
* @throws std::bad_alloc if allocation fails.
191+
* @param[in] needed_elements Number of float elements to allocate.
192+
* @return `dataset_t` A dataset wrapper holding the pointer and type of allocation.
193+
* @throws `std::bad_alloc` if allocation fails.
194194
*
195195
* @see NUMA docs: https://man7.org/linux/man-pages/man3/numa.3.html
196196
* @see MMAP docs: https://man7.org/linux/man-pages/man2/mmap.2.html
@@ -336,7 +336,11 @@ int main(int argc, char **argv) {
336336
register_("unrolled/f64", unrolled_gt<double> {}, dataset);
337337
register_("std::accumulate/f32", stl_accumulate_gt<float> {}, dataset);
338338
register_("std::accumulate/f64", stl_accumulate_gt<double> {}, dataset);
339+
register_("serial/f32/av::fork_union", fork_union_gt<unrolled_gt<float>> {}, dataset);
340+
register_("serial/f64/av::fork_union", fork_union_gt<unrolled_gt<double>> {}, dataset);
341+
#if defined(_OPENMP)
339342
register_("serial/f32/openmp", openmp_t {}, dataset);
343+
#endif // defined(_OPENMP)
340344

341345
//! BLAS struggles with zero-strided arguments!
342346
//! register_("blas/f32", blas_dot_t {}, dataset);
@@ -375,11 +379,17 @@ int main(int argc, char **argv) {
375379
// Arm NEON
376380
#if defined(__ARM_NEON)
377381
register_("neon/f32", neon_f32_t {}, dataset);
382+
register_("neon/f32/av::fork_union", fork_union_gt<neon_f32_t> {}, dataset);
383+
register_("neon/f32/std::threads", threads_gt<neon_f32_t> {}, dataset);
384+
register_("neon/f32/openmp", openmp_gt<neon_f32_t> {}, dataset);
378385
#endif
379386

380387
// Arm SVE
381388
#if defined(__ARM_FEATURE_SVE)
382389
register_("sve/f32", sve_f32_t {}, dataset);
390+
register_("sve/f32/av::fork_union", fork_union_gt<sve_f32_t> {}, dataset);
391+
register_("sve/f32/std::threads", threads_gt<sve_f32_t> {}, dataset);
392+
register_("sve/f32/openmp", openmp_gt<sve_f32_t> {}, dataset);
383393
#endif // defined(__ARM_FEATURE_SVE__)
384394

385395
// CUDA

reduce_cpu.hpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include <arm_sve.h> // ARM SVE intrinsics
2727
#endif
2828

29+
#include <fork_union.hpp>
30+
2931
namespace ashvardanian {
3032

3133
/**
@@ -604,7 +606,7 @@ class openmp_gt {
604606
#endif
605607

606608
/**
607-
* @brief Computes the sum of a sequence of float values using @b std::thread on-CPU
609+
* @brief Computes the sum of a sequence of float values using @b `std::thread` on-CPU
608610
* multi-core reductions acceleration.
609611
* @see https://en.cppreference.com/w/cpp/thread/thread
610612
*/
@@ -661,6 +663,37 @@ class threads_gt {
661663
}
662664
};
663665

666+
/**
667+
* @brief Computes the sum of a sequence of float values using @b `std::thread` on-CPU
668+
* multi-core reductions acceleration, reusing a fixed-size thread pool.
669+
* @see https://github.com/ashvardanian/fork_union
670+
*/
671+
template <typename serial_at = stl_accumulate_gt<float>>
672+
class fork_union_gt {
673+
using pool_t = ::ashvardanian::fork_union_t;
674+
float const *const begin_ = nullptr;
675+
float const *const end_ = nullptr;
676+
pool_t pool_;
677+
std::vector<double> sums_;
678+
679+
public:
680+
fork_union_gt() = default;
681+
fork_union_gt(float const *b, float const *e) : begin_(b), end_(e), sums_() {
682+
auto cores = total_cores();
683+
if (!pool_.try_spawn(cores)) throw std::runtime_error("Failed to fork threads");
684+
sums_.resize(cores);
685+
}
686+
687+
double operator()() {
688+
auto const input_size = static_cast<std::size_t>(end_ - begin_);
689+
pool_.for_each_slice(input_size, [this](pool_t::task_t first_task, std::size_t slice_length) noexcept {
690+
auto const slice_begin = begin_ + first_task.task_index;
691+
sums_[first_task.thread_index] = serial_at {slice_begin, slice_begin + slice_length}();
692+
});
693+
return std::accumulate(sums_.begin(), sums_.end(), 0.0);
694+
}
695+
};
696+
664697
#pragma endregion - Multicore
665698

666699
} // namespace ashvardanian

0 commit comments

Comments
 (0)