Skip to content

Commit 1b60eeb

Browse files
committed
Makes the connection full-duplex.
1 parent b93f361 commit 1b60eeb

File tree

4 files changed

+57
-39
lines changed

4 files changed

+57
-39
lines changed

include/boost/redis/detail/connection_base.hpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ class connection_base {
190190
// Notice this must come before the for-each below.
191191
cancel_push_requests();
192192

193+
// There is small optimization possible here: traverse only the
194+
// partition of unwritten requests instead of them all.
193195
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
194196
if (ptr->is_staged())
195197
ptr->mark_written();
@@ -227,6 +229,9 @@ class connection_base {
227229
action_ = action::stop;
228230
}
229231

232+
[[nodiscard]] auto is_waiting_write() const noexcept
233+
{ return !is_written() && !is_staged(); }
234+
230235
[[nodiscard]] auto is_written() const noexcept
231236
{ return status_ == status::written; }
232237

@@ -308,19 +313,24 @@ class connection_base {
308313
reqs_.erase(point, std::end(reqs_));
309314
}
310315

316+
[[nodiscard]] bool is_writing() const noexcept
317+
{
318+
return !write_buffer_.empty();
319+
}
320+
311321
void add_request_info(std::shared_ptr<req_info> const& info)
312322
{
313323
reqs_.push_back(info);
314324

315325
if (info->get_request().has_hello_priority()) {
316326
auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
317-
return !e->is_written() && !e->is_staged();
327+
return e->is_waiting_write();
318328
});
319329

320330
std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
321331
}
322332

323-
if (derived().is_open() && !is_waiting_response() && write_buffer_.empty())
333+
if (derived().is_open() && !is_writing())
324334
writer_timer_.cancel();
325335
}
326336

@@ -360,22 +370,29 @@ class connection_base {
360370
ri.mark_staged();
361371
}
362372

363-
void coalesce_requests()
373+
[[nodiscard]] bool coalesce_requests()
364374
{
365-
// Coalesce the requests and marks them staged. After a
375+
// Coalesces the requests and marks them staged. After a
366376
// successful write staged requests will be marked as written.
367-
BOOST_ASSERT(write_buffer_.empty());
368-
BOOST_ASSERT(!reqs_.empty());
377+
std::size_t pos = 0;
378+
for (; pos < std::size(reqs_); ++pos)
379+
if (reqs_.at(pos)->is_waiting_write())
380+
break;
381+
382+
if (pos == std::size(reqs_))
383+
return false;
369384

370-
stage_request(*reqs_.at(0));
385+
stage_request(*reqs_.at(pos));
371386

372-
for (std::size_t i = 1; i < std::size(reqs_); ++i) {
387+
for (std::size_t i = pos + 1; i < std::size(reqs_); ++i) {
373388
if (!reqs_.at(i - 1)->get_request().get_config().coalesce ||
374389
!reqs_.at(i - 0)->get_request().get_config().coalesce) {
375390
break;
376391
}
377392
stage_request(*reqs_.at(i));
378393
}
394+
395+
return true;
379396
}
380397

381398
bool is_waiting_response() const noexcept

include/boost/redis/detail/connection_ops.hpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -235,13 +235,11 @@ struct exec_op {
235235
BOOST_ASSERT(!conn->reqs_.empty());
236236
conn->reqs_.pop_front();
237237

238-
if (!conn->is_waiting_response()) {
239-
conn->read_timer_.cancel_one();
240-
if (!conn->reqs_.empty())
241-
conn->writer_timer_.cancel_one();
242-
} else {
238+
if (conn->is_waiting_response()) {
243239
BOOST_ASSERT(!conn->reqs_.empty());
244240
conn->reqs_.front()->proceed();
241+
} else {
242+
conn->read_timer_.cancel_one();
245243
}
246244

247245
self.complete({}, read_size);
@@ -301,8 +299,7 @@ struct writer_op {
301299

302300
BOOST_ASIO_CORO_REENTER (coro) for (;;)
303301
{
304-
while (!conn->reqs_.empty() && !conn->is_waiting_response() && conn->write_buffer_.empty()) {
305-
conn->coalesce_requests();
302+
while (conn->coalesce_requests()) {
306303
BOOST_ASIO_CORO_YIELD
307304
asio::async_write(conn->next_layer(), asio::buffer(conn->write_buffer_), std::move(self));
308305
AEDIS_CHECK_OP0(conn->cancel(operation::run););

tests/conn_exec_cancel.cpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,11 @@ auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
5858
req1.push("BLPOP", "any", 3);
5959

6060
// Should not be canceled.
61-
conn->async_exec(req1, gresp, [](auto ec, auto){
61+
bool seen = false;
62+
conn->async_exec(req1, gresp, [&](auto ec, auto) mutable{
6263
std::cout << "async_exec (1): " << ec.message() << std::endl;
6364
BOOST_TEST(!ec);
64-
});
65-
66-
request req2;
67-
req2.get_config().coalesce = false;
68-
req2.push("PING", "second");
69-
70-
// Should be canceled.
71-
conn->async_exec(req2, gresp, [](auto ec, auto){
72-
std::cout << "async_exec (2): " << ec.message() << std::endl;
73-
BOOST_CHECK_EQUAL(ec, net::error::basic_errors::operation_aborted);
65+
seen = true;
7466
});
7567

7668
// Will complete while BLPOP is pending.
@@ -88,6 +80,7 @@ auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
8880
co_await conn->async_exec(req3, gresp, net::redirect_error(net::use_awaitable, ec1));
8981

9082
BOOST_TEST(!ec1);
83+
BOOST_TEST(seen);
9184
}
9285

9386
auto ignore_implicit_cancel_of_req_written() -> net::awaitable<void>

tests/conn_quit.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,32 @@ BOOST_AUTO_TEST_CASE(test_quit_no_coalesce)
4343
req2.get_config().coalesce = false;
4444
req2.push("QUIT");
4545

46-
conn.async_exec(req1, ignore, [](auto ec, auto){
46+
request req3;
47+
req3.get_config().cancel_if_not_connected = true;
48+
req3.push("PING");
49+
50+
auto c3 = [](auto ec, auto)
51+
{
52+
std::cout << "3--> " << ec.message() << std::endl;
53+
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
54+
};
55+
56+
auto c2 = [&](auto ec, auto)
57+
{
58+
std::cout << "2--> " << ec.message() << std::endl;
4759
BOOST_TEST(!ec);
48-
});
49-
conn.async_exec(req2, ignore, [](auto ec, auto) {
60+
conn.async_exec(req3, ignore, c3);
61+
};
62+
63+
auto c1 = [&](auto ec, auto)
64+
{
65+
std::cout << "1--> " << ec.message() << std::endl;
5066
BOOST_TEST(!ec);
51-
});
52-
conn.async_exec(req1, ignore, [](auto ec, auto){
53-
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
54-
});
55-
conn.async_exec(req1, ignore, [](auto ec, auto){
56-
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
57-
});
58-
conn.async_exec(req1, ignore, [](auto ec, auto){
59-
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
60-
});
67+
68+
conn.async_exec(req2, ignore, c2);
69+
};
70+
71+
conn.async_exec(req1, ignore, c1);
6172

6273
conn.async_run([&](auto ec){
6374
BOOST_TEST(!ec);

0 commit comments

Comments
 (0)