Skip to content

Commit 6ce969b

Browse files
authored
Merge pull request ceph#57188 from cbodley/wip-boost-asio-yield-group
common/async: add primitives for structured concurrency with optional_yield Reviewed-by: Adam Emerson <[email protected]>
2 parents c3eda27 + 25017d6 commit 6ce969b

File tree

10 files changed

+2121
-0
lines changed

10 files changed

+2121
-0
lines changed

src/common/async/cancel_on_error.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab ft=cpp
3+
4+
/*
5+
* Ceph - scalable distributed file system
6+
*
7+
* Copyright contributors to the Ceph project
8+
*
9+
* This is free software; you can redistribute it and/or
10+
* modify it under the terms of the GNU Lesser General Public
11+
* License version 2.1, as published by the Free Software
12+
* Foundation. See file COPYING.
13+
*
14+
*/
15+
16+
#pragma once
17+
18+
#include <cstdint>
19+
20+
namespace ceph::async {
21+
22+
/// Error handling strategy for concurrent operations.
23+
enum class cancel_on_error : uint8_t {
24+
none, //< No spawned coroutines are canceled on failure.
25+
after, //< Cancel coroutines spawned after the failed coroutine.
26+
all, //< Cancel all spawned coroutines on failure.
27+
};
28+
29+
} // namespace ceph::async
Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab ft=cpp
3+
4+
/*
5+
* Ceph - scalable distributed file system
6+
*
7+
* Copyright contributors to the Ceph project
8+
*
9+
* This is free software; you can redistribute it and/or
10+
* modify it under the terms of the GNU Lesser General Public
11+
* License version 2.1, as published by the Free Software
12+
* Foundation. See file COPYING.
13+
*
14+
*/
15+
16+
#pragma once
17+
18+
#include <exception>
19+
#include <optional>
20+
#include <memory>
21+
#include <utility>
22+
#include <boost/asio/append.hpp>
23+
#include <boost/asio/associated_cancellation_slot.hpp>
24+
#include <boost/asio/async_result.hpp>
25+
#include <boost/asio/execution/context.hpp>
26+
#include <boost/asio/io_context.hpp>
27+
#include <boost/asio/query.hpp>
28+
#include <boost/asio/spawn.hpp>
29+
#include <boost/intrusive_ptr.hpp>
30+
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
31+
#include "common/async/cancel_on_error.h"
32+
#include "common/async/service.h"
33+
#include "common/async/yield_context.h"
34+
35+
namespace ceph::async::detail {
36+
37+
struct spawn_throttle_handler;
38+
39+
// Reference-counted spawn throttle interface.
40+
class spawn_throttle_impl :
41+
public boost::intrusive_ref_counter<spawn_throttle_impl,
42+
boost::thread_unsafe_counter>
43+
{
44+
public:
45+
spawn_throttle_impl(size_t limit, cancel_on_error on_error)
46+
: limit(limit), on_error(on_error),
47+
children(std::make_unique<child[]>(limit))
48+
{
49+
// initialize the free list
50+
for (size_t i = 0; i < limit; i++) {
51+
free.push_back(children[i]);
52+
}
53+
}
54+
virtual ~spawn_throttle_impl() {}
55+
56+
// factory function
57+
static auto create(optional_yield y, size_t limit, cancel_on_error on_error)
58+
-> boost::intrusive_ptr<spawn_throttle_impl>;
59+
60+
// return the completion handler for a new child. may block due to throttling
61+
// or rethrow an exception from a previously-spawned child
62+
spawn_throttle_handler get();
63+
64+
// track each spawned coroutine for cancellation. these are stored in an
65+
// array, and recycled after each use via the free list
66+
struct child : boost::intrusive::list_base_hook<> {
67+
std::optional<boost::asio::cancellation_signal> signal;
68+
};
69+
70+
using executor_type = boost::asio::any_io_executor;
71+
virtual executor_type get_executor() = 0;
72+
73+
// wait until count <= target_count
74+
virtual void wait_for(size_t target_count) = 0;
75+
76+
// cancel outstanding coroutines
77+
virtual void cancel(bool shutdown)
78+
{
79+
cancel_outstanding_from(outstanding.begin());
80+
}
81+
82+
// complete the given child coroutine
83+
virtual void on_complete(child& c, std::exception_ptr eptr)
84+
{
85+
--count;
86+
87+
// move back to the free list
88+
auto next = outstanding.erase(outstanding.iterator_to(c));
89+
c.signal.reset();
90+
free.push_back(c);
91+
92+
if (eptr && !unreported_exception) {
93+
// hold on to the first child exception until we can report it in wait()
94+
// or completion()
95+
unreported_exception = eptr;
96+
97+
// handle cancel_on_error
98+
auto cancel_from = outstanding.end();
99+
if (on_error == cancel_on_error::after) {
100+
cancel_from = next;
101+
} else if (on_error == cancel_on_error::all) {
102+
cancel_from = outstanding.begin();
103+
}
104+
cancel_outstanding_from(cancel_from);
105+
}
106+
}
107+
108+
protected:
109+
const size_t limit;
110+
const cancel_on_error on_error;
111+
size_t count = 0;
112+
113+
void report_exception()
114+
{
115+
if (unreported_exception) {
116+
std::rethrow_exception(std::exchange(unreported_exception, nullptr));
117+
}
118+
}
119+
120+
private:
121+
std::exception_ptr unreported_exception;
122+
std::unique_ptr<child[]> children;
123+
124+
using child_list = boost::intrusive::list<child,
125+
boost::intrusive::constant_time_size<false>>;
126+
child_list outstanding;
127+
child_list free;
128+
129+
void cancel_outstanding_from(child_list::iterator i)
130+
{
131+
while (i != outstanding.end()) {
132+
// increment before cancellation, which may invoke on_complete()
133+
// directly and remove the child from this list
134+
child& c = *i++;
135+
c.signal->emit(boost::asio::cancellation_type::terminal);
136+
}
137+
}
138+
};
139+
140+
// A cancellable spawn() completion handler that notifies the spawn_throttle
141+
// upon completion. This holds a reference to the implementation in order to
142+
// extend its lifetime. This is required for per-op cancellation because the
143+
// cancellation_signals must outlive these coroutine stacks.
144+
struct spawn_throttle_handler {
145+
boost::intrusive_ptr<spawn_throttle_impl> impl;
146+
spawn_throttle_impl::child& c;
147+
boost::asio::cancellation_slot slot;
148+
149+
spawn_throttle_handler(boost::intrusive_ptr<spawn_throttle_impl> impl,
150+
spawn_throttle_impl::child& c)
151+
: impl(std::move(impl)), c(c), slot(c.signal->slot())
152+
{}
153+
154+
using executor_type = spawn_throttle_impl::executor_type;
155+
executor_type get_executor() const noexcept
156+
{
157+
return impl->get_executor();
158+
}
159+
160+
using cancellation_slot_type = boost::asio::cancellation_slot;
161+
cancellation_slot_type get_cancellation_slot() const noexcept
162+
{
163+
return slot;
164+
}
165+
166+
void operator()(std::exception_ptr eptr)
167+
{
168+
impl->on_complete(c, eptr);
169+
}
170+
};
171+
172+
spawn_throttle_handler spawn_throttle_impl::get()
173+
{
174+
report_exception(); // throw unreported exception
175+
176+
if (count >= limit) {
177+
wait_for(limit - 1);
178+
}
179+
180+
++count;
181+
182+
// move a free child to the outstanding list
183+
child& c = free.front();
184+
free.pop_front();
185+
outstanding.push_back(c);
186+
187+
// spawn the coroutine with its associated cancellation signal
188+
c.signal.emplace();
189+
return {this, c};
190+
}
191+
192+
193+
// Spawn throttle implementation for use in synchronous contexts where wait()
194+
// blocks the calling thread until completion.
195+
class sync_spawn_throttle_impl final : public spawn_throttle_impl {
196+
static constexpr int concurrency = 1; // only run from a single thread
197+
public:
198+
sync_spawn_throttle_impl(size_t limit, cancel_on_error on_error)
199+
: spawn_throttle_impl(limit, on_error),
200+
ctx(std::in_place, concurrency)
201+
{}
202+
203+
executor_type get_executor() override
204+
{
205+
return ctx->get_executor();
206+
}
207+
208+
void wait_for(size_t target_count) override
209+
{
210+
while (count > target_count) {
211+
if (ctx->stopped()) {
212+
ctx->restart();
213+
}
214+
ctx->run_one();
215+
}
216+
217+
report_exception(); // throw unreported exception
218+
}
219+
220+
void cancel(bool shutdown) override
221+
{
222+
spawn_throttle_impl::cancel(shutdown);
223+
224+
if (shutdown) {
225+
// destroy the io_context to trigger two-phase shutdown which
226+
// destroys any completion handlers with a reference to 'this'
227+
ctx.reset();
228+
count = 0;
229+
}
230+
}
231+
232+
private:
233+
std::optional<boost::asio::io_context> ctx;
234+
};
235+
236+
// Spawn throttle implementation for use in asynchronous contexts where wait()
237+
// suspends the calling stackful coroutine.
238+
class async_spawn_throttle_impl final :
239+
public spawn_throttle_impl,
240+
public service_list_base_hook
241+
{
242+
public:
243+
async_spawn_throttle_impl(boost::asio::yield_context yield,
244+
size_t limit, cancel_on_error on_error)
245+
: spawn_throttle_impl(limit, on_error),
246+
svc(boost::asio::use_service<service<async_spawn_throttle_impl>>(
247+
boost::asio::query(yield.get_executor(),
248+
boost::asio::execution::context))),
249+
yield(yield)
250+
{
251+
// register for service_shutdown() notifications
252+
svc.add(*this);
253+
}
254+
255+
~async_spawn_throttle_impl()
256+
{
257+
svc.remove(*this);
258+
}
259+
260+
executor_type get_executor() override
261+
{
262+
return yield.get_executor();
263+
}
264+
265+
void service_shutdown()
266+
{
267+
waiter.reset();
268+
}
269+
270+
private:
271+
service<async_spawn_throttle_impl>& svc;
272+
boost::asio::yield_context yield;
273+
274+
using WaitSignature = void(boost::system::error_code);
275+
struct wait_state {
276+
using Work = boost::asio::executor_work_guard<
277+
boost::asio::any_io_executor>;
278+
using Handler = typename boost::asio::async_result<
279+
boost::asio::yield_context, WaitSignature>::handler_type;
280+
281+
Work work;
282+
Handler handler;
283+
284+
explicit wait_state(Handler&& h)
285+
: work(make_work_guard(h)),
286+
handler(std::move(h))
287+
{}
288+
};
289+
std::optional<wait_state> waiter;
290+
size_t wait_for_count = 0;
291+
292+
struct op_cancellation {
293+
async_spawn_throttle_impl* self;
294+
explicit op_cancellation(async_spawn_throttle_impl* self) noexcept
295+
: self(self) {}
296+
void operator()(boost::asio::cancellation_type type) {
297+
if (type != boost::asio::cancellation_type::none) {
298+
self->cancel(false);
299+
}
300+
}
301+
};
302+
303+
void wait_for(size_t target_count) override
304+
{
305+
if (count > target_count) {
306+
wait_for_count = target_count;
307+
308+
boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
309+
[this] (auto handler) {
310+
auto slot = get_associated_cancellation_slot(handler);
311+
if (slot.is_connected()) {
312+
slot.template emplace<op_cancellation>(this);
313+
}
314+
waiter.emplace(std::move(handler));
315+
}, yield);
316+
// this is a coroutine, so the wait has completed by this point
317+
}
318+
319+
report_exception(); // throw unreported exception
320+
}
321+
322+
void wait_complete(boost::system::error_code ec)
323+
{
324+
auto w = std::move(*waiter);
325+
waiter.reset();
326+
boost::asio::dispatch(boost::asio::append(std::move(w.handler), ec));
327+
}
328+
329+
void on_complete(child& c, std::exception_ptr eptr) override
330+
{
331+
spawn_throttle_impl::on_complete(c, eptr);
332+
333+
if (waiter && count <= wait_for_count) {
334+
wait_complete({});
335+
}
336+
}
337+
338+
void cancel(bool shutdown) override
339+
{
340+
spawn_throttle_impl::cancel(shutdown);
341+
342+
if (waiter) {
343+
wait_complete(make_error_code(boost::asio::error::operation_aborted));
344+
}
345+
}
346+
};
347+
348+
auto spawn_throttle_impl::create(optional_yield y, size_t limit,
349+
cancel_on_error on_error)
350+
-> boost::intrusive_ptr<spawn_throttle_impl>
351+
{
352+
if (y) {
353+
auto yield = y.get_yield_context();
354+
return new async_spawn_throttle_impl(yield, limit, on_error);
355+
} else {
356+
return new sync_spawn_throttle_impl(limit, on_error);
357+
}
358+
}
359+
360+
} // namespace ceph::async::detail

0 commit comments

Comments
 (0)