|
7 | 7 | #include <boost/redis/connection.hpp>
|
8 | 8 | #include <boost/asio/co_spawn.hpp>
|
9 | 9 | #include <boost/asio/detached.hpp>
|
| 10 | +#include <boost/asio/deferred.hpp> |
10 | 11 | #include <boost/system/errc.hpp>
|
11 | 12 | #define BOOST_TEST_MODULE echo-stress
|
12 | 13 | #include <boost/test/included/unit_test.hpp>
|
@@ -38,44 +39,75 @@ auto push_consumer(std::shared_ptr<connection> conn, int expected) -> net::await
|
38 | 39 | conn->cancel();
|
39 | 40 | }
|
40 | 41 |
|
41 |
| -auto echo_session(std::shared_ptr<connection> conn, std::string id, int n) -> net::awaitable<void> |
| 42 | +auto |
| 43 | +echo_session( |
| 44 | + std::shared_ptr<connection> conn, |
| 45 | + std::shared_ptr<request> pubs, |
| 46 | + std::string id, |
| 47 | + int n) -> net::awaitable<void> |
42 | 48 | {
|
43 | 49 | auto ex = co_await net::this_coro::executor;
|
44 | 50 |
|
45 | 51 | request req;
|
46 |
| - response<ignore_t, std::string> resp; |
| 52 | + response<ignore_t, std::string, ignore_t> resp; |
47 | 53 |
|
48 | 54 | for (auto i = 0; i < n; ++i) {
|
49 | 55 | auto const msg = id + "/" + std::to_string(i);
|
50 | 56 | //std::cout << msg << std::endl;
|
51 |
| - req.push("HELLO", 3); |
| 57 | + req.push("HELLO", 3); // Just to mess around. |
52 | 58 | req.push("PING", msg);
|
53 |
| - req.push("SUBSCRIBE", "channel"); |
| 59 | + req.push("PING", "lsls"); // TODO: Change to HELLO after fixing issue 105. |
54 | 60 | boost::system::error_code ec;
|
55 | 61 | co_await conn->async_exec(req, resp, redir(ec));
|
56 |
| - BOOST_CHECK_EQUAL(ec, boost::system::error_code{}); |
57 |
| - BOOST_CHECK_EQUAL(msg, std::get<1>(resp).value()); |
| 62 | + |
| 63 | + BOOST_REQUIRE_EQUAL(ec, boost::system::error_code{}); |
| 64 | + BOOST_REQUIRE_EQUAL(msg, std::get<1>(resp).value()); |
58 | 65 | req.clear();
|
59 | 66 | std::get<1>(resp).value().clear();
|
| 67 | + |
| 68 | + co_await conn->async_exec(*pubs, ignore, net::deferred); |
60 | 69 | }
|
61 | 70 | }
|
62 | 71 |
|
63 | 72 | auto async_echo_stress() -> net::awaitable<void>
|
64 | 73 | {
|
65 | 74 | auto ex = co_await net::this_coro::executor;
|
66 | 75 | auto conn = std::make_shared<connection>(ex);
|
| 76 | + config cfg; |
| 77 | + cfg.health_check_interval = std::chrono::seconds::zero(); |
| 78 | + run(conn, cfg, |
| 79 | + boost::asio::error::operation_aborted, |
| 80 | + boost::redis::operation::receive, |
| 81 | + boost::redis::logger::level::crit); |
| 82 | + |
| 83 | + request req; |
| 84 | + req.push("SUBSCRIBE", "channel"); |
| 85 | + co_await conn->async_exec(req, ignore, net::deferred); |
67 | 86 |
|
| 87 | + // Number of coroutines that will send pings sharing the same |
| 88 | + // connection to redis. |
68 | 89 | int const sessions = 500;
|
| 90 | + |
| 91 | + // The number of pings that will be sent by each session. |
69 | 92 | int const msgs = 1000;
|
70 |
| - int total = sessions * msgs; |
71 | 93 |
|
72 |
| - net::co_spawn(ex, push_consumer(conn, total), net::detached); |
| 94 | + // The number of publishes that will be sent by each session with |
| 95 | + // each message. |
| 96 | + int const n_pubs = 10; |
73 | 97 |
|
74 |
| - for (int i = 0; i < sessions; ++i) |
75 |
| - net::co_spawn(ex, echo_session(conn, std::to_string(i), msgs), net::detached); |
| 98 | + // This is the total number of pushes we will receive. |
| 99 | + int total_pushes = sessions * msgs * n_pubs + 1; |
76 | 100 |
|
| 101 | + auto pubs = std::make_shared<request>(); |
| 102 | + for (int i = 0; i < n_pubs; ++i) |
| 103 | + pubs->push("PUBLISH", "channel", "payload"); |
77 | 104 |
|
78 |
| - run(conn); |
| 105 | + // Op that will consume the pushes counting down until all expected |
| 106 | + // pushes have been received. |
| 107 | + net::co_spawn(ex, push_consumer(conn, total_pushes), net::detached); |
| 108 | + |
| 109 | + for (int i = 0; i < sessions; ++i) |
| 110 | + net::co_spawn(ex, echo_session(conn, pubs, std::to_string(i), msgs), net::detached); |
79 | 111 | }
|
80 | 112 |
|
81 | 113 | BOOST_AUTO_TEST_CASE(echo_stress)
|
|
0 commit comments