Skip to content

Commit bed336a

Browse files
authored
Merge pull request ceph#55846 from athanatos/sjust/for-review/wip-crimson-coroutine-support
crimson: add coroutine support for errorated and interruptible futures Reviewed-by: Matan Breizman <[email protected]> Reviewed-by: Yingxin Cheng <[email protected]> Reviewed-by: Chunmei Liu <[email protected]>
2 parents 96b9d3a + 1126ec3 commit bed336a

File tree

8 files changed

+742
-67
lines changed

8 files changed

+742
-67
lines changed

src/crimson/common/coroutine.h

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2+
// vim: ts=8 sw=2 smarttab expandtab
3+
4+
#pragma once
5+
6+
#include <seastar/core/coroutine.hh>
7+
8+
#include "crimson/common/errorator.h"
9+
#include "crimson/common/interruptible_future.h"
10+
11+
12+
namespace crimson {
13+
namespace internal {
14+
15+
template <typename Interruptor, typename Errorator>
16+
struct to_future {
17+
template <typename T>
18+
using future = crimson::interruptible::interruptible_future_detail<
19+
typename Interruptor::condition,
20+
typename Errorator::template future<T>>;
21+
};
22+
23+
template <typename Errorator>
24+
struct to_future<void, Errorator> {
25+
template <typename T>
26+
using future = typename Errorator::template future<T>;
27+
};
28+
29+
30+
template <typename Interruptor>
31+
struct to_future<Interruptor, void> {
32+
template <typename T>
33+
using future = ::crimson::interruptible::interruptible_future<
34+
typename Interruptor::condition, T>;
35+
};
36+
37+
template <>
38+
struct to_future<void, void> {
39+
template <typename T>
40+
using future = seastar::future<T>;
41+
};
42+
43+
44+
template <typename Future>
45+
struct cond_checker {
46+
using ref = std::unique_ptr<cond_checker>;
47+
virtual std::optional<Future> may_interrupt() = 0;
48+
virtual ~cond_checker() = default;
49+
};
50+
51+
template <typename Interruptor>
52+
struct interrupt_cond_capture {
53+
using InterruptCond = typename Interruptor::condition;
54+
interruptible::InterruptCondRef<InterruptCond> cond;
55+
56+
template <typename Future>
57+
struct type_erased_cond_checker final : cond_checker<Future> {
58+
interruptible::InterruptCondRef<InterruptCond> cond;
59+
60+
template <typename T>
61+
type_erased_cond_checker(T &&t) : cond(std::forward<T>(t)) {}
62+
63+
std::optional<Future> may_interrupt() final {
64+
return cond->template may_interrupt<Future>();
65+
}
66+
};
67+
68+
template <typename Future>
69+
typename cond_checker<Future>::ref capture_and_get_checker() {
70+
ceph_assert(interruptible::interrupt_cond<InterruptCond>.interrupt_cond);
71+
cond = interruptible::interrupt_cond<InterruptCond>.interrupt_cond;
72+
return typename cond_checker<Future>::ref{
73+
new type_erased_cond_checker<Future>{cond}
74+
};
75+
}
76+
77+
void restore() {
78+
ceph_assert(cond);
79+
interruptible::interrupt_cond<InterruptCond>.set(cond);
80+
}
81+
82+
void reset() {
83+
interruptible::interrupt_cond<InterruptCond>.reset();
84+
}
85+
};
86+
87+
template <>
88+
struct interrupt_cond_capture<void> {
89+
template <typename Future>
90+
typename cond_checker<Future>::ref capture_and_get_checker() {
91+
return nullptr;
92+
}
93+
};
94+
95+
template <typename Interruptor>
96+
struct seastar_task_ancestor : protected seastar::task {};
97+
98+
template <>
99+
struct seastar_task_ancestor<void> : public seastar::task {};
100+
101+
template <typename Interruptor, typename Errorator, typename T>
102+
class promise_base : public seastar_task_ancestor<Interruptor> {
103+
protected:
104+
seastar::promise<T> _promise;
105+
106+
public:
107+
interrupt_cond_capture<Interruptor> cond;
108+
109+
using errorator_type = Errorator;
110+
using interruptor = Interruptor;
111+
static constexpr bool is_errorated = !std::is_void<Errorator>::value;
112+
static constexpr bool is_interruptible = !std::is_void<Interruptor>::value;
113+
114+
using _to_future = to_future<Interruptor, Errorator>;
115+
116+
template <typename U=void>
117+
using future = typename _to_future::template future<U>;
118+
119+
promise_base() = default;
120+
promise_base(promise_base&&) = delete;
121+
promise_base(const promise_base&) = delete;
122+
123+
void set_exception(std::exception_ptr&& eptr) noexcept {
124+
_promise.set_exception(std::move(eptr));
125+
}
126+
127+
void unhandled_exception() noexcept {
128+
_promise.set_exception(std::current_exception());
129+
}
130+
131+
future<T> get_return_object() noexcept {
132+
return _promise.get_future();
133+
}
134+
135+
std::suspend_never initial_suspend() noexcept { return { }; }
136+
std::suspend_never final_suspend() noexcept { return { }; }
137+
138+
void run_and_dispose() noexcept final {
139+
if constexpr (is_interruptible) {
140+
cond.restore();
141+
}
142+
auto handle = std::coroutine_handle<promise_base>::from_promise(*this);
143+
handle.resume();
144+
if constexpr (is_interruptible) {
145+
cond.reset();
146+
}
147+
}
148+
149+
seastar::task *waiting_task() noexcept override {
150+
return _promise.waiting_task();
151+
}
152+
seastar::task *get_seastar_task() { return this; }
153+
};
154+
155+
template <typename Interruptor, typename Errorator, typename T=void>
156+
class coroutine_traits {
157+
public:
158+
class promise_type final : public promise_base<Interruptor, Errorator, T> {
159+
using base = promise_base<Interruptor, Errorator, T>;
160+
public:
161+
template <typename... U>
162+
void return_value(U&&... value) {
163+
base::_promise.set_value(std::forward<U>(value)...);
164+
}
165+
};
166+
};
167+
168+
169+
template <typename Interruptor, typename Errorator>
170+
class coroutine_traits<Interruptor, Errorator> {
171+
public:
172+
class promise_type final : public promise_base<Interruptor, Errorator, void> {
173+
using base = promise_base<Interruptor, Errorator, void>;
174+
public:
175+
void return_void() noexcept {
176+
base::_promise.set_value();
177+
}
178+
};
179+
};
180+
181+
template <typename Interruptor, typename Errorator,
182+
bool CheckPreempt, typename T=void>
183+
struct awaiter {
184+
static constexpr bool is_errorated = !std::is_void<Errorator>::value;
185+
static constexpr bool is_interruptible = !std::is_void<Interruptor>::value;
186+
187+
template <typename U=void>
188+
using future = typename to_future<Interruptor, Errorator>::template future<U>;
189+
190+
future<T> _future;
191+
192+
typename cond_checker<future<T>>::ref checker;
193+
public:
194+
explicit awaiter(future<T>&& f) noexcept : _future(std::move(f)) { }
195+
196+
awaiter(const awaiter&) = delete;
197+
awaiter(awaiter&&) = delete;
198+
199+
bool await_ready() const noexcept {
200+
return _future.available() && (!CheckPreempt || !seastar::need_preempt());
201+
}
202+
203+
template <typename U>
204+
void await_suspend(std::coroutine_handle<U> hndl) noexcept {
205+
if constexpr (is_errorated) {
206+
using dest_errorator_t = typename U::errorator_type;
207+
static_assert(dest_errorator_t::template contains_once_v<Errorator>,
208+
"conversion is possible to more-or-eq errorated future!");
209+
}
210+
211+
checker =
212+
hndl.promise().cond.template capture_and_get_checker<future<T>>();
213+
if (!CheckPreempt || !_future.available()) {
214+
_future.set_coroutine(*hndl.promise().get_seastar_task());
215+
} else {
216+
::seastar::schedule(hndl.promise().get_seastar_task());
217+
}
218+
}
219+
220+
T await_resume() {
221+
if (auto maybe_fut = checker ? checker->may_interrupt() : std::nullopt) {
222+
// silence warning that we are discarding an exceptional future
223+
if (_future.failed()) _future.get_exception();
224+
if constexpr (is_errorated) {
225+
return maybe_fut->unsafe_get0();
226+
} else {
227+
return maybe_fut->get0();
228+
}
229+
} else {
230+
if constexpr (is_errorated) {
231+
return _future.unsafe_get0();
232+
} else {
233+
return _future.get0();
234+
}
235+
}
236+
}
237+
};
238+
239+
}
240+
}
241+
242+
template <template <typename> typename Container, typename T>
243+
auto operator co_await(
244+
Container<crimson::errorated_future_marker<T>> f) noexcept {
245+
using Errorator = typename seastar::futurize<decltype(f)>::errorator_type;
246+
return crimson::internal::awaiter<void, Errorator, true, T>(std::move(f));
247+
}
248+
249+
template <typename InterruptCond, typename T>
250+
auto operator co_await(
251+
crimson::interruptible::interruptible_future_detail<
252+
InterruptCond, seastar::future<T>
253+
> f) noexcept {
254+
return crimson::internal::awaiter<
255+
crimson::interruptible::interruptor<InterruptCond>, void, true, T>(
256+
std::move(f));
257+
}
258+
259+
template <template <typename> typename Container,
260+
typename InterruptCond, typename T>
261+
auto operator co_await(
262+
crimson::interruptible::interruptible_future_detail<
263+
InterruptCond, Container<crimson::errorated_future_marker<T>>
264+
> f) noexcept {
265+
using Errorator = typename seastar::futurize<decltype(f)>::errorator_type;
266+
return crimson::internal::awaiter<
267+
crimson::interruptible::interruptor<InterruptCond>,
268+
typename Errorator::base_ertr, true, T>(
269+
std::move(f));
270+
}
271+
272+
namespace std {
273+
274+
template <template <typename> typename Container,
275+
typename T, typename... Args>
276+
class coroutine_traits<Container<crimson::errorated_future_marker<T>>, Args...> :
277+
public crimson::internal::coroutine_traits<
278+
void,
279+
typename seastar::futurize<
280+
Container<crimson::errorated_future_marker<T>>
281+
>::errorator_type,
282+
T> {};
283+
284+
template <typename InterruptCond,
285+
typename T, typename... Args>
286+
class coroutine_traits<
287+
crimson::interruptible::interruptible_future_detail<
288+
InterruptCond, seastar::future<T>
289+
>, Args...> : public crimson::internal::coroutine_traits<
290+
crimson::interruptible::interruptor<InterruptCond>,
291+
void,
292+
T> {};
293+
294+
template <template <typename> typename Container,
295+
typename InterruptCond,
296+
typename T, typename... Args>
297+
class coroutine_traits<
298+
crimson::interruptible::interruptible_future_detail<
299+
InterruptCond, Container<crimson::errorated_future_marker<T>>
300+
>, Args...> :
301+
public crimson::internal::coroutine_traits<
302+
crimson::interruptible::interruptor<InterruptCond>,
303+
typename seastar::futurize<
304+
crimson::interruptible::interruptible_future_detail<
305+
InterruptCond,
306+
Container<crimson::errorated_future_marker<T>>
307+
>
308+
>::errorator_type::base_ertr,
309+
T> {};
310+
}

src/crimson/common/errorator.h

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -354,17 +354,10 @@ class maybe_handle_error_t {
354354
// to throwing an exception by the handler.
355355
std::invoke(std::forward<ErrorVisitorT>(errfunc),
356356
ErrorT::error_t::from_exception_ptr(std::move(ep)));
357-
} else if constexpr (seastar::Future<decltype(result)>) {
358-
// result is seastar::future but return_t is e.g. int. If so,
359-
// the else clause cannot be used as seastar::future lacks
360-
// errorator_type member.
361-
result = seastar::make_ready_future<return_t>(
362-
std::invoke(std::forward<ErrorVisitorT>(errfunc),
363-
ErrorT::error_t::from_exception_ptr(std::move(ep))));
364357
} else {
365-
result = FuturatorT::type::errorator_type::template make_ready_future<return_t>(
366-
std::invoke(std::forward<ErrorVisitorT>(errfunc),
367-
ErrorT::error_t::from_exception_ptr(std::move(ep))));
358+
result = FuturatorT::invoke(
359+
std::forward<ErrorVisitorT>(errfunc),
360+
ErrorT::error_t::from_exception_ptr(std::move(ep)));
368361
}
369362
}
370363
}
@@ -529,9 +522,9 @@ struct errorator {
529522
}
530523

531524
protected:
532-
using base_t::get_exception;
533525
friend class ::transaction_manager_test_t;
534526
public:
527+
using base_t::get_exception;
535528
using errorator_type = ::crimson::errorator<AllowedErrors...>;
536529
using promise_type = seastar::promise<ValueT>;
537530

@@ -614,6 +607,10 @@ struct errorator {
614607
"ErrorT is not enlisted in errorator");
615608
}
616609

610+
void set_coroutine(seastar::task& coroutine) noexcept {
611+
base_t::set_coroutine(coroutine);
612+
}
613+
617614
template <class ValueFuncT, class ErrorVisitorT>
618615
auto safe_then(ValueFuncT&& valfunc, ErrorVisitorT&& errfunc) {
619616
static_assert((... && std::is_invocable_v<ErrorVisitorT,

0 commit comments

Comments
 (0)