Skip to content

Commit bf95eec

Browse files
author
Dmitry Malakhov
committed
Remove thread_local Reactor and pass it around everywhere instead
1 parent 1f3064f commit bf95eec

29 files changed

+378
-309
lines changed

.clang-tidy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Checks: >
1010
portability-*,
1111
readability-*,
1212
-readability-identifier-length,
13+
-readability-uppercase-literal-suffix,
1314
-readability-static-accessed-through-instance,
1415
-readability-make-member-function-const,
1516
-readability-avoid-return-with-void-value,
@@ -21,6 +22,7 @@ Checks: >
2122
WarningsAsErrors: "*"
2223

2324
CheckOptions:
25+
- { key: readability-function-cognitive-complexity.IgnoreMacros, value: true }
2426
- { key: readability-identifier-naming.GlobalConstantCase, value: UPPER_CASE }
2527
- { key: readability-identifier-naming.ConstexprVariableCase, value: UPPER_CASE }
2628
- { key: readability-identifier-naming.LocalConstantCase, value: lower_case }

example/SendLogsOnCrash.cpp

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
/// Use case: there is an application, producing logs. Logs are initially stored
22
/// in some in-memory buffer. Periodically, a flush operation is triggered and all
33
/// logs are written to various outputs: to remote udp server and to file. But what if
4-
/// an app receives malicious signal (such as SIGTERM when std::terminate is called or
4+
/// an app receives malicious signal (such as SIGABRT when std::terminate is called or
55
/// SIGFPE if current c++ implementation raises it on zero division) then in-buffer
66
/// logs are lost by default. To prevent this we have to write custom signal handler.
77
/// And to speed this sighandler up we will use corosig-powered async io
88

9+
#include "corosig/reactor/Reactor.hpp"
10+
911
#include <boost/outcome/try.hpp>
1012
#include <corosig/Coro.hpp>
1113
#include <corosig/ErrorTypes.hpp>
@@ -37,33 +39,37 @@ constexpr std::string_view FILE2 = "file2.log";
3739

3840
std::vector<std::string> logs_buffer;
3941

40-
corosig::Fut<void, corosig::Error<corosig::AllocationError, corosig::SyscallError>>
41-
sighandler(int) noexcept {
42-
using namespace corosig;
42+
namespace sighandling {
4343

44-
auto write_to_file = [](char const *path) -> Fut<void, Error<AllocationError, SyscallError>> {
45-
using enum File::OpenFlags;
46-
BOOST_OUTCOME_CO_TRY(auto file, co_await File::open(path, CREATE | TRUNCATE | WRONLY));
47-
for (auto &log : logs_buffer) {
48-
BOOST_OUTCOME_CO_TRY(co_await file.write(log));
49-
}
50-
co_return success();
51-
};
44+
using namespace corosig;
5245

53-
auto send_via_tcp = []() -> Fut<void, Error<AllocationError, SyscallError>> {
54-
BOOST_OUTCOME_CO_TRY(auto socket, co_await TcpSocket::connect(SERVER_ADDR));
55-
for (auto &log : logs_buffer) {
56-
BOOST_OUTCOME_CO_TRY(co_await socket.write(log));
57-
}
58-
co_return success();
59-
};
46+
Fut<void, Error<AllocationError, SyscallError>> write_to_file(Reactor &r,
47+
char const *path) noexcept {
48+
using enum File::OpenFlags;
49+
BOOST_OUTCOME_CO_TRY(auto file, co_await File::open(r, path, CREATE | TRUNCATE | WRONLY));
50+
for (auto &log : logs_buffer) {
51+
BOOST_OUTCOME_CO_TRY(co_await file.write(r, log));
52+
}
53+
co_return success();
54+
};
6055

61-
BOOST_OUTCOME_CO_TRY(co_await when_all_succeed(write_to_file(FILE1.data()),
62-
write_to_file(FILE2.data()), send_via_tcp()));
56+
Fut<void, Error<AllocationError, SyscallError>> send_via_tcp(Reactor &r) {
57+
BOOST_OUTCOME_CO_TRY(auto socket, co_await TcpSocket::connect(r, SERVER_ADDR));
58+
for (auto &log : logs_buffer) {
59+
BOOST_OUTCOME_CO_TRY(co_await socket.write(r, log));
60+
}
61+
co_return success();
62+
};
6363

64+
Fut<void, Error<AllocationError, SyscallError>> sighandler(Reactor &r, int) noexcept {
65+
66+
BOOST_OUTCOME_CO_TRY(co_await when_all_succeed(r, write_to_file(r, FILE1.data()),
67+
write_to_file(r, FILE2.data()), send_via_tcp(r)));
6468
co_return success();
6569
}
6670

71+
} // namespace sighandling
72+
6773
void run_tcp_server(std::string &out) {
6874
int srv_fd = ::socket(AF_INET, SOCK_STREAM, 0);
6975
assert(srv_fd >= 0);
@@ -79,13 +85,13 @@ void run_tcp_server(std::string &out) {
7985
::listen(srv_fd, 1);
8086

8187
int client = ::accept(srv_fd, nullptr, nullptr);
82-
char buf[1024]; // NOLINT
88+
std::array<char, 1024> buf;
8389
while (true) {
84-
ssize_t n = ::read(client, buf, sizeof(buf));
90+
ssize_t n = ::read(client, buf.begin(), buf.size());
8591
if (n <= 0) {
8692
break;
8793
}
88-
out += std::string_view{buf, size_t(n)};
94+
out += std::string_view{buf.begin(), size_t(n)};
8995
}
9096
::close(client);
9197
::close(srv_fd);
@@ -97,7 +103,7 @@ int main() {
97103
try {
98104
constexpr auto REACTOR_MEMORY = 8 * 1024;
99105
for (auto signal : {SIGILL, SIGFPE, SIGTERM, SIGABRT}) {
100-
corosig::set_sighandler<REACTOR_MEMORY, sighandler>(signal);
106+
corosig::set_sighandler<REACTOR_MEMORY, sighandling::sighandler>(signal);
101107
}
102108

103109
std::string remote_server_data;
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace corosig {
99

10-
struct Alloc {
10+
struct Allocator {
1111
private:
1212
struct FreeHeader {
1313
size_t block_size = 0;
@@ -46,21 +46,21 @@ struct Alloc {
4646
public:
4747
constexpr static size_t MIN_ALIGNMENT = 8;
4848

49-
Alloc() noexcept = default;
49+
Allocator() noexcept = default;
5050

5151
template <size_t SIZE>
52-
Alloc(Memory<SIZE> &mem) noexcept : m_mem{mem.begin()}, m_mem_size{mem.size()} {
52+
Allocator(Memory<SIZE> &mem) noexcept : m_mem{mem.begin()}, m_mem_size{mem.size()} {
5353
Node *first_node = new (m_mem) Node{};
5454
first_node->data.block_size = SIZE - sizeof(Node);
5555
first_node->next = nullptr;
5656
m_free_list.insert(nullptr, first_node);
5757
}
5858

59-
Alloc(const Alloc &) = delete;
60-
Alloc(Alloc &&) noexcept;
61-
Alloc &operator=(const Alloc &) = delete;
62-
Alloc &operator=(Alloc &&) noexcept;
63-
~Alloc();
59+
Allocator(const Allocator &) = delete;
60+
Allocator(Allocator &&) noexcept;
61+
Allocator &operator=(const Allocator &) = delete;
62+
Allocator &operator=(Allocator &&) noexcept;
63+
~Allocator();
6464

6565
[[nodiscard]] size_t peak_memory() const noexcept {
6666
return m_peak;

include/corosig/Coro.hpp

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include <concepts>
1111
#include <coroutine>
1212
#include <cstddef>
13-
#include <exception>
1413
#include <optional>
1514
#include <utility>
1615

@@ -21,39 +20,53 @@ struct Fut;
2120

2221
namespace detail {
2322

23+
template <typename T>
24+
concept NotReactor = !std::same_as<Reactor, T>;
25+
2426
template <typename T, typename E>
2527
struct CoroutinePromiseType : CoroListNode {
26-
CoroutinePromiseType() noexcept = default;
28+
29+
CoroutinePromiseType(Reactor &reactor, NotReactor auto const &...) noexcept
30+
// reuse m_out to pass reactor to future to avoid creating additional buffer
31+
: m_reactor{reactor} {
32+
}
33+
34+
CoroutinePromiseType(NotReactor auto const &, Reactor &reactor,
35+
NotReactor auto const &...) noexcept
36+
: CoroutinePromiseType{reactor} {
37+
}
2738

2839
CoroutinePromiseType(const CoroutinePromiseType &) = delete;
2940
CoroutinePromiseType(CoroutinePromiseType &&) = delete;
3041
CoroutinePromiseType &operator=(const CoroutinePromiseType &) = delete;
3142
CoroutinePromiseType &operator=(CoroutinePromiseType &&) = delete;
3243

33-
~CoroutinePromiseType() {
34-
assert(m_out);
35-
m_out->m_promise = nullptr;
44+
~CoroutinePromiseType() override = default;
45+
46+
static void *operator new(size_t n, Reactor &reactor, NotReactor auto const &...) noexcept {
47+
return reactor.allocate(n);
3648
}
3749

38-
template <typename... ARGS>
39-
static void *operator new(size_t n, ARGS &&...) noexcept {
40-
return Reactor::instance().allocate_frame(n);
50+
static void *operator new(size_t n, NotReactor auto const &, Reactor &reactor,
51+
NotReactor auto const &...) noexcept {
52+
return reactor.allocate(n);
4153
}
4254

43-
static void operator delete(void *frame) noexcept {
44-
return Reactor::instance().free_frame(frame);
55+
static void operator delete(void *) noexcept {
56+
// nothing to do in here since reactor is not accessible. instead, a coro frame is released when
57+
// future is destroyed
4558
}
4659

4760
void yield_to_reactor() noexcept {
48-
Reactor::instance().yield(*this);
61+
m_reactor.yield(*this);
4962
}
5063

5164
void poll_to_reactor(PollListNode &node) noexcept {
52-
Reactor::instance().poll(node);
65+
m_reactor.poll(node);
5366
}
5467

5568
[[noreturn]] static void unhandled_exception() noexcept {
56-
std::terminate();
69+
std::abort();
5770
}
5871

5972
Fut<T, E> get_return_object() noexcept;
@@ -64,21 +77,37 @@ struct CoroutinePromiseType : CoroListNode {
6477
}
6578

6679
auto final_suspend() noexcept {
67-
return std::suspend_never{};
80+
struct ReturnControlToCaller {
81+
static bool await_ready() noexcept {
82+
return false;
83+
}
84+
85+
static auto await_suspend(std::coroutine_handle<CoroutinePromiseType> self) noexcept {
86+
return self.promise().m_waiting_coro;
87+
}
88+
89+
static void await_resume() noexcept {
90+
}
91+
};
92+
93+
// coro frame must be destroyed in Fut dtor so we don't let it fall off the end here
94+
return ReturnControlToCaller{};
6895
}
6996

7097
template <std::convertible_to<Result<T, E>> U>
7198
void return_value(U &&value) noexcept {
7299
// NOLINTBEGIN false positives about m_out being uninitialized
73100
assert(m_out);
101+
assert(!m_out->m_value);
102+
assert(!m_waiting_coro.done());
74103
m_out->m_value.emplace(std::forward<U>(value));
75-
m_waiting_coro.resume();
76104
// NOLINTEND
77105
}
78106

79107
template <std::convertible_to<T> T2, std::convertible_to<E> E2>
80108
void return_value(Result<T2, E2> &&result) noexcept {
81109
assert(m_out);
110+
assert(!m_out->m_value);
82111
if (result.has_value()) {
83112
if constexpr (std::same_as<void, T>) {
84113
m_out->m_value.emplace(success());
@@ -88,7 +117,8 @@ struct CoroutinePromiseType : CoroListNode {
88117
} else {
89118
m_out->m_value.emplace(std::move(result.assume_error()));
90119
}
91-
m_waiting_coro.resume();
120+
121+
assert(!m_waiting_coro.done() && "Waiting coro was destroyed before child has finished");
92122
}
93123

94124
private:
@@ -99,6 +129,7 @@ struct CoroutinePromiseType : CoroListNode {
99129
friend struct Fut<T, E>;
100130

101131
std::coroutine_handle<> m_waiting_coro = std::noop_coroutine();
132+
Reactor &m_reactor;
102133
Fut<T, E> *m_out = nullptr;
103134
};
104135

@@ -110,10 +141,10 @@ struct [[nodiscard("forgot to await?")]] Fut {
110141

111142
Fut(const Fut &) = delete;
112143
Fut(Fut &&rhs) noexcept
113-
: m_promise{std::exchange(rhs.m_promise, nullptr)},
144+
: m_handle{std::exchange(rhs.m_handle, nullptr)},
114145
m_value{std::exchange(rhs.m_value, std::nullopt)} {
115-
if (m_promise) {
116-
m_promise->m_out = this;
146+
if (m_handle) {
147+
m_handle.promise().m_out = this;
117148
}
118149
}
119150

@@ -124,13 +155,21 @@ struct [[nodiscard("forgot to await?")]] Fut {
124155
return *this;
125156
};
126157

158+
~Fut() {
159+
if (m_handle != nullptr) {
160+
Reactor &reactor = m_handle.promise().m_reactor;
161+
m_handle.destroy();
162+
reactor.free(m_handle.address());
163+
}
164+
}
165+
127166
[[nodiscard]] bool has_value() const noexcept {
128167
return m_value.has_value();
129168
}
130169

131170
Result<T, extend_error<E, SyscallError>> block_on() && noexcept {
132171
while (!m_value.has_value()) {
133-
Result res = Reactor::instance().do_event_loop_iteration();
172+
Result res = m_handle.promise().m_reactor.do_event_loop_iteration();
134173
if (!res) {
135174
return failure(res.assume_error());
136175
}
@@ -157,10 +196,11 @@ struct [[nodiscard("forgot to await?")]] Fut {
157196
}
158197

159198
void await_suspend(std::coroutine_handle<> h) const noexcept {
160-
m_future.m_promise->m_waiting_coro = h;
199+
m_future.m_handle.promise().m_waiting_coro = h;
161200
}
162201

163202
Result<T, E> await_resume() const noexcept {
203+
assert(m_future.m_value.has_value());
164204
return *std::move(m_future.m_value);
165205
}
166206

@@ -172,31 +212,26 @@ struct [[nodiscard("forgot to await?")]] Fut {
172212
Fut &m_future;
173213
};
174214

175-
Awaiter operator co_await() noexcept {
215+
Awaiter operator co_await() && noexcept {
176216
return Awaiter{*this};
177217
}
178218

179219
private:
180-
Fut(promise_type &prom) noexcept : m_promise{&prom} {
220+
Fut(std::coroutine_handle<promise_type> handle) noexcept : m_handle{handle} {
181221
}
182222

183223
Fut(AllocationError e) noexcept : m_value{failure(e)} {
184224
}
185225

186226
friend promise_type;
187227

188-
promise_type *m_promise = nullptr;
189-
190-
struct CoroListNode : bi::slist_base_hook<bi::link_mode<bi::link_mode_type::auto_unlink>> {
191-
virtual std::coroutine_handle<> coro_from_this() noexcept = 0;
192-
};
193-
194-
std::optional<Result<T, E>> m_value = std::nullopt;
228+
std::coroutine_handle<promise_type> m_handle = nullptr;
229+
[[no_unique_address]] std::optional<Result<T, E>> m_value = std::nullopt;
195230
};
196231

197232
template <typename T, typename E>
198233
Fut<T, E> detail::CoroutinePromiseType<T, E>::get_return_object() noexcept {
199-
Fut<T, E> fut{*this};
234+
Fut<T, E> fut{std::coroutine_handle<CoroutinePromiseType>::from_promise(*this)};
200235
m_out = &fut;
201236
return fut;
202237
}

include/corosig/Parallel.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "corosig/Coro.hpp"
55
#include "corosig/ErrorTypes.hpp"
66
#include "corosig/Result.hpp"
7+
#include "corosig/reactor/Reactor.hpp"
78

89
#include <boost/mp11/algorithm.hpp>
910
#include <concepts>
@@ -15,7 +16,7 @@
1516
namespace corosig {
1617

1718
template <typename... R, typename... E>
18-
Fut<std::tuple<Result<R, E>...>> when_all(Fut<R, E> &&...futs) noexcept {
19+
Fut<std::tuple<Result<R, E>...>> when_all(Reactor &, Fut<R, E> &&...futs) noexcept {
1920
co_return std::tuple{co_await std::move(futs)...};
2021
}
2122

@@ -45,8 +46,8 @@ using WrapVoid = std::conditional_t<std::same_as<void, T>, std::monostate, T>;
4546

4647
template <typename... R, typename... E>
4748
Fut<std::tuple<detail::WrapVoid<R>...>, extend_error<E...>>
48-
when_all_succeed(Fut<R, E> &&...futs) noexcept {
49-
Result results_res = co_await when_all(std::move(futs)...);
49+
when_all_succeed(Reactor &r, Fut<R, E> &&...futs) noexcept {
50+
Result results_res = co_await when_all(r, std::move(futs)...);
5051
if (results_res.has_error()) {
5152
co_return failure(std::move(results_res.assume_error()));
5253
}

0 commit comments

Comments
 (0)