Skip to content

Commit e5d33fa

Browse files
authored
Merge pull request ceph#58348 from cbodley/wip-async-co-algorithms
common/async: add primitives for structured concurrency with c++20 coroutines Reviewed-by: Adam Emerson <[email protected]>
2 parents f84a159 + 7e5d28f commit e5d33fa

12 files changed

+2363
-3
lines changed

src/common/async/co_spawn_group.h

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* This is free software; you can redistribute it and/or
7+
* modify it under the terms of the GNU Lesser General Public
8+
* License version 2.1, as published by the Free Software
9+
* Foundation. See file COPYING.
10+
*
11+
*/
12+
13+
#pragma once
14+
15+
#include <boost/asio/awaitable.hpp>
16+
#include <boost/asio/execution/executor.hpp>
17+
#include "cancel_on_error.h"
18+
#include "detail/co_spawn_group.h"
19+
20+
namespace ceph::async {
21+
22+
/// \brief Tracks a group of coroutines to await all of their completions.
23+
///
24+
/// The wait() function can be used to await the completion of all children.
25+
/// If any child coroutines exit with an exception, the first such exception
26+
/// is rethrown by wait(). The cancel_on_error option controls whether these
27+
/// exceptions trigger the cancellation of other children.
28+
///
29+
/// All child coroutines are canceled by cancel() or co_spawn_group destruction.
30+
/// This allows the parent coroutine to share memory with its child coroutines
31+
/// without fear of dangling references.
32+
///
33+
/// This class is not thread-safe, so a strand executor should be used in
34+
/// multi-threaded contexts.
35+
///
36+
/// Example:
37+
/// \code
38+
/// awaitable<void> child(task& t);
39+
///
40+
/// awaitable<void> parent(std::span<task> tasks)
41+
/// {
42+
/// // process all tasks in parallel
43+
/// auto ex = co_await boost::asio::this_coro::executor;
44+
/// auto group = co_spawn_group{ex, tasks.size()};
45+
///
46+
/// for (auto& t : tasks) {
47+
/// group.spawn(child(t));
48+
/// }
49+
/// co_await group.wait();
50+
/// }
51+
/// \endcode
52+
template <boost::asio::execution::executor Executor>
53+
class co_spawn_group {
54+
using impl_type = detail::co_spawn_group_impl<Executor>;
55+
boost::intrusive_ptr<impl_type> impl;
56+
57+
public:
58+
co_spawn_group(Executor ex, size_t limit,
59+
cancel_on_error on_error = cancel_on_error::none)
60+
: impl(new impl_type(ex, limit, on_error))
61+
{
62+
}
63+
64+
~co_spawn_group()
65+
{
66+
impl->cancel();
67+
}
68+
69+
using executor_type = Executor;
70+
executor_type get_executor() const
71+
{
72+
return impl->get_executor();
73+
}
74+
75+
/// Spawn the given coroutine \ref cr on the group's executor. Throws a
76+
/// std::length_error exception if the number of outstanding coroutines
77+
/// would exceed the group's limit.
78+
void spawn(boost::asio::awaitable<void, executor_type> cr)
79+
{
80+
impl->spawn(std::move(cr));
81+
}
82+
83+
/// Wait for all outstanding coroutines before returning. If any of the
84+
/// spawned coroutines exit with an exception, the first exception is
85+
/// rethrown.
86+
///
87+
/// After wait() completes, whether by exception or co_return, the spawn
88+
/// group can be reused to spawn and await additional coroutines.
89+
boost::asio::awaitable<void, executor_type> wait()
90+
{
91+
return impl->wait();
92+
}
93+
94+
/// Cancel all outstanding coroutines.
95+
void cancel()
96+
{
97+
impl->cancel();
98+
}
99+
};
100+
101+
} // namespace ceph::async

src/common/async/co_throttle.h

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* Copyright (C) 2023 Red Hat <[email protected]>
7+
*
8+
* This is free software; you can redistribute it and/or
9+
* modify it under the terms of the GNU Lesser General Public
10+
* License version 2.1, as published by the Free Software
11+
* Foundation. See file COPYING.
12+
*
13+
*/
14+
15+
#pragma once
16+
17+
#include <cstdint>
18+
#include <limits>
19+
#include <boost/intrusive_ptr.hpp>
20+
#include "common/async/cancel_on_error.h"
21+
#include "common/async/detail/co_throttle_impl.h"
22+
23+
namespace ceph::async {
24+
25+
/// A coroutine throttle that allows a parent coroutine to spawn and manage
26+
/// multiple child coroutines, while enforcing an upper bound on concurrency.
27+
///
28+
/// Child coroutines must be of type awaitable<void>. Exceptions thrown by
29+
/// children are rethrown to the parent on its next call to spawn() or wait().
30+
/// The cancel_on_error option controls whether these exceptions errors trigger
31+
/// the cancellation of other children.
32+
///
33+
/// All child coroutines are canceled by cancel() or co_throttle destruction.
34+
/// This allows the parent coroutine to share memory with its child coroutines
35+
/// without fear of dangling references.
36+
///
37+
/// This class is not thread-safe, so a strand executor should be used in
38+
/// multi-threaded contexts.
39+
///
40+
/// Example:
41+
/// \code
42+
/// awaitable<void> child(task& t);
43+
///
44+
/// awaitable<void> parent(std::span<task> tasks)
45+
/// {
46+
/// // process all tasks, up to 10 at a time
47+
/// auto ex = co_await boost::asio::this_coro::executor;
48+
/// auto throttle = co_throttle{ex, 10};
49+
///
50+
/// for (auto& t : tasks) {
51+
/// co_await throttle.spawn(child(t));
52+
/// }
53+
/// co_await throttle.wait();
54+
/// }
55+
/// \endcode
56+
template <boost::asio::execution::executor Executor>
57+
class co_throttle {
58+
using impl_type = detail::co_throttle_impl<Executor>;
59+
boost::intrusive_ptr<impl_type> impl;
60+
61+
public:
62+
using executor_type = Executor;
63+
executor_type get_executor() const noexcept { return impl->get_executor(); }
64+
65+
static constexpr size_t max_limit = std::numeric_limits<size_t>::max();
66+
67+
co_throttle(const executor_type& ex, size_t limit,
68+
cancel_on_error on_error = cancel_on_error::none)
69+
: impl(new impl_type(ex, limit, on_error))
70+
{
71+
}
72+
73+
~co_throttle()
74+
{
75+
cancel();
76+
}
77+
78+
co_throttle(const co_throttle&) = delete;
79+
co_throttle& operator=(const co_throttle&) = delete;
80+
81+
/// Try to spawn the given coroutine \ref cr. If this would exceed the
82+
/// concurrency limit, wait for another coroutine to complete first. This
83+
/// default limit can be overridden with the optional \ref smaller_limit
84+
/// argument.
85+
///
86+
/// If any spawned coroutines exit with an exception, the first exception is
87+
/// rethrown by the next call to spawn() or wait(). If spawn() has an
88+
/// exception to rethrow, it will spawn \cr first only in the case of
89+
/// cancel_on_error::none. New coroutines can be spawned by later calls to
90+
/// spawn() regardless of cancel_on_error.
91+
auto spawn(boost::asio::awaitable<void, executor_type> cr,
92+
size_t smaller_limit = max_limit)
93+
-> boost::asio::awaitable<void, executor_type>
94+
{
95+
return impl->spawn(std::move(cr), smaller_limit);
96+
}
97+
98+
/// Wait for all associated coroutines to complete. If any of these coroutines
99+
/// exit with an exception, the first of those exceptions is rethrown.
100+
auto wait()
101+
-> boost::asio::awaitable<void, executor_type>
102+
{
103+
return impl->wait();
104+
}
105+
106+
/// Cancel all associated coroutines.
107+
void cancel()
108+
{
109+
impl->cancel();
110+
}
111+
};
112+
113+
} // namespace ceph::async

src/common/async/co_waiter.h

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* This is free software; you can redistribute it and/or
7+
* modify it under the terms of the GNU Lesser General Public
8+
* License version 2.1, as published by the Free Software
9+
* Foundation. See file COPYING.
10+
*
11+
*/
12+
13+
#pragma once
14+
15+
#include <exception>
16+
#include <optional>
17+
#include <boost/asio/append.hpp>
18+
#include <boost/asio/async_result.hpp>
19+
#include <boost/asio/dispatch.hpp>
20+
#include <boost/asio/execution/executor.hpp>
21+
#include <boost/asio/use_awaitable.hpp>
22+
#include "include/ceph_assert.h"
23+
24+
namespace ceph::async {
25+
26+
/// Captures an awaitable handler for deferred completion or cancellation.
27+
template <typename Ret, boost::asio::execution::executor Executor>
28+
class co_waiter {
29+
using signature = void(std::exception_ptr, Ret);
30+
using token_type = boost::asio::use_awaitable_t<Executor>;
31+
using handler_type = typename boost::asio::async_result<
32+
token_type, signature>::handler_type;
33+
std::optional<handler_type> handler;
34+
35+
struct op_cancellation {
36+
co_waiter* self;
37+
op_cancellation(co_waiter* self) : self(self) {}
38+
void operator()(boost::asio::cancellation_type_t type) {
39+
if (type != boost::asio::cancellation_type::none) {
40+
self->cancel();
41+
}
42+
}
43+
};
44+
public:
45+
co_waiter() = default;
46+
47+
// copy and move are disabled because the cancellation handler captures 'this'
48+
co_waiter(const co_waiter&) = delete;
49+
co_waiter& operator=(const co_waiter&) = delete;
50+
51+
/// Returns true if there's a handler awaiting completion.
52+
bool waiting() const { return handler.has_value(); }
53+
54+
/// Returns an awaitable that blocks until complete() or cancel().
55+
boost::asio::awaitable<Ret, Executor> get()
56+
{
57+
ceph_assert(!handler);
58+
token_type token;
59+
return boost::asio::async_initiate<token_type, signature>(
60+
[this] (handler_type h) {
61+
auto slot = boost::asio::get_associated_cancellation_slot(h);
62+
if (slot.is_connected()) {
63+
slot.template emplace<op_cancellation>(this);
64+
}
65+
handler.emplace(std::move(h));
66+
}, token);
67+
}
68+
69+
/// Schedule the completion handler with the given arguments.
70+
void complete(std::exception_ptr eptr, Ret value)
71+
{
72+
ceph_assert(handler);
73+
auto h = boost::asio::append(std::move(*handler), eptr, std::move(value));
74+
handler.reset();
75+
boost::asio::dispatch(std::move(h));
76+
}
77+
78+
/// Cancel the coroutine with an operation_aborted exception.
79+
void cancel()
80+
{
81+
if (handler) {
82+
auto eptr = std::make_exception_ptr(
83+
boost::system::system_error(
84+
boost::asio::error::operation_aborted));
85+
complete(eptr, Ret{});
86+
}
87+
}
88+
89+
/// Destroy the completion handler.
90+
void shutdown()
91+
{
92+
handler.reset();
93+
}
94+
};
95+
96+
// specialization for Ret=void
97+
template <boost::asio::execution::executor Executor>
98+
class co_waiter<void, Executor> {
99+
using signature = void(std::exception_ptr);
100+
using token_type = boost::asio::use_awaitable_t<Executor>;
101+
using handler_type = typename boost::asio::async_result<
102+
token_type, signature>::handler_type;
103+
std::optional<handler_type> handler;
104+
105+
struct op_cancellation {
106+
co_waiter* self;
107+
op_cancellation(co_waiter* self) : self(self) {}
108+
void operator()(boost::asio::cancellation_type_t type) {
109+
if (type != boost::asio::cancellation_type::none) {
110+
self->cancel();
111+
}
112+
}
113+
};
114+
public:
115+
co_waiter() = default;
116+
117+
// copy and move are disabled because the cancellation handler captures 'this'
118+
co_waiter(const co_waiter&) = delete;
119+
co_waiter& operator=(const co_waiter&) = delete;
120+
121+
/// Returns true if there's a handler awaiting completion.
122+
bool waiting() const { return handler.has_value(); }
123+
124+
/// Returns an awaitable that blocks until complete() or cancel().
125+
boost::asio::awaitable<void, Executor> get()
126+
{
127+
ceph_assert(!handler);
128+
token_type token;
129+
return boost::asio::async_initiate<token_type, signature>(
130+
[this] (handler_type h) {
131+
auto slot = boost::asio::get_associated_cancellation_slot(h);
132+
if (slot.is_connected()) {
133+
slot.template emplace<op_cancellation>(this);
134+
}
135+
handler.emplace(std::move(h));
136+
}, token);
137+
}
138+
139+
/// Schedule the completion handler with the given arguments.
140+
void complete(std::exception_ptr eptr)
141+
{
142+
ceph_assert(handler);
143+
auto h = boost::asio::append(std::move(*handler), eptr);
144+
handler.reset();
145+
boost::asio::dispatch(std::move(h));
146+
}
147+
148+
/// Cancel the coroutine with an operation_aborted exception.
149+
void cancel()
150+
{
151+
if (handler) {
152+
auto eptr = std::make_exception_ptr(
153+
boost::system::system_error(
154+
boost::asio::error::operation_aborted));
155+
complete(eptr);
156+
}
157+
}
158+
159+
/// Destroy the completion handler.
160+
void shutdown()
161+
{
162+
handler.reset();
163+
}
164+
};
165+
166+
} // namespace ceph::async

0 commit comments

Comments
 (0)