Skip to content

Commit 9217fcc

Browse files
committed
common/async: add co_throttle for bounded concurrency with c++20 coroutines
Signed-off-by: Casey Bodley <[email protected]>
1 parent 26ee069 commit 9217fcc

File tree

4 files changed

+890
-0
lines changed

4 files changed

+890
-0
lines changed

src/common/async/co_throttle.h

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* Copyright (C) 2023 Red Hat <[email protected]>
7+
*
8+
* This is free software; you can redistribute it and/or
9+
* modify it under the terms of the GNU Lesser General Public
10+
* License version 2.1, as published by the Free Software
11+
* Foundation. See file COPYING.
12+
*
13+
*/
14+
15+
#pragma once
16+
17+
#include <cstdint>
18+
#include <limits>
19+
#include <boost/intrusive_ptr.hpp>
20+
#include "common/async/cancel_on_error.h"
21+
#include "common/async/detail/co_throttle_impl.h"
22+
23+
namespace ceph::async {
24+
25+
/// A coroutine throttle that allows a parent coroutine to spawn and manage
26+
/// multiple child coroutines, while enforcing an upper bound on concurrency.
27+
///
28+
/// Child coroutines must be of type awaitable<void>. Exceptions thrown by
29+
/// children are rethrown to the parent on its next call to spawn() or wait().
30+
/// The cancel_on_error option controls whether these exceptions errors trigger
31+
/// the cancellation of other children.
32+
///
33+
/// All child coroutines are canceled by cancel() or co_throttle destruction.
34+
/// This allows the parent coroutine to share memory with its child coroutines
35+
/// without fear of dangling references.
36+
///
37+
/// This class is not thread-safe, so a strand executor should be used in
38+
/// multi-threaded contexts.
39+
///
40+
/// Example:
41+
/// \code
42+
/// awaitable<void> child(task& t);
43+
///
44+
/// awaitable<void> parent(std::span<task> tasks)
45+
/// {
46+
/// // process all tasks, up to 10 at a time
47+
/// auto ex = co_await boost::asio::this_coro::executor;
48+
/// auto throttle = co_throttle{ex, 10};
49+
///
50+
/// for (auto& t : tasks) {
51+
/// co_await throttle.spawn(child(t));
52+
/// }
53+
/// co_await throttle.wait();
54+
/// }
55+
/// \endcode
56+
template <boost::asio::execution::executor Executor>
57+
class co_throttle {
58+
using impl_type = detail::co_throttle_impl<Executor>;
59+
boost::intrusive_ptr<impl_type> impl;
60+
61+
public:
62+
using executor_type = Executor;
63+
executor_type get_executor() const noexcept { return impl->get_executor(); }
64+
65+
static constexpr size_t max_limit = std::numeric_limits<size_t>::max();
66+
67+
co_throttle(const executor_type& ex, size_t limit,
68+
cancel_on_error on_error = cancel_on_error::none)
69+
: impl(new impl_type(ex, limit, on_error))
70+
{
71+
}
72+
73+
~co_throttle()
74+
{
75+
cancel();
76+
}
77+
78+
co_throttle(const co_throttle&) = delete;
79+
co_throttle& operator=(const co_throttle&) = delete;
80+
81+
/// Try to spawn the given coroutine \ref cr. If this would exceed the
82+
/// concurrency limit, wait for another coroutine to complete first. This
83+
/// default limit can be overridden with the optional \ref smaller_limit
84+
/// argument.
85+
///
86+
/// If any spawned coroutines exit with an exception, the first exception is
87+
/// rethrown by the next call to spawn() or wait(). If spawn() has an
88+
/// exception to rethrow, it will spawn \cr first only in the case of
89+
/// cancel_on_error::none. New coroutines can be spawned by later calls to
90+
/// spawn() regardless of cancel_on_error.
91+
auto spawn(boost::asio::awaitable<void, executor_type> cr,
92+
size_t smaller_limit = max_limit)
93+
-> boost::asio::awaitable<void, executor_type>
94+
{
95+
return impl->spawn(std::move(cr), smaller_limit);
96+
}
97+
98+
/// Wait for all associated coroutines to complete. If any of these coroutines
99+
/// exit with an exception, the first of those exceptions is rethrown.
100+
auto wait()
101+
-> boost::asio::awaitable<void, executor_type>
102+
{
103+
return impl->wait();
104+
}
105+
106+
/// Cancel all associated coroutines.
107+
void cancel()
108+
{
109+
impl->cancel();
110+
}
111+
};
112+
113+
} // namespace ceph::async
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
/*
4+
* Ceph - scalable distributed file system
5+
*
6+
* Copyright (C) 2023 Red Hat <[email protected]>
7+
*
8+
* This is free software; you can redistribute it and/or
9+
* modify it under the terms of the GNU Lesser General Public
10+
* License version 2.1, as published by the Free Software
11+
* Foundation. See file COPYING.
12+
*
13+
*/
14+
15+
#pragma once
16+
17+
#include <memory>
18+
#include <optional>
19+
#include <boost/asio/append.hpp>
20+
#include <boost/asio/bind_cancellation_slot.hpp>
21+
#include <boost/asio/co_spawn.hpp>
22+
#include <boost/asio/execution/executor.hpp>
23+
#include <boost/intrusive/list.hpp>
24+
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
25+
#include "common/async/cancel_on_error.h"
26+
#include "common/async/co_waiter.h"
27+
#include "common/async/service.h"
28+
#include "include/ceph_assert.h"
29+
30+
namespace ceph::async::detail {
31+
32+
// Coroutine throttle implementation. This is reference-counted so the
33+
// co_spawn() completion handlers can extend the implementation's lifetime.
34+
// This is required for per-op cancellation because the cancellation_signals
35+
// must outlive their coroutine frames.
36+
template <boost::asio::execution::executor Executor>
37+
class co_throttle_impl :
38+
public boost::intrusive_ref_counter<co_throttle_impl<Executor>,
39+
boost::thread_unsafe_counter>,
40+
public service_list_base_hook
41+
{
42+
public:
43+
using executor_type = Executor;
44+
executor_type get_executor() const { return ex; }
45+
46+
co_throttle_impl(const executor_type& ex, size_t limit,
47+
cancel_on_error on_error)
48+
: svc(boost::asio::use_service<service<co_throttle_impl>>(
49+
boost::asio::query(ex, boost::asio::execution::context))),
50+
ex(ex), limit(limit), on_error(on_error),
51+
children(new child[limit])
52+
{
53+
// register for service_shutdown() notifications
54+
svc.add(*this);
55+
56+
// initialize the free list
57+
for (size_t i = 0; i < limit; i++) {
58+
free.push_back(children[i]);
59+
}
60+
}
61+
~co_throttle_impl()
62+
{
63+
svc.remove(*this);
64+
}
65+
66+
auto spawn(boost::asio::awaitable<void, executor_type> cr,
67+
size_t smaller_limit)
68+
-> boost::asio::awaitable<void, executor_type>
69+
{
70+
if (unreported_exception && on_error != cancel_on_error::none) {
71+
std::rethrow_exception(std::exchange(unreported_exception, nullptr));
72+
}
73+
74+
const size_t current_limit = std::min(smaller_limit, limit);
75+
if (count >= current_limit) {
76+
co_await wait_for(current_limit - 1);
77+
if (unreported_exception && on_error != cancel_on_error::none) {
78+
std::rethrow_exception(std::exchange(unreported_exception, nullptr));
79+
}
80+
}
81+
82+
++count;
83+
84+
// move a free child to the outstanding list
85+
ceph_assert(!free.empty());
86+
child& c = free.front();
87+
free.pop_front();
88+
outstanding.push_back(c);
89+
90+
// spawn the coroutine with its associated cancellation signal
91+
c.signal.emplace();
92+
c.canceled = false;
93+
94+
boost::asio::co_spawn(get_executor(), std::move(cr),
95+
boost::asio::bind_cancellation_slot(c.signal->slot(),
96+
child_completion{this, c}));
97+
98+
if (unreported_exception) {
99+
std::rethrow_exception(std::exchange(unreported_exception, nullptr));
100+
}
101+
}
102+
103+
auto wait()
104+
-> boost::asio::awaitable<void, executor_type>
105+
{
106+
if (count > 0) {
107+
co_await wait_for(0);
108+
}
109+
if (unreported_exception) {
110+
std::rethrow_exception(std::exchange(unreported_exception, nullptr));
111+
}
112+
}
113+
114+
void cancel()
115+
{
116+
while (!outstanding.empty()) {
117+
child& c = outstanding.front();
118+
outstanding.pop_front();
119+
120+
c.canceled = true;
121+
c.signal->emit(boost::asio::cancellation_type::terminal);
122+
}
123+
}
124+
125+
void service_shutdown()
126+
{
127+
waiter.shutdown();
128+
}
129+
130+
private:
131+
service<co_throttle_impl>& svc;
132+
executor_type ex;
133+
const size_t limit;
134+
const cancel_on_error on_error;
135+
136+
size_t count = 0;
137+
size_t wait_for_count = 0;
138+
139+
std::exception_ptr unreported_exception;
140+
141+
// track each spawned coroutine for cancellation. these are stored in an
142+
// array, and recycled after each use via the free list
143+
struct child : boost::intrusive::list_base_hook<> {
144+
std::optional<boost::asio::cancellation_signal> signal;
145+
bool canceled = false;
146+
};
147+
std::unique_ptr<child[]> children;
148+
149+
using child_list = boost::intrusive::list<child,
150+
boost::intrusive::constant_time_size<false>>;
151+
child_list outstanding;
152+
child_list free;
153+
154+
co_waiter<void, executor_type> waiter;
155+
156+
// return an awaitable that completes once count <= target_count
157+
auto wait_for(size_t target_count)
158+
-> boost::asio::awaitable<void, executor_type>
159+
{
160+
wait_for_count = target_count;
161+
return waiter.get();
162+
}
163+
164+
void on_complete(child& c, std::exception_ptr eptr)
165+
{
166+
--count;
167+
168+
if (c.canceled) {
169+
// if the child was canceled, it was already removed from outstanding
170+
ceph_assert(!c.is_linked());
171+
c.canceled = false;
172+
c.signal.reset();
173+
free.push_back(c);
174+
} else {
175+
// move back to the free list
176+
ceph_assert(c.is_linked());
177+
auto next = outstanding.erase(outstanding.iterator_to(c));
178+
c.signal.reset();
179+
free.push_back(c);
180+
181+
if (eptr) {
182+
if (eptr && !unreported_exception) {
183+
unreported_exception = eptr;
184+
}
185+
186+
// handle cancel_on_error. cancellation signals may recurse into
187+
// on_complete(), so move the entries into a separate list first
188+
child_list to_cancel;
189+
if (on_error == cancel_on_error::after) {
190+
to_cancel.splice(to_cancel.end(), outstanding,
191+
next, outstanding.end());
192+
} else if (on_error == cancel_on_error::all) {
193+
to_cancel = std::move(outstanding);
194+
}
195+
196+
for (auto i = to_cancel.begin(); i != to_cancel.end(); ++i) {
197+
child& c = *i;
198+
i = to_cancel.erase(i);
199+
200+
c.canceled = true;
201+
c.signal->emit(boost::asio::cancellation_type::terminal);
202+
}
203+
}
204+
}
205+
206+
// maybe wake the waiter
207+
if (waiter.waiting() && count <= wait_for_count) {
208+
waiter.complete(nullptr);
209+
}
210+
}
211+
212+
struct child_completion {
213+
boost::intrusive_ptr<co_throttle_impl> impl;
214+
child& c;
215+
216+
void operator()(std::exception_ptr eptr) {
217+
impl->on_complete(c, eptr);
218+
}
219+
};
220+
};
221+
222+
} // namespace ceph::async::detail

src/test/common/CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,10 +361,17 @@ add_executable(unittest_async_completion test_async_completion.cc)
361361
add_ceph_unittest(unittest_async_completion)
362362
target_link_libraries(unittest_async_completion ceph-common Boost::system)
363363

364+
364365
add_executable(unittest_async_max_concurrent_for_each test_async_max_concurrent_for_each.cc)
365366
add_ceph_unittest(unittest_async_max_concurrent_for_each)
366367
target_link_libraries(unittest_async_max_concurrent_for_each ceph-common Boost::system Boost::context)
367368

369+
if(NOT WIN32)
370+
add_executable(unittest_async_co_throttle test_async_co_throttle.cc)
371+
add_ceph_unittest(unittest_async_co_throttle)
372+
target_link_libraries(unittest_async_co_throttle ceph-common Boost::system)
373+
endif(NOT WIN32)
374+
368375
add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc)
369376
add_ceph_unittest(unittest_async_shared_mutex)
370377
target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system)

0 commit comments

Comments
 (0)