Skip to content

Commit 6748f76

Browse files
authored
Merge pull request #153 from boostorg/152-enable-reading-server-pushes-in-batches
152 enable reading server pushes in batches
2 parents 44a608c + 2a4936a commit 6748f76

17 files changed

+362
-126
lines changed

README.md

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -674,22 +674,34 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php.
674674

675675
## Changelog
676676

677-
### develop (incorporates changes to conform the boost review and more)
677+
### develop
678678

679679
* Deprecates the `async_receive` overload that takes a response. Users
680-
should now first call `set_receive_response` to avoid contantly seting
681-
the same response.
680+
should now first call `set_receive_response` to avoid constantly and
681+
unnecessarily setting the same response.
682682

683683
* Uses `std::function` to type erase the response adapter. This change
684684
should not influence users in any way but allowed important
685-
simplification in the connections internals. This resulted in big
686-
performance improvement where one of my benchmark programs passed
687-
from 190k/s to 473k/s.
685+
simplification in the connections internals. This resulted in
686+
massive performance improvement.
688687

689688
* The connection has a new member `get_usage()` that returns the
690-
connection usage information, such as number of bytes writen,
689+
connection usage information, such as number of bytes written,
691690
received etc.
692691

692+
* There are massive performance improvements in the consuming of
693+
server pushes which are now communicated with an `asio::channel` and
694+
therefore can be buffered which avoids blocking the socket read-loop.
695+
Batch reads are also supported by means of `channel.try_send` and
696+
buffered messages can be consumed synchronously with
697+
`connection::receive`. The function `boost::redis::cancel_one` has
698+
been added to simplify processing multiple server pushes contained
699+
in the same `generic_response`. *IMPORTANT*: These changes may
700+
result in more than one push in the response when
701+
`connection::async_receive` resumes. The user must therefore be
702+
careful when calling `resp.clear()`: either ensure that all message
703+
have been processed or just use `consume_one`.
704+
693705
### v1.4.2 (incorporates changes to conform the boost review and more)
694706

695707
* Adds `boost::redis::config::database_index` to make it possible to

examples/cpp20_subscriber.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ namespace asio = boost::asio;
2222
using namespace std::chrono_literals;
2323
using boost::redis::request;
2424
using boost::redis::generic_response;
25+
using boost::redis::consume_one;
2526
using boost::redis::logger;
2627
using boost::redis::config;
2728
using boost::redis::ignore;
29+
using boost::redis::error;
2830
using boost::system::error_code;
2931
using boost::redis::connection;
3032
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
@@ -58,20 +60,28 @@ receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
5860
// Loop while reconnection is enabled
5961
while (conn->will_reconnect()) {
6062

61-
// Reconnect to channels.
63+
// Reconnect to the channels.
6264
co_await conn->async_exec(req, ignore, asio::deferred);
6365

6466
// Loop reading Redis pushs messages.
6567
for (error_code ec;;) {
66-
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
68+
// First tries to read any buffered pushes.
69+
conn->receive(ec);
70+
if (ec == error::sync_receive_push_failed) {
71+
ec = {};
72+
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
73+
}
74+
6775
if (ec)
6876
break; // Connection lost, break so we can reconnect to channels.
77+
6978
std::cout
7079
<< resp.value().at(1).value
7180
<< " " << resp.value().at(2).value
7281
<< " " << resp.value().at(3).value
7382
<< std::endl;
74-
resp.value().clear();
83+
84+
consume_one(resp);
7585
}
7686
}
7787
}

include/boost/redis/adapter/detail/adapters.hpp

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ class general_aggregate {
9292

9393
public:
9494
explicit general_aggregate(Result* c = nullptr): result_(c) {}
95-
void operator()(resp3::basic_node<std::string_view> const& nd, system::error_code&)
95+
template <class String>
96+
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
9697
{
9798
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
9899
switch (nd.data_type) {
@@ -114,7 +115,8 @@ class general_simple {
114115
public:
115116
explicit general_simple(Node* t = nullptr) : result_(t) {}
116117

117-
void operator()(resp3::basic_node<std::string_view> const& nd, system::error_code&)
118+
template <class String>
119+
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
118120
{
119121
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
120122
switch (nd.data_type) {
@@ -136,11 +138,8 @@ class simple_impl {
136138
public:
137139
void on_value_available(Result&) {}
138140

139-
void
140-
operator()(
141-
Result& result,
142-
resp3::basic_node<std::string_view> const& n,
143-
system::error_code& ec)
141+
template <class String>
142+
void operator()(Result& result, resp3::basic_node<String> const& n, system::error_code& ec)
144143
{
145144
if (is_aggregate(n.data_type)) {
146145
ec = redis::error::expects_resp3_simple_type;
@@ -160,11 +159,8 @@ class set_impl {
160159
void on_value_available(Result& result)
161160
{ hint_ = std::end(result); }
162161

163-
void
164-
operator()(
165-
Result& result,
166-
resp3::basic_node<std::string_view> const& nd,
167-
system::error_code& ec)
162+
template <class String>
163+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
168164
{
169165
if (is_aggregate(nd.data_type)) {
170166
if (nd.data_type != resp3::type::set)
@@ -195,11 +191,8 @@ class map_impl {
195191
void on_value_available(Result& result)
196192
{ current_ = std::end(result); }
197193

198-
void
199-
operator()(
200-
Result& result,
201-
resp3::basic_node<std::string_view> const& nd,
202-
system::error_code& ec)
194+
template <class String>
195+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
203196
{
204197
if (is_aggregate(nd.data_type)) {
205198
if (element_multiplicity(nd.data_type) != 2)
@@ -233,11 +226,8 @@ class vector_impl {
233226
public:
234227
void on_value_available(Result& ) { }
235228

236-
void
237-
operator()(
238-
Result& result,
239-
resp3::basic_node<std::string_view> const& nd,
240-
system::error_code& ec)
229+
template <class String>
230+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
241231
{
242232
if (is_aggregate(nd.data_type)) {
243233
auto const m = element_multiplicity(nd.data_type);
@@ -257,11 +247,8 @@ class array_impl {
257247
public:
258248
void on_value_available(Result& ) { }
259249

260-
void
261-
operator()(
262-
Result& result,
263-
resp3::basic_node<std::string_view> const& nd,
264-
system::error_code& ec)
250+
template <class String>
251+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
265252
{
266253
if (is_aggregate(nd.data_type)) {
267254
if (i_ != -1) {
@@ -292,11 +279,8 @@ struct list_impl {
292279

293280
void on_value_available(Result& ) { }
294281

295-
void
296-
operator()(
297-
Result& result,
298-
resp3::basic_node<std::string_view> const& nd,
299-
system::error_code& ec)
282+
template <class String>
283+
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
300284
{
301285
if (!is_aggregate(nd.data_type)) {
302286
BOOST_ASSERT(nd.aggregate_size == 1);
@@ -365,7 +349,8 @@ class wrapper<result<Result>> {
365349
response_type* result_;
366350
typename impl_map<Result>::type impl_;
367351

368-
bool set_if_resp3_error(resp3::basic_node<std::string_view> const& nd) noexcept
352+
template <class String>
353+
bool set_if_resp3_error(resp3::basic_node<String> const& nd) noexcept
369354
{
370355
switch (nd.data_type) {
371356
case resp3::type::null:
@@ -387,10 +372,8 @@ class wrapper<result<Result>> {
387372
}
388373
}
389374

390-
void
391-
operator()(
392-
resp3::basic_node<std::string_view> const& nd,
393-
system::error_code& ec)
375+
template <class String>
376+
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
394377
{
395378
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
396379

@@ -414,7 +397,8 @@ class wrapper<result<std::optional<T>>> {
414397
response_type* result_;
415398
typename impl_map<T>::type impl_{};
416399

417-
bool set_if_resp3_error(resp3::basic_node<std::string_view> const& nd) noexcept
400+
template <class String>
401+
bool set_if_resp3_error(resp3::basic_node<String> const& nd) noexcept
418402
{
419403
switch (nd.data_type) {
420404
case resp3::type::blob_error:
@@ -429,9 +413,10 @@ class wrapper<result<std::optional<T>>> {
429413
public:
430414
explicit wrapper(response_type* o = nullptr) : result_(o) {}
431415

416+
template <class String>
432417
void
433418
operator()(
434-
resp3::basic_node<std::string_view> const& nd,
419+
resp3::basic_node<String> const& nd,
435420
system::error_code& ec)
436421
{
437422
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");

include/boost/redis/adapter/detail/response_traits.hpp

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ namespace boost::redis::adapter::detail
2323

2424
class ignore_adapter {
2525
public:
26-
void
27-
operator()(std::size_t, resp3::basic_node<std::string_view> const& nd, system::error_code& ec)
26+
template <class String>
27+
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
2828
{
2929
switch (nd.data_type) {
3030
case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break;
@@ -59,11 +59,8 @@ class static_adapter {
5959
auto get_supported_response_size() const noexcept
6060
{ return size;}
6161

62-
void
63-
operator()(
64-
std::size_t i,
65-
resp3::basic_node<std::string_view> const& nd,
66-
system::error_code& ec)
62+
template <class String>
63+
void operator()(std::size_t i, resp3::basic_node<String> const& nd, system::error_code& ec)
6764
{
6865
using std::visit;
6966
// I am usure whether this should be an error or an assertion.
@@ -88,11 +85,8 @@ class vector_adapter {
8885
get_supported_response_size() const noexcept
8986
{ return static_cast<std::size_t>(-1);}
9087

91-
void
92-
operator()(
93-
std::size_t,
94-
resp3::basic_node<std::string_view> const& nd,
95-
system::error_code& ec)
88+
template <class String>
89+
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
9690
{
9791
adapter_(nd, ec);
9892
}
@@ -142,7 +136,8 @@ class wrapper {
142136
public:
143137
explicit wrapper(Adapter adapter) : adapter_{adapter} {}
144138

145-
void operator()(resp3::basic_node<std::string_view> const& nd, system::error_code& ec)
139+
template <class String>
140+
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
146141
{ return adapter_(0, nd, ec); }
147142

148143
[[nodiscard]]

include/boost/redis/adapter/detail/result_traits.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ class static_aggregate_adapter<result<Tuple>> {
116116
}
117117
}
118118

119-
void count(resp3::basic_node<std::string_view> const& nd)
119+
template <class String>
120+
void count(resp3::basic_node<String> const& nd)
120121
{
121122
if (nd.depth == 1) {
122123
if (is_aggregate(nd.data_type))
@@ -131,7 +132,8 @@ class static_aggregate_adapter<result<Tuple>> {
131132
++i_;
132133
}
133134

134-
void operator()(resp3::basic_node<std::string_view> const& nd, system::error_code& ec)
135+
template <class String>
136+
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
135137
{
136138
using std::visit;
137139

include/boost/redis/connection.hpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,23 @@ class basic_connection {
188188
auto async_receive(CompletionToken token = CompletionToken{})
189189
{ return impl_.async_receive(std::move(token)); }
190190

191+
192+
/** @brief Receives server pushes synchronously without blocking.
193+
*
194+
* Receives a server push synchronously by calling `try_receive` on
195+
* the underlying channel. If the operation fails because
196+
* `try_receive` returns `false`, `ec` will be set to
197+
* `boost::redis::error::sync_receive_push_failed`.
198+
*
199+
* @param ec Contains the error if any occurred.
200+
*
201+
* @returns The number of bytes read from the socket.
202+
*/
203+
std::size_t receive(system::error_code& ec)
204+
{
205+
return impl_.receive(ec);
206+
}
207+
191208
template <
192209
class Response = ignore_t,
193210
class CompletionToken = asio::default_completion_token_t<executor_type>
@@ -367,6 +384,12 @@ class connection {
367384
auto async_receive(CompletionToken token)
368385
{ return impl_.async_receive(std::move(token)); }
369386

387+
/// Calls `boost::redis::basic_connection::receive`.
388+
std::size_t receive(system::error_code& ec)
389+
{
390+
return impl_.receive(ec);
391+
}
392+
370393
/// Calls `boost::redis::basic_connection::async_exec`.
371394
template <class Response, class CompletionToken>
372395
auto async_exec(request const& req, Response& resp, CompletionToken token)

include/boost/redis/detail/connection_base.hpp

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,8 +348,10 @@ struct reader_op {
348348
}
349349

350350
if (res_.first == parse_result::push) {
351-
BOOST_ASIO_CORO_YIELD
352-
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
351+
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
352+
BOOST_ASIO_CORO_YIELD
353+
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
354+
}
353355

354356
if (ec) {
355357
logger_.trace("reader-op: error. Exiting ...");
@@ -398,7 +400,7 @@ class connection_base {
398400
: ctx_{method}
399401
, stream_{std::make_unique<next_layer_type>(ex, ctx_)}
400402
, writer_timer_{ex}
401-
, receive_channel_{ex}
403+
, receive_channel_{ex, 256}
402404
, runner_{ex, {}}
403405
, dbuf_{read_buffer_, max_read_size}
404406
{
@@ -470,6 +472,26 @@ class connection_base {
470472
auto async_receive(CompletionToken token)
471473
{ return receive_channel_.async_receive(std::move(token)); }
472474

475+
std::size_t receive(system::error_code& ec)
476+
{
477+
std::size_t size = 0;
478+
479+
auto f = [&](system::error_code const& ec2, std::size_t n)
480+
{
481+
ec = ec2;
482+
size = n;
483+
};
484+
485+
auto const res = receive_channel_.try_receive(f);
486+
if (ec)
487+
return 0;
488+
489+
if (!res)
490+
ec = error::sync_receive_push_failed;
491+
492+
return size;
493+
}
494+
473495
template <class Logger, class CompletionToken>
474496
auto async_run(config const& cfg, Logger l, CompletionToken token)
475497
{

0 commit comments

Comments
 (0)