Skip to content

Commit 3c02a76

Browse files
committed
Replaces connection channel with a timer.
1 parent 538ab8f commit 3c02a76

File tree

3 files changed

+37
-27
lines changed

3 files changed

+37
-27
lines changed

include/boost/redis/connection_base.hpp

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include <boost/system.hpp>
2121
#include <boost/asio/basic_stream_socket.hpp>
2222
#include <boost/asio/bind_executor.hpp>
23-
#include <boost/asio/experimental/channel.hpp>
2423
#include <boost/asio/experimental/parallel_group.hpp>
2524
#include <boost/asio/ip/tcp.hpp>
2625
#include <boost/asio/steady_timer.hpp>
@@ -52,10 +51,10 @@ struct wait_receive_op {
5251
{
5352
BOOST_ASIO_CORO_REENTER (coro)
5453
{
55-
conn_->channel_.cancel();
54+
conn_->read_op_timer_.cancel();
5655

5756
BOOST_ASIO_CORO_YIELD
58-
conn_->channel_.async_send(system::error_code{}, 0, std::move(self));
57+
conn_->read_op_timer_.async_wait(std::move(self));
5958
if (!conn_->is_open() || is_cancelled(self)) {
6059
self.complete(!!ec ? ec : asio::error::operation_aborted);
6160
return;
@@ -143,7 +142,11 @@ class read_next_op {
143142

144143
++index_;
145144

146-
BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run););
145+
if (ec || redis::detail::is_cancelled(self)) {
146+
conn_->cancel(operation::run);
147+
self.complete(!!ec ? ec : asio::error::operation_aborted, {});
148+
return;
149+
}
147150

148151
read_size_ += n;
149152

@@ -171,9 +174,9 @@ struct receive_op {
171174
{
172175
BOOST_ASIO_CORO_REENTER (coro)
173176
{
174-
if (conn_->wait_read_op_notification_) {
177+
if (!conn_->is_next_push()) {
175178
BOOST_ASIO_CORO_YIELD
176-
conn_->channel_.async_receive(std::move(self));
179+
conn_->read_op_timer_.async_wait(std::move(self));
177180
if (!conn_->is_open() || is_cancelled(self)) {
178181
self.complete(!!ec ? ec : asio::error::operation_aborted, 0);
179182
return;
@@ -194,9 +197,9 @@ struct receive_op {
194197

195198
read_size = n;
196199

197-
conn_->wait_read_op_notification_ = !conn_->is_next_maybe_push();
198-
if (conn_->wait_read_op_notification_)
199-
conn_->channel_.cancel();
200+
if (!conn_->is_next_push()) {
201+
conn_->read_op_timer_.cancel();
202+
}
200203

201204
self.complete({}, read_size);
202205
return;
@@ -320,7 +323,6 @@ struct run_op {
320323
{
321324
conn->write_buffer_.clear();
322325
conn->read_buffer_.clear();
323-
conn->wait_read_op_notification_ = true;
324326

325327
BOOST_ASIO_CORO_YIELD
326328
asio::experimental::make_parallel_group(
@@ -496,11 +498,12 @@ class connection_base {
496498
, stream_{std::make_unique<next_layer_type>(ex, ctx_)}
497499
, writer_timer_{ex}
498500
, read_timer_{ex}
499-
, channel_{ex}
501+
, read_op_timer_{ex}
500502
, runner_{ex, {}}
501503
{
502504
writer_timer_.expires_at(std::chrono::steady_clock::time_point::max());
503505
read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
506+
read_op_timer_.expires_at(std::chrono::steady_clock::time_point::max());
504507
}
505508

506509
/// Contructs from an execution context.
@@ -628,7 +631,7 @@ class connection_base {
628631
return asio::async_compose
629632
< CompletionToken
630633
, void(system::error_code, std::size_t)
631-
>(redis::detail::receive_op<this_type, decltype(f)>{this, f}, token, channel_);
634+
>(redis::detail::receive_op<this_type, decltype(f)>{this, f}, token, read_op_timer_);
632635
}
633636

634637
/** @brief Starts underlying connection operations.
@@ -695,7 +698,6 @@ class connection_base {
695698
using clock_type = std::chrono::steady_clock;
696699
using clock_traits_type = asio::wait_traits<clock_type>;
697700
using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
698-
using channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
699701
using runner_type = redis::detail::runner<executor_type>;
700702

701703
auto use_ssl() const noexcept
@@ -767,7 +769,7 @@ class connection_base {
767769
} break;
768770
case operation::receive:
769771
{
770-
channel_.cancel();
772+
read_op_timer_.cancel();
771773
} break;
772774
default: /* ignore */;
773775
}
@@ -891,7 +893,7 @@ class connection_base {
891893
return asio::async_compose
892894
< CompletionToken
893895
, void(system::error_code)
894-
>(redis::detail::wait_receive_op<this_type>{this}, token, channel_);
896+
>(redis::detail::wait_receive_op<this_type>{this}, token, read_op_timer_);
895897
}
896898

897899
void cancel_push_requests()
@@ -997,7 +999,7 @@ class connection_base {
997999
stream_->next_layer().close();
9981000
}
9991001

1000-
bool is_next_maybe_push() const noexcept
1002+
bool is_next_push() const noexcept
10011003
{
10021004
return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) == resp3::type::push);
10031005
}
@@ -1013,16 +1015,13 @@ class connection_base {
10131015
// not suspend.
10141016
timer_type writer_timer_;
10151017
timer_type read_timer_;
1016-
channel_type channel_;
1018+
timer_type read_op_timer_;
10171019
runner_type runner_;
10181020

10191021
std::string read_buffer_;
10201022
std::string write_buffer_;
10211023
reqs_type reqs_;
10221024
std::size_t max_read_size_ = (std::numeric_limits<std::size_t>::max)();
1023-
1024-
// Flag that optimizes reading pushes.
1025-
bool wait_read_op_notification_ = true;
10261025
};
10271026

10281027
} // boost::redis

tests/test_conn_exec_error.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,25 @@ BOOST_AUTO_TEST_CASE(error_in_transaction)
200200
ioc.run();
201201
}
202202

203-
// This test is important because a subscriber has no response on
204-
// success, but on error, for example when using a wrong syntax, the
205-
// server will send a simple error response the client is not
206-
// expecting.
203+
// This test is important because a SUBSCRIBE command has no response
204+
// on success, but does on error. for example when using a wrong
205+
// syntax, the server will send a simple error response the client is
206+
// not expecting.
207+
//
208+
// Sending the subscribe after the ping command below is just a
209+
// convenience to avoid have it merged in a pipeline making things
210+
// even more complex. For example, without a ping, we might get the
211+
// sequence HELLO + SUBSCRIBE + PING where the hello and ping are
212+
// automatically sent by the implementation. In this case, if the
213+
// subscribe synthax is wrong, redis will send a response, which does
214+
// not exist on success. That response will be interprested as the
215+
// response to the PING command that comes thereafter and won't be
216+
// forwarded to the receive_op, resulting in a difficult to handle
217+
// error.
207218
BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
208219
{
209220
request req1;
210-
req1.push("HELLO", 3);
221+
req1.push("PING");
211222

212223
request req2;
213224
req2.push("SUBSCRIBE"); // Wrong command synthax.

tests/test_conn_push.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ push_consumer1(std::shared_ptr<connection> conn, bool& push_received)
152152

153153
{
154154
auto [ec, ev] = co_await conn->async_receive(ignore, as_tuple(net::use_awaitable));
155-
BOOST_CHECK_EQUAL(ec, net::experimental::channel_errc::channel_cancelled);
155+
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
156156
}
157157

158158
push_received = true;
@@ -196,7 +196,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
196196
});
197197

198198
conn->async_exec(req, ignore, [](auto ec, auto){
199-
BOOST_CHECK_EQUAL(ec, net::experimental::error::channel_errors::channel_cancelled);
199+
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
200200
});
201201

202202
run(conn);

0 commit comments

Comments
 (0)