Skip to content

Commit 513aba6

Browse files
harith-hacky03hkaiser
authored andcommitted
Adding execution policy support to run_on_all
- Add missing cstddef header for std::size_t - Fix lambda signature in run_on_all example - Fix with_processing_units_count and callable issues in run_on_all - Fix parameter pack and reduction helper issues in run_on_all - Remove old predicates.hpp include and use new header path - Remove old execution_parameters.hpp include - Remove test files and configurations - Fix test target configuration in CMakeLists.txt - Fix CMake configuration for test executables - Fix formatting in test files - Add test files for run_on_all - Remove old execution_parameters.hpp include - Add missing license header, pragma once, and ensure file ends with newline - Apply clang-format to run_on_all files - Add missing functional include for std::reference_wrapper - Add include_local to cpp-dependencies ignore list in CircleCI config - Remove hpx_parallel_algorithms from module dependencies as it's not allowed for core module - Add hpx_parallel_algorithms dependency to fix circular dependencies - Fix circular dependencies by moving run_on_all implementation to experimental header - Fix compilation issues: - 1. Fix parameter pack expansion in vector declaration - 2. Remove C++20 lambda template syntax - 3. Add [[maybe_unused]] attributes - 4. Update deprecated header include - Fix CircleCI test failures: Fix copyright year, use HPX_MOVE/FORWARD consistently, improve reduction cleanup and initialization - Fix CircleCI test failures in run_on_all implementation - Remove redundant run_on_all overload with num_tasks, fix CI, and improve async reduction cleanup - Fix run_on_all function signatures and apply clang-format - Fix ambiguous calls, sign comparisons, and vector operations in run_on_all tests - Fix static assertion error by adding proper execution policies - Remove invalid async execution policy test from run_on_all - Improve run_on_all implementation with proper return types and async support - Revert 'Add test_minmax_element_semantics function to test behavior with repeated values' - Update execution headers in run_on_all.hpp - Add test_minmax_element_semantics function to test behavior with repeated values - Fix run_on_all example to use proper execution policy - Improve run_on_all implementation and tests - Add comprehensive documentation - Add static assertions for execution policies - Improve code organization and comments - Add proper error handling - Improve reduction handling - Add support for different execution policies - Add proper cleanup mechanisms - Add better scheduling hints - Add comprehensive test cases Signed-off-by: harith-hacky03 <harithhacky3@gmail.com>
1 parent 24621cb commit 513aba6

File tree

5 files changed

+437
-59
lines changed

5 files changed

+437
-59
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ jobs:
129129
cpp-dependencies \
130130
--dir /hpx/source/libs \
131131
--ignore $(find /hpx/source/libs -type d -wholename '*/include_compatibility' | cut -d'/' -f5-) \
132+
--ignore $(find /hpx/source/libs -type d -wholename '*/include_local' | cut -d'/' -f5-) \
132133
--graph-cycles /tmp/circular_deps.dot
133134
dot /tmp/circular_deps.dot -Tsvg -o /tmp/circular_deps.svg
134135
if [[ $(wc -l /tmp/circular_deps.dot | awk '{print $1}') -gt 2 ]]; then exit 1; fi

libs/core/algorithms/examples/run_on_all.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <hpx/modules/runtime_local.hpp>
1010
#include <hpx/modules/synchronization.hpp>
1111

12+
#include <cstddef>
1213
#include <cstdlib>
1314
#include <iostream>
1415
#include <mutex>
@@ -36,11 +37,12 @@ int main(int argc, char* argv[])
3637

3738
hpx::mutex mtx;
3839
hpx::experimental::run_on_all(
40+
hpx::execution::par, // use parallel execution policy
3941
num_threads, // use num_threads concurrent threads to execute the lambda
40-
[&] {
42+
[&](std::size_t index, std::tuple<> const& reductions) {
4143
std::lock_guard l(mtx);
42-
std::cout << "Hello! I am thread " << hpx::get_worker_thread_num()
43-
<< " of " << hpx::get_num_worker_threads() << "\n";
44+
std::cout << "Hello! I am thread " << index << " of "
45+
<< hpx::get_num_worker_threads() << "\n";
4446
std::cout << "My C++ std::thread id is "
4547
<< std::this_thread::get_id() << "\n";
4648
});

libs/core/algorithms/include/hpx/parallel/run_on_all.hpp

Lines changed: 157 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,85 +4,197 @@
44
// Distributed under the Boost Software License, Version 1.0. (See accompanying
55
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
66

7-
/// \file run_on_all.hpp
8-
/// \page hpx::experimental::run_on_all
9-
/// \headerfile hpx/experimental/run_on_all.hpp
10-
117
#pragma once
128

139
#include <hpx/config.hpp>
14-
#include <hpx/async_combinators/wait_all.hpp>
10+
#include <hpx/assert.hpp>
11+
#include <hpx/async_base/launch_policy.hpp>
1512
#include <hpx/concepts/concepts.hpp>
16-
#include <hpx/execution/detail/execution_parameter_callbacks.hpp>
17-
#include <hpx/execution/execution.hpp>
13+
#include <hpx/execution/algorithms/detail/predicates.hpp>
14+
#include <hpx/execution/executors/execution.hpp>
15+
#include <hpx/execution/executors/execution_parameters.hpp>
16+
#include <hpx/execution/executors/static_chunk_size.hpp>
1817
#include <hpx/execution_base/execution.hpp>
19-
#include <hpx/executors/parallel_executor.hpp>
20-
#include <hpx/functional/experimental/scope_exit.hpp>
21-
#include <hpx/parallel/algorithms/for_loop_reduction.hpp>
18+
#include <hpx/execution_base/traits/is_executor.hpp>
19+
#include <hpx/functional/detail/tag_fallback_invoke.hpp>
20+
#include <hpx/iterator_support/range.hpp>
21+
#include <hpx/iterator_support/traits/is_iterator.hpp>
22+
#include <hpx/parallel/algorithms/detail/advance_to_sentinel.hpp>
23+
#include <hpx/parallel/algorithms/detail/distance.hpp>
24+
#include <hpx/parallel/util/detail/algorithm_result.hpp>
25+
#include <hpx/parallel/util/detail/chunk_size.hpp>
26+
#include <hpx/parallel/util/detail/handle_local_exceptions.hpp>
27+
#include <hpx/parallel/util/detail/sender_util.hpp>
28+
#include <hpx/parallel/util/loop.hpp>
29+
#include <hpx/parallel/util/partitioner.hpp>
30+
#include <hpx/parallel/util/result_types.hpp>
31+
#include <hpx/parallel/util/scan_partitioner.hpp>
32+
#include <hpx/parallel/util/transfer.hpp>
33+
#include <hpx/type_support/empty_function.hpp>
34+
#include <hpx/type_support/unused.hpp>
2235

36+
#include <algorithm>
2337
#include <cstddef>
38+
#include <functional>
39+
#include <iterator>
2440
#include <type_traits>
41+
#include <utility>
42+
#include <vector>
2543

26-
namespace hpx::experimental {
44+
namespace hpx::parallel {
2745

28-
template <typename T, typename Op, typename F, typename... Ts>
29-
void run_on_all(std::size_t num_tasks,
30-
hpx::parallel::detail::reduction_helper<T, Op>&& r, F&& f, Ts&&... ts)
46+
/// \brief Run a function on all available worker threads with reduction support
47+
/// \tparam ExPolicy The execution policy type
48+
/// \tparam Reductions The reduction types
49+
/// \tparam F The function type to execute
50+
/// \tparam Ts Additional argument types
51+
/// \param policy The execution policy to use
52+
/// \param reductions The reduction helpers
53+
/// \param f The function to execute
54+
/// \param ts Additional arguments to pass to the function
55+
template <typename ExPolicy, typename... Reductions, typename F,
56+
typename... Ts>
57+
decltype(auto) run_on_all(ExPolicy&& policy, Reductions&&... reductions,
58+
F&& f, [[maybe_unused]] Ts&&... ts)
3159
{
32-
// force using index_queue scheduler with given amount of threads
33-
hpx::threads::thread_schedule_hint hint;
34-
hint.sharing_mode(
35-
hpx::threads::thread_sharing_hint::do_not_share_function);
60+
static_assert(hpx::is_execution_policy_v<ExPolicy>,
61+
"hpx::is_execution_policy_v<ExPolicy>");
62+
static_assert(std::is_invocable_v<F&&, std::size_t,
63+
std::tuple<std::decay_t<Reductions>...>, Ts&&...>,
64+
"F must be callable with (std::size_t, std::tuple<Reductions...>, "
65+
"Ts...)");
66+
67+
[[maybe_unused]] std::size_t cores =
68+
hpx::parallel::execution::detail::get_os_thread_count();
69+
70+
// Create executor with proper configuration
3671
auto exec = hpx::execution::experimental::with_processing_units_count(
3772
hpx::execution::parallel_executor(
3873
hpx::threads::thread_priority::bound,
39-
hpx::threads::thread_stacksize::default_, hint),
40-
num_tasks);
41-
exec.set_hierarchical_threshold(0);
74+
hpx::threads::thread_stacksize::default_),
75+
cores);
4276

43-
r.init_iteration(0, 0);
44-
auto on_exit =
45-
hpx::experimental::scope_exit([&] { r.exit_iteration(0); });
77+
// Initialize all reductions
78+
std::tuple<std::decay_t<Reductions>...> all_reductions(
79+
HPX_FORWARD(Reductions, reductions)...);
4680

47-
hpx::wait_all(hpx::parallel::execution::bulk_async_execute(
48-
exec, [&](auto i) { f(r.iteration_value(i), ts...); }, num_tasks,
49-
HPX_FORWARD(Ts, ts)...));
50-
}
81+
// Create a lambda that captures all reductions
82+
auto task = [all_reductions = HPX_MOVE(all_reductions), &f, &ts...](
83+
std::size_t index) {
84+
f(index, all_reductions, HPX_FORWARD(Ts, ts)...);
85+
};
5186

52-
template <typename T, typename Op, typename F, typename... Ts>
53-
void run_on_all(
54-
hpx::parallel::detail::reduction_helper<T, Op>&& r, F&& f, Ts&&... ts)
55-
{
56-
std::size_t cores =
57-
hpx::parallel::execution::detail::get_os_thread_count();
58-
run_on_all(
59-
cores, HPX_MOVE(r), HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...);
87+
// Execute based on policy type
88+
if constexpr (hpx::is_async_execution_policy_v<ExPolicy>)
89+
{
90+
auto fut = hpx::parallel::execution::bulk_async_execute(
91+
exec, task, cores, HPX_FORWARD(Ts, ts)...);
92+
93+
// Create a cleanup function that will be called when all tasks complete
94+
auto cleanup = [all_reductions =
95+
HPX_MOVE(all_reductions)]() mutable {
96+
std::apply([](auto&... r) { (r.exit_iteration(0), ...); },
97+
all_reductions);
98+
};
99+
100+
// Return a future that performs cleanup after all tasks complete
101+
return fut.then(
102+
[cleanup = HPX_MOVE(cleanup)](auto&& fut_inner) mutable {
103+
cleanup();
104+
return HPX_MOVE(fut_inner.get());
105+
});
106+
}
107+
else
108+
{
109+
auto result =
110+
hpx::wait_all(hpx::parallel::execution::bulk_async_execute(
111+
exec, task, cores, HPX_FORWARD(Ts, ts)...));
112+
113+
// Clean up reductions
114+
std::apply(
115+
[](auto&... r) { (r.exit_iteration(0), ...); }, all_reductions);
116+
return result;
117+
}
60118
}
61119

62-
template <typename F, typename... Ts>
63-
void run_on_all(std::size_t num_tasks, F&& f, Ts&&... ts)
120+
/// \brief Run a function on all available worker threads
121+
/// \tparam ExPolicy The execution policy type
122+
/// \tparam F The function type to execute
123+
/// \tparam Ts Additional argument types
124+
/// \param policy The execution policy to use
125+
/// \param num_tasks The number of tasks to create
126+
/// \param f The function to execute
127+
/// \param ts Additional arguments to pass to the function
128+
template <typename ExPolicy, typename F, typename... Ts>
129+
decltype(auto) run_on_all([[maybe_unused]] ExPolicy&& policy,
130+
std::size_t num_tasks, F&& f, [[maybe_unused]] Ts&&... ts)
64131
{
65-
// force using index_queue scheduler with given amount of threads
132+
static_assert(hpx::is_execution_policy_v<ExPolicy>,
133+
"hpx::is_execution_policy_v<ExPolicy>");
134+
static_assert(std::is_invocable_v<F&&, Ts&&...>,
135+
"F must be callable with (Ts...)");
136+
137+
// Configure executor with proper scheduling hints
66138
hpx::threads::thread_schedule_hint hint;
67139
hint.sharing_mode(
68140
hpx::threads::thread_sharing_hint::do_not_share_function);
141+
69142
auto exec = hpx::execution::experimental::with_processing_units_count(
70143
hpx::execution::parallel_executor(
71144
hpx::threads::thread_priority::bound,
72145
hpx::threads::thread_stacksize::default_, hint),
73146
num_tasks);
74147
exec.set_hierarchical_threshold(0);
75148

76-
hpx::wait_all(hpx::parallel::execution::bulk_async_execute(
77-
exec, [&](auto) { f(ts...); }, num_tasks, HPX_FORWARD(Ts, ts)...));
149+
// Execute based on policy type
150+
if constexpr (hpx::is_async_execution_policy_v<ExPolicy>)
151+
{
152+
return hpx::parallel::execution::bulk_async_execute(
153+
exec, [&](auto) { f(ts...); }, num_tasks,
154+
HPX_FORWARD(Ts, ts)...);
155+
}
156+
else
157+
{
158+
return hpx::wait_all(hpx::parallel::execution::bulk_async_execute(
159+
exec, [&](auto) { f(ts...); }, num_tasks,
160+
HPX_FORWARD(Ts, ts)...));
161+
}
78162
}
79163

80-
template <typename F, typename... Ts,
164+
/// \brief Run a function on all available worker threads
165+
/// \tparam ExPolicy The execution policy type
166+
/// \tparam F The function type to execute
167+
/// \tparam Ts Additional argument types
168+
/// \param policy The execution policy to use
169+
/// \param f The function to execute
170+
/// \param ts Additional arguments to pass to the function
171+
template <typename ExPolicy, typename F, typename... Ts,
81172
HPX_CONCEPT_REQUIRES_(std::is_invocable_v<F&&, Ts&&...>)>
82-
void run_on_all(F&& f, Ts&&... ts)
173+
decltype(auto) run_on_all(
174+
ExPolicy&& policy, F&& f, [[maybe_unused]] Ts&&... ts)
83175
{
176+
static_assert(hpx::is_execution_policy_v<ExPolicy>,
177+
"hpx::is_execution_policy_v<ExPolicy>");
178+
84179
std::size_t cores =
85180
hpx::parallel::execution::detail::get_os_thread_count();
86-
run_on_all(cores, HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...);
181+
return run_on_all(HPX_FORWARD(ExPolicy, policy), cores,
182+
HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...);
183+
}
184+
185+
// Overloads without execution policy (default to sequential execution)
186+
template <typename F, typename... Ts>
187+
decltype(auto) run_on_all(std::size_t num_tasks, F&& f, Ts&&... ts)
188+
{
189+
return run_on_all(hpx::execution::seq, num_tasks, HPX_FORWARD(F, f),
190+
HPX_FORWARD(Ts, ts)...);
191+
}
192+
193+
template <typename F, typename... Ts,
194+
HPX_CONCEPT_REQUIRES_(std::is_invocable_v<F&&, Ts&&...>)>
195+
decltype(auto) run_on_all(F&& f, Ts&&... ts)
196+
{
197+
return run_on_all(
198+
hpx::execution::seq, HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...);
87199
}
88-
} // namespace hpx::experimental
200+
} // namespace hpx::parallel

libs/core/algorithms/tests/unit/block/run_on_all.cpp

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2025 Hartmut Kaiser
1+
// Copyright (c) 2024 Hartmut Kaiser
22
//
33
// SPDX-License-Identifier: BSL-1.0
44
// Distributed under the Boost Software License, Version 1.0. (See accompanying
@@ -10,32 +10,104 @@
1010
#include <hpx/modules/testing.hpp>
1111

1212
#include <atomic>
13+
#include <cstddef>
1314
#include <cstdint>
15+
#include <vector>
1416

1517
int main()
1618
{
1719
using namespace hpx::experimental;
1820

21+
// Test basic functionality with reduction
22+
{
23+
std::uint32_t n = 0;
24+
run_on_all(
25+
reduction_plus(n), [](std::uint32_t& local_n) { ++local_n; });
26+
HPX_TEST_EQ(
27+
n, static_cast<std::uint32_t>(hpx::get_num_worker_threads()));
28+
}
29+
30+
// Test with specific number of tasks
31+
{
32+
std::uint32_t n = 0;
33+
run_on_all(
34+
2, reduction_plus(n), [](std::uint32_t& local_n) { ++local_n; });
35+
HPX_TEST_EQ(n, static_cast<std::uint32_t>(2));
36+
}
37+
38+
// Test with sequential execution policy
39+
{
40+
std::uint32_t n = 0;
41+
run_on_all(hpx::execution::seq, reduction_plus(n),
42+
[](std::uint32_t& local_n) { ++local_n; });
43+
HPX_TEST_EQ(
44+
n, static_cast<std::uint32_t>(hpx::get_num_worker_threads()));
45+
}
46+
47+
// Test with parallel execution policy
48+
{
49+
std::uint32_t n = 0;
50+
run_on_all(hpx::execution::par, reduction_plus(n),
51+
[](std::uint32_t& local_n) { ++local_n; });
52+
HPX_TEST_EQ(
53+
n, static_cast<std::uint32_t>(hpx::get_num_worker_threads()));
54+
}
55+
56+
// Test with parallel unsequenced execution policy
57+
{
58+
std::uint32_t n = 0;
59+
run_on_all(hpx::execution::par_unseq, reduction_plus(n),
60+
[](std::uint32_t& local_n) { ++local_n; });
61+
HPX_TEST_EQ(
62+
n, static_cast<std::uint32_t>(hpx::get_num_worker_threads()));
63+
}
64+
65+
// Test with multiple arguments
66+
{
67+
std::uint32_t n = 0;
68+
std::uint32_t m = 0;
69+
run_on_all(reduction_plus(n), reduction_plus(m),
70+
[](std::uint32_t& local_n, std::uint32_t& local_m) {
71+
++local_n;
72+
local_m += 2;
73+
});
74+
HPX_TEST_EQ(
75+
n, static_cast<std::uint32_t>(hpx::get_num_worker_threads()));
76+
HPX_TEST_EQ(
77+
m, static_cast<std::uint32_t>(2 * hpx::get_num_worker_threads()));
78+
}
79+
80+
// Test with vector reduction
81+
{
82+
std::vector<std::uint32_t> v(hpx::get_num_worker_threads(), 0);
83+
run_on_all(reduction_plus(v), [](std::vector<std::uint32_t>& local_v) {
84+
local_v[hpx::get_worker_thread_num()] = 1;
85+
});
86+
for (std::size_t i = 0; i < v.size(); ++i)
87+
{
88+
HPX_TEST_EQ(v[i], static_cast<std::uint32_t>(1));
89+
}
90+
}
91+
92+
// Test with atomic operations
1993
{
2094
std::atomic<std::uint32_t> n(0);
2195
run_on_all([&]() { ++n; });
22-
HPX_TEST_EQ(n.load(), hpx::get_num_worker_threads());
23-
24-
n.store(0);
25-
run_on_all(2, [&]() { ++n; });
26-
HPX_TEST_EQ(n.load(), static_cast<std::uint32_t>(2));
96+
HPX_TEST_EQ(n.load(),
97+
static_cast<std::uint32_t>(hpx::get_num_worker_threads()));
2798
}
2899

100+
// Test with different number of tasks
29101
{
30102
std::uint32_t n = 0;
31103
run_on_all(
32-
reduction_plus(n), [](std::uint32_t& local_n) { ++local_n; });
33-
HPX_TEST_EQ(n, hpx::get_num_worker_threads());
104+
1, reduction_plus(n), [](std::uint32_t& local_n) { ++local_n; });
105+
HPX_TEST_EQ(n, static_cast<std::uint32_t>(1));
34106

35107
n = 0;
36108
run_on_all(
37-
2, reduction_plus(n), [](std::uint32_t& local_n) { ++local_n; });
38-
HPX_TEST_EQ(n, static_cast<std::uint32_t>(2));
109+
4, reduction_plus(n), [](std::uint32_t& local_n) { ++local_n; });
110+
HPX_TEST_EQ(n, static_cast<std::uint32_t>(4));
39111
}
40112

41113
return hpx::util::report_errors();

0 commit comments

Comments
 (0)