From a3d22a506ee2ad4564f76c0f183e77b40de5e260 Mon Sep 17 00:00:00 2001 From: Raghuveer Devulapalli Date: Wed, 7 May 2025 13:53:33 -0700 Subject: [PATCH 1/9] Replace openmp with std::threads: almost entirely written by copilot AI --- src/avx512-16bit-qsort.hpp | 41 +++++---- src/xss-common-qsort.h | 171 +++++++++++++++++++++++++++---------- src/xss-thread-pool.hpp | 127 +++++++++++++++++++++++++++ 3 files changed, 276 insertions(+), 63 deletions(-) create mode 100644 src/xss-thread-pool.hpp diff --git a/src/avx512-16bit-qsort.hpp b/src/avx512-16bit-qsort.hpp index 6dbe24d..5ac0416 100644 --- a/src/avx512-16bit-qsort.hpp +++ b/src/avx512-16bit-qsort.hpp @@ -548,25 +548,35 @@ avx512_qsort_fp16_helper(uint16_t *arr, arrsize_t arrsize) using T = uint16_t; using vtype = zmm_vector; -#ifdef XSS_COMPILE_OPENMP +#ifdef XSS_BUILD_WITH_STD_THREADS bool use_parallel = arrsize > 100000; +#else + bool use_parallel = false; +#endif if (use_parallel) { - // This thread limit was determined experimentally; it may be better for it to be the number of physical cores on the system +#ifdef XSS_BUILD_WITH_STD_THREADS + + // This thread limit was determined experimentally constexpr int thread_limit = 8; - int thread_count = std::min(thread_limit, omp_get_max_threads()); + int thread_count = std::min(thread_limit, + (int)std::thread::hardware_concurrency()); arrsize_t task_threshold = std::max((arrsize_t)100000, arrsize / 100); - // We use omp parallel and then omp single to setup the threads that will run the omp task calls in qsort_ - // The omp single prevents multiple threads from running the initial qsort_ simultaneously and causing problems - // Note that we do not use the if(...) clause built into OpenMP, because it causes a performance regression for small arrays -#pragma omp parallel num_threads(thread_count) -#pragma omp single - qsort_(arr, - 0, - arrsize - 1, - 2 * (arrsize_t)log2(arrsize), - task_threshold); + // Create a thread pool + ThreadPool pool(thread_count); + + // Initial sort task + qsort_threads(arr, + 0, + arrsize - 1, + 2 * (arrsize_t)log2(arrsize), + task_threshold, + pool); + + // Wait for all tasks to complete + pool.wait_all(); +#endif } else { qsort_(arr, @@ -575,11 +585,6 @@ avx512_qsort_fp16_helper(uint16_t *arr, arrsize_t arrsize) 2 * (arrsize_t)log2(arrsize), std::numeric_limits::max()); } -#pragma omp taskwait -#else - qsort_( - arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0); -#endif } [[maybe_unused]] X86_SIMD_SORT_INLINE void diff --git a/src/xss-common-qsort.h b/src/xss-common-qsort.h index cf4a34a..ebb3b29 100644 --- a/src/xss-common-qsort.h +++ b/src/xss-common-qsort.h @@ -11,6 +11,10 @@ #ifndef XSS_COMMON_QSORT #define XSS_COMMON_QSORT +#ifdef XSS_BUILD_WITH_STD_THREADS +#include "xss-thread-pool.hpp" +#endif + /* * Quicksort using AVX-512. The ideas and code are based on these two research * papers [1] and [2]. On a high level, the idea is to vectorize quicksort @@ -533,8 +537,61 @@ static void qsort_(type_t *arr, arrsize_t max_iters, arrsize_t task_threshold) { + UNUSED(task_threshold); /* - * Resort to std::sort if quicksort isnt making any progress + * Resort to std::sort if quicksort isn't making any progress + */ + if (max_iters <= 0) { + std::sort(arr + left, arr + right + 1, comparator::STDSortComparator); + return; + } + /* + * Base case: use bitonic networks to sort arrays <= vtype::network_sort_threshold + */ + if (right + 1 - left <= vtype::network_sort_threshold) { + sort_n( + arr + left, (int32_t)(right + 1 - left)); + return; + } + + auto pivot_result + = get_pivot_smart(arr, left, right); + type_t pivot = pivot_result.pivot; + + if (pivot_result.result == pivot_result_t::Sorted) { return; } + + type_t smallest = vtype::type_max(); + type_t biggest = vtype::type_min(); + + arrsize_t pivot_index = partition_unrolled( + arr, left, right + 1, pivot, &smallest, &biggest); + + if (pivot_result.result == pivot_result_t::Only2Values) { return; } + + type_t leftmostValue = comparator::leftmost(smallest, biggest); + type_t rightmostValue = comparator::rightmost(smallest, biggest); + + // Sequential recursion + if (pivot != leftmostValue) + qsort_(arr, left, pivot_index - 1, max_iters - 1, 0); + if (pivot != rightmostValue) + qsort_(arr, pivot_index, right, max_iters - 1, 0); +} + +// Template function for std::thread-based parallel quicksort implementation +#ifdef XSS_BUILD_WITH_STD_THREADS +template +static void qsort_threads(type_t *arr, + arrsize_t left, + arrsize_t right, + arrsize_t max_iters, + arrsize_t task_threshold, + ThreadPool &thread_pool) +{ + /* + * Resort to std::sort if quicksort isn't making any progress */ if (max_iters <= 0) { std::sort(arr + left, arr + right + 1, comparator::STDSortComparator); @@ -568,41 +625,65 @@ static void qsort_(type_t *arr, type_t leftmostValue = comparator::leftmost(smallest, biggest); type_t rightmostValue = comparator::rightmost(smallest, biggest); -#ifdef XSS_COMPILE_OPENMP + // Process left partition if (pivot != leftmostValue) { bool parallel_left = (pivot_index - left) > task_threshold; if (parallel_left) { -#pragma omp task - qsort_( - arr, left, pivot_index - 1, max_iters - 1, task_threshold); + submit_task(thread_pool, + [arr, + left, + pivot_index, + max_iters, + task_threshold, + &thread_pool]() { + qsort_threads(arr, + left, + pivot_index - 1, + max_iters - 1, + task_threshold, + thread_pool); + }); } else { - qsort_( - arr, left, pivot_index - 1, max_iters - 1, task_threshold); + qsort_threads(arr, + left, + pivot_index - 1, + max_iters - 1, + task_threshold, + thread_pool); } } + + // Process right partition if (pivot != rightmostValue) { bool parallel_right = (right - pivot_index) > task_threshold; - if (parallel_right) { -#pragma omp task - qsort_( - arr, pivot_index, right, max_iters - 1, task_threshold); + submit_task(thread_pool, + [arr, + pivot_index, + right, + max_iters, + task_threshold, + &thread_pool]() { + qsort_threads(arr, + pivot_index, + right, + max_iters - 1, + task_threshold, + thread_pool); + }); } else { - qsort_( - arr, pivot_index, right, max_iters - 1, task_threshold); + qsort_threads(arr, + pivot_index, + right, + max_iters - 1, + task_threshold, + thread_pool); } } -#else - UNUSED(task_threshold); - - if (pivot != leftmostValue) - qsort_(arr, left, pivot_index - 1, max_iters - 1, 0); - if (pivot != rightmostValue) - qsort_(arr, pivot_index, right, max_iters - 1, 0); -#endif } +#endif // XSS_BUILD_WITH_STD_THREADS template X86_SIMD_SORT_INLINE void qselect_(type_t *arr, @@ -667,40 +748,40 @@ X86_SIMD_SORT_INLINE void xss_qsort(T *arr, arrsize_t arrsize, bool hasnan) UNUSED(hasnan); -#ifdef XSS_COMPILE_OPENMP - +#ifdef XSS_BUILD_WITH_STD_THREADS bool use_parallel = arrsize > 100000; +#else + bool use_parallel = false; +#endif if (use_parallel) { - // This thread limit was determined experimentally; it may be better for it to be the number of physical cores on the system +#ifdef XSS_BUILD_WITH_STD_THREADS + // This thread limit was determined experimentally constexpr int thread_limit = 8; - int thread_count = std::min(thread_limit, omp_get_max_threads()); + int thread_count = std::min( + thread_limit, (int)std::thread::hardware_concurrency()); arrsize_t task_threshold = std::max((arrsize_t)100000, arrsize / 100); - // We use omp parallel and then omp single to setup the threads that will run the omp task calls in qsort_ - // The omp single prevents multiple threads from running the initial qsort_ simultaneously and causing problems - // Note that we do not use the if(...) clause built into OpenMP, because it causes a performance regression for small arrays -#pragma omp parallel num_threads(thread_count) -#pragma omp single - qsort_(arr, - 0, - arrsize - 1, - 2 * (arrsize_t)log2(arrsize), - task_threshold); -#pragma omp taskwait + // Create a thread pool + ThreadPool pool(thread_count); + + // Initial sort task + qsort_threads(arr, + 0, + arrsize - 1, + 2 * (arrsize_t)log2(arrsize), + task_threshold, + pool); + // Wait for all tasks to complete + pool.wait_all(); +#endif } else { - qsort_(arr, - 0, - arrsize - 1, - 2 * (arrsize_t)log2(arrsize), - std::numeric_limits::max()); + // For small arrays, just use the sequential version + qsort_( + arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0); } -#else - qsort_( - arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0); -#endif replace_inf_with_nan(arr, arrsize, nan_count, descending); } diff --git a/src/xss-thread-pool.hpp b/src/xss-thread-pool.hpp new file mode 100644 index 0000000..b998be3 --- /dev/null +++ b/src/xss-thread-pool.hpp @@ -0,0 +1,127 @@ +/******************************************************************* + * Copyright (C) 2025 Intel Corporation + * SPDX-License-Identifier: BSD-3-Clause + * Authors: Raghuveer Devulapalli + * ****************************************************************/ + +#ifndef XSS_THREAD_POOL +#define XSS_THREAD_POOL + +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * ThreadPool class and doc: Generated by copilot + * This thread pool implementation is a simple and efficient way to manage a + * pool of threads for executing tasks concurrently. It uses a std::queue to store + * tasks and a set of worker threads which wait for tasks to be added to the + * queue. When a task is added, one of the worker threads will pick it up and + * execute it. The thread pool can be stopped gracefully, and it also provides + * a way to wait for all tasks to complete before stopping. + * */ +class ThreadPool { +private: + std::vector workers; + std::queue> tasks; + std::mutex queue_mutex; + std::condition_variable condition; // Condition variable for task queue + std::condition_variable done_condition; // Condition variable for waiting + int active_tasks {0}; + bool stop; + +public: + ThreadPool(size_t num_threads) : stop(false) + { + for (size_t i = 0; i < num_threads; ++i) { + // Create a worker thread and add it to the pool + // Each thread will run a lambda function that waits for tasks + workers.emplace_back([this] { + while (true) { + // Lock the queue mutex and wait for a task to be available + std::unique_lock lock(queue_mutex); + // Wait until there is a task or the pool is stopped + condition.wait(lock, + [this] { return stop || !tasks.empty(); }); + + // Check if we need to terminate the thread + if (stop && tasks.empty()) { return; } + + // Extract the next task from the queue + auto task = std::move(tasks.front()); + tasks.pop(); + // Unlock the mutex before executing the task + lock.unlock(); + // Execute the task: + task(); + } + }); + } + } + + template + void enqueue(F &&func) + { + // Add a new task to the queue and notify one of the worker threads + std::unique_lock lock(queue_mutex); + tasks.emplace(std::forward(func)); + condition.notify_one(); + } + + ~ThreadPool() + { + // Stop the thread pool and join all threads + std::unique_lock lock(queue_mutex); + stop = true; + lock.unlock(); + condition.notify_all(); + for (std::thread &worker : workers) { + worker.join(); + } + } + + // Wait for all tasks to complete before stopping the pool + void wait_all() + { + std::unique_lock lock(queue_mutex); + done_condition.wait( + lock, [this] { return tasks.empty() && (active_tasks == 0); }); + // lock is automatically released here + } + + // Track the number of active tasks + void task_start() + { + std::unique_lock lock(queue_mutex); + active_tasks++; + // lock is automatically released here + } + + // Decrement the active task count and notify if all tasks are done + void task_end() + { + std::unique_lock lock(queue_mutex); + active_tasks--; + if (tasks.empty() && active_tasks == 0) { done_condition.notify_all(); } + // lock is automatically released here + } + +}; + +// Wrapper for submitting tasks to the thread pool with automatic tracking +template +void submit_task(ThreadPool &pool, F &&f) +{ + pool.task_start(); + pool.enqueue([f = std::forward(f), &pool]() { + f(); + pool.task_end(); + }); +} + +#endif // XSS_THREAD_POOL From 748fa74be9f987d6c122d43030a968f31c100844 Mon Sep 17 00:00:00 2001 From: Raghuveer Devulapalli Date: Wed, 7 May 2025 15:24:27 -0700 Subject: [PATCH 2/9] Add new meson options to build with std threads --- Makefile | 4 ++++ lib/meson.build | 18 ++++++++---------- meson.build | 18 ++++++++++++++++-- meson_options.txt | 2 ++ tests/meson.build | 16 ++++++---------- 5 files changed, 36 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index c51d658..65e6254 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,10 @@ test_openmp: meson setup -Dbuild_tests=true -Duse_openmp=true --warnlevel 2 --werror --buildtype release builddir cd builddir && ninja +test_stdthreads: + meson setup -Dbuild_tests=true -Duse_stdthreads=true --warnlevel 2 --werror --buildtype release builddir + cd builddir && ninja + test_asan: meson setup -Dbuild_tests=true -Duse_openmp=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Db_lundef=false -Dasan_ci_dont_validate=true --warnlevel 0 --buildtype debugoptimized builddir cd builddir && ninja diff --git a/lib/meson.build b/lib/meson.build index 48046b3..ded5134 100644 --- a/lib/meson.build +++ b/lib/meson.build @@ -1,18 +1,13 @@ libtargets = [] -# Add compile flags for OpenMP if enabled -openmpflags = [] -if get_option('use_openmp') - openmpflags = ['-DXSS_USE_OPENMP=true', '-fopenmp'] -endif - if cpp.has_argument('-march=haswell') libtargets += static_library('libavx', files( 'x86simdsort-avx2.cpp', ), include_directories : [src], - cpp_args : ['-march=haswell', openmpflags], + cpp_args : ['-march=haswell', stdthreadsflag], + dependencies: [omp_dep], gnu_symbol_visibility : 'inlineshidden', ) endif @@ -23,7 +18,8 @@ if cpp.has_argument('-march=skylake-avx512') 'x86simdsort-skx.cpp', ), include_directories : [src], - cpp_args : ['-march=skylake-avx512', openmpflags], + cpp_args : ['-march=skylake-avx512', stdthreadsflag], + dependencies: [omp_dep], gnu_symbol_visibility : 'inlineshidden', ) endif @@ -34,7 +30,8 @@ if cpp.has_argument('-march=icelake-client') 'x86simdsort-icl.cpp', ), include_directories : [src], - cpp_args : ['-march=icelake-client', openmpflags], + cpp_args : ['-march=icelake-client', stdthreadsflag], + dependencies: [omp_dep], gnu_symbol_visibility : 'inlineshidden', ) endif @@ -45,7 +42,8 @@ if cancompilefp16 'x86simdsort-spr.cpp', ), include_directories : [src], - cpp_args : ['-march=sapphirerapids', openmpflags], + cpp_args : ['-march=sapphirerapids', stdthreadsflag], + dependencies: [omp_dep], gnu_symbol_visibility : 'inlineshidden', ) endif diff --git a/meson.build b/meson.build index c796a0a..ee822ef 100644 --- a/meson.build +++ b/meson.build @@ -29,6 +29,20 @@ if get_option('build_vqsortbench') benchvq = true endif +# build with openmp +omp = [] +omp_dep = [] +if get_option('use_openmp') + omp = dependency('openmp', required : true) + omp_dep = declare_dependency(dependencies: omp, compile_args: ['-DXSS_USE_OPENMP']) +endif + +# build with std::threads +stdthreadsflag = [] +if get_option('use_stdthreads') + stdthreadsflag += ['-DXSS_BUILD_WITH_STD_THREADS'] +endif + fp16code = '''#include int main() { __m512h temp = _mm512_set1_ph(1.0f); @@ -43,8 +57,8 @@ if get_option('lib_type') == 'shared' libsimdsort = shared_library('x86simdsortcpp', 'lib/x86simdsort.cpp', include_directories : [src, utils, lib], - link_args : [openmpflags], link_with : [libtargets], + dependencies: [omp], gnu_symbol_visibility : 'inlineshidden', install : true, soversion : 1, @@ -53,7 +67,7 @@ else libsimdsort = static_library('x86simdsortcpp', 'lib/x86simdsort.cpp', include_directories : [src, utils, lib], - link_args : [openmpflags], + dependencies: [omp], link_with : [libtargets], gnu_symbol_visibility : 'inlineshidden', install : true, diff --git a/meson_options.txt b/meson_options.txt index 6edeb4e..43feadd 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -8,6 +8,8 @@ option('build_vqsortbench', type : 'boolean', value : true, description : 'Add google vqsort to benchmarks (default: "true").') option('use_openmp', type : 'boolean', value : false, description : 'Use OpenMP to accelerate key-value sort (default: "false").') +option('use_stdthreads', type : 'boolean', value : false, + description : 'Use std::threads to accelerate qsort (default: "false").') option('lib_type', type : 'string', value : 'shared', description : 'Library type: shared or static (default: "shared").') option('fatal_sanitizers', type : 'boolean', value : 'false', diff --git a/tests/meson.build b/tests/meson.build index b070bcc..e1132b0 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -1,9 +1,5 @@ libtests = [] -if get_option('use_openmp') - openmpflags = ['-DXSS_USE_OPENMP=true'] -endif - # Add compile flags when needed for the ASAN CI run testargs = [] if get_option('asan_ci_dont_validate') @@ -16,21 +12,21 @@ endif libtests += static_library('tests_qsort', files('test-qsort.cpp', ), - dependencies: gtest_dep, + dependencies: [gtest_dep, omp], include_directories : [src, lib, utils], - cpp_args : [testargs, openmpflags], + cpp_args : [testargs, stdthreadsflag], ) libtests += static_library('tests_kvsort', files('test-keyvalue.cpp', ), - dependencies: gtest_dep, + dependencies: [gtest_dep, omp], include_directories : [src, lib, utils], - cpp_args : [testargs, openmpflags], + cpp_args : [testargs, stdthreadsflag], ) libtests += static_library('tests_objsort', files('test-objqsort.cpp', ), - dependencies: gtest_dep, + dependencies: [gtest_dep, omp], include_directories : [src, lib, utils], - cpp_args : [testargs, openmpflags], + cpp_args : [testargs, stdthreadsflag], ) From 177f70cbce8668a4af40c37182381f59e27d7877 Mon Sep 17 00:00:00 2001 From: Raghuveer Devulapalli Date: Thu, 8 May 2025 12:22:43 -0700 Subject: [PATCH 3/9] update macro in test-qsort --- tests/test-qsort.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test-qsort.cpp b/tests/test-qsort.cpp index f2ce3a6..903f1df 100644 --- a/tests/test-qsort.cpp +++ b/tests/test-qsort.cpp @@ -12,8 +12,7 @@ class simdsort : public ::testing::Test { { std::iota(arrsize.begin(), arrsize.end(), 0); std::iota(arrsize_long.begin(), arrsize_long.end(), 0); -#ifdef XSS_USE_OPENMP - // These extended tests are only needed for the OpenMP logic +#if defined(XSS_BUILD_WITH_STD_THREADS) arrsize_long.push_back(10'000); arrsize_long.push_back(100'000); arrsize_long.push_back(1'000'000); From c1b66a6ba525a2ae9b8e92c5b38ae4a9fd1d8d41 Mon Sep 17 00:00:00 2001 From: Raghuveer Devulapalli Date: Thu, 8 May 2025 12:39:46 -0700 Subject: [PATCH 4/9] Update benchmarking scripts to use new meson flag --- scripts/bench-compare.sh | 2 +- scripts/branch-compare.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/bench-compare.sh b/scripts/bench-compare.sh index a224acd..d25a350 100755 --- a/scripts/bench-compare.sh +++ b/scripts/bench-compare.sh @@ -11,7 +11,7 @@ if [ ! -d .bench/google-benchmark ]; then fi compare=$(realpath .bench/google-benchmark/tools/compare.py) -meson setup -Dbuild_benchmarks=true -Dbuild_ippbench=true --warnlevel 0 --buildtype release builddir-${branch} +meson setup -Dbuild_benchmarks=true -Duse_stdthreads=true -Duse_openmp=true --warnlevel 0 --buildtype release builddir-${branch} cd builddir-${branch} ninja $compare filters ./benchexe $1 $2 --benchmark_repetitions=$3 diff --git a/scripts/branch-compare.sh b/scripts/branch-compare.sh index 0d5057f..90fa99c 100755 --- a/scripts/branch-compare.sh +++ b/scripts/branch-compare.sh @@ -27,7 +27,7 @@ build_branch() { fi fi cd $dir_name - meson setup -Dbuild_benchmarks=true -Duse_openmp=true --warnlevel 0 --buildtype release builddir + meson setup -Dbuild_benchmarks=true -Duse_openmp=true -Duse_stdthreads=true --warnlevel 0 --buildtype release builddir cd builddir ninja cd ../../ From fd2fe8db577d34aa63d4fc4dad229d78076ebaf7 Mon Sep 17 00:00:00 2001 From: Raghuveer Devulapalli Date: Thu, 8 May 2025 15:10:03 -0700 Subject: [PATCH 5/9] CI: update CI to build and test new code --- .github/workflows/c-cpp.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index 73f0a0a..0995c74 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -235,7 +235,7 @@ jobs: CXX: g++-10 run: | make clean - meson setup -Dbuild_tests=true -Duse_openmp=true --warnlevel 2 --werror --buildtype release builddir + meson setup -Dbuild_tests=true -Duse_openmp=true --Duse_stdthreads=true --warnlevel 2 --werror --buildtype release builddir cd builddir ninja From a6ac4dfa588cf027a1fbe64dc8a73447b924ccec Mon Sep 17 00:00:00 2001 From: Raghuveer Devulapalli Date: Thu, 8 May 2025 15:11:41 -0700 Subject: [PATCH 6/9] Fix typo in the meson build option. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .github/workflows/c-cpp.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index 0995c74..30822f1 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -235,7 +235,7 @@ jobs: CXX: g++-10 run: | make clean - meson setup -Dbuild_tests=true -Duse_openmp=true --Duse_stdthreads=true --warnlevel 2 --werror --buildtype release builddir + meson setup -Dbuild_tests=true -Duse_openmp=true -Duse_stdthreads=true --warnlevel 2 --werror --buildtype release builddir cd builddir ninja From 07b5a6e888d480c0589f3449e16c125b7980a8fd Mon Sep 17 00:00:00 2001 From: Raghuveer Devulapalli Date: Fri, 9 May 2025 09:59:47 -0700 Subject: [PATCH 7/9] fix lint error --- src/xss-thread-pool.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/xss-thread-pool.hpp b/src/xss-thread-pool.hpp index b998be3..a1361b0 100644 --- a/src/xss-thread-pool.hpp +++ b/src/xss-thread-pool.hpp @@ -110,7 +110,6 @@ class ThreadPool { if (tasks.empty() && active_tasks == 0) { done_condition.notify_all(); } // lock is automatically released here } - }; // Wrapper for submitting tasks to the thread pool with automatic tracking From 6c46d712c834ce95f26dfce2faf180ea79bc598b Mon Sep 17 00:00:00 2001 From: Raghuveer Devulapalli Date: Fri, 9 May 2025 10:39:26 -0700 Subject: [PATCH 8/9] Move threadpool under xss:tp:: namespace --- src/avx512-16bit-qsort.hpp | 2 +- src/xss-common-qsort.h | 62 ++++++------ src/xss-thread-pool.hpp | 187 ++++++++++++++++++++----------------- 3 files changed, 135 insertions(+), 116 deletions(-) diff --git a/src/avx512-16bit-qsort.hpp b/src/avx512-16bit-qsort.hpp index 5ac0416..d428d0a 100644 --- a/src/avx512-16bit-qsort.hpp +++ b/src/avx512-16bit-qsort.hpp @@ -564,7 +564,7 @@ avx512_qsort_fp16_helper(uint16_t *arr, arrsize_t arrsize) arrsize_t task_threshold = std::max((arrsize_t)100000, arrsize / 100); // Create a thread pool - ThreadPool pool(thread_count); + xss::tp::ThreadPool pool(thread_count); // Initial sort task qsort_threads(arr, diff --git a/src/xss-common-qsort.h b/src/xss-common-qsort.h index ebb3b29..7bd7c39 100644 --- a/src/xss-common-qsort.h +++ b/src/xss-common-qsort.h @@ -588,7 +588,7 @@ static void qsort_threads(type_t *arr, arrsize_t right, arrsize_t max_iters, arrsize_t task_threshold, - ThreadPool &thread_pool) + xss::tp::ThreadPool &thread_pool) { /* * Resort to std::sort if quicksort isn't making any progress @@ -629,20 +629,21 @@ static void qsort_threads(type_t *arr, if (pivot != leftmostValue) { bool parallel_left = (pivot_index - left) > task_threshold; if (parallel_left) { - submit_task(thread_pool, - [arr, - left, - pivot_index, - max_iters, - task_threshold, - &thread_pool]() { - qsort_threads(arr, - left, - pivot_index - 1, - max_iters - 1, - task_threshold, - thread_pool); - }); + xss::tp::submit_task(thread_pool, + [arr, + left, + pivot_index, + max_iters, + task_threshold, + &thread_pool]() { + qsort_threads( + arr, + left, + pivot_index - 1, + max_iters - 1, + task_threshold, + thread_pool); + }); } else { qsort_threads(arr, @@ -658,20 +659,21 @@ static void qsort_threads(type_t *arr, if (pivot != rightmostValue) { bool parallel_right = (right - pivot_index) > task_threshold; if (parallel_right) { - submit_task(thread_pool, - [arr, - pivot_index, - right, - max_iters, - task_threshold, - &thread_pool]() { - qsort_threads(arr, - pivot_index, - right, - max_iters - 1, - task_threshold, - thread_pool); - }); + xss::tp::submit_task(thread_pool, + [arr, + pivot_index, + right, + max_iters, + task_threshold, + &thread_pool]() { + qsort_threads( + arr, + pivot_index, + right, + max_iters - 1, + task_threshold, + thread_pool); + }); } else { qsort_threads(arr, @@ -764,7 +766,7 @@ X86_SIMD_SORT_INLINE void xss_qsort(T *arr, arrsize_t arrsize, bool hasnan) = std::max((arrsize_t)100000, arrsize / 100); // Create a thread pool - ThreadPool pool(thread_count); + xss::tp::ThreadPool pool(thread_count); // Initial sort task qsort_threads(arr, diff --git a/src/xss-thread-pool.hpp b/src/xss-thread-pool.hpp index a1361b0..a8e10e7 100644 --- a/src/xss-thread-pool.hpp +++ b/src/xss-thread-pool.hpp @@ -16,7 +16,10 @@ #include #include -/* +namespace xss { +namespace tp { + + /* * ThreadPool class and doc: Generated by copilot * This thread pool implementation is a simple and efficient way to manage a * pool of threads for executing tasks concurrently. It uses a std::queue to store @@ -25,102 +28,116 @@ * execute it. The thread pool can be stopped gracefully, and it also provides * a way to wait for all tasks to complete before stopping. * */ -class ThreadPool { -private: - std::vector workers; - std::queue> tasks; - std::mutex queue_mutex; - std::condition_variable condition; // Condition variable for task queue - std::condition_variable done_condition; // Condition variable for waiting - int active_tasks {0}; - bool stop; + class ThreadPool { + private: + std::vector workers; + std::queue> tasks; + std::mutex queue_mutex; + std::condition_variable condition; // Condition variable for task queue + std::condition_variable + done_condition; // Condition variable for waiting + int active_tasks {0}; + bool stop; -public: - ThreadPool(size_t num_threads) : stop(false) - { - for (size_t i = 0; i < num_threads; ++i) { - // Create a worker thread and add it to the pool - // Each thread will run a lambda function that waits for tasks - workers.emplace_back([this] { - while (true) { - // Lock the queue mutex and wait for a task to be available - std::unique_lock lock(queue_mutex); - // Wait until there is a task or the pool is stopped - condition.wait(lock, - [this] { return stop || !tasks.empty(); }); + public: + ThreadPool(size_t num_threads) : stop(false) + { + for (size_t i = 0; i < num_threads; ++i) { + // Create a worker thread and add it to the pool + // Each thread will run a lambda function that waits for tasks + workers.emplace_back([this] { + while (true) { + // Lock the queue mutex and wait for a task to be available + std::unique_lock lock(queue_mutex); + // Wait until there is a task or the pool is stopped + condition.wait(lock, [this] { + return stop || !tasks.empty(); + }); - // Check if we need to terminate the thread - if (stop && tasks.empty()) { return; } + // Check if we need to terminate the thread + if (stop && tasks.empty()) { return; } - // Extract the next task from the queue - auto task = std::move(tasks.front()); - tasks.pop(); - // Unlock the mutex before executing the task - lock.unlock(); - // Execute the task: - task(); - } - }); + // Extract the next task from the queue + auto task = std::move(tasks.front()); + tasks.pop(); + // Unlock the mutex before executing the task + lock.unlock(); + // Execute the task: + task(); + } + }); + } } - } - template - void enqueue(F &&func) - { - // Add a new task to the queue and notify one of the worker threads - std::unique_lock lock(queue_mutex); - tasks.emplace(std::forward(func)); - condition.notify_one(); - } + template + void enqueue(F &&func) + { + // Add a new task to the queue and notify one of the worker threads + std::unique_lock lock(queue_mutex); + tasks.emplace(std::forward(func)); + condition.notify_one(); + } - ~ThreadPool() - { - // Stop the thread pool and join all threads - std::unique_lock lock(queue_mutex); - stop = true; - lock.unlock(); - condition.notify_all(); - for (std::thread &worker : workers) { - worker.join(); + ~ThreadPool() + { + // Stop the thread pool and join all threads + std::unique_lock lock(queue_mutex); + stop = true; + lock.unlock(); + condition.notify_all(); + for (std::thread &worker : workers) { + worker.join(); + } } - } - // Wait for all tasks to complete before stopping the pool - void wait_all() - { - std::unique_lock lock(queue_mutex); - done_condition.wait( - lock, [this] { return tasks.empty() && (active_tasks == 0); }); - // lock is automatically released here - } + // Wait for all tasks to complete before stopping the pool + void wait_all() + { + std::unique_lock lock(queue_mutex); + done_condition.wait(lock, [this] { + return tasks.empty() && (active_tasks == 0); + }); + // lock is automatically released here + } - // Track the number of active tasks - void task_start() - { - std::unique_lock lock(queue_mutex); - active_tasks++; - // lock is automatically released here - } + // Track the number of active tasks + void task_start() + { + std::unique_lock lock(queue_mutex); + active_tasks++; + // lock is automatically released here + } + + // Decrement the active task count and notify if all tasks are done + void task_end() + { + std::unique_lock lock(queue_mutex); + active_tasks--; + if (tasks.empty() && active_tasks == 0) { + done_condition.notify_all(); + } + // lock is automatically released here + } + }; - // Decrement the active task count and notify if all tasks are done - void task_end() + // Wrapper for submitting tasks to the thread pool with automatic tracking + template + void submit_task(ThreadPool &pool, F &&f) { - std::unique_lock lock(queue_mutex); - active_tasks--; - if (tasks.empty() && active_tasks == 0) { done_condition.notify_all(); } - // lock is automatically released here + pool.task_start(); + pool.enqueue([f = std::forward(f), &pool]() { + try { + f(); + } catch (...) { + // Ensure task_end is called even if the task throws an exception + pool.task_end(); + throw; // Re-throw the exception + } + pool.task_end(); + }); } -}; -// Wrapper for submitting tasks to the thread pool with automatic tracking -template -void submit_task(ThreadPool &pool, F &&f) -{ - pool.task_start(); - pool.enqueue([f = std::forward(f), &pool]() { - f(); - pool.task_end(); - }); -} +} // namespace tp +} // namespace xss #endif // XSS_THREAD_POOL From 568cd7c4c578580602cabe9de7d04b12957c5029 Mon Sep 17 00:00:00 2001 From: Raghuveer Devulapalli Date: Mon, 12 May 2025 10:31:52 -0700 Subject: [PATCH 9/9] Minor comments based on review (1) Add meson flag to build with std threads to ASAN tests (2) Remove unnecessary comments --- .github/workflows/c-cpp.yml | 4 ++-- src/xss-common-qsort.h | 10 ++++------ src/xss-thread-pool.hpp | 7 ++----- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index 30822f1..2686e5e 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -165,7 +165,7 @@ jobs: CXX: clang++-18 run: | make clean - meson setup -Dbuild_tests=true -Duse_openmp=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Dasan_ci_dont_validate=true -Db_lundef=false --warnlevel 0 --buildtype release builddir + meson setup -Dbuild_tests=true -Duse_openmp=true -Duse_stdthreads=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Dasan_ci_dont_validate=true -Db_lundef=false --warnlevel 0 --buildtype release builddir cd builddir ninja @@ -202,7 +202,7 @@ jobs: CXX: clang++-18 run: | make clean - meson setup -Dbuild_tests=true -Duse_openmp=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Dasan_ci_dont_validate=true -Db_lundef=false --warnlevel 0 --buildtype release builddir + meson setup -Dbuild_tests=true -Duse_openmp=true -Duse_stdthreads=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Dasan_ci_dont_validate=true -Db_lundef=false --warnlevel 0 --buildtype release builddir cd builddir ninja diff --git a/src/xss-common-qsort.h b/src/xss-common-qsort.h index 7bd7c39..7c1a889 100644 --- a/src/xss-common-qsort.h +++ b/src/xss-common-qsort.h @@ -752,12 +752,7 @@ X86_SIMD_SORT_INLINE void xss_qsort(T *arr, arrsize_t arrsize, bool hasnan) #ifdef XSS_BUILD_WITH_STD_THREADS bool use_parallel = arrsize > 100000; -#else - bool use_parallel = false; -#endif - if (use_parallel) { -#ifdef XSS_BUILD_WITH_STD_THREADS // This thread limit was determined experimentally constexpr int thread_limit = 8; int thread_count = std::min( @@ -777,13 +772,16 @@ X86_SIMD_SORT_INLINE void xss_qsort(T *arr, arrsize_t arrsize, bool hasnan) pool); // Wait for all tasks to complete pool.wait_all(); -#endif } else { // For small arrays, just use the sequential version qsort_( arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0); } +#else + qsort_( + arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0); +#endif // XSS_BUILD_WITH_STD_THREADS replace_inf_with_nan(arr, arrsize, nan_count, descending); } diff --git a/src/xss-thread-pool.hpp b/src/xss-thread-pool.hpp index a8e10e7..67932ae 100644 --- a/src/xss-thread-pool.hpp +++ b/src/xss-thread-pool.hpp @@ -37,10 +37,10 @@ namespace tp { std::condition_variable done_condition; // Condition variable for waiting int active_tasks {0}; - bool stop; + bool stop {false}; public: - ThreadPool(size_t num_threads) : stop(false) + ThreadPool(size_t num_threads) { for (size_t i = 0; i < num_threads; ++i) { // Create a worker thread and add it to the pool @@ -97,7 +97,6 @@ namespace tp { done_condition.wait(lock, [this] { return tasks.empty() && (active_tasks == 0); }); - // lock is automatically released here } // Track the number of active tasks @@ -105,7 +104,6 @@ namespace tp { { std::unique_lock lock(queue_mutex); active_tasks++; - // lock is automatically released here } // Decrement the active task count and notify if all tasks are done @@ -116,7 +114,6 @@ namespace tp { if (tasks.empty() && active_tasks == 0) { done_condition.notify_all(); } - // lock is automatically released here } };