Skip to content

Commit 48c3f37

Browse files
committed
Test improvements and bugfix in send-retry.
1 parent 3c63911 commit 48c3f37

File tree

14 files changed

+269
-147
lines changed

14 files changed

+269
-147
lines changed

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ endif()
217217

218218
add_executable(test_conn_reconnect tests/conn_reconnect.cpp)
219219
target_compile_features(test_conn_reconnect PUBLIC cxx_std_20)
220+
target_link_libraries(test_conn_reconnect common)
220221
add_test(test_conn_reconnect test_conn_reconnect)
221222
if (MSVC)
222223
target_compile_options(test_conn_reconnect PRIVATE /bigobj)
@@ -250,6 +251,7 @@ endif()
250251

251252
add_executable(test_conn_exec_cancel tests/conn_exec_cancel.cpp)
252253
target_compile_features(test_conn_exec_cancel PUBLIC cxx_std_20)
254+
target_link_libraries(test_conn_exec_cancel common)
253255
add_test(test_conn_exec_cancel test_conn_exec_cancel)
254256
if (MSVC)
255257
target_compile_options(test_conn_exec_cancel PRIVATE /bigobj)
@@ -258,6 +260,7 @@ endif()
258260

259261
add_executable(test_conn_echo_stress tests/conn_echo_stress.cpp)
260262
target_compile_features(test_conn_echo_stress PUBLIC cxx_std_20)
263+
target_link_libraries(test_conn_echo_stress common)
261264
add_test(test_conn_echo_stress test_conn_echo_stress)
262265
if (MSVC)
263266
target_compile_options(test_conn_echo_stress PRIVATE /bigobj)

examples/common/common.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,23 @@ connect(
7272
throw std::runtime_error("Connect timeout");
7373
}
7474

75+
auto run(net::awaitable<void> op) -> int
76+
{
77+
try {
78+
net::io_context ioc;
79+
net::co_spawn(ioc, std::move(op), [](std::exception_ptr p) {
80+
if (p)
81+
std::rethrow_exception(p);
82+
});
83+
ioc.run();
84+
85+
return 0;
86+
87+
} catch (std::exception const& e) {
88+
std::cerr << "Error: " << e.what() << std::endl;
89+
}
90+
91+
return 1;
92+
}
93+
7594
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

examples/common/common.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,7 @@ connect(
2828

2929
auto healthy_checker(std::shared_ptr<connection> conn) -> boost::asio::awaitable<void>;
3030

31+
auto run(boost::asio::awaitable<void> op) -> int;
32+
3133
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)
3234
#endif // AEDIS_EXAMPLES_COMMON_HPP

examples/common/main.cpp

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,27 @@
44
* accompanying file LICENSE.txt)
55
*/
66

7-
#include <iostream>
87
#include <boost/asio.hpp>
8+
99
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
1010

11-
namespace net = boost::asio;
12-
extern net::awaitable<void> async_main();
11+
#include "common.hpp"
12+
13+
extern boost::asio::awaitable<void> async_main();
1314

14-
// The main function used in our examples.
1515
auto main() -> int
1616
{
17-
try {
18-
net::io_context ioc;
19-
net::co_spawn(ioc, async_main(), net::detached);
20-
net::co_spawn(ioc, async_main(), [](std::exception_ptr p) {
21-
if (p)
22-
std::rethrow_exception(p);
23-
});
24-
ioc.run();
25-
} catch (std::exception const& e) {
26-
std::cerr << "Error: " << e.what() << std::endl;
27-
return 1;
28-
}
17+
return run(async_main());
2918
}
3019

3120
#else // defined(BOOST_ASIO_HAS_CO_AWAIT)
32-
auto main() -> int {std::cout << "Requires coroutine support." << std::endl; return 0;}
21+
22+
#include <iostream>
23+
24+
auto main() -> int
25+
{
26+
std::cout << "Requires coroutine support." << std::endl;
27+
return 0;
28+
}
29+
3330
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

include/aedis/detail/connection_base.hpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,19 @@ class connection_base {
104104
return ret;
105105
}
106106

107+
// Remove requests that have the flag cancel_if_not_sent_when_connection_lost set
107108
auto cancel_on_conn_lost() -> std::size_t
108109
{
110+
// Must return false if the request should be removed.
109111
auto cond = [](auto const& ptr)
110112
{
111113
BOOST_ASSERT(ptr != nullptr);
112114

113-
if (ptr->get_request().get_config().cancel_on_connection_lost)
114-
return false;
115-
116-
return !(!ptr->get_request().get_config().retry && ptr->is_written());
115+
if (ptr->is_written()) {
116+
return ptr->get_request().get_config().retry_on_connection_lost;
117+
} else {
118+
return !ptr->get_request().get_config().cancel_on_connection_lost;
119+
}
117120
};
118121

119122
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
@@ -128,6 +131,7 @@ class connection_base {
128131
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
129132
return ptr->reset_status();
130133
});
134+
131135
return ret;
132136
}
133137

include/aedis/detail/connection_ops.hpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,7 @@ struct exec_op {
161161
BOOST_ASSERT(conn->cmds_ != 0);
162162
yield
163163
conn->async_exec_read(adapter, conn->reqs_.front()->get_number_of_commands(), std::move(self));
164-
if (is_cancelled(self)) {
165-
conn->remove_request(info);
166-
return self.complete(boost::asio::error::operation_aborted, {});
167-
}
168-
169-
if (ec) {
170-
return self.complete(ec, {});
171-
}
164+
AEDIS_CHECK_OP1(;);
172165

173166
read_size = n;
174167

include/aedis/resp3/request.hpp

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -169,29 +169,30 @@ class request {
169169
public:
170170
/// Request configuration options.
171171
struct config {
172-
/** \brief If true the request will complete with error if the
173-
* connection is lost while the request is pending. The default
174-
* behaviour is not to close requests.
172+
/** \brief Setting it to true will cause
173+
* `aedis::connection::async_exec` to complete with error if the
174+
* connection is lost. Affects only requests that haven't been
175+
* sent yet.
175176
*/
176177
bool cancel_on_connection_lost = false;
177178

178-
/** \brief If true the request will be coalesced with other requests,
179-
* see https://redis.io/topics/pipelining. Otherwise the
180-
* request is sent individually.
179+
/** \brief If true the request will be coalesced with other
180+
* requests, see https://redis.io/topics/pipelining. Otherwise
181+
* the request is sent individually.
181182
*/
182183
bool coalesce = true;
183184

184185
/** \brief If true, the request will complete with error if the
185-
* call happens before the connection with Redis was stablished.
186+
* call happens before the connection with Redis was established.
186187
*/
187188
bool cancel_if_not_connected = false;
188189

189-
/** \brief If true, the implementation will resend this
190-
* request if it remains unresponded when
191-
* `aedis::connection::async_run` completes. Has effect only if
192-
* cancel_on_connection_lost is true.
190+
/** \brief If true `aedis::connection::async_exec` will not
191+
* cancel this request if the connection is lost. Affects only
192+
* requests that have been written to the socket but remained
193+
* unresponded when `aedis::connection::async_run` completed.
193194
*/
194-
bool retry = true;
195+
bool retry_on_connection_lost = true;
195196

196197
/** \brief If this request has a HELLO command and this flag is
197198
* true, the `aedis::connection` will move it to the front of

tests/common.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,9 @@ resolve(
1515
net::ip::tcp::resolver resv{ioc};
1616
return resv.resolve(host, port);
1717
}
18+
19+
#ifdef BOOST_ASIO_HAS_CO_AWAIT
20+
inline
21+
auto redir(boost::system::error_code& ec)
22+
{ return net::redirect_error(net::use_awaitable, ec); }
23+
#endif // BOOST_ASIO_HAS_CO_AWAIT

tests/conn_echo_stress.cpp

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,20 @@
88
#include <boost/asio.hpp>
99
#ifdef BOOST_ASIO_HAS_CO_AWAIT
1010
#include <boost/system/errc.hpp>
11-
1211
#define BOOST_TEST_MODULE low level
1312
#include <boost/test/included/unit_test.hpp>
14-
1513
#include <aedis.hpp>
1614
#include <aedis/src.hpp>
17-
1815
#include "common.hpp"
16+
#include "../examples/common/common.hpp"
1917

2018
namespace net = boost::asio;
19+
namespace resp3 = aedis::resp3;
2120
using error_code = boost::system::error_code;
22-
23-
using aedis::resp3::request;
2421
using aedis::operation;
2522
using aedis::adapt;
26-
using connection = net::use_awaitable_t<>::as_default_on_t<aedis::connection>;
27-
28-
#include <boost/asio/experimental/awaitable_operators.hpp>
29-
using namespace net::experimental::awaitable_operators;
3023

31-
net::awaitable<void> push_consumer(std::shared_ptr<connection> conn, int expected)
24+
auto push_consumer(std::shared_ptr<connection> conn, int expected) -> net::awaitable<void>
3225
{
3326
int c = 0;
3427
for (;;) {
@@ -37,7 +30,7 @@ net::awaitable<void> push_consumer(std::shared_ptr<connection> conn, int expecte
3730
break;
3831
}
3932

40-
request req;
33+
resp3::request req;
4134
req.push("HELLO", 3);
4235
req.push("QUIT");
4336
co_await conn->async_exec(req, adapt());
@@ -47,7 +40,7 @@ auto echo_session(std::shared_ptr<connection> conn, std::string id, int n) -> ne
4740
{
4841
auto ex = co_await net::this_coro::executor;
4942

50-
request req;
43+
resp3::request req;
5144
std::tuple<aedis::ignore, std::string> resp;
5245

5346
for (auto i = 0; i < n; ++i) {
@@ -57,7 +50,7 @@ auto echo_session(std::shared_ptr<connection> conn, std::string id, int n) -> ne
5750
req.push("PING", msg);
5851
req.push("SUBSCRIBE", "channel");
5952
boost::system::error_code ec;
60-
co_await conn->async_exec(req, adapt(resp), net::redirect_error(net::use_awaitable, ec));
53+
co_await conn->async_exec(req, adapt(resp), redir(ec));
6154
BOOST_TEST(!ec);
6255
BOOST_CHECK_EQUAL(msg, std::get<1>(resp));
6356
req.clear();
@@ -79,16 +72,13 @@ auto async_echo_stress() -> net::awaitable<void>
7972
for (int i = 0; i < sessions; ++i)
8073
net::co_spawn(ex, echo_session(conn, std::to_string(i), msgs), net::detached);
8174

82-
auto const addrs = resolve();
83-
co_await net::async_connect(conn->next_layer(), addrs);
75+
co_await connect(conn, "127.0.0.1", "6379");
8476
co_await conn->async_run();
8577
}
8678

8779
BOOST_AUTO_TEST_CASE(echo_stress)
8880
{
89-
net::io_context ioc;
90-
net::co_spawn(ioc, async_echo_stress(), net::detached);
91-
ioc.run();
81+
run(async_echo_stress());
9282
}
9383

9484
#else

tests/conn_exec.cpp

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,23 @@ BOOST_AUTO_TEST_CASE(cancel_request_if_not_connected)
123123
ioc.run();
124124
}
125125

126-
BOOST_AUTO_TEST_CASE(request_retry)
126+
// TODO: This test is broken.
127+
BOOST_AUTO_TEST_CASE(request_retry_false)
127128
{
129+
resp3::request req0;
130+
req0.get_config().coalesce = false;
131+
req0.get_config().cancel_on_connection_lost = true;
132+
req0.push("HELLO", 3);
133+
128134
resp3::request req1;
135+
req1.get_config().coalesce = true;
129136
req1.get_config().cancel_on_connection_lost = true;
130-
req1.push("HELLO", 3);
131-
req1.push("CLIENT", "PAUSE", 7000);
137+
req1.push("BLPOP", "any", 0);
132138

133139
resp3::request req2;
140+
req2.get_config().coalesce = true;
134141
req2.get_config().cancel_on_connection_lost = false;
135-
req2.get_config().retry = false;
142+
req2.get_config().retry_on_connection_lost = false;
136143
req2.push("PING");
137144

138145
net::io_context ioc;
@@ -150,10 +157,14 @@ BOOST_AUTO_TEST_CASE(request_retry)
150157
auto const endpoints = resolve();
151158
net::connect(conn.next_layer(), endpoints);
152159

153-
conn.async_exec(req1, adapt(), [](auto ec, auto){
160+
conn.async_exec(req0, adapt(), [](auto ec, auto){
154161
BOOST_TEST(!ec);
155162
});
156163

164+
conn.async_exec(req1, adapt(), [](auto ec, auto){
165+
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
166+
});
167+
157168
conn.async_exec(req2, adapt(), [](auto ec, auto){
158169
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
159170
});
@@ -164,3 +175,70 @@ BOOST_AUTO_TEST_CASE(request_retry)
164175

165176
ioc.run();
166177
}
178+
179+
BOOST_AUTO_TEST_CASE(request_retry_true)
180+
{
181+
resp3::request req0;
182+
req0.get_config().coalesce = false;
183+
req0.get_config().cancel_on_connection_lost = true;
184+
req0.push("HELLO", 3);
185+
186+
resp3::request req1;
187+
req1.get_config().coalesce = true;
188+
req1.get_config().cancel_on_connection_lost = true;
189+
req1.push("BLPOP", "any", 0);
190+
191+
resp3::request req2;
192+
req2.get_config().coalesce = true;
193+
req2.get_config().cancel_on_connection_lost = false;
194+
req2.get_config().retry_on_connection_lost = true;
195+
req2.push("PING");
196+
197+
resp3::request req3;
198+
req3.get_config().coalesce = true;
199+
req3.get_config().cancel_on_connection_lost = true;
200+
req3.get_config().retry_on_connection_lost = false;
201+
req3.push("QUIT");
202+
203+
net::io_context ioc;
204+
connection conn{ioc};
205+
206+
net::steady_timer st{ioc};
207+
st.expires_after(std::chrono::seconds{1});
208+
st.async_wait([&](auto){
209+
// Cancels the request before receiving the response. This
210+
// should cause the second request to complete with error
211+
// although it has cancel_on_connection_lost = false.
212+
conn.cancel(aedis::operation::run);
213+
});
214+
215+
auto const endpoints = resolve();
216+
net::connect(conn.next_layer(), endpoints);
217+
218+
conn.async_exec(req0, adapt(), [](auto ec, auto){
219+
BOOST_TEST(!ec);
220+
});
221+
222+
conn.async_exec(req1, adapt(), [](auto ec, auto){
223+
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
224+
});
225+
226+
conn.async_exec(req2, adapt(), [&](auto ec, auto){
227+
BOOST_TEST(!ec);
228+
conn.async_exec(req3, adapt(), [&](auto ec, auto){
229+
BOOST_TEST(!ec);
230+
});
231+
});
232+
233+
conn.async_run([&](auto ec){
234+
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
235+
conn.reset_stream();
236+
net::connect(conn.next_layer(), endpoints);
237+
conn.async_run([&](auto ec){
238+
std::cout << ec.message() << std::endl;
239+
BOOST_TEST(!ec);
240+
});
241+
});
242+
243+
ioc.run();
244+
}

0 commit comments

Comments
 (0)