Skip to content

Commit 89a42db

Browse files
authored
async_exec now uses a sans-io strategy (#250)
async_exec now supports partial and total cancellation when the request is pending Added docs on async_exec's cancellation support Added detail::exec_fsm and macros for coroutine emulation Requests are now removed from the multiplexer when an error is encountered
1 parent a39d130 commit 89a42db

File tree

12 files changed

+693
-61
lines changed

12 files changed

+693
-61
lines changed

include/boost/redis/connection.hpp

Lines changed: 38 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <boost/redis/adapter/adapt.hpp>
1111
#include <boost/redis/adapter/any_adapter.hpp>
1212
#include <boost/redis/config.hpp>
13+
#include <boost/redis/detail/exec_fsm.hpp>
1314
#include <boost/redis/detail/health_checker.hpp>
1415
#include <boost/redis/detail/helper.hpp>
1516
#include <boost/redis/detail/multiplexer.hpp>
@@ -24,7 +25,6 @@
2425

2526
#include <boost/asio/any_completion_handler.hpp>
2627
#include <boost/asio/any_io_executor.hpp>
27-
#include <boost/asio/associated_immediate_executor.hpp>
2828
#include <boost/asio/basic_stream_socket.hpp>
2929
#include <boost/asio/bind_executor.hpp>
3030
#include <boost/asio/buffer.hpp>
@@ -33,6 +33,7 @@
3333
#include <boost/asio/deferred.hpp>
3434
#include <boost/asio/experimental/channel.hpp>
3535
#include <boost/asio/experimental/parallel_group.hpp>
36+
#include <boost/asio/immediate.hpp>
3637
#include <boost/asio/io_context.hpp>
3738
#include <boost/asio/ip/tcp.hpp>
3839
#include <boost/asio/prepend.hpp>
@@ -110,68 +111,41 @@ using exec_notifier_type = asio::experimental::channel<
110111

111112
template <class Conn>
112113
struct exec_op {
113-
using req_info_type = typename multiplexer::elem;
114114
using executor_type = typename Conn::executor_type;
115115

116116
Conn* conn_ = nullptr;
117117
std::shared_ptr<exec_notifier_type<executor_type>> notifier_ = nullptr;
118-
std::shared_ptr<req_info_type> info_ = nullptr;
119-
asio::coroutine coro_{};
118+
detail::exec_fsm fsm_;
120119

121120
template <class Self>
122121
void operator()(Self& self, system::error_code = {}, std::size_t = 0)
123122
{
124-
BOOST_ASIO_CORO_REENTER(coro_)
125-
{
126-
// Check whether the user wants to wait for the connection to
127-
// be stablished.
128-
if (info_->get_request().get_config().cancel_if_not_connected && !conn_->is_open()) {
129-
BOOST_ASIO_CORO_YIELD
130-
asio::dispatch(
131-
asio::get_associated_immediate_executor(self, self.get_io_executor()),
132-
std::move(self));
133-
return self.complete(error::not_connected, 0);
134-
}
135-
136-
conn_->mpx_.add(info_);
137-
if (conn_->trigger_write()) {
138-
conn_->writer_timer_.cancel();
139-
}
140-
141-
EXEC_OP_WAIT:
142-
BOOST_ASIO_CORO_YIELD
143-
notifier_->async_receive(std::move(self));
144-
145-
if (info_->get_error()) {
146-
self.complete(info_->get_error(), 0);
147-
return;
148-
}
149-
150-
if (is_cancelled(self)) {
151-
if (!conn_->mpx_.remove(info_)) {
152-
using c_t = asio::cancellation_type;
153-
auto const c = self.get_cancellation_state().cancelled();
154-
if ((c & c_t::terminal) != c_t::none) {
155-
// Cancellation requires closing the connection
156-
// otherwise it stays in inconsistent state.
157-
conn_->cancel(operation::run);
158-
return self.complete(asio::error::operation_aborted, 0);
159-
} else {
160-
// Can't implement other cancelation types, ignoring.
161-
self.get_cancellation_state().clear();
162-
163-
// TODO: Find out a better way to ignore
164-
// cancelation.
165-
goto EXEC_OP_WAIT;
166-
}
167-
} else {
168-
// Cancelation honored.
169-
self.complete(asio::error::operation_aborted, 0);
123+
while (true) {
124+
// Invoke the state machine
125+
auto act = fsm_.resume(conn_->is_open(), self.get_cancellation_state().cancelled());
126+
127+
// Do what the FSM said
128+
switch (act.type()) {
129+
case detail::exec_action_type::setup_cancellation:
130+
self.reset_cancellation_state(asio::enable_total_cancellation());
131+
continue; // this action does not require yielding
132+
case detail::exec_action_type::immediate:
133+
asio::async_immediate(self.get_io_executor(), std::move(self));
134+
return;
135+
case detail::exec_action_type::notify_writer:
136+
conn_->writer_timer_.cancel();
137+
continue; // this action does not require yielding
138+
case detail::exec_action_type::wait_for_response:
139+
notifier_->async_receive(std::move(self));
140+
return;
141+
case detail::exec_action_type::cancel_run:
142+
conn_->cancel(operation::run);
143+
continue; // this action does not require yielding
144+
case detail::exec_action_type::done:
145+
notifier_.reset();
146+
self.complete(act.error(), act.bytes_read());
170147
return;
171-
}
172148
}
173-
174-
self.complete(info_->get_error(), info_->get_read_size());
175149
}
176150
}
177151
};
@@ -612,6 +586,16 @@ class basic_connection {
612586
*
613587
* Where the second parameter is the size of the response received
614588
* in bytes.
589+
*
590+
* @par Per-operation cancellation
591+
* This operation supports per-operation cancellation. The following cancellation types
592+
* are supported:
593+
*
594+
* - `asio::cancellation_type_t::terminal`. Always supported. May cause the current
595+
* `async_run` operation to be cancelled.
596+
* - `asio::cancellation_type_t::partial` and `asio::cancellation_type_t::total`.
597+
* Supported only if the request hasn't been written to the network yet.
598+
*
615599
*/
616600
template <
617601
class Response = ignore_t,
@@ -644,7 +628,7 @@ class basic_connection {
644628
});
645629

646630
return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
647-
detail::exec_op<this_type>{this, notifier, info},
631+
detail::exec_op<this_type>{this, notifier, detail::exec_fsm(mpx_, std::move(info))},
648632
token,
649633
writer_timer_);
650634
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//
2+
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
3+
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
4+
//
5+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7+
//
8+
9+
#ifndef BOOST_REDIS_DETAIL_COROUTINE_HPP
10+
#define BOOST_REDIS_DETAIL_COROUTINE_HPP
11+
12+
// asio::coroutine uses __COUNTER__ internally, which can trigger
13+
// ODR violations if we use them in header-only code. These manifest as
14+
// extremely hard-to-debug bugs only present in release builds.
15+
// Use this instead when doing coroutines in non-template code.
16+
// Adapted from Boost.MySQL.
17+
18+
// Coroutine state is represented as an integer (resume_point_var).
19+
// Every yield gets assigned a unique value (resume_point_id).
20+
// Yielding sets the next resume point, returns, and sets a case label for re-entering.
21+
// Coroutines need to switch on resume_point_var to re-enter.
22+
23+
// Enclosing this in a scope allows placing the macro inside a brace-less for/while loop
24+
// The empty scope after the case label is required because labels can't be at the end of a compound statement
25+
#define BOOST_REDIS_YIELD(resume_point_var, resume_point_id, ...) \
26+
{ \
27+
resume_point_var = resume_point_id; \
28+
return __VA_ARGS__; \
29+
case resume_point_id: \
30+
{ \
31+
} \
32+
}
33+
34+
#define BOOST_REDIS_CORO_INITIAL case 0:
35+
36+
#endif
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
//
2+
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
3+
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
4+
//
5+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7+
//
8+
9+
#ifndef BOOST_REDIS_EXEC_FSM_HPP
10+
#define BOOST_REDIS_EXEC_FSM_HPP
11+
12+
#include <boost/redis/detail/multiplexer.hpp>
13+
14+
#include <boost/asio/cancellation_type.hpp>
15+
#include <boost/system/error_code.hpp>
16+
17+
#include <cstddef>
18+
#include <memory>
19+
20+
// Sans-io algorithm for async_exec, as a finite state machine
21+
22+
namespace boost::redis::detail {
23+
24+
// What should we do next?
25+
enum class exec_action_type
26+
{
27+
setup_cancellation, // Set up the cancellation types supported by the composed operation
28+
immediate, // Invoke asio::async_immediate to avoid re-entrancy problems
29+
done, // Call the final handler
30+
notify_writer, // Notify the writer task
31+
wait_for_response, // Wait to be notified
32+
cancel_run, // Cancel the connection's run operation
33+
};
34+
35+
class exec_action {
36+
exec_action_type type_;
37+
system::error_code ec_;
38+
std::size_t bytes_read_;
39+
40+
public:
41+
exec_action(exec_action_type type) noexcept
42+
: type_{type}
43+
{ }
44+
45+
exec_action(system::error_code ec, std::size_t bytes_read = 0u) noexcept
46+
: type_{exec_action_type::done}
47+
, ec_{ec}
48+
, bytes_read_{bytes_read}
49+
{ }
50+
51+
exec_action_type type() const { return type_; }
52+
system::error_code error() const { return ec_; }
53+
std::size_t bytes_read() const { return bytes_read_; }
54+
};
55+
56+
class exec_fsm {
57+
int resume_point_{0};
58+
multiplexer* mpx_{nullptr};
59+
std::shared_ptr<multiplexer::elem> elem_;
60+
61+
public:
62+
exec_fsm(multiplexer& mpx, std::shared_ptr<multiplexer::elem> elem) noexcept
63+
: mpx_(&mpx)
64+
, elem_(std::move(elem))
65+
{ }
66+
67+
exec_action resume(bool connection_is_open, asio::cancellation_type_t cancel_state);
68+
};
69+
70+
} // namespace boost::redis::detail
71+
72+
#endif // BOOST_REDIS_CONNECTOR_HPP

include/boost/redis/detail/multiplexer.hpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ struct multiplexer {
4343

4444
void set_done_callback(std::function<void()> f) noexcept { done_ = std::move(f); };
4545

46-
auto notify_done() noexcept { done_(); }
46+
auto notify_done() noexcept -> void
47+
{
48+
status_ = status::done;
49+
done_();
50+
}
4751

4852
auto notify_error(system::error_code ec) noexcept -> void;
4953

@@ -65,6 +69,12 @@ struct multiplexer {
6569
return status_ == status::staged;
6670
}
6771

72+
[[nodiscard]]
73+
bool is_done() const noexcept
74+
{
75+
return status_ == status::done;
76+
}
77+
6878
void mark_written() noexcept { status_ = status::written; }
6979

7080
void mark_staged() noexcept { status_ = status::staged; }
@@ -86,9 +96,10 @@ struct multiplexer {
8696
private:
8797
enum class status
8898
{
89-
waiting,
90-
staged,
91-
written
99+
waiting, // the request hasn't been written yet
100+
staged, // we've issued the write for this request, but it hasn't finished yet
101+
written, // the request has been written successfully
102+
done, // the request has completed and the done callback has been invoked
92103
};
93104

94105
request const* req_;

include/boost/redis/impl/exec_fsm.ipp

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
//
2+
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
3+
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
4+
//
5+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7+
//
8+
9+
#ifndef BOOST_REDIS_EXEC_FSM_IPP
10+
#define BOOST_REDIS_EXEC_FSM_IPP
11+
12+
#include <boost/redis/detail/coroutine.hpp>
13+
#include <boost/redis/detail/exec_fsm.hpp>
14+
#include <boost/redis/request.hpp>
15+
16+
#include <boost/asio/error.hpp>
17+
#include <boost/assert.hpp>
18+
19+
namespace boost::redis::detail {
20+
21+
inline bool is_cancellation(asio::cancellation_type_t type)
22+
{
23+
return !!(
24+
type & (asio::cancellation_type_t::total | asio::cancellation_type_t::partial |
25+
asio::cancellation_type_t::terminal));
26+
}
27+
28+
} // namespace boost::redis::detail
29+
30+
boost::redis::detail::exec_action boost::redis::detail::exec_fsm::resume(
31+
bool connection_is_open,
32+
asio::cancellation_type_t cancel_state)
33+
{
34+
switch (resume_point_) {
35+
BOOST_REDIS_CORO_INITIAL
36+
37+
// Check whether the user wants to wait for the connection to
38+
// be established.
39+
if (elem_->get_request().get_config().cancel_if_not_connected && !connection_is_open) {
40+
BOOST_REDIS_YIELD(resume_point_, 1, exec_action_type::immediate)
41+
elem_.reset(); // Deallocate memory before finalizing
42+
return system::error_code(error::not_connected);
43+
}
44+
45+
// No more immediate errors. Set up the supported cancellation types.
46+
// This is required to get partial and total cancellations.
47+
// This is a potentially allocating operation, so do it as late as we can.
48+
BOOST_REDIS_YIELD(resume_point_, 2, exec_action_type::setup_cancellation)
49+
50+
// Add the request to the multiplexer
51+
mpx_->add(elem_);
52+
53+
// Notify the writer task that there is work to do. If the task is not
54+
// listening (e.g. it's already writing or the connection is not healthy),
55+
// this is a no-op. Since this is sync, no cancellation can happen here.
56+
BOOST_REDIS_YIELD(resume_point_, 3, exec_action_type::notify_writer)
57+
58+
while (true) {
59+
// Wait until we get notified. This will return once the request completes,
60+
// or upon any kind of cancellation
61+
BOOST_REDIS_YIELD(resume_point_, 4, exec_action_type::wait_for_response)
62+
63+
// If the request has completed (with error or not), we're done
64+
if (elem_->is_done()) {
65+
exec_action act{elem_->get_error(), elem_->get_read_size()};
66+
elem_.reset(); // Deallocate memory before finalizing
67+
return act;
68+
}
69+
70+
// If we're cancelled, try to remove the request from the queue. This will only
71+
// succeed if the request is waiting (wasn't written yet)
72+
if (is_cancellation(cancel_state) && mpx_->remove(elem_)) {
73+
elem_.reset(); // Deallocate memory before finalizing
74+
return exec_action{asio::error::operation_aborted};
75+
}
76+
77+
// If we hit a terminal cancellation, tear down the connection.
78+
// Otherwise, go back to waiting.
79+
// TODO: we could likely do better here and mark the request as cancelled, removing
80+
// the done callback and the adapter. But this requires further exploration
81+
if (!!(cancel_state & asio::cancellation_type_t::terminal)) {
82+
BOOST_REDIS_YIELD(resume_point_, 5, exec_action_type::cancel_run)
83+
elem_.reset(); // Deallocate memory before finalizing
84+
return exec_action{asio::error::operation_aborted};
85+
}
86+
}
87+
}
88+
89+
// We should never get here
90+
BOOST_ASSERT(false);
91+
return exec_action{system::error_code()};
92+
}
93+
94+
#endif

0 commit comments

Comments
 (0)