Skip to content

Commit 35d5070

Browse files
committed
Refactors reader_op to simplify sans-io.
1 parent e8b13bd commit 35d5070

File tree

4 files changed

+64
-59
lines changed

4 files changed

+64
-59
lines changed

include/boost/redis/connection.hpp

Lines changed: 53 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -252,79 +252,82 @@ struct reader_op {
252252

253253
Conn* conn_;
254254
Logger logger_;
255-
std::pair<tribool, std::size_t> res_{std::make_pair(std::make_optional(false), 0)};
255+
std::pair<tribool, std::size_t> res_{std::make_pair(std::nullopt, 0)};
256256
asio::coroutine coro{};
257257

258258
template <class Self>
259259
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
260260
{
261-
ignore_unused(n);
262-
263261
BOOST_ASIO_CORO_REENTER(coro) for (;;)
264262
{
265263
// Appends some data to the buffer if necessary.
266-
if (!res_.first.has_value() || conn_->mpx_.is_data_needed()) {
267-
if (conn_->use_ssl()) {
268-
BOOST_ASIO_CORO_YIELD
269-
async_append_some(
270-
conn_->next_layer(),
271-
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
272-
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
273-
std::move(self));
274-
} else {
275-
BOOST_ASIO_CORO_YIELD
276-
async_append_some(
277-
conn_->next_layer().next_layer(),
278-
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
279-
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
280-
std::move(self));
281-
}
282-
283-
logger_.on_read(ec, n);
284-
285-
// The connection is not viable after an error.
286-
if (ec) {
287-
logger_.trace("reader_op (1)", ec);
288-
conn_->cancel(operation::run);
289-
self.complete(ec);
290-
return;
291-
}
292-
293-
// Somebody might have canceled implicitly or explicitly
294-
// while we were suspended and after queueing so we have to
295-
// check.
296-
if (!conn_->is_open()) {
297-
logger_.trace("reader_op (2): connection is closed.");
298-
self.complete(ec);
299-
return;
300-
}
264+
if (conn_->use_ssl()) {
265+
BOOST_ASIO_CORO_YIELD
266+
async_append_some(
267+
conn_->next_layer(),
268+
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
269+
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
270+
std::move(self));
271+
} else {
272+
BOOST_ASIO_CORO_YIELD
273+
async_append_some(
274+
conn_->next_layer().next_layer(),
275+
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
276+
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
277+
std::move(self));
301278
}
302279

303-
res_ = conn_->mpx_.commit_read(ec);
280+
logger_.on_read(ec, n);
281+
282+
// The connection is not viable after an error.
304283
if (ec) {
305-
logger_.trace("reader_op (3)", ec);
284+
logger_.trace("reader_op (1)", ec);
306285
conn_->cancel(operation::run);
307286
self.complete(ec);
308287
return;
309288
}
310289

311-
if (res_.first.has_value() && res_.first.value()) {
312-
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
313-
BOOST_ASIO_CORO_YIELD
314-
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
315-
}
290+
// The connection might have been canceled while this op was
291+
// suspended or after queueing so we have to check.
292+
if (!conn_->is_open()) {
293+
logger_.trace("reader_op (2): connection is closed.");
294+
self.complete(ec);
295+
return;
296+
}
297+
298+
while (!conn_->mpx_.get_read_buffer().empty()) {
299+
res_ = conn_->mpx_.consume_next(ec);
316300

317301
if (ec) {
318-
logger_.trace("reader_op (4)", ec);
302+
logger_.trace("reader_op (3)", ec);
319303
conn_->cancel(operation::run);
320304
self.complete(ec);
321305
return;
322306
}
323307

324-
if (!conn_->is_open()) {
325-
logger_.trace("reader_op (5): connection is closed.");
326-
self.complete(asio::error::operation_aborted);
327-
return;
308+
if (!res_.first.has_value()) {
309+
// More data is needed.
310+
break;
311+
}
312+
313+
if (res_.first.value()) {
314+
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
315+
BOOST_ASIO_CORO_YIELD
316+
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
317+
}
318+
319+
if (ec) {
320+
logger_.trace("reader_op (4)", ec);
321+
conn_->cancel(operation::run);
322+
self.complete(ec);
323+
return;
324+
}
325+
326+
if (!conn_->is_open()) {
327+
logger_.trace("reader_op (5): connection is closed.");
328+
self.complete(asio::error::operation_aborted);
329+
return;
330+
}
328331
}
329332
}
330333
}

include/boost/redis/detail/multiplexer.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,10 @@ struct multiplexer {
113113
// they don't have a response e.g. SUBSCRIBE.
114114
auto commit_write() -> std::size_t;
115115

116+
// If the tribool contains no value more data is needed, otherwise
117+
// if the value is true the message consumed is a push.
116118
[[nodiscard]]
117-
auto commit_read(system::error_code& ec) -> std::pair<tribool, std::size_t>;
119+
auto consume_next(system::error_code& ec) -> std::pair<tribool, std::size_t>;
118120

119121
auto add(std::shared_ptr<elem> const& ptr) -> void;
120122
auto reset() -> void;
@@ -150,9 +152,9 @@ struct multiplexer {
150152
}
151153

152154
[[nodiscard]]
153-
auto is_data_needed() const noexcept -> bool
155+
auto get_read_buffer() const noexcept -> std::string const&
154156
{
155-
return std::empty(read_buffer_);
157+
return read_buffer_;
156158
}
157159

158160
// TODO: Change signature to receive an adapter instead of a

include/boost/redis/impl/multiplexer.ipp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void multiplexer::add(std::shared_ptr<elem> const& info)
8181
}
8282
}
8383

84-
std::pair<tribool, std::size_t> multiplexer::commit_read(system::error_code& ec)
84+
std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec)
8585
{
8686
// We arrive here in two states:
8787
//

test/test_low_level_sync_sans_io.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push)
261261
mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n";
262262

263263
boost::system::error_code ec;
264-
auto const ret = mpx.commit_read(ec);
264+
auto const ret = mpx.consume_next(ec);
265265

266266
BOOST_TEST(ret.first.value());
267267
BOOST_CHECK_EQUAL(ret.second, 16u);
@@ -286,12 +286,12 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more)
286286
mpx.get_read_buffer() = ">2\r\n+one\r";
287287

288288
boost::system::error_code ec;
289-
auto ret = mpx.commit_read(ec);
289+
auto ret = mpx.consume_next(ec);
290290

291291
BOOST_TEST(!ret.first.has_value());
292292

293293
mpx.get_read_buffer().append("\n+two\r\n");
294-
ret = mpx.commit_read(ec);
294+
ret = mpx.consume_next(ec);
295295

296296
BOOST_TEST(ret.first.value());
297297
BOOST_CHECK_EQUAL(ret.second, 16u);
@@ -381,9 +381,9 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline)
381381
// Simulates a socket read by putting some data in the read buffer.
382382
mpx.get_read_buffer().append("+one\r\n");
383383

384-
// Informs the multiplexer the read operation is concluded.
384+
// Consumes the next message in the read buffer.
385385
boost::system::error_code ec;
386-
auto const ret = mpx.commit_read(ec);
386+
auto const ret = mpx.consume_next(ec);
387387

388388
// The read operation should have been successfull.
389389
BOOST_TEST(ret.first.has_value());

0 commit comments

Comments
 (0)