Skip to content

Commit 607a9e9

Browse files
brammzimbres
authored andcommitted
Added example cpp20_streams, which reproduces an assertion.
1 parent 2d53bb7 commit 607a9e9

File tree

3 files changed

+101
-1
lines changed

3 files changed

+101
-1
lines changed

CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ if (MSVC)
9292
target_compile_definitions(cpp20_intro PRIVATE _WIN32_WINNT=0x0601)
9393
endif()
9494

95+
add_executable(cpp20_streams examples/cpp20_streams.cpp)
96+
target_link_libraries(cpp20_streams common)
97+
target_compile_features(cpp20_streams PUBLIC cxx_std_20)
98+
add_test(cpp20_streams cpp20_streams)
99+
if (MSVC)
100+
target_compile_options(cpp20_streams PRIVATE /bigobj)
101+
target_compile_definitions(cpp20_streams PRIVATE _WIN32_WINNT=0x0601)
102+
endif()
103+
95104
add_executable(cpp17_intro examples/cpp17_intro.cpp)
96105
target_compile_features(cpp17_intro PUBLIC cxx_std_17)
97106
add_test(cpp17_intro cpp17_intro)

examples/cpp20_streams.cpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva ([email protected])
2+
*
3+
* Distributed under the Boost Software License, Version 1.0. (See
4+
* accompanying file LICENSE.txt)
5+
*/
6+
7+
#include <boost/redis/connection.hpp>
8+
#include <boost/asio/deferred.hpp>
9+
#include <boost/asio/co_spawn.hpp>
10+
#include <boost/asio/detached.hpp>
11+
#include <boost/asio/consign.hpp>
12+
#include <boost/asio/signal_set.hpp>
13+
#include <boost/asio/awaitable.hpp>
14+
#include <iostream>
15+
16+
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
17+
18+
namespace net = boost::asio;
19+
using boost::redis::config;
20+
using boost::redis::generic_response;
21+
using boost::redis::operation;
22+
using boost::redis::request;
23+
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
24+
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
25+
26+
auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
27+
{
28+
std::string redisStreamKey_;
29+
request req;
30+
generic_response resp;
31+
32+
std::string stream_id{"$"};
33+
std::string const field = "myfield";
34+
35+
for (;;) {
36+
req.push("XREAD", "BLOCK", "0", "STREAMS", "test-topic", stream_id);
37+
co_await conn->async_exec(req, resp);
38+
39+
// std::cout << "Response: ";
40+
// for (int i = 0; i < resp->value().size(); ++i) {
41+
// std::cout << resp->value().at(i).value << ", ";
42+
// }
43+
// std::cout << std::endl;
44+
45+
// The following approach was taken in order to be able to
46+
// deal with the responses, as generated by redis in the case
47+
// that there are multiple stream 'records' within a single
48+
// generic_response. The nesting and number of values in
49+
// resp.value() are different, depending on the contents
50+
// of the stream in redis. Uncomment the above commented-out
51+
// code for examples while running the XADD command.
52+
53+
std::size_t item_index = 0;
54+
while (item_index < std::size(resp.value())) {
55+
auto const& val = resp.value().at(item_index).value;
56+
57+
if (field.compare(val) == 0) {
58+
// We've hit a myfield field.
59+
// The streamId is located at item_index - 2
60+
// The payload is located at item_index + 1
61+
stream_id = resp.value().at(item_index - 2).value;
62+
std::cout
63+
<< "StreamId: " << stream_id << ", "
64+
<< "MyField: " << resp.value().at(item_index + 1).value
65+
<< std::endl;
66+
++item_index; // We can increase so we don't read this again
67+
}
68+
69+
++item_index;
70+
}
71+
72+
req.clear();
73+
resp.value().clear();
74+
}
75+
76+
}
77+
78+
// Run this in another terminal:
79+
// redis-cli -r 100000 -i 0.0001 XADD "test-topic" "*" "myfield" "myfieldvalue1"
80+
auto co_main(config cfg) -> net::awaitable<void>
81+
{
82+
auto ex = co_await net::this_coro::executor;
83+
auto conn = std::make_shared<connection>(ex);
84+
net::co_spawn(ex, stream_reader(conn), net::detached);
85+
conn->async_run(cfg, {}, net::consign(net::detached, conn));
86+
87+
signal_set sig_set(ex, SIGINT, SIGTERM);
88+
co_await sig_set.async_wait();
89+
conn->cancel();
90+
}
91+
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

examples/cpp20_subscriber.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
2020

2121
namespace net = boost::asio;
22+
using namespace std::chrono_literals;
2223
using boost::redis::request;
2324
using boost::redis::generic_response;
2425
using boost::redis::logger;
2526
using boost::redis::config;
2627
using boost::system::error_code;
2728
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
2829
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
29-
using namespace std::chrono_literals;
3030

3131
/* This example will subscribe and read pushes indefinitely.
3232
*

0 commit comments

Comments
 (0)