Skip to content

Commit fb43bf2

Browse files
committed
Simplify run_on_all API
Signed-off-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>
1 parent 513aba6 commit fb43bf2

File tree

5 files changed

+203
-392
lines changed

5 files changed

+203
-392
lines changed

.circleci/config.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ jobs:
129129
cpp-dependencies \
130130
--dir /hpx/source/libs \
131131
--ignore $(find /hpx/source/libs -type d -wholename '*/include_compatibility' | cut -d'/' -f5-) \
132-
--ignore $(find /hpx/source/libs -type d -wholename '*/include_local' | cut -d'/' -f5-) \
133132
--graph-cycles /tmp/circular_deps.dot
134133
dot /tmp/circular_deps.dot -Tsvg -o /tmp/circular_deps.svg
135134
if [[ $(wc -l /tmp/circular_deps.dot | awk '{print $1}') -gt 2 ]]; then exit 1; fi

libs/core/algorithms/examples/run_on_all.cpp

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66

77
#include <hpx/experimental/run_on_all.hpp>
88
#include <hpx/hpx_main.hpp>
9+
#include <hpx/modules/executors.hpp>
910
#include <hpx/modules/runtime_local.hpp>
1011
#include <hpx/modules/synchronization.hpp>
1112

12-
#include <cstddef>
1313
#include <cstdlib>
1414
#include <iostream>
1515
#include <mutex>
@@ -35,17 +35,19 @@ int main(int argc, char* argv[])
3535
<< hpx::get_num_worker_threads() << "\n";
3636
std::cout << delim;
3737

38+
// use parallel execution policy with num_threads concurrent threads to
39+
// execute the lambda
40+
auto policy = hpx::execution::experimental::with_processing_units_count(
41+
hpx::execution::par, num_threads);
42+
3843
hpx::mutex mtx;
39-
hpx::experimental::run_on_all(
40-
hpx::execution::par, // use parallel execution policy
41-
num_threads, // use num_threads concurrent threads to execute the lambda
42-
[&](std::size_t index, std::tuple<> const& reductions) {
43-
std::lock_guard l(mtx);
44-
std::cout << "Hello! I am thread " << index << " of "
45-
<< hpx::get_num_worker_threads() << "\n";
46-
std::cout << "My C++ std::thread id is "
47-
<< std::this_thread::get_id() << "\n";
48-
});
44+
hpx::experimental::run_on_all(policy, [&] {
45+
std::lock_guard l(mtx);
46+
std::cout << "Hello! I am thread " << hpx::get_worker_thread_num()
47+
<< " of " << hpx::get_num_worker_threads() << "\n";
48+
std::cout << "My C++ std::thread id is " << std::this_thread::get_id()
49+
<< "\n";
50+
});
4951

5052
std::cout << delim;
5153

Lines changed: 119 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -1,200 +1,152 @@
1+
// Copyright (c) 2025 Harith Reddy
12
// Copyright (c) 2025 Hartmut Kaiser
23
//
34
// SPDX-License-Identifier: BSL-1.0
45
// Distributed under the Boost Software License, Version 1.0. (See accompanying
56
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
67

8+
/// \file run_on_all.hpp
9+
/// \page hpx::experimental::run_on_all
10+
/// \headerfile hpx/task_block.hpp
11+
712
#pragma once
813

914
#include <hpx/config.hpp>
10-
#include <hpx/assert.hpp>
11-
#include <hpx/async_base/launch_policy.hpp>
1215
#include <hpx/concepts/concepts.hpp>
13-
#include <hpx/execution/algorithms/detail/predicates.hpp>
1416
#include <hpx/execution/executors/execution.hpp>
1517
#include <hpx/execution/executors/execution_parameters.hpp>
16-
#include <hpx/execution/executors/static_chunk_size.hpp>
1718
#include <hpx/execution_base/execution.hpp>
18-
#include <hpx/execution_base/traits/is_executor.hpp>
19-
#include <hpx/functional/detail/tag_fallback_invoke.hpp>
20-
#include <hpx/iterator_support/range.hpp>
21-
#include <hpx/iterator_support/traits/is_iterator.hpp>
22-
#include <hpx/parallel/algorithms/detail/advance_to_sentinel.hpp>
23-
#include <hpx/parallel/algorithms/detail/distance.hpp>
24-
#include <hpx/parallel/util/detail/algorithm_result.hpp>
25-
#include <hpx/parallel/util/detail/chunk_size.hpp>
26-
#include <hpx/parallel/util/detail/handle_local_exceptions.hpp>
27-
#include <hpx/parallel/util/detail/sender_util.hpp>
28-
#include <hpx/parallel/util/loop.hpp>
29-
#include <hpx/parallel/util/partitioner.hpp>
30-
#include <hpx/parallel/util/result_types.hpp>
31-
#include <hpx/parallel/util/scan_partitioner.hpp>
32-
#include <hpx/parallel/util/transfer.hpp>
33-
#include <hpx/type_support/empty_function.hpp>
34-
#include <hpx/type_support/unused.hpp>
35-
36-
#include <algorithm>
19+
#include <hpx/executors/execution_policy.hpp>
20+
#include <hpx/executors/parallel_executor.hpp>
21+
#include <hpx/parallel/algorithms/for_loop_reduction.hpp>
22+
#include <hpx/type_support/pack.hpp>
23+
3724
#include <cstddef>
38-
#include <functional>
39-
#include <iterator>
25+
#include <memory>
26+
#include <tuple>
4027
#include <type_traits>
4128
#include <utility>
42-
#include <vector>
4329

44-
namespace hpx::parallel {
30+
namespace hpx::experimental {
4531

46-
/// \brief Run a function on all available worker threads with reduction support
47-
/// \tparam ExPolicy The execution policy type
48-
/// \tparam Reductions The reduction types
49-
/// \tparam F The function type to execute
50-
/// \tparam Ts Additional argument types
51-
/// \param policy The execution policy to use
52-
/// \param reductions The reduction helpers
53-
/// \param f The function to execute
54-
/// \param ts Additional arguments to pass to the function
55-
template <typename ExPolicy, typename... Reductions, typename F,
56-
typename... Ts>
57-
decltype(auto) run_on_all(ExPolicy&& policy, Reductions&&... reductions,
58-
F&& f, [[maybe_unused]] Ts&&... ts)
59-
{
60-
static_assert(hpx::is_execution_policy_v<ExPolicy>,
61-
"hpx::is_execution_policy_v<ExPolicy>");
62-
static_assert(std::is_invocable_v<F&&, std::size_t,
63-
std::tuple<std::decay_t<Reductions>...>, Ts&&...>,
64-
"F must be callable with (std::size_t, std::tuple<Reductions...>, "
65-
"Ts...)");
66-
67-
[[maybe_unused]] std::size_t cores =
68-
hpx::parallel::execution::detail::get_os_thread_count();
69-
70-
// Create executor with proper configuration
71-
auto exec = hpx::execution::experimental::with_processing_units_count(
72-
hpx::execution::parallel_executor(
73-
hpx::threads::thread_priority::bound,
74-
hpx::threads::thread_stacksize::default_),
75-
cores);
76-
77-
// Initialize all reductions
78-
std::tuple<std::decay_t<Reductions>...> all_reductions(
79-
HPX_FORWARD(Reductions, reductions)...);
80-
81-
// Create a lambda that captures all reductions
82-
auto task = [all_reductions = HPX_MOVE(all_reductions), &f, &ts...](
83-
std::size_t index) {
84-
f(index, all_reductions, HPX_FORWARD(Ts, ts)...);
85-
};
86-
87-
// Execute based on policy type
88-
if constexpr (hpx::is_async_execution_policy_v<ExPolicy>)
89-
{
90-
auto fut = hpx::parallel::execution::bulk_async_execute(
91-
exec, task, cores, HPX_FORWARD(Ts, ts)...);
32+
/// \cond NOINTERNAL
33+
namespace detail {
9234

93-
// Create a cleanup function that will be called when all tasks complete
94-
auto cleanup = [all_reductions =
95-
HPX_MOVE(all_reductions)]() mutable {
35+
template <typename ExPolicy, typename F, typename... Reductions>
36+
decltype(auto) run_on_all(
37+
ExPolicy&& policy, F&& f, Reductions&&... reductions)
38+
{
39+
// Create executor with proper configuration
40+
auto exec =
41+
hpx::execution::experimental::with_processing_units_count(
42+
hpx::execution::parallel_executor(
43+
hpx::threads::thread_priority::bound,
44+
hpx::threads::thread_stacksize::default_),
45+
hpx::execution::experimental::processing_units_count(
46+
policy.executor()));
47+
48+
auto cores =
49+
hpx::execution::experimental::processing_units_count(exec);
50+
51+
// Execute based on policy type
52+
if constexpr (hpx::is_async_execution_policy_v<ExPolicy>)
53+
{
54+
// Initialize all reductions
55+
auto all_reductions =
56+
std::make_tuple(HPX_FORWARD(Reductions, reductions)...);
57+
auto sp = std::make_shared<decltype(all_reductions)>(
58+
HPX_MOVE(all_reductions));
59+
60+
// Create a lambda that captures all reductions
61+
auto task = [sp, f = HPX_FORWARD(F, f)](std::size_t) {
62+
std::apply(
63+
[&](auto&... r) { f(r.iteration_value(0)...); }, *sp);
64+
};
65+
66+
auto fut = hpx::parallel::execution::bulk_async_execute(
67+
HPX_MOVE(exec), HPX_MOVE(task), cores);
68+
69+
// Return a future that performs cleanup after all tasks
70+
// complete
71+
return fut.then([sp = HPX_MOVE(sp)](auto&& fut_inner) mutable {
72+
std::apply(
73+
[](auto&... r) { (r.exit_iteration(0), ...); }, *sp);
74+
return fut_inner.get();
75+
});
76+
}
77+
else
78+
{
79+
// Initialize all reductions
80+
auto&& all_reductions = std::forward_as_tuple(
81+
HPX_FORWARD(Reductions, reductions)...);
82+
83+
// Create a lambda that captures all reductions
84+
auto task = [&all_reductions, &f](std::size_t) {
85+
std::apply([&](auto&... r) { f(r.iteration_value(0)...); },
86+
all_reductions);
87+
};
88+
89+
hpx::parallel::execution::bulk_sync_execute(
90+
HPX_MOVE(exec), HPX_MOVE(task), cores);
91+
92+
// Clean up reductions
9693
std::apply([](auto&... r) { (r.exit_iteration(0), ...); },
9794
all_reductions);
98-
};
99-
100-
// Return a future that performs cleanup after all tasks complete
101-
return fut.then(
102-
[cleanup = HPX_MOVE(cleanup)](auto&& fut_inner) mutable {
103-
cleanup();
104-
return HPX_MOVE(fut_inner.get());
105-
});
95+
}
10696
}
107-
else
108-
{
109-
auto result =
110-
hpx::wait_all(hpx::parallel::execution::bulk_async_execute(
111-
exec, task, cores, HPX_FORWARD(Ts, ts)...));
112-
113-
// Clean up reductions
114-
std::apply(
115-
[](auto&... r) { (r.exit_iteration(0), ...); }, all_reductions);
116-
return result;
117-
}
118-
}
11997

120-
/// \brief Run a function on all available worker threads
121-
/// \tparam ExPolicy The execution policy type
122-
/// \tparam F The function type to execute
123-
/// \tparam Ts Additional argument types
124-
/// \param policy The execution policy to use
125-
/// \param num_tasks The number of tasks to create
126-
/// \param f The function to execute
127-
/// \param ts Additional arguments to pass to the function
128-
template <typename ExPolicy, typename F, typename... Ts>
129-
decltype(auto) run_on_all([[maybe_unused]] ExPolicy&& policy,
130-
std::size_t num_tasks, F&& f, [[maybe_unused]] Ts&&... ts)
131-
{
132-
static_assert(hpx::is_execution_policy_v<ExPolicy>,
133-
"hpx::is_execution_policy_v<ExPolicy>");
134-
static_assert(std::is_invocable_v<F&&, Ts&&...>,
135-
"F must be callable with (Ts...)");
136-
137-
// Configure executor with proper scheduling hints
138-
hpx::threads::thread_schedule_hint hint;
139-
hint.sharing_mode(
140-
hpx::threads::thread_sharing_hint::do_not_share_function);
141-
142-
auto exec = hpx::execution::experimental::with_processing_units_count(
143-
hpx::execution::parallel_executor(
144-
hpx::threads::thread_priority::bound,
145-
hpx::threads::thread_stacksize::default_, hint),
146-
num_tasks);
147-
exec.set_hierarchical_threshold(0);
148-
149-
// Execute based on policy type
150-
if constexpr (hpx::is_async_execution_policy_v<ExPolicy>)
151-
{
152-
return hpx::parallel::execution::bulk_async_execute(
153-
exec, [&](auto) { f(ts...); }, num_tasks,
154-
HPX_FORWARD(Ts, ts)...);
155-
}
156-
else
98+
template <typename ExPolicy, std::size_t... Is, typename... Ts>
99+
decltype(auto) run_on_all(
100+
ExPolicy&& policy, hpx::util::index_pack<Is...>, Ts&&... ts)
157101
{
158-
return hpx::wait_all(hpx::parallel::execution::bulk_async_execute(
159-
exec, [&](auto) { f(ts...); }, num_tasks,
160-
HPX_FORWARD(Ts, ts)...));
102+
auto&& t = std::forward_as_tuple(HPX_FORWARD(Ts, ts)...);
103+
auto f = std::get<sizeof...(Ts) - 1>(t);
104+
105+
return run_on_all(
106+
HPX_FORWARD(ExPolicy, policy), HPX_MOVE(f), std::get<Is>(t)...);
161107
}
162-
}
108+
} // namespace detail
109+
/// \endcond
163110

164-
/// \brief Run a function on all available worker threads
111+
/// Run a function on all available worker threads with reduction support
112+
/// using the given execution policy
113+
///
165114
/// \tparam ExPolicy The execution policy type
166-
/// \tparam F The function type to execute
167-
/// \tparam Ts Additional argument types
168-
/// \param policy The execution policy to use
169-
/// \param f The function to execute
170-
/// \param ts Additional arguments to pass to the function
171-
template <typename ExPolicy, typename F, typename... Ts,
172-
HPX_CONCEPT_REQUIRES_(std::is_invocable_v<F&&, Ts&&...>)>
173-
decltype(auto) run_on_all(
174-
ExPolicy&& policy, F&& f, [[maybe_unused]] Ts&&... ts)
115+
/// \tparam T The first type in a list of reduction types and the
116+
/// function type to invoke (last argument)
117+
/// \tparam Ts The list of reduction types and the function type to
118+
/// invoke (last argument)
119+
/// \param policy The execution policy to use
120+
/// \param t The first in a list of reductions and the function to
121+
/// invoke (last argument)
122+
/// \param ts The list of reductions and the function to invoke (last
123+
/// argument)
124+
template <typename ExPolicy, typename T, typename... Ts,
125+
HPX_CONCEPT_REQUIRES_(hpx::is_execution_policy_v<ExPolicy>)>
126+
decltype(auto) run_on_all(ExPolicy&& policy, T&& t, Ts&&... ts)
175127
{
176-
static_assert(hpx::is_execution_policy_v<ExPolicy>,
177-
"hpx::is_execution_policy_v<ExPolicy>");
178-
179-
std::size_t cores =
180-
hpx::parallel::execution::detail::get_os_thread_count();
181-
return run_on_all(HPX_FORWARD(ExPolicy, policy), cores,
182-
HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...);
183-
}
184-
185-
// Overloads without execution policy (default to sequential execution)
186-
template <typename F, typename... Ts>
187-
decltype(auto) run_on_all(std::size_t num_tasks, F&& f, Ts&&... ts)
188-
{
189-
return run_on_all(hpx::execution::seq, num_tasks, HPX_FORWARD(F, f),
128+
return detail::run_on_all(HPX_FORWARD(ExPolicy, policy),
129+
hpx::util::make_index_pack_t<sizeof...(Ts)>(), HPX_FORWARD(T, t),
190130
HPX_FORWARD(Ts, ts)...);
191131
}
192132

193-
template <typename F, typename... Ts,
194-
HPX_CONCEPT_REQUIRES_(std::is_invocable_v<F&&, Ts&&...>)>
195-
decltype(auto) run_on_all(F&& f, Ts&&... ts)
133+
/// Run a function on all available worker threads with reduction support
134+
/// using the \a hpx::execution::par execution policy
135+
///
136+
/// \tparam T The first type in a list of reduction types and the
137+
/// function type to invoke (last argument)
138+
/// \tparam Ts The list of reduction types and the function type to
139+
/// invoke (last argument)
140+
/// \param t The first in a list of reductions and the function to
141+
/// invoke (last argument)
142+
/// \param ts The list of reductions and the function to invoke (last
143+
/// argument)
144+
template <typename T, typename... Ts,
145+
HPX_CONCEPT_REQUIRES_(!hpx::is_execution_policy_v<T>)>
146+
decltype(auto) run_on_all(T&& t, Ts&&... ts)
196147
{
197-
return run_on_all(
198-
hpx::execution::seq, HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...);
148+
return detail::run_on_all(hpx::execution::par,
149+
hpx::util::make_index_pack_t<sizeof...(Ts)>(), HPX_FORWARD(T, t),
150+
HPX_FORWARD(Ts, ts)...);
199151
}
200-
} // namespace hpx::parallel
152+
} // namespace hpx::experimental

0 commit comments

Comments
 (0)