|
16 | 16 | #include <boost/asio/experimental/use_promise.hpp>
|
17 | 17 | #include <boost/asio/steady_timer.hpp>
|
18 | 18 | #include <boost/asio/compose.hpp>
|
| 19 | +#include <boost/asio/consign.hpp> |
19 | 20 | #include <memory>
|
20 | 21 | #include <chrono>
|
21 | 22 | #include <optional>
|
22 | 23 |
|
23 | 24 | namespace boost::redis {
|
24 | 25 | namespace detail {
|
25 | 26 |
|
26 |
| -template <class Connection> |
| 27 | +template <class HealthChecker, class Connection> |
27 | 28 | class check_health_op {
|
28 |
| -private: |
29 |
| - using executor_type = typename Connection::executor_type; |
30 |
| - |
31 |
| - struct state { |
32 |
| - using promise_type = asio::experimental::promise<void(system::error_code, std::size_t), executor_type>; |
33 |
| - using timer_type = |
34 |
| - asio::basic_waitable_timer< |
35 |
| - std::chrono::steady_clock, |
36 |
| - asio::wait_traits<std::chrono::steady_clock>, |
37 |
| - executor_type>; |
38 |
| - |
39 |
| - |
40 |
| - timer_type timer_; |
41 |
| - redis::request req_; |
42 |
| - redis::generic_response resp_; |
43 |
| - std::optional<promise_type> prom_; |
44 |
| - std::chrono::steady_clock::duration interval_; |
45 |
| - |
46 |
| - state( |
47 |
| - executor_type ex, |
48 |
| - std::string const& msg, |
49 |
| - std::chrono::steady_clock::duration interval) |
50 |
| - : timer_{ex} |
51 |
| - , interval_{interval} |
52 |
| - { |
53 |
| - req_.push("PING", msg); |
54 |
| - } |
55 |
| - |
56 |
| - void reset() |
57 |
| - { |
58 |
| - resp_.value().clear(); |
59 |
| - prom_.reset(); |
60 |
| - } |
61 |
| - }; |
62 |
| - |
63 |
| - Connection* conn_ = nullptr; |
64 |
| - std::shared_ptr<state> state_ = nullptr; |
65 |
| - asio::coroutine coro_{}; |
66 |
| - |
67 | 29 | public:
|
68 |
| - check_health_op( |
69 |
| - Connection& conn, |
70 |
| - std::string const& msg, |
71 |
| - std::chrono::steady_clock::duration interval) |
72 |
| - : conn_{&conn} |
73 |
| - , state_{std::make_shared<state>(conn.get_executor(), msg, interval)} |
74 |
| - { } |
| 30 | + HealthChecker* checker = nullptr; |
| 31 | + Connection* conn = nullptr; |
| 32 | + asio::coroutine coro_{}; |
75 | 33 |
|
76 | 34 | template <class Self>
|
77 | 35 | void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
|
78 | 36 | {
|
79 | 37 | BOOST_ASIO_CORO_REENTER (coro_) for (;;)
|
80 | 38 | {
|
81 |
| - state_->prom_.emplace(conn_->async_exec(state_->req_, state_->resp_, asio::experimental::use_promise)); |
| 39 | + checker->prom_.emplace(conn->async_exec(checker->req_, checker->resp_, asio::experimental::use_promise)); |
82 | 40 |
|
83 |
| - state_->timer_.expires_after(state_->interval_); |
| 41 | + checker->timer_.expires_after(checker->interval_); |
84 | 42 | BOOST_ASIO_CORO_YIELD
|
85 |
| - state_->timer_.async_wait(std::move(self)); |
86 |
| - if (ec || is_cancelled(self) || state_->resp_.value().empty()) { |
87 |
| - conn_->cancel(operation::run); |
| 43 | + checker->timer_.async_wait(std::move(self)); |
| 44 | + if (ec || is_cancelled(self) || checker->resp_.value().empty()) { |
| 45 | + conn->cancel(operation::run); |
88 | 46 | BOOST_ASIO_CORO_YIELD
|
89 |
| - std::move(*state_->prom_)(std::move(self)); |
| 47 | + std::move(*checker->prom_)(std::move(self)); |
90 | 48 | self.complete({});
|
91 | 49 | return;
|
92 | 50 | }
|
93 | 51 |
|
94 |
| - state_->reset(); |
| 52 | + checker->reset(); |
95 | 53 | }
|
96 | 54 | }
|
97 | 55 | };
|
98 | 56 |
|
| 57 | +template <class Executor> |
| 58 | +class health_checker { |
| 59 | +private: |
| 60 | + using promise_type = asio::experimental::promise<void(system::error_code, std::size_t), Executor>; |
| 61 | + using timer_type = |
| 62 | + asio::basic_waitable_timer< |
| 63 | + std::chrono::steady_clock, |
| 64 | + asio::wait_traits<std::chrono::steady_clock>, |
| 65 | + Executor>; |
| 66 | + |
| 67 | +public: |
| 68 | + health_checker( |
| 69 | + Executor ex, |
| 70 | + std::string const& msg, |
| 71 | + std::chrono::steady_clock::duration interval) |
| 72 | + : timer_{ex} |
| 73 | + , interval_{interval} |
| 74 | + { |
| 75 | + req_.push("PING", msg); |
| 76 | + } |
| 77 | + |
| 78 | + template < |
| 79 | + class Connection, |
| 80 | + class CompletionToken = asio::default_completion_token_t<Executor> |
| 81 | + > |
| 82 | + auto async_check_health(Connection& conn, CompletionToken token = CompletionToken{}) |
| 83 | + { |
| 84 | + return asio::async_compose |
| 85 | + < CompletionToken |
| 86 | + , void(system::error_code) |
| 87 | + >(check_health_op<health_checker, Connection>{this, &conn}, token, conn); |
| 88 | + } |
| 89 | + |
| 90 | + void reset() |
| 91 | + { |
| 92 | + resp_.value().clear(); |
| 93 | + prom_.reset(); |
| 94 | + } |
| 95 | + |
| 96 | +private: |
| 97 | + template <class, class> friend class check_health_op; |
| 98 | + timer_type timer_; |
| 99 | + std::optional<promise_type> prom_; |
| 100 | + redis::request req_; |
| 101 | + redis::generic_response resp_; |
| 102 | + std::chrono::steady_clock::duration interval_; |
| 103 | +}; |
| 104 | + |
99 | 105 | } // detail
|
100 | 106 |
|
101 | 107 | /** @brief Checks Redis health asynchronously
|
@@ -127,10 +133,10 @@ async_check_health(
|
127 | 133 | std::chrono::steady_clock::duration interval = std::chrono::seconds{2},
|
128 | 134 | CompletionToken token = CompletionToken{})
|
129 | 135 | {
|
130 |
| - return asio::async_compose |
131 |
| - < CompletionToken |
132 |
| - , void(system::error_code) |
133 |
| - >(detail::check_health_op<Connection>{conn, msg, interval}, token, conn); |
| 136 | + using executor_type = typename Connection::executor_type; |
| 137 | + using health_checker_type = detail::health_checker<executor_type>; |
| 138 | + auto checker = std::make_shared<health_checker_type>(conn.get_executor(), msg, interval); |
| 139 | + return checker->async_check_health(conn, asio::consign(std::move(token), checker)); |
134 | 140 | }
|
135 | 141 |
|
136 | 142 | } // boost::redis
|
|
0 commit comments