Skip to content

Commit 15afc8b

Browse files
authored
Merge pull request STEllAR-GROUP#6685 from harith-hacky03/feature/run_on_all
Improve run_on_all implementation and tests
2 parents 2d5dd14 + fb43bf2 commit 15afc8b

File tree

3 files changed

+268
-79
lines changed

3 files changed

+268
-79
lines changed

libs/core/algorithms/examples/run_on_all.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <hpx/experimental/run_on_all.hpp>
88
#include <hpx/hpx_main.hpp>
9+
#include <hpx/modules/executors.hpp>
910
#include <hpx/modules/runtime_local.hpp>
1011
#include <hpx/modules/synchronization.hpp>
1112

@@ -34,16 +35,19 @@ int main(int argc, char* argv[])
3435
<< hpx::get_num_worker_threads() << "\n";
3536
std::cout << delim;
3637

38+
// use parallel execution policy with num_threads concurrent threads to
39+
// execute the lambda
40+
auto policy = hpx::execution::experimental::with_processing_units_count(
41+
hpx::execution::par, num_threads);
42+
3743
hpx::mutex mtx;
38-
hpx::experimental::run_on_all(
39-
num_threads, // use num_threads concurrent threads to execute the lambda
40-
[&] {
41-
std::lock_guard l(mtx);
42-
std::cout << "Hello! I am thread " << hpx::get_worker_thread_num()
43-
<< " of " << hpx::get_num_worker_threads() << "\n";
44-
std::cout << "My C++ std::thread id is "
45-
<< std::this_thread::get_id() << "\n";
46-
});
44+
hpx::experimental::run_on_all(policy, [&] {
45+
std::lock_guard l(mtx);
46+
std::cout << "Hello! I am thread " << hpx::get_worker_thread_num()
47+
<< " of " << hpx::get_num_worker_threads() << "\n";
48+
std::cout << "My C++ std::thread id is " << std::this_thread::get_id()
49+
<< "\n";
50+
});
4751

4852
std::cout << delim;
4953

Lines changed: 122 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// Copyright (c) 2025 Harith Reddy
12
// Copyright (c) 2025 Hartmut Kaiser
23
//
34
// SPDX-License-Identifier: BSL-1.0
@@ -6,83 +7,146 @@
67

78
/// \file run_on_all.hpp
89
/// \page hpx::experimental::run_on_all
9-
/// \headerfile hpx/experimental/run_on_all.hpp
10+
/// \headerfile hpx/task_block.hpp
1011

1112
#pragma once
1213

1314
#include <hpx/config.hpp>
14-
#include <hpx/async_combinators/wait_all.hpp>
1515
#include <hpx/concepts/concepts.hpp>
16-
#include <hpx/execution/detail/execution_parameter_callbacks.hpp>
17-
#include <hpx/execution/execution.hpp>
16+
#include <hpx/execution/executors/execution.hpp>
17+
#include <hpx/execution/executors/execution_parameters.hpp>
1818
#include <hpx/execution_base/execution.hpp>
19+
#include <hpx/executors/execution_policy.hpp>
1920
#include <hpx/executors/parallel_executor.hpp>
20-
#include <hpx/functional/experimental/scope_exit.hpp>
2121
#include <hpx/parallel/algorithms/for_loop_reduction.hpp>
22+
#include <hpx/type_support/pack.hpp>
2223

2324
#include <cstddef>
25+
#include <memory>
26+
#include <tuple>
2427
#include <type_traits>
28+
#include <utility>
2529

2630
namespace hpx::experimental {
2731

28-
template <typename T, typename Op, typename F, typename... Ts>
29-
void run_on_all(std::size_t num_tasks,
30-
hpx::parallel::detail::reduction_helper<T, Op>&& r, F&& f, Ts&&... ts)
31-
{
32-
// force using index_queue scheduler with given amount of threads
33-
hpx::threads::thread_schedule_hint hint;
34-
hint.sharing_mode(
35-
hpx::threads::thread_sharing_hint::do_not_share_function);
36-
auto exec = hpx::execution::experimental::with_processing_units_count(
37-
hpx::execution::parallel_executor(
38-
hpx::threads::thread_priority::bound,
39-
hpx::threads::thread_stacksize::default_, hint),
40-
num_tasks);
41-
exec.set_hierarchical_threshold(0);
42-
43-
r.init_iteration(0, 0);
44-
auto on_exit =
45-
hpx::experimental::scope_exit([&] { r.exit_iteration(0); });
46-
47-
hpx::wait_all(hpx::parallel::execution::bulk_async_execute(
48-
exec, [&](auto i) { f(r.iteration_value(i), ts...); }, num_tasks,
49-
HPX_FORWARD(Ts, ts)...));
50-
}
32+
/// \cond NOINTERNAL
33+
namespace detail {
5134

52-
template <typename T, typename Op, typename F, typename... Ts>
53-
void run_on_all(
54-
hpx::parallel::detail::reduction_helper<T, Op>&& r, F&& f, Ts&&... ts)
55-
{
56-
std::size_t cores =
57-
hpx::parallel::execution::detail::get_os_thread_count();
58-
run_on_all(
59-
cores, HPX_MOVE(r), HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...);
60-
}
35+
template <typename ExPolicy, typename F, typename... Reductions>
36+
decltype(auto) run_on_all(
37+
ExPolicy&& policy, F&& f, Reductions&&... reductions)
38+
{
39+
// Create executor with proper configuration
40+
auto exec =
41+
hpx::execution::experimental::with_processing_units_count(
42+
hpx::execution::parallel_executor(
43+
hpx::threads::thread_priority::bound,
44+
hpx::threads::thread_stacksize::default_),
45+
hpx::execution::experimental::processing_units_count(
46+
policy.executor()));
47+
48+
auto cores =
49+
hpx::execution::experimental::processing_units_count(exec);
50+
51+
// Execute based on policy type
52+
if constexpr (hpx::is_async_execution_policy_v<ExPolicy>)
53+
{
54+
// Initialize all reductions
55+
auto all_reductions =
56+
std::make_tuple(HPX_FORWARD(Reductions, reductions)...);
57+
auto sp = std::make_shared<decltype(all_reductions)>(
58+
HPX_MOVE(all_reductions));
59+
60+
// Create a lambda that captures all reductions
61+
auto task = [sp, f = HPX_FORWARD(F, f)](std::size_t) {
62+
std::apply(
63+
[&](auto&... r) { f(r.iteration_value(0)...); }, *sp);
64+
};
65+
66+
auto fut = hpx::parallel::execution::bulk_async_execute(
67+
HPX_MOVE(exec), HPX_MOVE(task), cores);
68+
69+
// Return a future that performs cleanup after all tasks
70+
// complete
71+
return fut.then([sp = HPX_MOVE(sp)](auto&& fut_inner) mutable {
72+
std::apply(
73+
[](auto&... r) { (r.exit_iteration(0), ...); }, *sp);
74+
return fut_inner.get();
75+
});
76+
}
77+
else
78+
{
79+
// Initialize all reductions
80+
auto&& all_reductions = std::forward_as_tuple(
81+
HPX_FORWARD(Reductions, reductions)...);
82+
83+
// Create a lambda that captures all reductions
84+
auto task = [&all_reductions, &f](std::size_t) {
85+
std::apply([&](auto&... r) { f(r.iteration_value(0)...); },
86+
all_reductions);
87+
};
88+
89+
hpx::parallel::execution::bulk_sync_execute(
90+
HPX_MOVE(exec), HPX_MOVE(task), cores);
91+
92+
// Clean up reductions
93+
std::apply([](auto&... r) { (r.exit_iteration(0), ...); },
94+
all_reductions);
95+
}
96+
}
97+
98+
template <typename ExPolicy, std::size_t... Is, typename... Ts>
99+
decltype(auto) run_on_all(
100+
ExPolicy&& policy, hpx::util::index_pack<Is...>, Ts&&... ts)
101+
{
102+
auto&& t = std::forward_as_tuple(HPX_FORWARD(Ts, ts)...);
103+
auto f = std::get<sizeof...(Ts) - 1>(t);
104+
105+
return run_on_all(
106+
HPX_FORWARD(ExPolicy, policy), HPX_MOVE(f), std::get<Is>(t)...);
107+
}
108+
} // namespace detail
109+
/// \endcond
61110

62-
template <typename F, typename... Ts>
63-
void run_on_all(std::size_t num_tasks, F&& f, Ts&&... ts)
111+
/// Run a function on all available worker threads with reduction support
112+
/// using the given execution policy
113+
///
114+
/// \tparam ExPolicy The execution policy type
115+
/// \tparam T The first type in a list of reduction types and the
116+
/// function type to invoke (last argument)
117+
/// \tparam Ts The list of reduction types and the function type to
118+
/// invoke (last argument)
119+
/// \param policy The execution policy to use
120+
/// \param t The first in a list of reductions and the function to
121+
/// invoke (last argument)
122+
/// \param ts The list of reductions and the function to invoke (last
123+
/// argument)
124+
template <typename ExPolicy, typename T, typename... Ts,
125+
HPX_CONCEPT_REQUIRES_(hpx::is_execution_policy_v<ExPolicy>)>
126+
decltype(auto) run_on_all(ExPolicy&& policy, T&& t, Ts&&... ts)
64127
{
65-
// force using index_queue scheduler with given amount of threads
66-
hpx::threads::thread_schedule_hint hint;
67-
hint.sharing_mode(
68-
hpx::threads::thread_sharing_hint::do_not_share_function);
69-
auto exec = hpx::execution::experimental::with_processing_units_count(
70-
hpx::execution::parallel_executor(
71-
hpx::threads::thread_priority::bound,
72-
hpx::threads::thread_stacksize::default_, hint),
73-
num_tasks);
74-
exec.set_hierarchical_threshold(0);
75-
76-
hpx::wait_all(hpx::parallel::execution::bulk_async_execute(
77-
exec, [&](auto) { f(ts...); }, num_tasks, HPX_FORWARD(Ts, ts)...));
128+
return detail::run_on_all(HPX_FORWARD(ExPolicy, policy),
129+
hpx::util::make_index_pack_t<sizeof...(Ts)>(), HPX_FORWARD(T, t),
130+
HPX_FORWARD(Ts, ts)...);
78131
}
79132

80-
template <typename F, typename... Ts,
81-
HPX_CONCEPT_REQUIRES_(std::is_invocable_v<F&&, Ts&&...>)>
82-
void run_on_all(F&& f, Ts&&... ts)
133+
/// Run a function on all available worker threads with reduction support
134+
/// using the \a hpx::execution::par execution policy
135+
///
136+
/// \tparam T The first type in a list of reduction types and the
137+
/// function type to invoke (last argument)
138+
/// \tparam Ts The list of reduction types and the function type to
139+
/// invoke (last argument)
140+
/// \param t The first in a list of reductions and the function to
141+
/// invoke (last argument)
142+
/// \param ts The list of reductions and the function to invoke (last
143+
/// argument)
144+
template <typename T, typename... Ts,
145+
HPX_CONCEPT_REQUIRES_(!hpx::is_execution_policy_v<T>)>
146+
decltype(auto) run_on_all(T&& t, Ts&&... ts)
83147
{
84-
std::size_t cores =
85-
hpx::parallel::execution::detail::get_os_thread_count();
86-
run_on_all(cores, HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...);
148+
return detail::run_on_all(hpx::execution::par,
149+
hpx::util::make_index_pack_t<sizeof...(Ts)>(), HPX_FORWARD(T, t),
150+
HPX_FORWARD(Ts, ts)...);
87151
}
88152
} // namespace hpx::experimental

0 commit comments

Comments
 (0)