Skip to content

Commit be0f6e0

Browse files
committed
Merge remote-tracking branch 'origin/main' into release-0.31.X
2 parents db16b16 + c855c01 commit be0f6e0

File tree

6 files changed

+242
-7
lines changed

6 files changed

+242
-7
lines changed

.gitlab/scripts/run_performance_benchmarks.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pika_targets=(
3737
"task_yield_test"
3838
"task_yield_test"
3939
"condition_variable_overhead_test"
40+
"async_rw_mutex_scheduling_test"
4041
)
4142
pika_test_options=(
4243
"--pika:ini=pika.thread_queue.init_threads_count=100 \
@@ -107,6 +108,12 @@ pika_test_options=(
107108
--pika:threads=2
108109
--perftest-json"
109110

111+
"--num-iterations=1000
112+
--num-rw-accesses=5
113+
--num-ro-accesses=5
114+
--repetitions=100
115+
--pika:threads=4
116+
--perftest-json"
110117
)
111118

112119
index=0

libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <pika/functional/unique_function.hpp>
1818

1919
#include <atomic>
20+
#include <cstddef>
2021
#include <exception>
2122
#include <memory>
2223
#include <mutex>
@@ -72,7 +73,15 @@ namespace pika::execution::experimental {
7273
// NOLINTNEXTLINE(bugprone-unchecked-optional-access)
7374
next_state->set_value(std::move(*value));
7475

75-
for (auto& continuation : continuations) { continuation(next_state); }
76+
if (!continuations.empty())
77+
{
78+
auto const size = continuations.size();
79+
for (std::size_t i = 0; i < size - 1; ++i) { continuations[i](next_state); }
80+
81+
// Move shared state into the last continuation to ensure that the
82+
// continuations release the last reference and not this destructor.
83+
continuations[size - 1](std::move(next_state));
84+
}
7685
}
7786
}
7887

@@ -131,7 +140,15 @@ namespace pika::execution::experimental {
131140
// If there is no next state the continuations must be empty.
132141
PIKA_ASSERT(next_state || continuations.empty());
133142

134-
for (auto& continuation : continuations) { continuation(next_state); }
143+
if (!continuations.empty())
144+
{
145+
auto const size = continuations.size();
146+
for (std::size_t i = 0; i < size - 1; ++i) { continuations[i](next_state); }
147+
148+
// Move shared state into the last continuation to ensure that the continuations
149+
// release the last reference and not this destructor.
150+
continuations[size - 1](std::move(next_state));
151+
}
135152
}
136153

137154
void set_next_state(std::shared_ptr<async_rw_mutex_shared_state> state)

libs/pika/synchronization/tests/performance/CMakeLists.txt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,19 @@
44
# Distributed under the Boost Software License, Version 1.0. (See accompanying
55
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
66

7-
set(benchmarks)
7+
set(benchmarks async_rw_mutex_scheduling)
88

99
foreach(benchmark ${benchmarks})
10-
1110
set(sources ${benchmark}.cpp)
1211

1312
source_group("Source Files" FILES ${sources})
1413

15-
# add benchmark executable
1614
pika_add_executable(
1715
${benchmark}_test INTERNAL_FLAGS
1816
SOURCES ${sources}
1917
EXCLUDE_FROM_ALL ${${benchmark}_FLAGS}
2018
FOLDER "Benchmarks/Modules/Synchronization"
2119
)
2220

23-
# add a custom target for this benchmark
2421
pika_add_performance_test("modules.synchronization" ${benchmark} ${${benchmark}_PARAMETERS})
25-
2622
endforeach()
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright (c) 2024 ETH Zurich
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
// This test measures the performance of accessing values through async_rw_mutex. Accesses are
8+
// scheduled on new tasks to test the performance with concurrency. This means that the benchmark
9+
// includes the overhead of creating new tasks, but it represents a more realistic scenario.
10+
11+
#include <pika/config.hpp>
12+
#include <pika/async_rw_mutex.hpp>
13+
#include <pika/execution.hpp>
14+
#include <pika/init.hpp>
15+
#include <pika/runtime.hpp>
16+
#include <pika/testing/performance.hpp>
17+
18+
#include <fmt/format.h>
19+
#include <fmt/ostream.h>
20+
#include <fmt/printf.h>
21+
22+
#include <cstddef>
23+
#include <cstdint>
24+
#include <cstdlib>
25+
#include <iostream>
26+
#include <utility>
27+
28+
using pika::program_options::bool_switch;
29+
using pika::program_options::options_description;
30+
using pika::program_options::value;
31+
using pika::program_options::variables_map;
32+
33+
using pika::chrono::detail::high_resolution_timer;
34+
35+
namespace ex = pika::execution::experimental;
36+
namespace tt = pika::this_thread::experimental;
37+
38+
template <typename T>
39+
double test_async_rw_mutex(
40+
std::uint64_t num_iterations, std::uint64_t num_rw_accesses, std::uint64_t num_ro_accesses)
41+
{
42+
pika::chrono::detail::high_resolution_timer timer;
43+
44+
{
45+
ex::async_rw_mutex<T> m;
46+
ex::thread_pool_scheduler sched;
47+
48+
for (std::uint64_t i = 0; i < num_iterations; ++i)
49+
{
50+
for (std::uint64_t j = 0; j < num_rw_accesses; ++j)
51+
{
52+
ex::start_detached(m.readwrite() | ex::continues_on(sched));
53+
}
54+
55+
for (std::uint64_t j = 0; j < num_ro_accesses; ++j)
56+
{
57+
ex::start_detached(m.read() | ex::continues_on(sched));
58+
}
59+
}
60+
61+
tt::sync_wait(m.readwrite());
62+
}
63+
64+
return timer.elapsed();
65+
}
66+
67+
int pika_main(variables_map& vm)
68+
{
69+
auto const num_iterations = vm["num-iterations"].as<std::uint64_t>();
70+
auto const num_rw_accesses = vm["num-rw-accesses"].as<std::uint64_t>();
71+
auto const num_ro_accesses = vm["num-ro-accesses"].as<std::uint64_t>();
72+
auto const repetitions = vm["repetitions"].as<std::uint64_t>();
73+
auto const perftest_json = vm["perftest-json"].as<bool>();
74+
75+
double time_avg_s = 0.0;
76+
double time_min_s = std::numeric_limits<double>::max();
77+
double time_max_s = std::numeric_limits<double>::min();
78+
79+
for (std::uint64_t i = 0; i < repetitions; ++i)
80+
{
81+
double time_s = test_async_rw_mutex<void>(num_iterations, num_rw_accesses, num_ro_accesses);
82+
83+
time_avg_s += time_s;
84+
time_max_s = (std::max)(time_max_s, time_s);
85+
time_min_s = (std::min)(time_min_s, time_s);
86+
}
87+
88+
time_avg_s /= repetitions;
89+
90+
double const time_avg_us = time_avg_s * 1e6 / num_iterations;
91+
double const time_min_us = time_min_s * 1e6 / num_iterations;
92+
double const time_max_us = time_max_s * 1e6 / num_iterations;
93+
94+
if (perftest_json)
95+
{
96+
pika::util::detail::json_perf_times t;
97+
t.add(fmt::format("async_rw_mutex - {} threads - {}:{}", pika::get_num_worker_threads(),
98+
num_rw_accesses, num_ro_accesses),
99+
time_avg_us);
100+
std::cout << t;
101+
}
102+
else
103+
{
104+
fmt::print(
105+
"repetitions,iterations,rw_accesses,ro_accesses,time_avg_us,time_min_us,time_max_us\n");
106+
fmt::print("{},{},{},{},{},{},{}\n", repetitions, num_iterations, num_rw_accesses,
107+
num_ro_accesses, time_avg_us, time_min_us, time_max_us);
108+
}
109+
110+
pika::finalize();
111+
return EXIT_SUCCESS;
112+
}
113+
114+
///////////////////////////////////////////////////////////////////////////////
115+
int main(int argc, char* argv[])
116+
{
117+
options_description cmdline("usage: " PIKA_APPLICATION_STRING " [options]");
118+
// clang-format off
119+
cmdline.add_options()
120+
("num-iterations", value<std::uint64_t>()->default_value(100), "number of times to cycle through read-write and read-only accesses in one test")
121+
("num-rw-accesses", value<std::uint64_t>()->default_value(5), "number of consecutive read-write accesses")
122+
("num-ro-accesses", value<std::uint64_t>()->default_value(5), "number of consecutive read-only accesses")
123+
("repetitions", value<std::uint64_t>()->default_value(1), "number of repetitions of the full benchmark")
124+
("perftest-json", bool_switch(), "print final task size in json format for use with performance CI.")
125+
// clang-format on
126+
;
127+
128+
pika::init_params init_args;
129+
init_args.desc_cmdline = cmdline;
130+
return pika::init(pika_main, argc, argv, init_args);
131+
}

libs/pika/synchronization/tests/unit/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
set(tests
88
async_rw_mutex
9+
async_rw_mutex_yielding
910
barrier
1011
binary_semaphore
1112
condition_variable
@@ -19,6 +20,7 @@ set(tests
1920
)
2021

2122
set(async_rw_mutex_PARAMETERS THREADS 4)
23+
set(async_rw_mutex_yielding_PARAMETERS THREADS 4)
2224
set(barrier_cpp20_PARAMETERS THREADS 4)
2325
set(binary_semaphore_cpp20_PARAMETERS THREADS 4)
2426

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright (c) 2024 ETH Zurich
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
// This test checks for a desirable property in async_rw_mutex: if a previous access is guaranteed
8+
// to have completed, e.g. via sync_wait, the next access is guaranteed to start inline. This makes
9+
// it slightly easier to reason about whether waiting for a sender from async_rw_mutex may yield or
10+
// not.
11+
//
12+
// Note that while we test the property here, we don't guarantee that it won't change in. We simply
13+
// want to preserve the property as long as it's reasonable with the current implementation.
14+
15+
#include <pika/async_rw_mutex.hpp>
16+
#include <pika/execution.hpp>
17+
#include <pika/init.hpp>
18+
#include <pika/modules/threading_base.hpp>
19+
#include <pika/testing.hpp>
20+
21+
#include <cstddef>
22+
#include <cstdlib>
23+
24+
namespace ex = pika::execution::experimental;
25+
namespace tt = pika::this_thread::experimental;
26+
27+
template <typename M>
28+
void test(M&& m)
29+
{
30+
ex::thread_pool_scheduler sched{};
31+
32+
// We first access the mutex in a way such that the wrapper will be released in another task.
33+
ex::start_detached(m.readwrite() | ex::continues_on(sched) | ex::then([](auto&&) {}));
34+
35+
// Then we access the mutex again, but block to wait for the result. We discard the result so
36+
// the wrapper is released immediately.
37+
{
38+
[[maybe_unused]] auto wrapper = tt::sync_wait(m.readwrite());
39+
}
40+
41+
// Finally, since we blockingly waited for the result above, we expect the below sync_wait to
42+
// never cause the task yield, or change worker thread. To achieve this, the async_rw_mutex
43+
// implementation must guarantee that in a situation like this, the wrapper returned by
44+
// sync_wait holds the last reference to the shared state of that particular access. This would
45+
// not happen if e.g. the async_rw_mutex_shared_state destructor release the next shared state
46+
// only once all the continuations have been triggered.
47+
//
48+
// We check that neither the thread phase (how many invocations of the tasks, or in other words:
49+
// did the task yield?) nor worker thread change across the sync_wait. The thread phase is a
50+
// more reliable check, but is not always available. The worker thread can change if the task
51+
// yields whenever work stealing is enabled, but is much lower probability.
52+
auto phase_before = pika::threads::detail::get_self_id_data()->get_thread_phase();
53+
auto thread_before = pika::get_worker_thread_num();
54+
55+
{
56+
[[maybe_unused]] auto wrapper = tt::sync_wait(m.read());
57+
}
58+
59+
auto phase_after = pika::threads::detail::get_self_id_data()->get_thread_phase();
60+
auto thread_after = pika::get_worker_thread_num();
61+
62+
PIKA_TEST_EQ(phase_before, phase_after);
63+
PIKA_TEST_EQ(thread_before, thread_after);
64+
}
65+
66+
int pika_main()
67+
{
68+
pika::scoped_finalize sf{};
69+
70+
// This whole test fails only with low probability, so repeat it some reasonable number of
71+
// times. 100 does not guarantee failure in a single run, but hopefully across multiple CI
72+
// configurations at least one run will fail.
73+
for (std::size_t iteration = 0; iteration < 100; ++iteration)
74+
{
75+
test(ex::async_rw_mutex<int>{42});
76+
test(ex::async_rw_mutex<void>{});
77+
}
78+
79+
return EXIT_SUCCESS;
80+
}
81+
82+
int main(int argc, char* argv[]) { return pika::init(pika_main, argc, argv); }

0 commit comments

Comments
 (0)