Skip to content

Commit 2a4936a

Browse files
committed
Implements batch reads for server pushes.
1 parent 4547e1a commit 2a4936a

16 files changed

+323
-118
lines changed

README.md

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -674,22 +674,34 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php.
674674

675675
## Changelog
676676

677-
### develop (incorporates changes to conform the boost review and more)
677+
### develop
678678

679679
* Deprecates the `async_receive` overload that takes a response. Users
680-
should now first call `set_receive_response` to avoid contantly seting
681-
the same response.
680+
should now first call `set_receive_response` to avoid constantly and
681+
unnecessarily setting the same response.
682682

683683
* Uses `std::function` to type erase the response adapter. This change
684684
should not influence users in any way but allowed important
685-
simplification in the connections internals. This resulted in big
686-
performance improvement where one of my benchmark programs passed
687-
from 190k/s to 473k/s.
685+
simplification in the connections internals. This resulted in
686+
massive performance improvement.
688687

689688
* The connection has a new member `get_usage()` that returns the
690-
connection usage information, such as number of bytes writen,
689+
connection usage information, such as number of bytes written,
691690
received etc.
692691

692+
* There are massive performance improvements in the consuming of
693+
server pushes which are now communicated with an `asio::channel` and
694+
therefore can be buffered which avoids blocking the socket read-loop.
695+
Batch reads are also supported by means of `channel.try_send` and
696+
buffered messages can be consumed synchronously with
697+
`connection::receive`. The function `boost::redis::cancel_one` has
698+
been added to simplify processing multiple server pushes contained
699+
in the same `generic_response`. *IMPORTANT*: These changes may
700+
result in more than one push in the response when
701+
`connection::async_receive` resumes. The user must therefore be
702+
careful when calling `resp.clear()`: either ensure that all message
703+
have been processed or just use `consume_one`.
704+
693705
### v1.4.2 (incorporates changes to conform the boost review and more)
694706

695707
* Adds `boost::redis::config::database_index` to make it possible to

examples/cpp20_subscriber.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ namespace asio = boost::asio;
2222
using namespace std::chrono_literals;
2323
using boost::redis::request;
2424
using boost::redis::generic_response;
25+
using boost::redis::consume_one;
2526
using boost::redis::logger;
2627
using boost::redis::config;
2728
using boost::redis::ignore;
29+
using boost::redis::error;
2830
using boost::system::error_code;
2931
using boost::redis::connection;
3032
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
@@ -58,20 +60,28 @@ receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
5860
// Loop while reconnection is enabled
5961
while (conn->will_reconnect()) {
6062

61-
// Reconnect to channels.
63+
// Reconnect to the channels.
6264
co_await conn->async_exec(req, ignore, asio::deferred);
6365

6466
// Loop reading Redis pushs messages.
6567
for (error_code ec;;) {
66-
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
68+
// First tries to read any buffered pushes.
69+
conn->receive(ec);
70+
if (ec == error::sync_receive_push_failed) {
71+
ec = {};
72+
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
73+
}
74+
6775
if (ec)
6876
break; // Connection lost, break so we can reconnect to channels.
77+
6978
std::cout
7079
<< resp.value().at(1).value
7180
<< " " << resp.value().at(2).value
7281
<< " " << resp.value().at(3).value
7382
<< std::endl;
74-
resp.value().clear();
83+
84+
consume_one(resp);
7585
}
7686
}
7787
}

include/boost/redis/adapter/detail/adapters.hpp

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,7 @@ class simple_impl {
139139
void on_value_available(Result&) {}
140140

141141
template <class String>
142-
void
143-
operator()(
144-
Result& result,
145-
resp3::basic_node<String> const& n,
146-
system::error_code& ec)
142+
void operator()(Result& result, resp3::basic_node<String> const& n, system::error_code& ec)
147143
{
148144
if (is_aggregate(n.data_type)) {
149145
ec = redis::error::expects_resp3_simple_type;
@@ -164,11 +160,7 @@ class set_impl {
164160
{ hint_ = std::end(result); }
165161

166162
template <class String>
167-
void
168-
operator()(
169-
Result& result,
170-
resp3::basic_node<String> const& nd,
171-
system::error_code& ec)
163+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
172164
{
173165
if (is_aggregate(nd.data_type)) {
174166
if (nd.data_type != resp3::type::set)
@@ -200,11 +192,7 @@ class map_impl {
200192
{ current_ = std::end(result); }
201193

202194
template <class String>
203-
void
204-
operator()(
205-
Result& result,
206-
resp3::basic_node<String> const& nd,
207-
system::error_code& ec)
195+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
208196
{
209197
if (is_aggregate(nd.data_type)) {
210198
if (element_multiplicity(nd.data_type) != 2)
@@ -239,11 +227,7 @@ class vector_impl {
239227
void on_value_available(Result& ) { }
240228

241229
template <class String>
242-
void
243-
operator()(
244-
Result& result,
245-
resp3::basic_node<String> const& nd,
246-
system::error_code& ec)
230+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
247231
{
248232
if (is_aggregate(nd.data_type)) {
249233
auto const m = element_multiplicity(nd.data_type);
@@ -264,11 +248,7 @@ class array_impl {
264248
void on_value_available(Result& ) { }
265249

266250
template <class String>
267-
void
268-
operator()(
269-
Result& result,
270-
resp3::basic_node<String> const& nd,
271-
system::error_code& ec)
251+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
272252
{
273253
if (is_aggregate(nd.data_type)) {
274254
if (i_ != -1) {
@@ -300,11 +280,7 @@ struct list_impl {
300280
void on_value_available(Result& ) { }
301281

302282
template <class String>
303-
void
304-
operator()(
305-
Result& result,
306-
resp3::basic_node<String> const& nd,
307-
system::error_code& ec)
283+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
308284
{
309285
if (!is_aggregate(nd.data_type)) {
310286
BOOST_ASSERT(nd.aggregate_size == 1);
@@ -397,10 +373,7 @@ class wrapper<result<Result>> {
397373
}
398374

399375
template <class String>
400-
void
401-
operator()(
402-
resp3::basic_node<String> const& nd,
403-
system::error_code& ec)
376+
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
404377
{
405378
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
406379

include/boost/redis/adapter/detail/response_traits.hpp

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ namespace boost::redis::adapter::detail
2424
class ignore_adapter {
2525
public:
2626
template <class String>
27-
void
28-
operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
27+
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
2928
{
3029
switch (nd.data_type) {
3130
case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break;
@@ -61,11 +60,7 @@ class static_adapter {
6160
{ return size;}
6261

6362
template <class String>
64-
void
65-
operator()(
66-
std::size_t i,
67-
resp3::basic_node<String> const& nd,
68-
system::error_code& ec)
63+
void operator()(std::size_t i, resp3::basic_node<String> const& nd, system::error_code& ec)
6964
{
7065
using std::visit;
7166
// I am usure whether this should be an error or an assertion.
@@ -91,11 +86,7 @@ class vector_adapter {
9186
{ return static_cast<std::size_t>(-1);}
9287

9388
template <class String>
94-
void
95-
operator()(
96-
std::size_t,
97-
resp3::basic_node<String> const& nd,
98-
system::error_code& ec)
89+
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
9990
{
10091
adapter_(nd, ec);
10192
}

include/boost/redis/connection.hpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,23 @@ class basic_connection {
188188
auto async_receive(CompletionToken token = CompletionToken{})
189189
{ return impl_.async_receive(std::move(token)); }
190190

191+
192+
/** @brief Receives server pushes synchronously without blocking.
193+
*
194+
* Receives a server push synchronously by calling `try_receive` on
195+
* the underlying channel. If the operation fails because
196+
* `try_receive` returns `false`, `ec` will be set to
197+
* `boost::redis::error::sync_receive_push_failed`.
198+
*
199+
* @param ec Contains the error if any occurred.
200+
*
201+
* @returns The number of bytes read from the socket.
202+
*/
203+
std::size_t receive(system::error_code& ec)
204+
{
205+
return impl_.receive(ec);
206+
}
207+
191208
template <
192209
class Response = ignore_t,
193210
class CompletionToken = asio::default_completion_token_t<executor_type>
@@ -367,6 +384,12 @@ class connection {
367384
auto async_receive(CompletionToken token)
368385
{ return impl_.async_receive(std::move(token)); }
369386

387+
/// Calls `boost::redis::basic_connection::receive`.
388+
std::size_t receive(system::error_code& ec)
389+
{
390+
return impl_.receive(ec);
391+
}
392+
370393
/// Calls `boost::redis::basic_connection::async_exec`.
371394
template <class Response, class CompletionToken>
372395
auto async_exec(request const& req, Response& resp, CompletionToken token)

include/boost/redis/detail/connection_base.hpp

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,8 +348,10 @@ struct reader_op {
348348
}
349349

350350
if (res_.first == parse_result::push) {
351-
BOOST_ASIO_CORO_YIELD
352-
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
351+
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
352+
BOOST_ASIO_CORO_YIELD
353+
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
354+
}
353355

354356
if (ec) {
355357
logger_.trace("reader-op: error. Exiting ...");
@@ -398,7 +400,7 @@ class connection_base {
398400
: ctx_{method}
399401
, stream_{std::make_unique<next_layer_type>(ex, ctx_)}
400402
, writer_timer_{ex}
401-
, receive_channel_{ex}
403+
, receive_channel_{ex, 256}
402404
, runner_{ex, {}}
403405
, dbuf_{read_buffer_, max_read_size}
404406
{
@@ -470,6 +472,26 @@ class connection_base {
470472
auto async_receive(CompletionToken token)
471473
{ return receive_channel_.async_receive(std::move(token)); }
472474

475+
std::size_t receive(system::error_code& ec)
476+
{
477+
std::size_t size = 0;
478+
479+
auto f = [&](system::error_code const& ec2, std::size_t n)
480+
{
481+
ec = ec2;
482+
size = n;
483+
};
484+
485+
auto const res = receive_channel_.try_receive(f);
486+
if (ec)
487+
return 0;
488+
489+
if (!res)
490+
ec = error::sync_receive_push_failed;
491+
492+
return size;
493+
}
494+
473495
template <class Logger, class CompletionToken>
474496
auto async_run(config const& cfg, Logger l, CompletionToken token)
475497
{

include/boost/redis/error.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ enum class error
7575

7676
/// SSL handshake timeout
7777
ssl_handshake_timeout,
78+
79+
/// Can't receive push synchronously without blocking
80+
sync_receive_push_failed,
81+
82+
/// Incompatible node depth.
83+
incompatible_node_depth,
7884
};
7985

8086
/** \internal

include/boost/redis/impl/error.ipp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ struct error_category_impl : system::error_category {
4141
case error::resolve_timeout: return "Resolve timeout.";
4242
case error::connect_timeout: return "Connect timeout.";
4343
case error::pong_timeout: return "Pong timeout.";
44+
case error::ssl_handshake_timeout: return "SSL handshake timeout.";
45+
case error::sync_receive_push_failed: return "Can't receive server push synchronously without blocking.";
46+
case error::incompatible_node_depth: return "Incompatible node depth.";
4447
default: BOOST_ASSERT(false); return "Boost.Redis error.";
4548
}
4649
}

include/boost/redis/impl/response.ipp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva ([email protected])
2+
*
3+
* Distributed under the Boost Software License, Version 1.0. (See
4+
* accompanying file LICENSE.txt)
5+
*/
6+
7+
#include <boost/redis/response.hpp>
8+
#include <boost/redis/error.hpp>
9+
#include <boost/assert.hpp>
10+
11+
namespace boost::redis
12+
{
13+
14+
void consume_one(generic_response& r, system::error_code& ec)
15+
{
16+
if (r.has_error())
17+
return; // Nothing to consume.
18+
19+
if (std::empty(r.value()))
20+
return; // Nothing to consume.
21+
22+
auto const depth = r.value().front().depth;
23+
24+
// To simplify we will refuse to consume any data-type that is not
25+
// a root node. I think there is no use for that and it is complex
26+
// since it requires updating parent nodes.
27+
if (depth != 0) {
28+
ec = error::incompatible_node_depth;
29+
return;
30+
}
31+
32+
auto f = [depth](auto const& e)
33+
{ return e.depth == depth; };
34+
35+
auto match = std::find_if(std::next(std::cbegin(r.value())), std::cend(r.value()), f);
36+
37+
r.value().erase(std::cbegin(r.value()), match);
38+
}
39+
40+
void consume_one(generic_response& r)
41+
{
42+
system::error_code ec;
43+
consume_one(r, ec);
44+
if (ec)
45+
throw system::system_error(ec);
46+
}
47+
48+
} // boost::redis::resp3

0 commit comments

Comments
 (0)