Skip to content

Commit 479068e

Browse files
authored
Merge pull request #258 from boostorg/refactoring_clean_code
Refactors reader_op to simplify sans-io.
2 parents b58e4f9 + 35d5070 commit 479068e

File tree

4 files changed

+64
-59
lines changed

4 files changed

+64
-59
lines changed

include/boost/redis/connection.hpp

Lines changed: 53 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -253,79 +253,82 @@ struct reader_op {
253253

254254
Conn* conn_;
255255
Logger logger_;
256-
std::pair<tribool, std::size_t> res_{std::make_pair(std::make_optional(false), 0)};
256+
std::pair<tribool, std::size_t> res_{std::make_pair(std::nullopt, 0)};
257257
asio::coroutine coro{};
258258

259259
template <class Self>
260260
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
261261
{
262-
ignore_unused(n);
263-
264262
BOOST_ASIO_CORO_REENTER(coro) for (;;)
265263
{
266264
// Appends some data to the buffer if necessary.
267-
if (!res_.first.has_value() || conn_->mpx_.is_data_needed()) {
268-
if (conn_->use_ssl()) {
269-
BOOST_ASIO_CORO_YIELD
270-
async_append_some(
271-
conn_->next_layer(),
272-
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
273-
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
274-
std::move(self));
275-
} else {
276-
BOOST_ASIO_CORO_YIELD
277-
async_append_some(
278-
conn_->next_layer().next_layer(),
279-
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
280-
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
281-
std::move(self));
282-
}
283-
284-
logger_.on_read(ec, n);
285-
286-
// The connection is not viable after an error.
287-
if (ec) {
288-
logger_.trace("reader_op (1)", ec);
289-
conn_->cancel(operation::run);
290-
self.complete(ec);
291-
return;
292-
}
293-
294-
// Somebody might have canceled implicitly or explicitly
295-
// while we were suspended and after queueing so we have to
296-
// check.
297-
if (!conn_->is_open()) {
298-
logger_.trace("reader_op (2): connection is closed.");
299-
self.complete(ec);
300-
return;
301-
}
265+
if (conn_->use_ssl()) {
266+
BOOST_ASIO_CORO_YIELD
267+
async_append_some(
268+
conn_->next_layer(),
269+
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
270+
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
271+
std::move(self));
272+
} else {
273+
BOOST_ASIO_CORO_YIELD
274+
async_append_some(
275+
conn_->next_layer().next_layer(),
276+
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
277+
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
278+
std::move(self));
302279
}
303280

304-
res_ = conn_->mpx_.commit_read(ec);
281+
logger_.on_read(ec, n);
282+
283+
// The connection is not viable after an error.
305284
if (ec) {
306-
logger_.trace("reader_op (3)", ec);
285+
logger_.trace("reader_op (1)", ec);
307286
conn_->cancel(operation::run);
308287
self.complete(ec);
309288
return;
310289
}
311290

312-
if (res_.first.has_value() && res_.first.value()) {
313-
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
314-
BOOST_ASIO_CORO_YIELD
315-
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
316-
}
291+
// The connection might have been canceled while this op was
292+
// suspended or after queueing so we have to check.
293+
if (!conn_->is_open()) {
294+
logger_.trace("reader_op (2): connection is closed.");
295+
self.complete(ec);
296+
return;
297+
}
298+
299+
while (!conn_->mpx_.get_read_buffer().empty()) {
300+
res_ = conn_->mpx_.consume_next(ec);
317301

318302
if (ec) {
319-
logger_.trace("reader_op (4)", ec);
303+
logger_.trace("reader_op (3)", ec);
320304
conn_->cancel(operation::run);
321305
self.complete(ec);
322306
return;
323307
}
324308

325-
if (!conn_->is_open()) {
326-
logger_.trace("reader_op (5): connection is closed.");
327-
self.complete(asio::error::operation_aborted);
328-
return;
309+
if (!res_.first.has_value()) {
310+
// More data is needed.
311+
break;
312+
}
313+
314+
if (res_.first.value()) {
315+
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
316+
BOOST_ASIO_CORO_YIELD
317+
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
318+
}
319+
320+
if (ec) {
321+
logger_.trace("reader_op (4)", ec);
322+
conn_->cancel(operation::run);
323+
self.complete(ec);
324+
return;
325+
}
326+
327+
if (!conn_->is_open()) {
328+
logger_.trace("reader_op (5): connection is closed.");
329+
self.complete(asio::error::operation_aborted);
330+
return;
331+
}
329332
}
330333
}
331334
}

include/boost/redis/detail/multiplexer.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,10 @@ struct multiplexer {
113113
// they don't have a response e.g. SUBSCRIBE.
114114
auto commit_write() -> std::size_t;
115115

116+
// If the tribool contains no value more data is needed, otherwise
117+
// if the value is true the message consumed is a push.
116118
[[nodiscard]]
117-
auto commit_read(system::error_code& ec) -> std::pair<tribool, std::size_t>;
119+
auto consume_next(system::error_code& ec) -> std::pair<tribool, std::size_t>;
118120

119121
auto add(std::shared_ptr<elem> const& ptr) -> void;
120122
auto reset() -> void;
@@ -150,9 +152,9 @@ struct multiplexer {
150152
}
151153

152154
[[nodiscard]]
153-
auto is_data_needed() const noexcept -> bool
155+
auto get_read_buffer() const noexcept -> std::string const&
154156
{
155-
return std::empty(read_buffer_);
157+
return read_buffer_;
156158
}
157159

158160
// TODO: Change signature to receive an adapter instead of a

include/boost/redis/impl/multiplexer.ipp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void multiplexer::add(std::shared_ptr<elem> const& info)
8181
}
8282
}
8383

84-
std::pair<tribool, std::size_t> multiplexer::commit_read(system::error_code& ec)
84+
std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec)
8585
{
8686
// We arrive here in two states:
8787
//

test/test_low_level_sync_sans_io.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push)
261261
mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n";
262262

263263
boost::system::error_code ec;
264-
auto const ret = mpx.commit_read(ec);
264+
auto const ret = mpx.consume_next(ec);
265265

266266
BOOST_TEST(ret.first.value());
267267
BOOST_CHECK_EQUAL(ret.second, 16u);
@@ -286,12 +286,12 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more)
286286
mpx.get_read_buffer() = ">2\r\n+one\r";
287287

288288
boost::system::error_code ec;
289-
auto ret = mpx.commit_read(ec);
289+
auto ret = mpx.consume_next(ec);
290290

291291
BOOST_TEST(!ret.first.has_value());
292292

293293
mpx.get_read_buffer().append("\n+two\r\n");
294-
ret = mpx.commit_read(ec);
294+
ret = mpx.consume_next(ec);
295295

296296
BOOST_TEST(ret.first.value());
297297
BOOST_CHECK_EQUAL(ret.second, 16u);
@@ -381,9 +381,9 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline)
381381
// Simulates a socket read by putting some data in the read buffer.
382382
mpx.get_read_buffer().append("+one\r\n");
383383

384-
// Informs the multiplexer the read operation is concluded.
384+
// Consumes the next message in the read buffer.
385385
boost::system::error_code ec;
386-
auto const ret = mpx.commit_read(ec);
386+
auto const ret = mpx.consume_next(ec);
387387

388388
// The read operation should have been successfull.
389389
BOOST_TEST(ret.first.has_value());

0 commit comments

Comments
 (0)