@@ -43,7 +43,7 @@ namespace detail {
43
43
44
44
template <class Conn >
45
45
struct wait_receive_op {
46
- Conn* conn ;
46
+ Conn* conn_ ;
47
47
asio::coroutine coro{};
48
48
49
49
template <class Self >
@@ -52,14 +52,14 @@ struct wait_receive_op {
52
52
{
53
53
BOOST_ASIO_CORO_REENTER (coro)
54
54
{
55
- BOOST_ASIO_CORO_YIELD
56
- conn->channel_ .async_send (system::error_code{}, 0 , std::move (self));
57
- BOOST_REDIS_CHECK_OP0 (;);
55
+ conn_->channel_ .cancel ();
58
56
59
57
BOOST_ASIO_CORO_YIELD
60
- conn->channel_ .async_send (system::error_code{}, 0 , std::move (self));
61
- BOOST_REDIS_CHECK_OP0 (;);
62
-
58
+ conn_->channel_ .async_send (system::error_code{}, 0 , std::move (self));
59
+ if (!conn_->is_open () || is_cancelled (self)) {
60
+ self.complete (!!ec ? ec : asio::error::operation_aborted);
61
+ return ;
62
+ }
63
63
self.complete ({});
64
64
}
65
65
}
@@ -158,7 +158,7 @@ class read_next_op {
158
158
159
159
template <class Conn , class Adapter >
160
160
struct receive_op {
161
- Conn* conn ;
161
+ Conn* conn_ ;
162
162
Adapter adapter;
163
163
std::size_t read_size = 0 ;
164
164
asio::coroutine coro{};
@@ -171,27 +171,32 @@ struct receive_op {
171
171
{
172
172
BOOST_ASIO_CORO_REENTER (coro)
173
173
{
174
- BOOST_ASIO_CORO_YIELD
175
- conn->channel_ .async_receive (std::move (self));
176
- BOOST_REDIS_CHECK_OP1 (;);
174
+ if (conn_->wait_read_op_notification_ ) {
175
+ BOOST_ASIO_CORO_YIELD
176
+ conn_->channel_ .async_receive (std::move (self));
177
+ if (!conn_->is_open () || is_cancelled (self)) {
178
+ self.complete (!!ec ? ec : asio::error::operation_aborted, 0 );
179
+ return ;
180
+ }
181
+ }
177
182
178
- if (conn ->use_ssl ())
179
- BOOST_ASIO_CORO_YIELD redis::detail::async_read (conn ->next_layer (), conn ->make_dynamic_buffer (), adapter, std::move (self));
183
+ if (conn_ ->use_ssl ())
184
+ BOOST_ASIO_CORO_YIELD redis::detail::async_read (conn_ ->next_layer (), conn_ ->make_dynamic_buffer (), adapter, std::move (self));
180
185
else
181
- BOOST_ASIO_CORO_YIELD redis::detail::async_read (conn ->next_layer ().next_layer (), conn ->make_dynamic_buffer (), adapter, std::move (self));
186
+ BOOST_ASIO_CORO_YIELD redis::detail::async_read (conn_ ->next_layer ().next_layer (), conn_ ->make_dynamic_buffer (), adapter, std::move (self));
182
187
183
188
if (ec || is_cancelled (self)) {
184
- conn ->cancel (operation::run);
185
- conn ->cancel (operation::receive);
189
+ conn_ ->cancel (operation::run);
190
+ conn_ ->cancel (operation::receive);
186
191
self.complete (!!ec ? ec : asio::error::operation_aborted, {});
187
192
return ;
188
193
}
189
194
190
195
read_size = n;
191
196
192
- BOOST_ASIO_CORO_YIELD
193
- conn-> channel_ . async_receive ( std::move (self));
194
- BOOST_REDIS_CHECK_OP1 (; );
197
+ conn_-> wait_read_op_notification_ = !conn_-> is_next_maybe_push ();
198
+ if (conn_-> wait_read_op_notification_ )
199
+ conn_-> channel_ . cancel ( );
195
200
196
201
self.complete ({}, read_size);
197
202
return ;
@@ -315,6 +320,7 @@ struct run_op {
315
320
{
316
321
conn->write_buffer_ .clear ();
317
322
conn->read_buffer_ .clear ();
323
+ conn->wait_read_op_notification_ = true ;
318
324
319
325
BOOST_ASIO_CORO_YIELD
320
326
asio::experimental::make_parallel_group (
@@ -991,6 +997,11 @@ class connection_base {
991
997
stream_->next_layer ().close ();
992
998
}
993
999
1000
+ bool is_next_maybe_push () const noexcept
1001
+ {
1002
+ return !read_buffer_.empty () && (resp3::to_type (read_buffer_.front ()) == resp3::type::push);
1003
+ }
1004
+
994
1005
auto is_open () const noexcept { return stream_->next_layer ().is_open (); }
995
1006
auto & lowest_layer () noexcept { return stream_->lowest_layer (); }
996
1007
@@ -1009,6 +1020,9 @@ class connection_base {
1009
1020
std::string write_buffer_;
1010
1021
reqs_type reqs_;
1011
1022
std::size_t max_read_size_ = (std::numeric_limits<std::size_t >::max)();
1023
+
1024
+ // Flag that optimizes reading pushes.
1025
+ bool wait_read_op_notification_ = true ;
1012
1026
};
1013
1027
1014
1028
} // boost::redis
0 commit comments