Skip to content

Commit 22405e6

Browse files
committed
Accepts as valid responses to staged requests.
Before these changes the request had to be marked as written in order to interpret incoming responses as belonging to that request. On fast networks however, like on localhost and underload the responses might arrive before the write operation completed.
1 parent 723e727 commit 22405e6

File tree

7 files changed

+128
-87
lines changed

7 files changed

+128
-87
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,15 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php.
676676

677677
## Changelog
678678

679+
### Boost 1.85
680+
681+
* Fixes [issue 170](https://github.com/boostorg/redis/issues/170).
682+
Under load and on low-latency networks it is possible to start
683+
receiving responses before the write operation completed and while
684+
the request is still marked as staged and not written. This messes
685+
up with the heuristics that classifies responses as unsolicied or
686+
not.
687+
679688
### Boost 1.84 (First release in Boost)
680689

681690
* Deprecates the `async_receive` overload that takes a response. Users

include/boost/redis/detail/connection_base.hpp

Lines changed: 64 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ struct exec_op {
113113
asio::coroutine coro{};
114114

115115
template <class Self>
116-
void operator()(Self& self , system::error_code ec = {})
116+
void operator()(Self& self , system::error_code ec = {}, std::size_t = 0)
117117
{
118118
BOOST_ASIO_CORO_REENTER (coro)
119119
{
@@ -130,7 +130,6 @@ struct exec_op {
130130
EXEC_OP_WAIT:
131131
BOOST_ASIO_CORO_YIELD
132132
info_->async_wait(std::move(self));
133-
BOOST_ASSERT(ec == asio::error::operation_aborted);
134133

135134
if (info_->ec_) {
136135
self.complete(info_->ec_, 0);
@@ -140,18 +139,18 @@ struct exec_op {
140139
if (info_->stop_requested()) {
141140
// Don't have to call remove_request as it has already
142141
// been by cancel(exec).
143-
return self.complete(ec, 0);
142+
return self.complete(asio::error::operation_aborted, 0);
144143
}
145144

146145
if (is_cancelled(self)) {
147-
if (info_->is_written()) {
146+
if (!info_->is_waiting()) {
148147
using c_t = asio::cancellation_type;
149148
auto const c = self.get_cancellation_state().cancelled();
150149
if ((c & c_t::terminal) != c_t::none) {
151150
// Cancellation requires closing the connection
152151
// otherwise it stays in inconsistent state.
153152
conn_->cancel(operation::run);
154-
return self.complete(ec, 0);
153+
return self.complete(asio::error::operation_aborted, 0);
155154
} else {
156155
// Can't implement other cancelation types, ignoring.
157156
self.get_cancellation_state().clear();
@@ -163,7 +162,7 @@ struct exec_op {
163162
} else {
164163
// Cancelation can be honored.
165164
conn_->remove_request(info_);
166-
self.complete(ec, 0);
165+
self.complete(asio::error::operation_aborted, 0);
167166
return;
168167
}
169168
}
@@ -516,6 +515,7 @@ class connection_base {
516515
using runner_type = runner<executor_type>;
517516
using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
518517
using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
518+
using exec_notifier_type = receive_channel_type;
519519

520520
auto use_ssl() const noexcept
521521
{ return runner_.get_config().use_ssl;}
@@ -527,10 +527,10 @@ class connection_base {
527527
{
528528
BOOST_ASSERT(ptr != nullptr);
529529

530-
if (ptr->is_written()) {
531-
return !ptr->req_->get_config().cancel_if_unresponded;
532-
} else {
530+
if (ptr->is_waiting()) {
533531
return !ptr->req_->get_config().cancel_on_connection_lost;
532+
} else {
533+
return !ptr->req_->get_config().cancel_if_unresponded;
534534
}
535535
};
536536

@@ -544,7 +544,7 @@ class connection_base {
544544

545545
reqs_.erase(point, std::end(reqs_));
546546
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
547-
return ptr->reset_status();
547+
return ptr->mark_waiting();
548548
});
549549

550550
return ret;
@@ -555,7 +555,7 @@ class connection_base {
555555
auto f = [](auto const& ptr)
556556
{
557557
BOOST_ASSERT(ptr != nullptr);
558-
return ptr->is_written();
558+
return !ptr->is_waiting();
559559
};
560560

561561
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
@@ -615,25 +615,15 @@ class connection_base {
615615
using node_type = resp3::basic_node<std::string_view>;
616616
using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
617617

618-
enum class action
619-
{
620-
stop,
621-
proceed,
622-
none,
623-
};
624-
625618
explicit req_info(request const& req, adapter_type adapter, executor_type ex)
626-
: timer_{ex}
627-
, action_{action::none}
619+
: notifier_{ex, 1}
628620
, req_{&req}
629621
, adapter_{}
630622
, expected_responses_{req.get_expected_responses()}
631-
, status_{status::none}
623+
, status_{status::waiting}
632624
, ec_{{}}
633625
, read_size_{0}
634626
{
635-
timer_.expires_at((std::chrono::steady_clock::time_point::max)());
636-
637627
adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
638628
{
639629
auto const i = req_->get_expected_responses() - expected_responses_;
@@ -643,18 +633,16 @@ class connection_base {
643633

644634
auto proceed()
645635
{
646-
timer_.cancel();
647-
action_ = action::proceed;
636+
notifier_.try_send(std::error_code{}, 0);
648637
}
649638

650639
void stop()
651640
{
652-
timer_.cancel();
653-
action_ = action::stop;
641+
notifier_.close();
654642
}
655643

656-
[[nodiscard]] auto is_waiting_write() const noexcept
657-
{ return !is_written() && !is_staged(); }
644+
[[nodiscard]] auto is_waiting() const noexcept
645+
{ return status_ == status::waiting; }
658646

659647
[[nodiscard]] auto is_written() const noexcept
660648
{ return status_ == status::written; }
@@ -668,27 +656,26 @@ class connection_base {
668656
void mark_staged() noexcept
669657
{ status_ = status::staged; }
670658

671-
void reset_status() noexcept
672-
{ status_ = status::none; }
659+
void mark_waiting() noexcept
660+
{ status_ = status::waiting; }
673661

674662
[[nodiscard]] auto stop_requested() const noexcept
675-
{ return action_ == action::stop;}
663+
{ return !notifier_.is_open();}
676664

677665
template <class CompletionToken>
678666
auto async_wait(CompletionToken token)
679667
{
680-
return timer_.async_wait(std::move(token));
668+
return notifier_.async_receive(std::move(token));
681669
}
682670

683671
//private:
684672
enum class status
685-
{ none
673+
{ waiting
686674
, staged
687675
, written
688676
};
689677

690-
timer_type timer_;
691-
action action_;
678+
exec_notifier_type notifier_;
692679
request const* req_;
693680
wrapped_adapter_type adapter_;
694681

@@ -716,7 +703,7 @@ class connection_base {
716703
void cancel_push_requests()
717704
{
718705
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
719-
return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
706+
return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
720707
});
721708

722709
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
@@ -737,7 +724,7 @@ class connection_base {
737724

738725
if (info->req_->has_hello_priority()) {
739726
auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
740-
return e->is_waiting_write();
727+
return e->is_waiting();
741728
});
742729

743730
std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
@@ -781,7 +768,7 @@ class connection_base {
781768
// Coalesces the requests and marks them staged. After a
782769
// successful write staged requests will be marked as written.
783770
auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
784-
return !ri->is_waiting_write();
771+
return !ri->is_waiting();
785772
});
786773

787774
std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
@@ -798,7 +785,14 @@ class connection_base {
798785

799786
bool is_waiting_response() const noexcept
800787
{
801-
return !std::empty(reqs_) && reqs_.front()->is_written();
788+
if (std::empty(reqs_))
789+
return false;
790+
791+
// Under load and on low-latency networks we might start
792+
// receiving responses before the write operation completed and
793+
// the request is still maked as staged and not written. See
794+
// https://github.com/boostorg/redis/issues/170
795+
return !reqs_.front()->is_waiting();
802796
}
803797

804798
void close()
@@ -814,36 +808,39 @@ class connection_base {
814808

815809
auto is_next_push()
816810
{
817-
// We handle unsolicited events in the following way
818-
//
819-
// 1. Its resp3 type is a push.
820-
//
821-
// 2. A non-push type is received with an empty requests
822-
// queue. I have noticed this is possible (e.g. -MISCONF).
823-
// I expect them to have type push so we can distinguish
824-
// them from responses to commands, but it is a
825-
// simple-error. If we are lucky enough to receive them
826-
// when the command queue is empty we can treat them as
827-
// server pushes, otherwise it is impossible to handle
828-
// them properly
829-
//
830-
// 3. The request does not expect any response but we got
831-
// one. This may happen if for example, subscribe with
832-
// wrong syntax.
833-
//
834-
// Useful links:
811+
BOOST_ASSERT(!read_buffer_.empty());
812+
813+
// Useful links to understand the heuristics below.
835814
//
836815
// - https://github.com/redis/redis/issues/11784
837816
// - https://github.com/redis/redis/issues/6426
838-
//
839-
840-
BOOST_ASSERT(!read_buffer_.empty());
841-
842-
return
843-
(resp3::to_type(read_buffer_.front()) == resp3::type::push)
844-
|| reqs_.empty()
845-
|| (!reqs_.empty() && reqs_.front()->expected_responses_ == 0)
846-
|| !is_waiting_response(); // Added to deal with MONITOR.
817+
// - https://github.com/boostorg/redis/issues/170
818+
819+
// The message's resp3 type is a push.
820+
if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
821+
return true;
822+
823+
// This is non-push type and the requests queue is empty. I have
824+
// noticed this is possible, for example with -MISCONF. I don't
825+
// know why they are not sent with a push type so we can
826+
// distinguish them from responses to commands. If we are lucky
827+
// enough to receive them when the command queue is empty they
828+
// can be treated as server pushes, otherwise it is impossible
829+
// to handle them properly
830+
if (reqs_.empty())
831+
return true;
832+
833+
// The request does not expect any response but we got one. This
834+
// may happen if for example, subscribe with wrong syntax.
835+
if (reqs_.front()->expected_responses_ == 0)
836+
return true;
837+
838+
// Added to deal with MONITOR and also to fix PR170 which
839+
// happens under load and on low-latency networks, where we
840+
// might start receiving responses before the write operation
841+
// completed and the request is still maked as staged and not
842+
// written.
843+
return reqs_.front()->is_waiting();
847844
}
848845

849846
auto get_suggested_buffer_growth() const noexcept

include/boost/redis/request.hpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,31 +47,31 @@ class request {
4747
public:
4848
/// Request configuration options.
4949
struct config {
50-
/** \brief If `true`
51-
* `boost::redis::connection::async_exec` will complete with error if the
52-
* connection is lost. Affects only requests that haven't been
53-
* sent yet.
50+
/** \brief If `true` calls to `connection::async_exec` will
51+
* complete with error if the connection is lost while the
52+
* request hasn't been sent yet.
5453
*/
5554
bool cancel_on_connection_lost = true;
5655

57-
/** \brief If `true` the request will complete with
58-
* boost::redis::error::not_connected if `async_exec` is called before
59-
* the connection with Redis was established.
56+
/** \brief If `true` `connection::async_exec` will complete with
57+
* `boost::redis::error::not_connected` if the call happens
58+
* before the connection with Redis was established.
6059
*/
6160
bool cancel_if_not_connected = false;
6261

63-
/** \brief If `false` `boost::redis::connection::async_exec` will not
62+
/** \brief If `false` `connection::async_exec` will not
6463
* automatically cancel this request if the connection is lost.
6564
* Affects only requests that have been written to the socket
66-
* but remained unresponded when `boost::redis::connection::async_run`
67-
* completed.
65+
* but remained unresponded when
66+
* `boost::redis::connection::async_run` completed.
6867
*/
6968
bool cancel_if_unresponded = true;
7069

71-
/** \brief If this request has a `HELLO` command and this flag is
72-
* `true`, the `boost::redis::connection` will move it to the front of
73-
* the queue of awaiting requests. This makes it possible to
74-
* send `HELLO` and authenticate before other commands are sent.
70+
/** \brief If this request has a `HELLO` command and this flag
71+
* is `true`, the `boost::redis::connection` will move it to the
72+
* front of the queue of awaiting requests. This makes it
73+
* possible to send `HELLO` and authenticate before other
74+
* commands are sent.
7575
*/
7676
bool hello_with_priority = true;
7777
};

0 commit comments

Comments
 (0)