Skip to content

Commit 5ac4f7e

Browse files
committed
Removes dependency on asio::promise as it does not compile on windows.
1 parent 7a08588 commit 5ac4f7e

File tree

9 files changed

+498
-66
lines changed

9 files changed

+498
-66
lines changed

examples/cpp20_subscriber.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
#include <boost/redis/experimental/connector.hpp>
88
#include <boost/redis/logger.hpp>
99
#include <boost/asio/awaitable.hpp>
10-
#include <boost/asio/detached.hpp>
1110
#include <boost/asio/use_awaitable.hpp>
1211
#include <boost/asio/co_spawn.hpp>
12+
#include <boost/asio/detached.hpp>
1313
#include <boost/asio/consign.hpp>
1414
#include <iostream>
1515

include/boost/redis/check_health.hpp

Lines changed: 128 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,53 +11,129 @@
1111
#include <boost/redis/request.hpp>
1212
#include <boost/redis/response.hpp>
1313
#include <boost/redis/operation.hpp>
14-
#include <boost/redis/detail/read_ops.hpp>
15-
#include <boost/asio/experimental/promise.hpp>
16-
#include <boost/asio/experimental/use_promise.hpp>
14+
#include <boost/redis/detail/helper.hpp>
1715
#include <boost/asio/steady_timer.hpp>
1816
#include <boost/asio/compose.hpp>
1917
#include <boost/asio/consign.hpp>
18+
#include <boost/asio/coroutine.hpp>
19+
#include <boost/asio/experimental/parallel_group.hpp>
2020
#include <memory>
2121
#include <chrono>
22-
#include <optional>
2322

2423
namespace boost::redis {
2524
namespace detail {
2625

2726
template <class HealthChecker, class Connection>
28-
class check_health_op {
27+
class ping_op {
2928
public:
30-
HealthChecker* checker = nullptr;
31-
Connection* conn = nullptr;
29+
HealthChecker* checker_ = nullptr;
30+
Connection* conn_ = nullptr;
3231
asio::coroutine coro_{};
3332

3433
template <class Self>
3534
void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
3635
{
3736
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
3837
{
39-
checker->prom_.emplace(conn->async_exec(checker->req_, checker->resp_, asio::experimental::use_promise));
38+
if (checker_->checker_has_exited_) {
39+
self.complete({});
40+
return;
41+
}
4042

41-
checker->timer_.expires_after(checker->timeout_);
4243
BOOST_ASIO_CORO_YIELD
43-
checker->timer_.async_wait(std::move(self));
44-
if (ec || is_cancelled(self) || checker->resp_.value().empty()) {
45-
conn->cancel(operation::run);
46-
BOOST_ASIO_CORO_YIELD
47-
std::move(*checker->prom_)(std::move(self));
44+
conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
45+
BOOST_REDIS_CHECK_OP0(checker_->wait_timer_.cancel();)
46+
47+
// Wait before pinging again.
48+
checker_->ping_timer_.expires_after(checker_->ping_interval_);
49+
BOOST_ASIO_CORO_YIELD
50+
checker_->ping_timer_.async_wait(std::move(self));
51+
BOOST_REDIS_CHECK_OP0(;)
52+
}
53+
}
54+
};
55+
56+
template <class HealthChecker, class Connection>
57+
class check_timeout_op {
58+
public:
59+
HealthChecker* checker_ = nullptr;
60+
Connection* conn_ = nullptr;
61+
asio::coroutine coro_{};
62+
63+
template <class Self>
64+
void operator()(Self& self, system::error_code ec = {})
65+
{
66+
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
67+
{
68+
checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
69+
BOOST_ASIO_CORO_YIELD
70+
checker_->wait_timer_.async_wait(std::move(self));
71+
BOOST_REDIS_CHECK_OP0(;)
72+
73+
if (!checker_->resp_.has_value()) {
4874
self.complete({});
4975
return;
5076
}
5177

52-
checker->reset();
78+
if (checker_->resp_.value().empty()) {
79+
checker_->ping_timer_.cancel();
80+
conn_->cancel(operation::run);
81+
checker_->checker_has_exited_ = true;
82+
self.complete(error::pong_timeout);
83+
return;
84+
}
85+
86+
checker_->resp_.value().clear();
87+
88+
if (checker_->resp_.has_value()) {
89+
checker_->resp_.value().clear();
90+
}
91+
}
92+
}
93+
};
94+
95+
template <class HealthChecker, class Connection>
96+
class check_health_op {
97+
public:
98+
HealthChecker* checker_ = nullptr;
99+
Connection* conn_ = nullptr;
100+
asio::coroutine coro_{};
101+
102+
template <class Self>
103+
void
104+
operator()(
105+
Self& self,
106+
std::array<std::size_t, 2> order = {},
107+
system::error_code ec1 = {},
108+
system::error_code ec2 = {})
109+
{
110+
BOOST_ASIO_CORO_REENTER (coro_)
111+
{
112+
BOOST_ASIO_CORO_YIELD
113+
asio::experimental::make_parallel_group(
114+
[this](auto token) { return checker_->async_ping(*conn_, token); },
115+
[this](auto token) { return checker_->async_check_timeout(*conn_, token);}
116+
).async_wait(
117+
asio::experimental::wait_for_one(),
118+
std::move(self));
119+
120+
if (is_cancelled(self)) {
121+
self.complete(asio::error::operation_aborted);
122+
return;
123+
}
124+
125+
switch (order[0]) {
126+
case 0: self.complete(ec1); return;
127+
case 1: self.complete(ec2); return;
128+
default: BOOST_ASSERT(false);
129+
}
53130
}
54131
}
55132
};
56133

57134
template <class Executor>
58135
class health_checker {
59136
private:
60-
using promise_type = asio::experimental::promise<void(system::error_code, std::size_t), Executor>;
61137
using timer_type =
62138
asio::basic_waitable_timer<
63139
std::chrono::steady_clock,
@@ -68,9 +144,10 @@ class health_checker {
68144
health_checker(
69145
Executor ex,
70146
std::string const& msg,
71-
std::chrono::steady_clock::duration interval)
72-
: timer_{ex}
73-
, timeout_{interval}
147+
std::chrono::steady_clock::duration ping_interval)
148+
: ping_timer_{ex}
149+
, wait_timer_{ex}
150+
, ping_interval_{ping_interval}
74151
{
75152
req_.push("PING", msg);
76153
}
@@ -87,26 +164,41 @@ class health_checker {
87164
>(check_health_op<health_checker, Connection>{this, &conn}, token, conn);
88165
}
89166

90-
void reset()
167+
void cancel()
168+
{
169+
ping_timer_.cancel();
170+
wait_timer_.cancel();
171+
}
172+
173+
private:
174+
template <class Connection, class CompletionToken>
175+
auto async_ping(Connection& conn, CompletionToken token)
91176
{
92-
resp_.value().clear();
93-
prom_.reset();
177+
return asio::async_compose
178+
< CompletionToken
179+
, void(system::error_code)
180+
>(ping_op<health_checker, Connection>{this, &conn}, token, conn, ping_timer_);
94181
}
95182

96-
void cancel()
183+
template <class Connection, class CompletionToken>
184+
auto async_check_timeout(Connection& conn, CompletionToken token)
97185
{
98-
timer_.cancel();
99-
if (prom_)
100-
prom_.cancel();
186+
return asio::async_compose
187+
< CompletionToken
188+
, void(system::error_code)
189+
>(check_timeout_op<health_checker, Connection>{this, &conn}, token, conn, wait_timer_);
101190
}
102191

103-
private:
192+
template <class, class> friend class ping_op;
193+
template <class, class> friend class check_timeout_op;
104194
template <class, class> friend class check_health_op;
105-
timer_type timer_;
106-
std::optional<promise_type> prom_;
195+
196+
timer_type ping_timer_;
197+
timer_type wait_timer_;
107198
redis::request req_;
108199
redis::generic_response resp_;
109-
std::chrono::steady_clock::duration timeout_;
200+
std::chrono::steady_clock::duration ping_interval_;
201+
bool checker_has_exited_ = false;
110202
};
111203

112204
} // detail
@@ -120,14 +212,17 @@ class health_checker {
120212
*
121213
* @param conn A connection to the Redis server.
122214
* @param msg The message to be sent with the [PING](https://redis.io/commands/ping/) command. Seting a proper and unique id will help users identify which connections are active.
123-
* @param interval Ping interval.
215+
* @param ping_interval Ping ping_interval.
124216
* @param token The completion token
125217
*
126218
* The completion token must have the following signature
127219
*
128220
* @code
129221
* void f(system::error_code);
130222
* @endcode
223+
*
224+
* Completion occurs when a pong response is not receive within two
225+
* times the ping interval.
131226
*/
132227
template <
133228
class Connection,
@@ -137,12 +232,12 @@ auto
137232
async_check_health(
138233
Connection& conn,
139234
std::string const& msg = "Boost.Redis",
140-
std::chrono::steady_clock::duration interval = std::chrono::seconds{2},
235+
std::chrono::steady_clock::duration ping_interval = std::chrono::seconds{2},
141236
CompletionToken token = CompletionToken{})
142237
{
143238
using executor_type = typename Connection::executor_type;
144239
using health_checker_type = detail::health_checker<executor_type>;
145-
auto checker = std::make_shared<health_checker_type>(conn.get_executor(), msg, interval);
240+
auto checker = std::make_shared<health_checker_type>(conn.get_executor(), msg, ping_interval);
146241
return checker->async_check_health(conn, asio::consign(std::move(token), checker));
147242
}
148243

include/boost/redis/connection.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class basic_connection :
5151
basic_connection(executor_type ex)
5252
: base_type{ex}
5353
, stream_{ex}
54+
, reconnect_{true}
5455
{}
5556

5657
/// Contructs from a context.
@@ -227,6 +228,22 @@ class basic_connection :
227228
void reserve(std::size_t read, std::size_t write)
228229
{ base_type::reserve(read, write); }
229230

231+
/** @brief Enable reconnection
232+
*
233+
* This property plays any role only when used with
234+
* `boost::redis::async_run`.
235+
*/
236+
void enable_reconnection() noexcept {reconnect_ = true;}
237+
238+
/** @brief Disable reconnection
239+
*
240+
* This property plays any role only when used with
241+
* `boost::redis::async_run`.
242+
*/
243+
void disable_reconnection() noexcept {reconnect_ = false;}
244+
245+
bool reconnect() const noexcept {return reconnect_;}
246+
230247
private:
231248
using this_type = basic_connection<next_layer_type>;
232249

@@ -244,6 +261,7 @@ class basic_connection :
244261
auto lowest_layer() noexcept -> auto& { return stream_.lowest_layer(); }
245262

246263
Socket stream_;
264+
bool reconnect_;
247265
};
248266

249267
/** \brief A connection that uses a asio::ip::tcp::socket.

include/boost/redis/error.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ enum class error
6969

7070
/// Connect timeout
7171
connect_timeout,
72+
73+
/// Connect timeout
74+
pong_timeout,
7275
};
7376

7477
/** \internal

0 commit comments

Comments
 (0)