Skip to content

Commit 7a08588

Browse files
committed
Progresses with the subscriber.
1 parent e7ff1ce commit 7a08588

17 files changed

+169
-79
lines changed

examples/cpp17_intro.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <iostream>
88
#include <boost/redis/run.hpp>
99
#include <boost/redis/address.hpp>
10+
#include <boost/redis/logger.hpp>
1011
#include <boost/redis/src.hpp>
1112

1213
namespace net = boost::asio;
@@ -15,6 +16,7 @@ using boost::redis::request;
1516
using boost::redis::response;
1617
using boost::redis::ignore_t;
1718
using boost::redis::async_run;
19+
using boost::redis::logger;
1820
using boost::redis::address;
1921
using namespace std::chrono_literals;
2022

@@ -39,7 +41,7 @@ auto main(int argc, char * argv[]) -> int
3941
net::io_context ioc;
4042
connection conn{ioc};
4143

42-
async_run(conn, addr, 10s, 10s, [&](auto){
44+
async_run(conn, addr, 10s, 10s, logger{}, [&](auto){
4345
conn.cancel();
4446
});
4547

examples/cpp17_intro_sync.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <boost/redis/request.hpp>
99
#include <boost/redis/check_health.hpp>
1010
#include <boost/redis/run.hpp>
11+
#include <boost/redis/logger.hpp>
1112
#include <boost/asio/deferred.hpp>
1213
#include <boost/asio/use_future.hpp>
1314
#include <tuple>
@@ -27,6 +28,7 @@ using boost::redis::response;
2728
using boost::redis::ignore_t;
2829
using boost::redis::async_run;
2930
using boost::redis::address;
31+
using boost::redis::logger;
3032
using boost::redis::async_check_health;
3133
using namespace std::chrono_literals;
3234

@@ -56,7 +58,7 @@ auto main(int argc, char * argv[]) -> int
5658
// Starts a thread that will can io_context::run on which the
5759
// connection will run.
5860
std::thread t{[&ioc, conn, addr]() {
59-
async_run(*conn, addr, 10s, 10s, [conn](auto){
61+
async_run(*conn, addr, 10s, 10s, logger{}, [conn](auto){
6062
conn->cancel();
6163
});
6264

examples/cpp20_subscriber.cpp

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,25 @@
44
* accompanying file LICENSE.txt)
55
*/
66

7-
#include <boost/redis/run.hpp>
8-
#include <boost/redis/check_health.hpp>
7+
#include <boost/redis/experimental/connector.hpp>
8+
#include <boost/redis/logger.hpp>
99
#include <boost/asio/awaitable.hpp>
10+
#include <boost/asio/detached.hpp>
1011
#include <boost/asio/use_awaitable.hpp>
12+
#include <boost/asio/co_spawn.hpp>
13+
#include <boost/asio/consign.hpp>
1114
#include <iostream>
1215

1316
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
14-
#include <boost/asio/experimental/awaitable_operators.hpp>
1517

1618
namespace net = boost::asio;
17-
using namespace net::experimental::awaitable_operators;
18-
using steady_timer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
19-
using boost::redis::request;
20-
using boost::redis::async_run;
21-
using boost::redis::generic_response;
22-
using boost::redis::async_check_health;
23-
using boost::redis::address;
24-
using connection = net::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
19+
namespace redis = boost::redis;
20+
using redis::generic_response;
21+
using redis::address;
22+
using redis::logger;
23+
using redis::experimental::async_connect;
24+
using redis::experimental::connect_config;
25+
using connection = net::use_awaitable_t<>::as_default_on_t<redis::connection>;
2526

2627
/* This example will subscribe and read pushes indefinitely.
2728
*
@@ -39,12 +40,16 @@ using connection = net::use_awaitable_t<>::as_default_on_t<boost::redis::connect
3940
* > CLIENT kill TYPE pubsub
4041
*/
4142

42-
// Receives pushes.
43+
// Receives server pushes.
4344
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
4445
{
4546
for (generic_response resp;;) {
4647
co_await conn->async_receive(resp);
47-
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " " << resp.value().at(3).value << std::endl;
48+
std::cout
49+
<< resp.value().at(1).value
50+
<< " " << resp.value().at(2).value
51+
<< " " << resp.value().at(3).value
52+
<< std::endl;
4853
resp.value().clear();
4954
}
5055
}
@@ -53,19 +58,10 @@ auto co_main(address const& addr) -> net::awaitable<void>
5358
{
5459
auto ex = co_await net::this_coro::executor;
5560
auto conn = std::make_shared<connection>(ex);
56-
steady_timer timer{ex};
57-
58-
request req;
59-
req.push("HELLO", 3);
60-
req.push("SUBSCRIBE", "channel");
61-
62-
// The loop will reconnect on connection lost. To exit type Ctrl-C twice.
63-
for (;;) {
64-
co_await ((async_run(*conn, addr) || async_check_health(*conn) || receiver(conn)) && conn->async_exec(req));
65-
conn->reset_stream();
66-
timer.expires_after(std::chrono::seconds{1});
67-
co_await timer.async_wait();
68-
}
61+
connect_config cfg;
62+
cfg.addr = addr;
63+
net::co_spawn(ex, receiver(conn), net::detached);
64+
redis::experimental::async_connect(*conn, cfg, logger{}, net::consign(net::detached, conn));
6965
}
7066

7167
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

include/boost/redis/check_health.hpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class check_health_op {
3838
{
3939
checker->prom_.emplace(conn->async_exec(checker->req_, checker->resp_, asio::experimental::use_promise));
4040

41-
checker->timer_.expires_after(checker->interval_);
41+
checker->timer_.expires_after(checker->timeout_);
4242
BOOST_ASIO_CORO_YIELD
4343
checker->timer_.async_wait(std::move(self));
4444
if (ec || is_cancelled(self) || checker->resp_.value().empty()) {
@@ -70,7 +70,7 @@ class health_checker {
7070
std::string const& msg,
7171
std::chrono::steady_clock::duration interval)
7272
: timer_{ex}
73-
, interval_{interval}
73+
, timeout_{interval}
7474
{
7575
req_.push("PING", msg);
7676
}
@@ -93,13 +93,20 @@ class health_checker {
9393
prom_.reset();
9494
}
9595

96+
void cancel()
97+
{
98+
timer_.cancel();
99+
if (prom_)
100+
prom_.cancel();
101+
}
102+
96103
private:
97104
template <class, class> friend class check_health_op;
98105
timer_type timer_;
99106
std::optional<promise_type> prom_;
100107
redis::request req_;
101108
redis::generic_response resp_;
102-
std::chrono::steady_clock::duration interval_;
109+
std::chrono::steady_clock::duration timeout_;
103110
};
104111

105112
} // detail

include/boost/redis/detail/runner.hpp

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ struct resolve_op {
4848
asio::experimental::wait_for_one(),
4949
std::move(self));
5050

51+
runner->logger_.on_resolve(ec1, res);
52+
5153
if (is_cancelled(self)) {
5254
self.complete(asio::error::operation_aborted);
5355
return;
@@ -86,10 +88,10 @@ struct connect_op {
8688

8789
template <class Self>
8890
void operator()( Self& self
89-
, std::array<std::size_t, 2> order = {}
90-
, system::error_code ec1 = {}
91-
, asio::ip::tcp::endpoint const& = {}
92-
, system::error_code ec2 = {})
91+
, std::array<std::size_t, 2> const& order = {}
92+
, system::error_code const& ec1 = {}
93+
, asio::ip::tcp::endpoint const& ep= {}
94+
, system::error_code const& ec2 = {})
9395
{
9496
BOOST_ASIO_CORO_REENTER (coro)
9597
{
@@ -105,6 +107,8 @@ struct connect_op {
105107
asio::experimental::wait_for_one(),
106108
std::move(self));
107109

110+
runner->logger_.on_connect(ec1, ep);
111+
108112
if (is_cancelled(self)) {
109113
self.complete(asio::error::operation_aborted);
110114
return;
@@ -160,10 +164,21 @@ struct runner_op {
160164
}
161165
};
162166

163-
template <class Executor>
167+
template <class Executor, class Logger>
164168
class runner {
165169
public:
166-
runner(Executor ex, address addr): resv_{ex}, timer_{ex}, addr_{addr} {}
170+
using timer_type =
171+
asio::basic_waitable_timer<
172+
std::chrono::steady_clock,
173+
asio::wait_traits<std::chrono::steady_clock>,
174+
Executor>;
175+
176+
runner(Executor ex, address addr, Logger l = Logger{})
177+
: resv_{ex}
178+
, timer_{ex}
179+
, addr_{addr}
180+
, logger_{l}
181+
{}
167182

168183
template <class CompletionToken>
169184
auto async_resolve(CompletionToken&& token)
@@ -197,14 +212,14 @@ class runner {
197212
>(runner_op<runner, Connection>{this, &conn, resolve_timeout, connect_timeout}, token, resv_);
198213
}
199214

215+
void cancel()
216+
{
217+
resv_.cancel();
218+
timer_.cancel();
219+
}
220+
200221
private:
201222
using resolver_type = asio::ip::basic_resolver<asio::ip::tcp, Executor>;
202-
using timer_type =
203-
asio::basic_waitable_timer<
204-
std::chrono::steady_clock,
205-
asio::wait_traits<std::chrono::steady_clock>,
206-
Executor>;
207-
208223

209224
template <class, class> friend struct runner_op;
210225
template <class, class> friend struct connect_op;
@@ -214,6 +229,7 @@ class runner {
214229
timer_type timer_;
215230
address addr_;
216231
asio::ip::tcp::resolver::results_type endpoints_;
232+
Logger logger_;
217233
};
218234

219235
} // boost::redis::detail

include/boost/redis/logger.hpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva ([email protected])
2+
*
3+
* Distributed under the Boost Software License, Version 1.0. (See
4+
* accompanying file LICENSE.txt)
5+
*/
6+
7+
#ifndef BOOST_REDIS_LOGGER_HPP
8+
#define BOOST_REDIS_LOGGER_HPP
9+
10+
#include <boost/asio/ip/tcp.hpp>
11+
#include <boost/system/error_code.hpp>
12+
#include <iostream>
13+
14+
namespace boost::redis {
15+
16+
// TODO: Move to ipp file.
17+
// TODO: Implement filter.
18+
class logger {
19+
public:
20+
void on_resolve(system::error_code const& ec, asio::ip::tcp::resolver::results_type const& res)
21+
{
22+
// TODO: Print the endpoints
23+
std::clog << "on_resolve: " << ec.message() << std::endl;
24+
}
25+
26+
void on_connect(system::error_code const& ec, asio::ip::tcp::endpoint const& ep)
27+
{
28+
// TODO: Print the endpoint
29+
std::clog << "on_connect: " << ec.message() << std::endl;
30+
}
31+
32+
void on_connection_lost()
33+
{
34+
std::clog << "on_connection_lost: " << std::endl;
35+
}
36+
37+
void on_hello(system::error_code const& ec)
38+
{
39+
std::clog << "on_hello: " << ec.message() << std::endl;
40+
}
41+
};
42+
43+
} // boost::redis
44+
45+
#endif // BOOST_REDIS_LOGGER_HPP

include/boost/redis/run.hpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@
77
#ifndef BOOST_REDIS_RUN_HPP
88
#define BOOST_REDIS_RUN_HPP
99

10-
// Has to included before promise.hpp to build on msvc.
1110
#include <boost/redis/detail/runner.hpp>
1211
#include <boost/redis/connection.hpp>
1312
#include <boost/redis/address.hpp>
13+
#include <boost/redis/logger.hpp>
1414
#include <boost/asio/compose.hpp>
1515
#include <boost/asio/consign.hpp>
1616
#include <memory>
1717
#include <chrono>
1818

19-
namespace boost::redis {
19+
namespace boost::redis
20+
{
2021

2122
/** @brief Call async_run on the connection.
2223
* @ingroup high-level-api
@@ -35,6 +36,7 @@ namespace boost::redis {
3536
*/
3637
template <
3738
class Socket,
39+
class Logger = logger,
3840
class CompletionToken = asio::default_completion_token_t<typename Socket::executor_type>
3941
>
4042
auto
@@ -43,11 +45,12 @@ async_run(
4345
address addr = address{"127.0.0.1", "6379"},
4446
std::chrono::steady_clock::duration resolve_timeout = std::chrono::seconds{10},
4547
std::chrono::steady_clock::duration connect_timeout = std::chrono::seconds{10},
48+
Logger l = Logger{},
4649
CompletionToken token = CompletionToken{})
4750
{
4851
using executor_type = typename Socket::executor_type;
49-
using runner_type = detail::runner<executor_type>;
50-
auto runner = std::make_shared<runner_type>(conn.get_executor(), addr);
52+
using runner_type = detail::runner<executor_type, Logger>;
53+
auto runner = std::make_shared<runner_type>(conn.get_executor(), addr, l);
5154

5255
return
5356
runner->async_run(

tests/conn_check_health.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66

77
#include <boost/redis/run.hpp>
8+
#include <boost/redis/logger.hpp>
89
#include <boost/redis/check_health.hpp>
910
#include <boost/system/errc.hpp>
1011
#define BOOST_TEST_MODULE check-health
@@ -22,6 +23,7 @@ using boost::redis::operation;
2223
using boost::redis::generic_response;
2324
using boost::redis::async_check_health;
2425
using boost::redis::async_run;
26+
using boost::redis::logger;
2527
using boost::redis::address;
2628
using namespace std::chrono_literals;
2729

@@ -104,12 +106,12 @@ BOOST_AUTO_TEST_CASE(check_health)
104106
generic_response resp;
105107
push_callback{&conn, &conn2, &resp, &req2}(); // Starts reading pushes.
106108

107-
async_run(conn, address{}, 10s, 10s, [](auto ec){
109+
async_run(conn, address{}, 10s, 10s, logger{}, [](auto ec){
108110
std::cout << "B" << std::endl;
109111
BOOST_TEST(!!ec);
110112
});
111113

112-
async_run(conn2, address{}, 10s, 10s, [](auto ec){
114+
async_run(conn2, address{}, 10s, 10s, logger{}, [](auto ec){
113115
std::cout << "C" << std::endl;
114116
BOOST_TEST(!!ec);
115117
});

0 commit comments

Comments
 (0)