1
- /* Copyright (c) 2018-2023 Marcelo Zimbres Silva ([email protected] )
1
+ /* Copyright (c) 2018-2024 Marcelo Zimbres Silva ([email protected] )
2
2
*
3
3
* Distributed under the Boost Software License, Version 1.0. (See
4
4
* accompanying file LICENSE.txt)
23
23
#include < boost/asio/experimental/parallel_group.hpp>
24
24
#include < boost/asio/ip/tcp.hpp>
25
25
#include < boost/asio/steady_timer.hpp>
26
+ #include < boost/asio/prepend.hpp>
26
27
#include < string>
27
28
#include < memory>
28
29
#include < chrono>
@@ -87,11 +88,52 @@ class runner_op {
87
88
{
88
89
BOOST_ASIO_CORO_REENTER (coro_)
89
90
{
91
+ BOOST_ASIO_CORO_YIELD
92
+ runner_->resv_ .async_resolve (
93
+ asio::prepend (std::move (self), std::array<std::size_t , 3 > {}));
94
+
95
+ logger_.on_resolve (ec0, runner_->resv_ .results ());
96
+
97
+ if (ec0 || redis::detail::is_cancelled (self)) {
98
+ self.complete (!!ec0 ? ec0 : asio::error::operation_aborted);
99
+ return ;
100
+ }
101
+
102
+ BOOST_ASIO_CORO_YIELD
103
+ runner_->ctor_ .async_connect (
104
+ conn_->next_layer ().next_layer (),
105
+ runner_->resv_ .results (),
106
+ asio::prepend (std::move (self), std::array<std::size_t , 3 > {}));
107
+
108
+ logger_.on_connect (ec0, runner_->ctor_ .endpoint ());
109
+
110
+ if (ec0 || redis::detail::is_cancelled (self)) {
111
+ self.complete (!!ec0 ? ec0 : asio::error::operation_aborted);
112
+ return ;
113
+ }
114
+
115
+ if (conn_->use_ssl ()) {
116
+ BOOST_ASIO_CORO_YIELD
117
+ runner_->hsher_ .async_handshake (
118
+ conn_->next_layer (),
119
+ asio::prepend (std::move (self),
120
+ std::array<std::size_t , 3 > {}));
121
+
122
+ logger_.on_ssl_handshake (ec0);
123
+ if (ec0 || redis::detail::is_cancelled (self)) {
124
+ self.complete (!!ec0 ? ec0 : asio::error::operation_aborted);
125
+ return ;
126
+ }
127
+ }
128
+
129
+ // Note: Oder is important here because async_run might
130
+ // trigger an async_write before the async_hello thereby
131
+ // causing authentication problems.
90
132
BOOST_ASIO_CORO_YIELD
91
133
asio::experimental::make_parallel_group (
92
- [this ](auto token) { return runner_->async_run_all (*conn_, logger_, token); },
134
+ [this ](auto token) { return runner_->async_hello (*conn_, logger_, token); },
93
135
[this ](auto token) { return runner_->health_checker_ .async_check_health (*conn_, logger_, token); },
94
- [this ](auto token) { return runner_-> async_hello (*conn_ , logger_, token); }
136
+ [this ](auto token) { return conn_-> async_run_lean (runner_-> cfg_ , logger_, token); }
95
137
).async_wait (
96
138
asio::experimental::wait_for_one_error (),
97
139
std::move (self));
@@ -103,59 +145,17 @@ class runner_op {
103
145
return ;
104
146
}
105
147
106
- if (ec0 == error::connect_timeout || ec0 == error::resolve_timeout ) {
148
+ if (order[ 0 ] == 0 && !! ec0) {
107
149
self.complete (ec0);
108
150
return ;
109
151
}
110
152
111
- if (order[0 ] == 2 && !!ec2) {
112
- self.complete (ec2);
113
- return ;
114
- }
115
-
116
153
if (order[0 ] == 1 && ec1 == error::pong_timeout) {
117
154
self.complete (ec1);
118
155
return ;
119
156
}
120
157
121
- self.complete (ec0);
122
- }
123
- }
124
- };
125
-
126
- template <class Runner , class Connection , class Logger >
127
- struct run_all_op {
128
- Runner* runner_ = nullptr ;
129
- Connection* conn_ = nullptr ;
130
- Logger logger_;
131
- asio::coroutine coro_{};
132
-
133
- template <class Self >
134
- void operator ()(Self& self, system::error_code ec = {}, std::size_t = 0 )
135
- {
136
- BOOST_ASIO_CORO_REENTER (coro_)
137
- {
138
- BOOST_ASIO_CORO_YIELD
139
- runner_->resv_ .async_resolve (std::move (self));
140
- logger_.on_resolve (ec, runner_->resv_ .results ());
141
- BOOST_REDIS_CHECK_OP0 (conn_->cancel (operation::run);)
142
-
143
- BOOST_ASIO_CORO_YIELD
144
- runner_->ctor_ .async_connect (conn_->next_layer ().next_layer (), runner_->resv_ .results (), std::move (self));
145
- logger_.on_connect (ec, runner_->ctor_ .endpoint ());
146
- BOOST_REDIS_CHECK_OP0 (conn_->cancel (operation::run);)
147
-
148
- if (conn_->use_ssl ()) {
149
- BOOST_ASIO_CORO_YIELD
150
- runner_->hsher_ .async_handshake (conn_->next_layer (), std::move (self));
151
- logger_.on_ssl_handshake (ec);
152
- BOOST_REDIS_CHECK_OP0 (conn_->cancel (operation::run);)
153
- }
154
-
155
- BOOST_ASIO_CORO_YIELD
156
- conn_->async_run_lean (runner_->cfg_ , logger_, std::move (self));
157
- BOOST_REDIS_CHECK_OP0 (;)
158
- self.complete (ec);
158
+ self.complete (ec2);
159
159
}
160
160
}
161
161
};
@@ -207,19 +207,9 @@ class runner {
207
207
using health_checker_type = health_checker<Executor>;
208
208
using timer_type = typename connector_type::timer_type;
209
209
210
- template <class , class , class > friend struct run_all_op ;
211
210
template <class , class , class > friend class runner_op ;
212
211
template <class , class , class > friend struct hello_op ;
213
212
214
- template <class Connection , class Logger , class CompletionToken >
215
- auto async_run_all (Connection& conn, Logger l, CompletionToken token)
216
- {
217
- return asio::async_compose
218
- < CompletionToken
219
- , void (system::error_code)
220
- >(run_all_op<runner, Connection, Logger>{this , &conn, l}, token, conn);
221
- }
222
-
223
213
template <class Connection , class Logger , class CompletionToken >
224
214
auto async_hello (Connection& conn, Logger l, CompletionToken token)
225
215
{
0 commit comments