Skip to content

Commit 91014b1

Browse files
committed
Simplifies parse ops and fixes health-check on reconnection.
1 parent 4f6f8b4 commit 91014b1

File tree

3 files changed

+99
-27
lines changed

3 files changed

+99
-27
lines changed

include/boost/redis/detail/health_checker.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ class health_checker {
167167
>
168168
auto async_check_health(Connection& conn, CompletionToken token = CompletionToken{})
169169
{
170+
checker_has_exited_ = false;
170171
return asio::async_compose
171172
< CompletionToken
172173
, void(system::error_code)

include/boost/redis/detail/read.hpp

Lines changed: 74 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
#include <boost/redis/resp3/type.hpp>
1111
#include <boost/redis/resp3/parser.hpp>
1212
#include <boost/redis/adapter/ignore.hpp>
13+
#include <boost/redis/detail/helper.hpp>
1314
#include <boost/asio/read.hpp>
1415
#include <boost/asio/compose.hpp>
1516
#include <boost/asio/coroutine.hpp>
16-
#include <boost/redis/detail/helper.hpp>
17+
#include <boost/asio/post.hpp>
1718

1819
#include <string_view>
1920
#include <limits>
@@ -27,6 +28,59 @@ std::string_view buffer_view(DynamicBuffer buf) noexcept
2728
return std::string_view{start, std::size(buf)};
2829
}
2930

31+
template <class AsyncReadStream, class DynamicBuffer>
32+
class append_some_op {
33+
private:
34+
AsyncReadStream& stream_;
35+
DynamicBuffer buf_;
36+
std::size_t size_ = 0;
37+
std::size_t tmp_ = 0;
38+
asio::coroutine coro_{};
39+
40+
public:
41+
append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
42+
: stream_ {stream}
43+
, buf_ {std::move(buf)}
44+
, size_{size}
45+
{ }
46+
47+
template <class Self>
48+
void operator()( Self& self
49+
, system::error_code ec = {}
50+
, std::size_t n = 0)
51+
{
52+
BOOST_ASIO_CORO_REENTER (coro_)
53+
{
54+
tmp_ = buf_.size();
55+
buf_.grow(size_);
56+
57+
BOOST_ASIO_CORO_YIELD
58+
stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
59+
if (ec) {
60+
self.complete(ec, 0);
61+
return;
62+
}
63+
64+
buf_.shrink(buf_.size() - tmp_ - n);
65+
self.complete({}, n);
66+
}
67+
}
68+
};
69+
70+
template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
71+
auto
72+
async_append_some(
73+
AsyncReadStream& stream,
74+
DynamicBuffer buffer,
75+
std::size_t size,
76+
CompletionToken&& token)
77+
{
78+
return asio::async_compose
79+
< CompletionToken
80+
, void(system::error_code, std::size_t)
81+
>(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
82+
}
83+
3084
template <
3185
class AsyncReadStream,
3286
class DynamicBuffer,
@@ -37,8 +91,8 @@ class parse_op {
3791
DynamicBuffer buf_;
3892
resp3::parser parser_;
3993
ResponseAdapter adapter_;
40-
std::size_t tmp_ = 0;
41-
resp3::parser::result res_;
94+
bool needs_rescheduling_ = true;
95+
system::error_code ec_;
4296
asio::coroutine coro_{};
4397

4498
static std::size_t const growth = 1024;
@@ -53,36 +107,29 @@ class parse_op {
53107
template <class Self>
54108
void operator()( Self& self
55109
, system::error_code ec = {}
56-
, std::size_t n = 0)
110+
, std::size_t = 0)
57111
{
58-
BOOST_ASIO_CORO_REENTER (coro_) for (;;) {
59-
60-
res_ = parser_.consume(buffer_view(buf_), ec);
61-
if (ec)
62-
return self.complete(ec, 0);
63-
64-
if (!res_.has_value()) {
65-
tmp_ = buf_.size();
66-
buf_.grow(parser_.get_suggested_buffer_growth(growth));
67-
112+
BOOST_ASIO_CORO_REENTER (coro_)
113+
{
114+
while (!resp3::parse(parser_, buffer_view(buf_), adapter_, ec)) {
115+
needs_rescheduling_ = false;
68116
BOOST_ASIO_CORO_YIELD
69-
stream_.async_read_some(
70-
buf_.data(tmp_, parser_.get_suggested_buffer_growth(growth)),
117+
async_append_some(
118+
stream_, buf_, parser_.get_suggested_buffer_growth(growth),
71119
std::move(self));
72-
BOOST_REDIS_CHECK_OP1(;);
73-
74-
buf_.shrink(buf_.size() - tmp_ - n);
75-
continue;
120+
if (ec) {
121+
self.complete(ec, 0);
122+
return;
123+
}
76124
}
77125

78-
adapter_(res_.value(), ec);
79-
if (ec)
80-
return self.complete(ec, 0);
81-
82-
if (parser_.done()) {
83-
self.complete({}, parser_.get_consumed());
84-
return;
126+
ec_ = ec;
127+
if (needs_rescheduling_) {
128+
BOOST_ASIO_CORO_YIELD
129+
asio::post(std::move(self));
85130
}
131+
132+
self.complete(ec_, parser_.get_consumed());
86133
}
87134
}
88135
};

include/boost/redis/resp3/parser.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,30 @@ class parser {
7373
auto consume(std::string_view view, system::error_code& ec) noexcept -> result;
7474
};
7575

76+
template <class Adapter>
77+
bool
78+
parse(
79+
resp3::parser& p,
80+
std::string_view const& msg,
81+
Adapter& adapter,
82+
system::error_code& ec)
83+
{
84+
while (!p.done()) {
85+
auto const res = p.consume(msg, ec);
86+
if (ec)
87+
return true;
88+
89+
if (!res)
90+
return false;
91+
92+
adapter(res.value(), ec);
93+
if (ec)
94+
return true;
95+
}
96+
97+
return true;
98+
}
99+
76100
} // boost::redis::resp3
77101

78102
#endif // BOOST_REDIS_RESP3_PARSER_HPP

0 commit comments

Comments
 (0)