Skip to content

Commit 3c62daf

Browse files
committed
common/async: add co_spawn_group template for fork-join parallelism
Signed-off-by: Casey Bodley <[email protected]>
1 parent d192ca7 commit 3c62daf

File tree

4 files changed

+798
-0
lines changed

4 files changed

+798
-0
lines changed

src/common/async/co_spawn_group.h

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* This is free software; you can redistribute it and/or
7+
* modify it under the terms of the GNU Lesser General Public
8+
* License version 2.1, as published by the Free Software
9+
* Foundation. See file COPYING.
10+
*
11+
*/
12+
13+
#pragma once
14+
15+
#include <boost/asio/awaitable.hpp>
16+
#include <boost/asio/execution/executor.hpp>
17+
#include "cancel_on_error.h"
18+
#include "detail/co_spawn_group.h"
19+
20+
namespace ceph::async {
21+
22+
/// \brief Tracks a group of coroutines to await all of their completions.
23+
///
24+
/// The wait() function can be used to await the completion of all children.
25+
/// If any child coroutines exit with an exception, the first such exception
26+
/// is rethrown by wait(). The cancel_on_error option controls whether these
27+
/// exceptions trigger the cancellation of other children.
28+
///
29+
/// All child coroutines are canceled by cancel() or co_spawn_group destruction.
30+
/// This allows the parent coroutine to share memory with its child coroutines
31+
/// without fear of dangling references.
32+
///
33+
/// This class is not thread-safe, so a strand executor should be used in
34+
/// multi-threaded contexts.
35+
///
36+
/// Example:
37+
/// \code
38+
/// awaitable<void> child(task& t);
39+
///
40+
/// awaitable<void> parent(std::span<task> tasks)
41+
/// {
42+
/// // process all tasks in parallel
43+
/// auto ex = co_await boost::asio::this_coro::executor;
44+
/// auto group = co_spawn_group{ex, tasks.size()};
45+
///
46+
/// for (auto& t : tasks) {
47+
/// group.spawn(child(t));
48+
/// }
49+
/// co_await group.wait();
50+
/// }
51+
/// \endcode
52+
template <boost::asio::execution::executor Executor>
53+
class co_spawn_group {
54+
using impl_type = detail::co_spawn_group_impl<Executor>;
55+
boost::intrusive_ptr<impl_type> impl;
56+
57+
public:
58+
co_spawn_group(Executor ex, size_t limit,
59+
cancel_on_error on_error = cancel_on_error::none)
60+
: impl(new impl_type(ex, limit, on_error))
61+
{
62+
}
63+
64+
~co_spawn_group()
65+
{
66+
impl->cancel();
67+
}
68+
69+
using executor_type = Executor;
70+
executor_type get_executor() const
71+
{
72+
return impl->get_executor();
73+
}
74+
75+
/// Spawn the given coroutine \ref cr on the group's executor. Throws a
76+
/// std::length_error exception if the number of outstanding coroutines
77+
/// would exceed the group's limit.
78+
void spawn(boost::asio::awaitable<void, executor_type> cr)
79+
{
80+
impl->spawn(std::move(cr));
81+
}
82+
83+
/// Wait for all outstanding coroutines before returning. If any of the
84+
/// spawned coroutines exit with an exception, the first exception is
85+
/// rethrown.
86+
///
87+
/// After wait() completes, whether by exception or co_return, the spawn
88+
/// group can be reused to spawn and await additional coroutines.
89+
boost::asio::awaitable<void, executor_type> wait()
90+
{
91+
return impl->wait();
92+
}
93+
94+
/// Cancel all outstanding coroutines.
95+
void cancel()
96+
{
97+
impl->cancel();
98+
}
99+
};
100+
101+
} // namespace ceph::async
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* This is free software; you can redistribute it and/or
7+
* modify it under the terms of the GNU Lesser General Public
8+
* License version 2.1, as published by the Free Software
9+
* Foundation. See file COPYING.
10+
*
11+
*/
12+
13+
#pragma once
14+
15+
#include <exception>
16+
#include <boost/asio/awaitable.hpp>
17+
#include <boost/asio/bind_cancellation_slot.hpp>
18+
#include <boost/asio/cancellation_signal.hpp>
19+
#include <boost/asio/co_spawn.hpp>
20+
#include <boost/asio/execution/executor.hpp>
21+
#include <boost/intrusive_ptr.hpp>
22+
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
23+
#include "common/async/cancel_on_error.h"
24+
#include "common/async/co_waiter.h"
25+
#include "common/async/service.h"
26+
#include "include/scope_guard.h"
27+
28+
namespace ceph::async::detail {
29+
30+
template <boost::asio::execution::executor Executor>
31+
class co_spawn_group_impl;
32+
33+
// A cancellable co_spawn() completion handler that notifies the co_spawn_group
34+
// upon completion. This holds a reference to the implementation in order to
35+
// extend its lifetime. This is required for per-op cancellation because the
36+
// cancellation_signals must outlive these coroutine frames.
37+
template <typename Executor>
38+
class co_spawn_group_handler {
39+
using impl_type = co_spawn_group_impl<Executor>;
40+
using size_type = typename impl_type::size_type;
41+
boost::intrusive_ptr<impl_type> impl;
42+
boost::asio::cancellation_slot slot;
43+
size_type index;
44+
public:
45+
co_spawn_group_handler(boost::intrusive_ptr<impl_type> impl,
46+
boost::asio::cancellation_slot slot, size_type index)
47+
: impl(std::move(impl)), slot(std::move(slot)), index(index)
48+
{}
49+
50+
using executor_type = typename impl_type::executor_type;
51+
executor_type get_executor() const noexcept
52+
{
53+
return impl->get_executor();
54+
}
55+
56+
using cancellation_slot_type = boost::asio::cancellation_slot;
57+
cancellation_slot_type get_cancellation_slot() const noexcept
58+
{
59+
return slot;
60+
}
61+
62+
void operator()(std::exception_ptr eptr)
63+
{
64+
impl->child_complete(index, eptr);
65+
}
66+
};
67+
68+
// Reference-counted spawn group implementation.
69+
template <boost::asio::execution::executor Executor>
70+
class co_spawn_group_impl :
71+
public boost::intrusive_ref_counter<co_spawn_group_impl<Executor>,
72+
boost::thread_unsafe_counter>,
73+
public service_list_base_hook
74+
{
75+
public:
76+
using size_type = uint16_t;
77+
78+
co_spawn_group_impl(Executor ex, size_type limit,
79+
cancel_on_error on_error)
80+
: svc(boost::asio::use_service<service<co_spawn_group_impl>>(
81+
boost::asio::query(ex, boost::asio::execution::context))),
82+
ex(ex),
83+
signals(std::make_unique<boost::asio::cancellation_signal[]>(limit)),
84+
limit(limit), on_error(on_error)
85+
{
86+
// register for service_shutdown() notifications
87+
svc.add(*this);
88+
}
89+
~co_spawn_group_impl()
90+
{
91+
svc.remove(*this);
92+
}
93+
94+
using executor_type = Executor;
95+
executor_type get_executor() const noexcept
96+
{
97+
return ex;
98+
}
99+
100+
void child_complete(size_type index, std::exception_ptr e)
101+
{
102+
if (e) {
103+
if (!eptr) {
104+
eptr = e;
105+
}
106+
if (on_error == cancel_on_error::all) {
107+
cancel_from(0);
108+
} else if (on_error == cancel_on_error::after) {
109+
cancel_from(index + 1);
110+
}
111+
}
112+
if (++completed == spawned) {
113+
complete();
114+
}
115+
}
116+
117+
void spawn(boost::asio::awaitable<void, executor_type> cr)
118+
{
119+
boost::asio::co_spawn(get_executor(), std::move(cr), completion());
120+
}
121+
122+
boost::asio::awaitable<void, executor_type> wait()
123+
{
124+
if (completed < spawned) {
125+
co_await waiter.get();
126+
}
127+
128+
// clear for reuse
129+
completed = 0;
130+
spawned = 0;
131+
132+
if (eptr) {
133+
std::rethrow_exception(std::exchange(eptr, nullptr));
134+
}
135+
}
136+
137+
void cancel()
138+
{
139+
cancel_from(0);
140+
}
141+
142+
void service_shutdown()
143+
{
144+
waiter.shutdown();
145+
}
146+
147+
private:
148+
service<co_spawn_group_impl>& svc;
149+
co_waiter<void, executor_type> waiter;
150+
executor_type ex;
151+
std::unique_ptr<boost::asio::cancellation_signal[]> signals;
152+
std::exception_ptr eptr;
153+
const size_type limit;
154+
size_type spawned = 0;
155+
size_type completed = 0;
156+
const cancel_on_error on_error;
157+
158+
void cancel_from(size_type begin)
159+
{
160+
for (size_type i = begin; i < spawned; i++) {
161+
signals[i].emit(boost::asio::cancellation_type::terminal);
162+
}
163+
}
164+
165+
void complete()
166+
{
167+
if (waiter.waiting()) {
168+
waiter.complete(nullptr);
169+
}
170+
}
171+
172+
co_spawn_group_handler<executor_type> completion()
173+
{
174+
if (spawned >= limit) {
175+
throw std::length_error("spawn group maximum size exceeded");
176+
}
177+
const size_type index = spawned++;
178+
return {boost::intrusive_ptr{this}, signals[index].slot(), index};
179+
}
180+
};
181+
182+
} // namespace ceph::async::detail

src/test/common/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,10 @@ add_ceph_unittest(unittest_async_max_concurrent_for_each)
367367
target_link_libraries(unittest_async_max_concurrent_for_each ceph-common Boost::system Boost::context)
368368

369369
if(NOT WIN32)
370+
add_executable(unittest_async_co_spawn_group test_async_co_spawn_group.cc)
371+
add_ceph_unittest(unittest_async_co_spawn_group)
372+
target_link_libraries(unittest_async_co_spawn_group ceph-common Boost::system)
373+
370374
add_executable(unittest_async_co_throttle test_async_co_throttle.cc)
371375
add_ceph_unittest(unittest_async_co_throttle)
372376
target_link_libraries(unittest_async_co_throttle ceph-common Boost::system)

0 commit comments

Comments
 (0)