Skip to content

Commit 1126ec3

Browse files
committed
crimson/common: add coroutine integration for crimson futures
Adds coroutine machinery for crimson errorated and interruptible futures. Signed-off-by: Samuel Just <[email protected]>
1 parent 11698a6 commit 1126ec3

File tree

4 files changed

+672
-0
lines changed

4 files changed

+672
-0
lines changed

src/crimson/common/coroutine.h

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2+
// vim: ts=8 sw=2 smarttab expandtab
3+
4+
#pragma once
5+
6+
#include <seastar/core/coroutine.hh>
7+
8+
#include "crimson/common/errorator.h"
9+
#include "crimson/common/interruptible_future.h"
10+
11+
12+
namespace crimson {
13+
namespace internal {
14+
15+
template <typename Interruptor, typename Errorator>
16+
struct to_future {
17+
template <typename T>
18+
using future = crimson::interruptible::interruptible_future_detail<
19+
typename Interruptor::condition,
20+
typename Errorator::template future<T>>;
21+
};
22+
23+
template <typename Errorator>
24+
struct to_future<void, Errorator> {
25+
template <typename T>
26+
using future = typename Errorator::template future<T>;
27+
};
28+
29+
30+
template <typename Interruptor>
31+
struct to_future<Interruptor, void> {
32+
template <typename T>
33+
using future = ::crimson::interruptible::interruptible_future<
34+
typename Interruptor::condition, T>;
35+
};
36+
37+
template <>
38+
struct to_future<void, void> {
39+
template <typename T>
40+
using future = seastar::future<T>;
41+
};
42+
43+
44+
template <typename Future>
45+
struct cond_checker {
46+
using ref = std::unique_ptr<cond_checker>;
47+
virtual std::optional<Future> may_interrupt() = 0;
48+
virtual ~cond_checker() = default;
49+
};
50+
51+
template <typename Interruptor>
52+
struct interrupt_cond_capture {
53+
using InterruptCond = typename Interruptor::condition;
54+
interruptible::InterruptCondRef<InterruptCond> cond;
55+
56+
template <typename Future>
57+
struct type_erased_cond_checker final : cond_checker<Future> {
58+
interruptible::InterruptCondRef<InterruptCond> cond;
59+
60+
template <typename T>
61+
type_erased_cond_checker(T &&t) : cond(std::forward<T>(t)) {}
62+
63+
std::optional<Future> may_interrupt() final {
64+
return cond->template may_interrupt<Future>();
65+
}
66+
};
67+
68+
template <typename Future>
69+
typename cond_checker<Future>::ref capture_and_get_checker() {
70+
ceph_assert(interruptible::interrupt_cond<InterruptCond>.interrupt_cond);
71+
cond = interruptible::interrupt_cond<InterruptCond>.interrupt_cond;
72+
return typename cond_checker<Future>::ref{
73+
new type_erased_cond_checker<Future>{cond}
74+
};
75+
}
76+
77+
void restore() {
78+
ceph_assert(cond);
79+
interruptible::interrupt_cond<InterruptCond>.set(cond);
80+
}
81+
82+
void reset() {
83+
interruptible::interrupt_cond<InterruptCond>.reset();
84+
}
85+
};
86+
87+
template <>
88+
struct interrupt_cond_capture<void> {
89+
template <typename Future>
90+
typename cond_checker<Future>::ref capture_and_get_checker() {
91+
return nullptr;
92+
}
93+
};
94+
95+
template <typename Interruptor>
96+
struct seastar_task_ancestor : protected seastar::task {};
97+
98+
template <>
99+
struct seastar_task_ancestor<void> : public seastar::task {};
100+
101+
template <typename Interruptor, typename Errorator, typename T>
102+
class promise_base : public seastar_task_ancestor<Interruptor> {
103+
protected:
104+
seastar::promise<T> _promise;
105+
106+
public:
107+
interrupt_cond_capture<Interruptor> cond;
108+
109+
using errorator_type = Errorator;
110+
using interruptor = Interruptor;
111+
static constexpr bool is_errorated = !std::is_void<Errorator>::value;
112+
static constexpr bool is_interruptible = !std::is_void<Interruptor>::value;
113+
114+
using _to_future = to_future<Interruptor, Errorator>;
115+
116+
template <typename U=void>
117+
using future = typename _to_future::template future<U>;
118+
119+
promise_base() = default;
120+
promise_base(promise_base&&) = delete;
121+
promise_base(const promise_base&) = delete;
122+
123+
void set_exception(std::exception_ptr&& eptr) noexcept {
124+
_promise.set_exception(std::move(eptr));
125+
}
126+
127+
void unhandled_exception() noexcept {
128+
_promise.set_exception(std::current_exception());
129+
}
130+
131+
future<T> get_return_object() noexcept {
132+
return _promise.get_future();
133+
}
134+
135+
std::suspend_never initial_suspend() noexcept { return { }; }
136+
std::suspend_never final_suspend() noexcept { return { }; }
137+
138+
void run_and_dispose() noexcept final {
139+
if constexpr (is_interruptible) {
140+
cond.restore();
141+
}
142+
auto handle = std::coroutine_handle<promise_base>::from_promise(*this);
143+
handle.resume();
144+
if constexpr (is_interruptible) {
145+
cond.reset();
146+
}
147+
}
148+
149+
seastar::task *waiting_task() noexcept override {
150+
return _promise.waiting_task();
151+
}
152+
seastar::task *get_seastar_task() { return this; }
153+
};
154+
155+
template <typename Interruptor, typename Errorator, typename T=void>
156+
class coroutine_traits {
157+
public:
158+
class promise_type final : public promise_base<Interruptor, Errorator, T> {
159+
using base = promise_base<Interruptor, Errorator, T>;
160+
public:
161+
template <typename... U>
162+
void return_value(U&&... value) {
163+
base::_promise.set_value(std::forward<U>(value)...);
164+
}
165+
};
166+
};
167+
168+
169+
template <typename Interruptor, typename Errorator>
170+
class coroutine_traits<Interruptor, Errorator> {
171+
public:
172+
class promise_type final : public promise_base<Interruptor, Errorator, void> {
173+
using base = promise_base<Interruptor, Errorator, void>;
174+
public:
175+
void return_void() noexcept {
176+
base::_promise.set_value();
177+
}
178+
};
179+
};
180+
181+
template <typename Interruptor, typename Errorator,
182+
bool CheckPreempt, typename T=void>
183+
struct awaiter {
184+
static constexpr bool is_errorated = !std::is_void<Errorator>::value;
185+
static constexpr bool is_interruptible = !std::is_void<Interruptor>::value;
186+
187+
template <typename U=void>
188+
using future = typename to_future<Interruptor, Errorator>::template future<U>;
189+
190+
future<T> _future;
191+
192+
typename cond_checker<future<T>>::ref checker;
193+
public:
194+
explicit awaiter(future<T>&& f) noexcept : _future(std::move(f)) { }
195+
196+
awaiter(const awaiter&) = delete;
197+
awaiter(awaiter&&) = delete;
198+
199+
bool await_ready() const noexcept {
200+
return _future.available() && (!CheckPreempt || !seastar::need_preempt());
201+
}
202+
203+
template <typename U>
204+
void await_suspend(std::coroutine_handle<U> hndl) noexcept {
205+
if constexpr (is_errorated) {
206+
using dest_errorator_t = typename U::errorator_type;
207+
static_assert(dest_errorator_t::template contains_once_v<Errorator>,
208+
"conversion is possible to more-or-eq errorated future!");
209+
}
210+
211+
checker =
212+
hndl.promise().cond.template capture_and_get_checker<future<T>>();
213+
if (!CheckPreempt || !_future.available()) {
214+
_future.set_coroutine(*hndl.promise().get_seastar_task());
215+
} else {
216+
::seastar::schedule(hndl.promise().get_seastar_task());
217+
}
218+
}
219+
220+
T await_resume() {
221+
if (auto maybe_fut = checker ? checker->may_interrupt() : std::nullopt) {
222+
// silence warning that we are discarding an exceptional future
223+
if (_future.failed()) _future.get_exception();
224+
if constexpr (is_errorated) {
225+
return maybe_fut->unsafe_get0();
226+
} else {
227+
return maybe_fut->get0();
228+
}
229+
} else {
230+
if constexpr (is_errorated) {
231+
return _future.unsafe_get0();
232+
} else {
233+
return _future.get0();
234+
}
235+
}
236+
}
237+
};
238+
239+
}
240+
}
241+
242+
template <template <typename> typename Container, typename T>
243+
auto operator co_await(
244+
Container<crimson::errorated_future_marker<T>> f) noexcept {
245+
using Errorator = typename seastar::futurize<decltype(f)>::errorator_type;
246+
return crimson::internal::awaiter<void, Errorator, true, T>(std::move(f));
247+
}
248+
249+
template <typename InterruptCond, typename T>
250+
auto operator co_await(
251+
crimson::interruptible::interruptible_future_detail<
252+
InterruptCond, seastar::future<T>
253+
> f) noexcept {
254+
return crimson::internal::awaiter<
255+
crimson::interruptible::interruptor<InterruptCond>, void, true, T>(
256+
std::move(f));
257+
}
258+
259+
template <template <typename> typename Container,
260+
typename InterruptCond, typename T>
261+
auto operator co_await(
262+
crimson::interruptible::interruptible_future_detail<
263+
InterruptCond, Container<crimson::errorated_future_marker<T>>
264+
> f) noexcept {
265+
using Errorator = typename seastar::futurize<decltype(f)>::errorator_type;
266+
return crimson::internal::awaiter<
267+
crimson::interruptible::interruptor<InterruptCond>,
268+
typename Errorator::base_ertr, true, T>(
269+
std::move(f));
270+
}
271+
272+
namespace std {
273+
274+
template <template <typename> typename Container,
275+
typename T, typename... Args>
276+
class coroutine_traits<Container<crimson::errorated_future_marker<T>>, Args...> :
277+
public crimson::internal::coroutine_traits<
278+
void,
279+
typename seastar::futurize<
280+
Container<crimson::errorated_future_marker<T>>
281+
>::errorator_type,
282+
T> {};
283+
284+
template <typename InterruptCond,
285+
typename T, typename... Args>
286+
class coroutine_traits<
287+
crimson::interruptible::interruptible_future_detail<
288+
InterruptCond, seastar::future<T>
289+
>, Args...> : public crimson::internal::coroutine_traits<
290+
crimson::interruptible::interruptor<InterruptCond>,
291+
void,
292+
T> {};
293+
294+
template <template <typename> typename Container,
295+
typename InterruptCond,
296+
typename T, typename... Args>
297+
class coroutine_traits<
298+
crimson::interruptible::interruptible_future_detail<
299+
InterruptCond, Container<crimson::errorated_future_marker<T>>
300+
>, Args...> :
301+
public crimson::internal::coroutine_traits<
302+
crimson::interruptible::interruptor<InterruptCond>,
303+
typename seastar::futurize<
304+
crimson::interruptible::interruptible_future_detail<
305+
InterruptCond,
306+
Container<crimson::errorated_future_marker<T>>
307+
>
308+
>::errorator_type::base_ertr,
309+
T> {};
310+
}

src/test/crimson/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,14 @@ target_link_libraries(
111111
add_ceph_unittest(unittest-seastar-errorator
112112
--memory 256M --smp 1)
113113

114+
add_executable(unittest-crimson-coroutine
115+
test_crimson_coroutine.cc)
116+
target_link_libraries(
117+
unittest-crimson-coroutine
118+
crimson::gtest)
119+
add_ceph_unittest(unittest-crimson-coroutine
120+
--memory 256M --smp 1)
121+
114122
add_executable(unittest-crimson-scrub
115123
test_crimson_scrub.cc
116124
${PROJECT_SOURCE_DIR}/src/crimson/osd/scrub/scrub_machine.cc

src/test/crimson/gtest_seastar.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#pragma once
55

6+
#include "crimson/common/errorator.h"
67
#include "gtest/gtest.h"
78

89
#include "seastar_runner.h"
@@ -15,6 +16,15 @@ struct seastar_test_suite_t : public ::testing::Test {
1516
return seastar_env.run(std::forward<Func>(func));
1617
}
1718

19+
template <typename Func>
20+
void run_ertr(Func &&func) {
21+
return run(
22+
[func=std::forward<Func>(func)]() mutable {
23+
return std::invoke(std::move(func)).handle_error(
24+
crimson::ct_error::assert_all("error"));
25+
});
26+
}
27+
1828
template <typename Func>
1929
void run_async(Func &&func) {
2030
run(
@@ -23,6 +33,23 @@ struct seastar_test_suite_t : public ::testing::Test {
2333
});
2434
}
2535

36+
template <typename F>
37+
auto scl(F &&f) {
38+
return [fptr = std::make_unique<F>(std::forward<F>(f))]() mutable {
39+
return std::invoke(*fptr).finally([fptr=std::move(fptr)] {});
40+
};
41+
}
42+
43+
auto run_scl(auto &&f) {
44+
return run([this, f=std::forward<decltype(f)>(f)]() mutable {
45+
return std::invoke(scl(std::move(f)));
46+
});
47+
}
48+
49+
auto run_ertr_scl(auto &&f) {
50+
return run_ertr(scl(std::forward<decltype(f)>(f)));
51+
}
52+
2653
virtual seastar::future<> set_up_fut() { return seastar::now(); }
2754
void SetUp() final {
2855
return run([this] { return set_up_fut(); });

0 commit comments

Comments
 (0)