Skip to content

Commit 7e5d28f

Browse files
committed
common/async: add parallel_for_each() algorithm
Signed-off-by: Casey Bodley <[email protected]>
1 parent 3c62daf commit 7e5d28f

File tree

3 files changed

+348
-0
lines changed

3 files changed

+348
-0
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* This is free software; you can redistribute it and/or
7+
* modify it under the terms of the GNU Lesser General Public
8+
* License version 2.1, as published by the Free Software
9+
* Foundation. See file COPYING.
10+
*
11+
*/
12+
13+
#pragma once
14+
15+
#include <concepts>
16+
#include <iterator>
17+
#include <ranges>
18+
#include <type_traits>
19+
#include <boost/asio/awaitable.hpp>
20+
#include <boost/asio/co_spawn.hpp>
21+
#include <boost/asio/execution/executor.hpp>
22+
#include <boost/asio/this_coro.hpp>
23+
#include "co_spawn_group.h"
24+
25+
namespace ceph::async {
26+
27+
/// Call a coroutine with each element in the given range then wait for all
28+
/// of them to complete. The first exception is rethrown to the caller. The
29+
/// cancel_on_error option controls whether these exceptions trigger the
30+
/// cancellation of other children.
31+
///
32+
/// Example:
33+
/// \code
34+
/// awaitable<void> child(task& t);
35+
///
36+
/// awaitable<void> parent(std::span<task> tasks)
37+
/// {
38+
/// co_await parallel_for_each(tasks.begin(), tasks.end(), child);
39+
/// }
40+
/// \endcode
41+
template <typename Iterator, typename Sentinel, typename VoidAwaitableFactory,
42+
typename Value = std::iter_reference_t<Iterator>,
43+
typename VoidAwaitable = std::invoke_result_t<
44+
VoidAwaitableFactory, Value>,
45+
typename AwaitableT = typename VoidAwaitable::value_type,
46+
typename AwaitableExecutor = typename VoidAwaitable::executor_type>
47+
requires (std::input_iterator<Iterator> &&
48+
std::sentinel_for<Sentinel, Iterator> &&
49+
std::same_as<AwaitableT, void> &&
50+
boost::asio::execution::executor<AwaitableExecutor>)
51+
auto parallel_for_each(Iterator begin, Sentinel end,
52+
VoidAwaitableFactory&& factory,
53+
cancel_on_error on_error = cancel_on_error::none)
54+
-> boost::asio::awaitable<void, AwaitableExecutor>
55+
{
56+
const size_t count = std::ranges::distance(begin, end);
57+
if (!count) {
58+
co_return;
59+
}
60+
auto ex = co_await boost::asio::this_coro::executor;
61+
auto group = co_spawn_group{ex, count, on_error};
62+
for (Iterator i = begin; i != end; ++i) {
63+
group.spawn(factory(*i));
64+
}
65+
co_await group.wait();
66+
}
67+
68+
/// \overload
69+
template <typename Range, typename VoidAwaitableFactory,
70+
typename Value = std::ranges::range_reference_t<Range>,
71+
typename VoidAwaitable = std::invoke_result_t<
72+
VoidAwaitableFactory, Value>,
73+
typename AwaitableT = typename VoidAwaitable::value_type,
74+
typename AwaitableExecutor = typename VoidAwaitable::executor_type>
75+
requires (std::ranges::range<Range> &&
76+
std::same_as<AwaitableT, void> &&
77+
boost::asio::execution::executor<AwaitableExecutor>)
78+
auto parallel_for_each(Range&& range, VoidAwaitableFactory&& factory,
79+
cancel_on_error on_error = cancel_on_error::none)
80+
-> boost::asio::awaitable<void, AwaitableExecutor>
81+
{
82+
return parallel_for_each(std::begin(range), std::end(range),
83+
std::move(factory), on_error);
84+
}
85+
86+
} // namespace ceph::async

src/test/common/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,10 @@ add_executable(unittest_async_yield_waiter test_async_yield_waiter.cc)
388388
add_ceph_unittest(unittest_async_yield_waiter)
389389
target_link_libraries(unittest_async_yield_waiter ceph-common Boost::system Boost::context)
390390

391+
add_executable(unittest_async_parallel_for_each test_async_parallel_for_each.cc)
392+
add_ceph_unittest(unittest_async_parallel_for_each)
393+
target_link_libraries(unittest_async_parallel_for_each ceph-common Boost::system)
394+
391395
add_executable(unittest_cdc test_cdc.cc
392396
$<TARGET_OBJECTS:unit-main>)
393397
target_link_libraries(unittest_cdc global ceph-common)
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* This is free software; you can redistribute it and/or
7+
* modify it under the terms of the GNU Lesser General Public
8+
* License version 2.1, as published by the Free Software
9+
* Foundation. See file COPYING.
10+
*
11+
*/
12+
13+
#include "common/async/parallel_for_each.h"
14+
15+
#include <optional>
16+
#include <boost/asio/bind_cancellation_slot.hpp>
17+
#include <boost/asio/co_spawn.hpp>
18+
#include <boost/asio/io_context.hpp>
19+
#include <gtest/gtest.h>
20+
#include "common/async/co_waiter.h"
21+
22+
namespace ceph::async {
23+
24+
namespace asio = boost::asio;
25+
namespace errc = boost::system::errc;
26+
using boost::system::error_code;
27+
28+
using executor_type = asio::io_context::executor_type;
29+
30+
template <typename T>
31+
using awaitable = asio::awaitable<T, executor_type>;
32+
33+
using void_waiter = co_waiter<void, executor_type>;
34+
35+
template <typename T>
36+
auto capture(std::optional<T>& opt)
37+
{
38+
return [&opt] (T value) { opt = std::move(value); };
39+
}
40+
41+
template <typename T>
42+
auto capture(asio::cancellation_signal& signal, std::optional<T>& opt)
43+
{
44+
return asio::bind_cancellation_slot(signal.slot(), capture(opt));
45+
}
46+
47+
TEST(parallel_for_each, empty)
48+
{
49+
asio::io_context ctx;
50+
51+
int* end = nullptr;
52+
auto cr = [] (int i) -> awaitable<void> { co_return; };
53+
54+
std::optional<std::exception_ptr> result;
55+
asio::co_spawn(ctx, parallel_for_each(end, end, cr), capture(result));
56+
57+
ctx.poll();
58+
ASSERT_TRUE(ctx.stopped());
59+
ASSERT_TRUE(result);
60+
EXPECT_FALSE(*result);
61+
}
62+
63+
TEST(parallel_for_each, shutdown)
64+
{
65+
asio::io_context ctx;
66+
67+
void_waiter waiters[2];
68+
auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
69+
70+
asio::cancellation_signal signal;
71+
std::optional<std::exception_ptr> result;
72+
asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
73+
74+
ctx.poll();
75+
ASSERT_FALSE(ctx.stopped());
76+
EXPECT_FALSE(result);
77+
// shut down before any waiters complete
78+
}
79+
80+
TEST(parallel_for_each, cancel)
81+
{
82+
asio::io_context ctx;
83+
84+
void_waiter waiters[2];
85+
auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
86+
87+
asio::cancellation_signal signal;
88+
std::optional<std::exception_ptr> result;
89+
asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
90+
91+
ctx.poll();
92+
ASSERT_FALSE(ctx.stopped());
93+
EXPECT_FALSE(result);
94+
95+
// cancel before any waiters complete
96+
signal.emit(asio::cancellation_type::terminal);
97+
98+
ctx.poll();
99+
ASSERT_TRUE(ctx.stopped());
100+
ASSERT_TRUE(result);
101+
ASSERT_TRUE(*result);
102+
try {
103+
std::rethrow_exception(*result);
104+
} catch (const boost::system::system_error& e) {
105+
EXPECT_EQ(e.code(), asio::error::operation_aborted);
106+
} catch (const std::exception&) {
107+
EXPECT_THROW(throw, boost::system::system_error);
108+
}
109+
}
110+
111+
TEST(parallel_for_each, complete_shutdown)
112+
{
113+
asio::io_context ctx;
114+
115+
void_waiter waiters[2];
116+
auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
117+
118+
asio::cancellation_signal signal;
119+
std::optional<std::exception_ptr> result;
120+
asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
121+
122+
ctx.poll();
123+
ASSERT_FALSE(ctx.stopped());
124+
EXPECT_FALSE(result);
125+
126+
waiters[0].complete(nullptr);
127+
128+
ctx.poll();
129+
ASSERT_FALSE(ctx.stopped());
130+
EXPECT_FALSE(result);
131+
// shut down before final waiter completes
132+
}
133+
134+
TEST(parallel_for_each, complete_cancel)
135+
{
136+
asio::io_context ctx;
137+
138+
void_waiter waiters[2];
139+
auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
140+
141+
asio::cancellation_signal signal;
142+
std::optional<std::exception_ptr> result;
143+
asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
144+
145+
ctx.poll();
146+
ASSERT_FALSE(ctx.stopped());
147+
EXPECT_FALSE(result);
148+
149+
waiters[0].complete(nullptr);
150+
151+
ctx.poll();
152+
ASSERT_FALSE(ctx.stopped());
153+
EXPECT_FALSE(result);
154+
155+
// cancel before final waiter completes
156+
signal.emit(asio::cancellation_type::terminal);
157+
158+
ctx.poll();
159+
ASSERT_TRUE(ctx.stopped());
160+
ASSERT_TRUE(result);
161+
ASSERT_TRUE(*result);
162+
try {
163+
std::rethrow_exception(*result);
164+
} catch (const boost::system::system_error& e) {
165+
EXPECT_EQ(e.code(), asio::error::operation_aborted);
166+
} catch (const std::exception&) {
167+
EXPECT_THROW(throw, boost::system::system_error);
168+
}
169+
}
170+
171+
TEST(parallel_for_each, complete_complete)
172+
{
173+
asio::io_context ctx;
174+
175+
void_waiter waiters[2];
176+
auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
177+
178+
std::optional<std::exception_ptr> result;
179+
asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(result));
180+
181+
ctx.poll();
182+
ASSERT_FALSE(ctx.stopped());
183+
EXPECT_FALSE(result);
184+
185+
waiters[0].complete(nullptr);
186+
187+
ctx.poll();
188+
ASSERT_FALSE(ctx.stopped());
189+
EXPECT_FALSE(result);
190+
191+
waiters[1].complete(nullptr);
192+
193+
ctx.poll();
194+
ASSERT_TRUE(ctx.stopped());
195+
ASSERT_TRUE(result);
196+
EXPECT_FALSE(*result);
197+
}
198+
199+
struct null_sentinel {};
200+
bool operator==(const char* c, null_sentinel) { return !*c; }
201+
static_assert(std::sentinel_for<null_sentinel, const char*>);
202+
203+
TEST(parallel_for_each, sentinel)
204+
{
205+
asio::io_context ctx;
206+
207+
const char* begin = "hello";
208+
null_sentinel end;
209+
210+
size_t count = 0;
211+
auto cr = [&count] (char c) -> awaitable<void> {
212+
++count;
213+
co_return;
214+
};
215+
216+
std::optional<std::exception_ptr> result;
217+
asio::co_spawn(ctx, parallel_for_each(begin, end, cr), capture(result));
218+
219+
ctx.poll();
220+
ASSERT_TRUE(ctx.stopped());
221+
ASSERT_TRUE(result);
222+
EXPECT_FALSE(*result);
223+
EXPECT_EQ(count, 5);
224+
}
225+
226+
TEST(parallel_for_each, move_iterator)
227+
{
228+
asio::io_context ctx;
229+
230+
using value_type = std::unique_ptr<int>;
231+
value_type values[] = {
232+
std::make_unique<int>(42),
233+
std::make_unique<int>(43),
234+
};
235+
236+
auto begin = std::make_move_iterator(std::begin(values));
237+
auto end = std::make_move_iterator(std::end(values));
238+
239+
auto cr = [] (value_type v) -> awaitable<void> {
240+
if (!v) {
241+
throw std::invalid_argument("empty");
242+
}
243+
co_return;
244+
};
245+
246+
std::optional<std::exception_ptr> result;
247+
asio::co_spawn(ctx, parallel_for_each(begin, end, cr), capture(result));
248+
249+
ctx.poll();
250+
ASSERT_TRUE(ctx.stopped());
251+
ASSERT_TRUE(result);
252+
EXPECT_FALSE(*result);
253+
254+
EXPECT_FALSE(values[0]);
255+
EXPECT_FALSE(values[1]);
256+
}
257+
258+
} // namespace ceph::async

0 commit comments

Comments
 (0)