@@ -113,7 +113,7 @@ struct exec_op {
113
113
asio::coroutine coro{};
114
114
115
115
template <class Self >
116
- void operator ()(Self& self , system::error_code ec = {})
116
+ void operator ()(Self& self , system::error_code ec = {}, std:: size_t = 0 )
117
117
{
118
118
BOOST_ASIO_CORO_REENTER (coro)
119
119
{
@@ -130,7 +130,6 @@ struct exec_op {
130
130
EXEC_OP_WAIT:
131
131
BOOST_ASIO_CORO_YIELD
132
132
info_->async_wait (std::move (self));
133
- BOOST_ASSERT (ec == asio::error::operation_aborted);
134
133
135
134
if (info_->ec_ ) {
136
135
self.complete (info_->ec_ , 0 );
@@ -140,18 +139,18 @@ struct exec_op {
140
139
if (info_->stop_requested ()) {
141
140
// Don't have to call remove_request as it has already
142
141
// been by cancel(exec).
143
- return self.complete (ec , 0 );
142
+ return self.complete (asio::error::operation_aborted , 0 );
144
143
}
145
144
146
145
if (is_cancelled (self)) {
147
- if (info_->is_written ()) {
146
+ if (! info_->is_waiting ()) {
148
147
using c_t = asio::cancellation_type;
149
148
auto const c = self.get_cancellation_state ().cancelled ();
150
149
if ((c & c_t ::terminal) != c_t ::none) {
151
150
// Cancellation requires closing the connection
152
151
// otherwise it stays in inconsistent state.
153
152
conn_->cancel (operation::run);
154
- return self.complete (ec , 0 );
153
+ return self.complete (asio::error::operation_aborted , 0 );
155
154
} else {
156
155
// Can't implement other cancelation types, ignoring.
157
156
self.get_cancellation_state ().clear ();
@@ -163,7 +162,7 @@ struct exec_op {
163
162
} else {
164
163
// Cancelation can be honored.
165
164
conn_->remove_request (info_);
166
- self.complete (ec , 0 );
165
+ self.complete (asio::error::operation_aborted , 0 );
167
166
return ;
168
167
}
169
168
}
@@ -516,6 +515,7 @@ class connection_base {
516
515
using runner_type = runner<executor_type>;
517
516
using adapter_type = std::function<void (std::size_t , resp3::basic_node<std::string_view> const &, system::error_code&)>;
518
517
using receiver_adapter_type = std::function<void (resp3::basic_node<std::string_view> const &, system::error_code&)>;
518
+ using exec_notifier_type = receive_channel_type;
519
519
520
520
auto use_ssl () const noexcept
521
521
{ return runner_.get_config ().use_ssl ;}
@@ -527,10 +527,10 @@ class connection_base {
527
527
{
528
528
BOOST_ASSERT (ptr != nullptr );
529
529
530
- if (ptr->is_written ()) {
531
- return !ptr->req_ ->get_config ().cancel_if_unresponded ;
532
- } else {
530
+ if (ptr->is_waiting ()) {
533
531
return !ptr->req_ ->get_config ().cancel_on_connection_lost ;
532
+ } else {
533
+ return !ptr->req_ ->get_config ().cancel_if_unresponded ;
534
534
}
535
535
};
536
536
@@ -544,7 +544,7 @@ class connection_base {
544
544
545
545
reqs_.erase (point, std::end (reqs_));
546
546
std::for_each (std::begin (reqs_), std::end (reqs_), [](auto const & ptr) {
547
- return ptr->reset_status ();
547
+ return ptr->mark_waiting ();
548
548
});
549
549
550
550
return ret;
@@ -555,7 +555,7 @@ class connection_base {
555
555
auto f = [](auto const & ptr)
556
556
{
557
557
BOOST_ASSERT (ptr != nullptr );
558
- return ptr->is_written ();
558
+ return ! ptr->is_waiting ();
559
559
};
560
560
561
561
auto point = std::stable_partition (std::begin (reqs_), std::end (reqs_), f);
@@ -615,25 +615,15 @@ class connection_base {
615
615
using node_type = resp3::basic_node<std::string_view>;
616
616
using wrapped_adapter_type = std::function<void (node_type const &, system::error_code&)>;
617
617
618
- enum class action
619
- {
620
- stop,
621
- proceed,
622
- none,
623
- };
624
-
625
618
explicit req_info (request const & req, adapter_type adapter, executor_type ex)
626
- : timer_{ex}
627
- , action_{action::none}
619
+ : notifier_{ex, 1 }
628
620
, req_{&req}
629
621
, adapter_{}
630
622
, expected_responses_{req.get_expected_responses ()}
631
- , status_{status::none }
623
+ , status_{status::waiting }
632
624
, ec_{{}}
633
625
, read_size_{0 }
634
626
{
635
- timer_.expires_at ((std::chrono::steady_clock::time_point::max)());
636
-
637
627
adapter_ = [this , adapter](node_type const & nd, system::error_code& ec)
638
628
{
639
629
auto const i = req_->get_expected_responses () - expected_responses_;
@@ -643,18 +633,16 @@ class connection_base {
643
633
644
634
auto proceed ()
645
635
{
646
- timer_.cancel ();
647
- action_ = action::proceed;
636
+ notifier_.try_send (std::error_code{}, 0 );
648
637
}
649
638
650
639
void stop ()
651
640
{
652
- timer_.cancel ();
653
- action_ = action::stop;
641
+ notifier_.close ();
654
642
}
655
643
656
- [[nodiscard]] auto is_waiting_write () const noexcept
657
- { return ! is_written () && ! is_staged () ; }
644
+ [[nodiscard]] auto is_waiting () const noexcept
645
+ { return status_ == status::waiting ; }
658
646
659
647
[[nodiscard]] auto is_written () const noexcept
660
648
{ return status_ == status::written; }
@@ -668,27 +656,26 @@ class connection_base {
668
656
void mark_staged () noexcept
669
657
{ status_ = status::staged; }
670
658
671
- void reset_status () noexcept
672
- { status_ = status::none ; }
659
+ void mark_waiting () noexcept
660
+ { status_ = status::waiting ; }
673
661
674
662
[[nodiscard]] auto stop_requested () const noexcept
675
- { return action_ == action::stop ;}
663
+ { return !notifier_. is_open () ;}
676
664
677
665
template <class CompletionToken >
678
666
auto async_wait (CompletionToken token)
679
667
{
680
- return timer_. async_wait (std::move (token));
668
+ return notifier_. async_receive (std::move (token));
681
669
}
682
670
683
671
// private:
684
672
enum class status
685
- { none
673
+ { waiting
686
674
, staged
687
675
, written
688
676
};
689
677
690
- timer_type timer_;
691
- action action_;
678
+ exec_notifier_type notifier_;
692
679
request const * req_;
693
680
wrapped_adapter_type adapter_;
694
681
@@ -716,7 +703,7 @@ class connection_base {
716
703
void cancel_push_requests ()
717
704
{
718
705
auto point = std::stable_partition (std::begin (reqs_), std::end (reqs_), [](auto const & ptr) {
719
- return !(ptr->is_staged () && ptr->req_ ->get_expected_responses () == 0 );
706
+ return !(ptr->is_staged () && ptr->req_ ->get_expected_responses () == 0 );
720
707
});
721
708
722
709
std::for_each (point, std::end (reqs_), [](auto const & ptr) {
@@ -737,7 +724,7 @@ class connection_base {
737
724
738
725
if (info->req_ ->has_hello_priority ()) {
739
726
auto rend = std::partition_point (std::rbegin (reqs_), std::rend (reqs_), [](auto const & e) {
740
- return e->is_waiting_write ();
727
+ return e->is_waiting ();
741
728
});
742
729
743
730
std::rotate (std::rbegin (reqs_), std::rbegin (reqs_) + 1 , rend);
@@ -781,7 +768,7 @@ class connection_base {
781
768
// Coalesces the requests and marks them staged. After a
782
769
// successful write staged requests will be marked as written.
783
770
auto const point = std::partition_point (std::cbegin (reqs_), std::cend (reqs_), [](auto const & ri) {
784
- return !ri->is_waiting_write ();
771
+ return !ri->is_waiting ();
785
772
});
786
773
787
774
std::for_each (point, std::cend (reqs_), [this ](auto const & ri) {
@@ -798,7 +785,14 @@ class connection_base {
798
785
799
786
bool is_waiting_response () const noexcept
800
787
{
801
- return !std::empty (reqs_) && reqs_.front ()->is_written ();
788
+ if (std::empty (reqs_))
789
+ return false ;
790
+
791
+ // Under load and on low-latency networks we might start
792
+ // receiving responses before the write operation completed and
793
+ // the request is still maked as staged and not written. See
794
+ // https://github.com/boostorg/redis/issues/170
795
+ return !reqs_.front ()->is_waiting ();
802
796
}
803
797
804
798
void close ()
@@ -814,36 +808,39 @@ class connection_base {
814
808
815
809
auto is_next_push ()
816
810
{
817
- // We handle unsolicited events in the following way
818
- //
819
- // 1. Its resp3 type is a push.
820
- //
821
- // 2. A non-push type is received with an empty requests
822
- // queue. I have noticed this is possible (e.g. -MISCONF).
823
- // I expect them to have type push so we can distinguish
824
- // them from responses to commands, but it is a
825
- // simple-error. If we are lucky enough to receive them
826
- // when the command queue is empty we can treat them as
827
- // server pushes, otherwise it is impossible to handle
828
- // them properly
829
- //
830
- // 3. The request does not expect any response but we got
831
- // one. This may happen if for example, subscribe with
832
- // wrong syntax.
833
- //
834
- // Useful links:
811
+ BOOST_ASSERT (!read_buffer_.empty ());
812
+
813
+ // Useful links to understand the heuristics below.
835
814
//
836
815
// - https://github.com/redis/redis/issues/11784
837
816
// - https://github.com/redis/redis/issues/6426
838
- //
839
-
840
- BOOST_ASSERT (!read_buffer_.empty ());
841
-
842
- return
843
- (resp3::to_type (read_buffer_.front ()) == resp3::type::push)
844
- || reqs_.empty ()
845
- || (!reqs_.empty () && reqs_.front ()->expected_responses_ == 0 )
846
- || !is_waiting_response (); // Added to deal with MONITOR.
817
+ // - https://github.com/boostorg/redis/issues/170
818
+
819
+ // The message's resp3 type is a push.
820
+ if (resp3::to_type (read_buffer_.front ()) == resp3::type::push)
821
+ return true ;
822
+
823
+ // This is non-push type and the requests queue is empty. I have
824
+ // noticed this is possible, for example with -MISCONF. I don't
825
+ // know why they are not sent with a push type so we can
826
+ // distinguish them from responses to commands. If we are lucky
827
+ // enough to receive them when the command queue is empty they
828
+ // can be treated as server pushes, otherwise it is impossible
829
+ // to handle them properly
830
+ if (reqs_.empty ())
831
+ return true ;
832
+
833
+ // The request does not expect any response but we got one. This
834
+ // may happen if for example, subscribe with wrong syntax.
835
+ if (reqs_.front ()->expected_responses_ == 0 )
836
+ return true ;
837
+
838
+ // Added to deal with MONITOR and also to fix PR170 which
839
+ // happens under load and on low-latency networks, where we
840
+ // might start receiving responses before the write operation
841
+ // completed and the request is still maked as staged and not
842
+ // written.
843
+ return reqs_.front ()->is_waiting ();
847
844
}
848
845
849
846
auto get_suggested_buffer_growth () const noexcept
0 commit comments