Skip to content

Commit 9fe9802

Browse files
author
Dmitry Malakhov
committed
Remove thread_local Reactor and pass it around everywhere instead
1 parent 1f3064f commit 9fe9802

29 files changed

+374
-306
lines changed

.clang-tidy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Checks: >
1010
portability-*,
1111
readability-*,
1212
-readability-identifier-length,
13+
-readability-uppercase-literal-suffix,
1314
-readability-static-accessed-through-instance,
1415
-readability-make-member-function-const,
1516
-readability-avoid-return-with-void-value,
@@ -21,6 +22,7 @@ Checks: >
2122
WarningsAsErrors: "*"
2223

2324
CheckOptions:
25+
- { key: readability-function-cognitive-complexity.IgnoreMacros, value: true }
2426
- { key: readability-identifier-naming.GlobalConstantCase, value: UPPER_CASE }
2527
- { key: readability-identifier-naming.ConstexprVariableCase, value: UPPER_CASE }
2628
- { key: readability-identifier-naming.LocalConstantCase, value: lower_case }

example/SendLogsOnCrash.cpp

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
/// logs are lost by default. To prevent this we have to write custom signal handler.
77
/// And to speed this sighandler up we will use corosig-powered async io
88

9+
#include "corosig/reactor/Reactor.hpp"
10+
911
#include <boost/outcome/try.hpp>
1012
#include <corosig/Coro.hpp>
1113
#include <corosig/ErrorTypes.hpp>
@@ -37,33 +39,37 @@ constexpr std::string_view FILE2 = "file2.log";
3739

3840
std::vector<std::string> logs_buffer;
3941

40-
corosig::Fut<void, corosig::Error<corosig::AllocationError, corosig::SyscallError>>
41-
sighandler(int) noexcept {
42-
using namespace corosig;
42+
namespace sighandling {
4343

44-
auto write_to_file = [](char const *path) -> Fut<void, Error<AllocationError, SyscallError>> {
45-
using enum File::OpenFlags;
46-
BOOST_OUTCOME_CO_TRY(auto file, co_await File::open(path, CREATE | TRUNCATE | WRONLY));
47-
for (auto &log : logs_buffer) {
48-
BOOST_OUTCOME_CO_TRY(co_await file.write(log));
49-
}
50-
co_return success();
51-
};
44+
using namespace corosig;
5245

53-
auto send_via_tcp = []() -> Fut<void, Error<AllocationError, SyscallError>> {
54-
BOOST_OUTCOME_CO_TRY(auto socket, co_await TcpSocket::connect(SERVER_ADDR));
55-
for (auto &log : logs_buffer) {
56-
BOOST_OUTCOME_CO_TRY(co_await socket.write(log));
57-
}
58-
co_return success();
59-
};
46+
Fut<void, Error<AllocationError, SyscallError>> write_to_file(Reactor &r,
47+
char const *path) noexcept {
48+
using enum File::OpenFlags;
49+
BOOST_OUTCOME_CO_TRY(auto file, co_await File::open(r, path, CREATE | TRUNCATE | WRONLY));
50+
for (auto &log : logs_buffer) {
51+
BOOST_OUTCOME_CO_TRY(co_await file.write(r, log));
52+
}
53+
co_return success();
54+
};
6055

61-
BOOST_OUTCOME_CO_TRY(co_await when_all_succeed(write_to_file(FILE1.data()),
62-
write_to_file(FILE2.data()), send_via_tcp()));
56+
Fut<void, Error<AllocationError, SyscallError>> send_via_tcp(Reactor &r) {
57+
BOOST_OUTCOME_CO_TRY(auto socket, co_await TcpSocket::connect(r, SERVER_ADDR));
58+
for (auto &log : logs_buffer) {
59+
BOOST_OUTCOME_CO_TRY(co_await socket.write(r, log));
60+
}
61+
co_return success();
62+
};
6363

64+
Fut<void, Error<AllocationError, SyscallError>> sighandler(Reactor &r, int) noexcept {
65+
66+
BOOST_OUTCOME_CO_TRY(co_await when_all_succeed(r, write_to_file(r, FILE1.data()),
67+
write_to_file(r, FILE2.data()), send_via_tcp(r)));
6468
co_return success();
6569
}
6670

71+
} // namespace sighandling
72+
6773
void run_tcp_server(std::string &out) {
6874
int srv_fd = ::socket(AF_INET, SOCK_STREAM, 0);
6975
assert(srv_fd >= 0);
@@ -79,13 +85,13 @@ void run_tcp_server(std::string &out) {
7985
::listen(srv_fd, 1);
8086

8187
int client = ::accept(srv_fd, nullptr, nullptr);
82-
char buf[1024]; // NOLINT
88+
std::array<char, 1024> buf;
8389
while (true) {
84-
ssize_t n = ::read(client, buf, sizeof(buf));
90+
ssize_t n = ::read(client, buf.begin(), buf.size());
8591
if (n <= 0) {
8692
break;
8793
}
88-
out += std::string_view{buf, size_t(n)};
94+
out += std::string_view{buf.begin(), size_t(n)};
8995
}
9096
::close(client);
9197
::close(srv_fd);
@@ -97,7 +103,7 @@ int main() {
97103
try {
98104
constexpr auto REACTOR_MEMORY = 8 * 1024;
99105
for (auto signal : {SIGILL, SIGFPE, SIGTERM, SIGABRT}) {
100-
corosig::set_sighandler<REACTOR_MEMORY, sighandler>(signal);
106+
corosig::set_sighandler<REACTOR_MEMORY, sighandling::sighandler>(signal);
101107
}
102108

103109
std::string remote_server_data;
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace corosig {
99

10-
struct Alloc {
10+
struct Allocator {
1111
private:
1212
struct FreeHeader {
1313
size_t block_size = 0;
@@ -46,21 +46,21 @@ struct Alloc {
4646
public:
4747
constexpr static size_t MIN_ALIGNMENT = 8;
4848

49-
Alloc() noexcept = default;
49+
Allocator() noexcept = default;
5050

5151
template <size_t SIZE>
52-
Alloc(Memory<SIZE> &mem) noexcept : m_mem{mem.begin()}, m_mem_size{mem.size()} {
52+
Allocator(Memory<SIZE> &mem) noexcept : m_mem{mem.begin()}, m_mem_size{mem.size()} {
5353
Node *first_node = new (m_mem) Node{};
5454
first_node->data.block_size = SIZE - sizeof(Node);
5555
first_node->next = nullptr;
5656
m_free_list.insert(nullptr, first_node);
5757
}
5858

59-
Alloc(const Alloc &) = delete;
60-
Alloc(Alloc &&) noexcept;
61-
Alloc &operator=(const Alloc &) = delete;
62-
Alloc &operator=(Alloc &&) noexcept;
63-
~Alloc();
59+
Allocator(const Allocator &) = delete;
60+
Allocator(Allocator &&) noexcept;
61+
Allocator &operator=(const Allocator &) = delete;
62+
Allocator &operator=(Allocator &&) noexcept;
63+
~Allocator();
6464

6565
[[nodiscard]] size_t peak_memory() const noexcept {
6666
return m_peak;

include/corosig/Coro.hpp

Lines changed: 65 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,35 +21,49 @@ struct Fut;
2121

2222
namespace detail {
2323

24+
template <typename T>
25+
concept NotReactor = !std::same_as<Reactor, T>;
26+
2427
template <typename T, typename E>
2528
struct CoroutinePromiseType : CoroListNode {
26-
CoroutinePromiseType() noexcept = default;
29+
30+
CoroutinePromiseType(Reactor &reactor, NotReactor auto const &...) noexcept
31+
// reuse m_out to pass reactor to future to avoid creating additional buffer
32+
: m_reactor{reactor} {
33+
}
34+
35+
CoroutinePromiseType(NotReactor auto const &, Reactor &reactor,
36+
NotReactor auto const &...) noexcept
37+
: CoroutinePromiseType{reactor} {
38+
}
2739

2840
CoroutinePromiseType(const CoroutinePromiseType &) = delete;
2941
CoroutinePromiseType(CoroutinePromiseType &&) = delete;
3042
CoroutinePromiseType &operator=(const CoroutinePromiseType &) = delete;
3143
CoroutinePromiseType &operator=(CoroutinePromiseType &&) = delete;
3244

33-
~CoroutinePromiseType() {
34-
assert(m_out);
35-
m_out->m_promise = nullptr;
45+
~CoroutinePromiseType() override = default;
46+
47+
static void *operator new(size_t n, Reactor &reactor, NotReactor auto const &...) noexcept {
48+
return reactor.allocate(n);
3649
}
3750

38-
template <typename... ARGS>
39-
static void *operator new(size_t n, ARGS &&...) noexcept {
40-
return Reactor::instance().allocate_frame(n);
51+
static void *operator new(size_t n, NotReactor auto const &, Reactor &reactor,
52+
NotReactor auto const &...) noexcept {
53+
return reactor.allocate(n);
4154
}
4255

43-
static void operator delete(void *frame) noexcept {
44-
return Reactor::instance().free_frame(frame);
56+
static void operator delete(void *) noexcept {
57+
// nothing to do in here since reactor is not accessible. instead, a coro frame is released when
58+
// future is destroyed
4559
}
4660

4761
void yield_to_reactor() noexcept {
48-
Reactor::instance().yield(*this);
62+
m_reactor.yield(*this);
4963
}
5064

5165
void poll_to_reactor(PollListNode &node) noexcept {
52-
Reactor::instance().poll(node);
66+
m_reactor.poll(node);
5367
}
5468

5569
[[noreturn]] static void unhandled_exception() noexcept {
@@ -64,21 +78,37 @@ struct CoroutinePromiseType : CoroListNode {
6478
}
6579

6680
auto final_suspend() noexcept {
67-
return std::suspend_never{};
81+
struct ReturnControlToCaller {
82+
static bool await_ready() noexcept {
83+
return false;
84+
}
85+
86+
static auto await_suspend(std::coroutine_handle<CoroutinePromiseType> self) noexcept {
87+
return self.promise().m_waiting_coro;
88+
}
89+
90+
static void await_resume() noexcept {
91+
}
92+
};
93+
94+
// coro frame must be destroyed in Fut dtor so we don't let it fall off the end here
95+
return ReturnControlToCaller{};
6896
}
6997

7098
template <std::convertible_to<Result<T, E>> U>
7199
void return_value(U &&value) noexcept {
72100
// NOLINTBEGIN false positives about m_out being uninitialized
73101
assert(m_out);
102+
assert(!m_out->m_value);
103+
assert(!m_waiting_coro.done());
74104
m_out->m_value.emplace(std::forward<U>(value));
75-
m_waiting_coro.resume();
76105
// NOLINTEND
77106
}
78107

79108
template <std::convertible_to<T> T2, std::convertible_to<E> E2>
80109
void return_value(Result<T2, E2> &&result) noexcept {
81110
assert(m_out);
111+
assert(!m_out->m_value);
82112
if (result.has_value()) {
83113
if constexpr (std::same_as<void, T>) {
84114
m_out->m_value.emplace(success());
@@ -88,7 +118,8 @@ struct CoroutinePromiseType : CoroListNode {
88118
} else {
89119
m_out->m_value.emplace(std::move(result.assume_error()));
90120
}
91-
m_waiting_coro.resume();
121+
122+
assert(!m_waiting_coro.done() && "Waiting coro was destroyed before child has finished");
92123
}
93124

94125
private:
@@ -99,6 +130,7 @@ struct CoroutinePromiseType : CoroListNode {
99130
friend struct Fut<T, E>;
100131

101132
std::coroutine_handle<> m_waiting_coro = std::noop_coroutine();
133+
Reactor &m_reactor;
102134
Fut<T, E> *m_out = nullptr;
103135
};
104136

@@ -110,10 +142,10 @@ struct [[nodiscard("forgot to await?")]] Fut {
110142

111143
Fut(const Fut &) = delete;
112144
Fut(Fut &&rhs) noexcept
113-
: m_promise{std::exchange(rhs.m_promise, nullptr)},
145+
: m_handle{std::exchange(rhs.m_handle, nullptr)},
114146
m_value{std::exchange(rhs.m_value, std::nullopt)} {
115-
if (m_promise) {
116-
m_promise->m_out = this;
147+
if (m_handle) {
148+
m_handle.promise().m_out = this;
117149
}
118150
}
119151

@@ -124,13 +156,21 @@ struct [[nodiscard("forgot to await?")]] Fut {
124156
return *this;
125157
};
126158

159+
~Fut() {
160+
if (m_handle != nullptr) {
161+
Reactor &reactor = m_handle.promise().m_reactor;
162+
m_handle.destroy();
163+
reactor.free(m_handle.address());
164+
}
165+
}
166+
127167
[[nodiscard]] bool has_value() const noexcept {
128168
return m_value.has_value();
129169
}
130170

131171
Result<T, extend_error<E, SyscallError>> block_on() && noexcept {
132172
while (!m_value.has_value()) {
133-
Result res = Reactor::instance().do_event_loop_iteration();
173+
Result res = m_handle.promise().m_reactor.do_event_loop_iteration();
134174
if (!res) {
135175
return failure(res.assume_error());
136176
}
@@ -157,10 +197,11 @@ struct [[nodiscard("forgot to await?")]] Fut {
157197
}
158198

159199
void await_suspend(std::coroutine_handle<> h) const noexcept {
160-
m_future.m_promise->m_waiting_coro = h;
200+
m_future.m_handle.promise().m_waiting_coro = h;
161201
}
162202

163203
Result<T, E> await_resume() const noexcept {
204+
assert(m_future.m_value.has_value());
164205
return *std::move(m_future.m_value);
165206
}
166207

@@ -172,31 +213,26 @@ struct [[nodiscard("forgot to await?")]] Fut {
172213
Fut &m_future;
173214
};
174215

175-
Awaiter operator co_await() noexcept {
216+
Awaiter operator co_await() && noexcept {
176217
return Awaiter{*this};
177218
}
178219

179220
private:
180-
Fut(promise_type &prom) noexcept : m_promise{&prom} {
221+
Fut(std::coroutine_handle<promise_type> handle) noexcept : m_handle{handle} {
181222
}
182223

183224
Fut(AllocationError e) noexcept : m_value{failure(e)} {
184225
}
185226

186227
friend promise_type;
187228

188-
promise_type *m_promise = nullptr;
189-
190-
struct CoroListNode : bi::slist_base_hook<bi::link_mode<bi::link_mode_type::auto_unlink>> {
191-
virtual std::coroutine_handle<> coro_from_this() noexcept = 0;
192-
};
193-
194-
std::optional<Result<T, E>> m_value = std::nullopt;
229+
std::coroutine_handle<promise_type> m_handle = nullptr;
230+
[[no_unique_address]] std::optional<Result<T, E>> m_value = std::nullopt;
195231
};
196232

197233
template <typename T, typename E>
198234
Fut<T, E> detail::CoroutinePromiseType<T, E>::get_return_object() noexcept {
199-
Fut<T, E> fut{*this};
235+
Fut<T, E> fut{std::coroutine_handle<CoroutinePromiseType>::from_promise(*this)};
200236
m_out = &fut;
201237
return fut;
202238
}

include/corosig/Parallel.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "corosig/Coro.hpp"
55
#include "corosig/ErrorTypes.hpp"
66
#include "corosig/Result.hpp"
7+
#include "corosig/reactor/Reactor.hpp"
78

89
#include <boost/mp11/algorithm.hpp>
910
#include <concepts>
@@ -15,7 +16,7 @@
1516
namespace corosig {
1617

1718
template <typename... R, typename... E>
18-
Fut<std::tuple<Result<R, E>...>> when_all(Fut<R, E> &&...futs) noexcept {
19+
Fut<std::tuple<Result<R, E>...>> when_all(Reactor &, Fut<R, E> &&...futs) noexcept {
1920
co_return std::tuple{co_await std::move(futs)...};
2021
}
2122

@@ -45,8 +46,8 @@ using WrapVoid = std::conditional_t<std::same_as<void, T>, std::monostate, T>;
4546

4647
template <typename... R, typename... E>
4748
Fut<std::tuple<detail::WrapVoid<R>...>, extend_error<E...>>
48-
when_all_succeed(Fut<R, E> &&...futs) noexcept {
49-
Result results_res = co_await when_all(std::move(futs)...);
49+
when_all_succeed(Reactor &r, Fut<R, E> &&...futs) noexcept {
50+
Result results_res = co_await when_all(r, std::move(futs)...);
5051
if (results_res.has_error()) {
5152
co_return failure(std::move(results_res.assume_error()));
5253
}

include/corosig/PollEvent.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ struct PollEvent : PollListNode {
2727
h.promise().poll_to_reactor(*this);
2828
}
2929

30-
void await_resume() const noexcept {
30+
static void await_resume() noexcept {
3131
}
3232
};
3333

0 commit comments

Comments
 (0)