Skip to content

Commit 1612e26

Browse files
authored
Merge pull request #25 from basiliscos/issue-19
Add stream example
2 parents a3f62b3 + 9d862ff commit 1612e26

File tree

3 files changed

+132
-0
lines changed

3 files changed

+132
-0
lines changed

CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,7 @@ add_executable(t-21-coroutine t/21-coroutine.cpp)
141141
target_link_libraries(t-21-coroutine ${LINK_DEPENDENCIES})
142142
add_test("t-21-coroutine" t-21-coroutine)
143143

144+
add_executable(t-23-stream t/23-stream.cpp)
145+
target_link_libraries(t-23-stream ${LINK_DEPENDENCIES})
146+
add_test("t-23-stream" t-23-stream)
147+

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,38 @@ boost::asio::spawn(
383383
});
384384
```
385385
386+
## Steams
387+
388+
There is no specific support for streams (appeared in redis 5.0) in bredis,
389+
they are just usual `XADD`, `XRANGE` etc. commands and corresponding replies.
390+
391+
```cpp
392+
...
393+
Buffer rx_buff;
394+
c.write(r::single_command_t{ "XADD", "mystream", "*", "cpu-temp", "23.4", "load", "2.3" });
395+
auto parse_result1 = c.read(rx_buff);
396+
auto extract1 = boost::apply_visitor(Extractor(), parse_result1.result);
397+
auto id1 = boost::get<r::extracts::string_t>(extract1);
398+
399+
c.write(r::single_command_t{ "XADD", "mystream", "*", "cpu-temp", "23.2", "load", "2.1" });
400+
auto parse_result2 = c.read(rx_buff);
401+
auto extract2 = boost::apply_visitor(Extractor(), parse_result2.result);
402+
auto id2 = boost::get<r::extracts::string_t>(extract2);
403+
rx_buff.consume(parse_result2.consumed);
404+
405+
c.write(r::single_command_t{ "XRANGE" , "mystream", id1.str, id2.str});
406+
auto parse_result3 = c.read(rx_buff);
407+
auto extract3 = boost::apply_visitor(Extractor(), parse_result3.result);
408+
rx_buff.consume(parse_result3.consumed);
409+
410+
auto& outer_arr = boost::get<r::extracts::array_holder_t>(extract3);
411+
auto& inner_arr1 = boost::get<r::extracts::array_holder_t>(outer_arr.elements[0]);
412+
auto& inner_arr2 = boost::get<r::extracts::array_holder_t>(outer_arr.elements[1]);
413+
...
414+
415+
```
416+
417+
386418
## Inspecting network traffic
387419

388420
See `t/SocketWithLogging.hpp` for an example. The main idea is quite simple:

t/23-stream.cpp

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#include <boost/asio.hpp>
2+
#include <future>
3+
4+
#include "EmptyPort.hpp"
5+
#include "TestServer.hpp"
6+
#include "catch.hpp"
7+
8+
#include "bredis/Connection.hpp"
9+
#include "bredis/MarkerHelpers.hpp"
10+
11+
#include "bredis/Extract.hpp"
12+
13+
14+
#include "SocketWithLogging.hpp"
15+
16+
namespace r = bredis;
17+
namespace asio = boost::asio;
18+
namespace ep = empty_port;
19+
namespace ts = test_server;
20+
21+
TEST_CASE("stream", "[connection]") {
22+
using socket_t = asio::ip::tcp::socket;
23+
#ifdef BREDIS_DEBUG
24+
using next_layer_t = r::test::SocketWithLogging<socket_t>;
25+
#else
26+
using next_layer_t = socket_t;
27+
#endif
28+
using Buffer = boost::asio::streambuf;
29+
using Iterator = typename r::to_iterator<Buffer>::iterator_t;
30+
using Extractor = r::extractor<Iterator>;
31+
32+
std::chrono::milliseconds sleep_delay(1);
33+
34+
uint16_t port = ep::get_random<ep::Kind::TCP>();
35+
auto port_str = boost::lexical_cast<std::string>(port);
36+
auto server = ts::make_server({"redis-server", "--port", port_str});
37+
ep::wait_port<ep::Kind::TCP>(port);
38+
asio::io_service io_service;
39+
40+
asio::ip::tcp::endpoint end_point(
41+
asio::ip::address::from_string("127.0.0.1"), port);
42+
socket_t socket(io_service, end_point.protocol());
43+
socket.connect(end_point);
44+
45+
r::Connection<next_layer_t> c(std::move(socket));
46+
47+
Buffer rx_buff;
48+
c.write(r::single_command_t{ "INFO" });
49+
auto parse_result0 = c.read(rx_buff);
50+
auto extract0 = boost::apply_visitor(Extractor(), parse_result0.result);
51+
auto info = boost::get<r::extracts::string_t>(extract0);
52+
rx_buff.consume(parse_result0.consumed);
53+
auto it_begin = info.str.begin();
54+
auto it_end = info.str.end();
55+
std::string version_str = "redis_version:5.";
56+
if (info.str.find("redis_version:5.") == std::string::npos) {
57+
/* not supported by earlier redis versions */
58+
return;
59+
}
60+
61+
62+
c.write(r::single_command_t{ "XADD", "mystream", "*", "cpu-temp", "23.4", "load", "2.3" });
63+
auto parse_result1 = c.read(rx_buff);
64+
auto extract1 = boost::apply_visitor(Extractor(), parse_result1.result);
65+
auto id1 = boost::get<r::extracts::string_t>(extract1);
66+
rx_buff.consume(parse_result1.consumed);
67+
68+
c.write(r::single_command_t{ "XADD", "mystream", "*", "cpu-temp", "23.2", "load", "2.1" });
69+
auto parse_result2 = c.read(rx_buff);
70+
auto extract2 = boost::apply_visitor(Extractor(), parse_result2.result);
71+
auto id2 = boost::get<r::extracts::string_t>(extract2);
72+
rx_buff.consume(parse_result2.consumed);
73+
74+
c.write(r::single_command_t{ "XRANGE" , "mystream", id1.str, id2.str});
75+
auto parse_result3 = c.read(rx_buff);
76+
auto extract3 = boost::apply_visitor(Extractor(), parse_result3.result);
77+
rx_buff.consume(parse_result3.consumed);
78+
79+
auto& outer_arr = boost::get<r::extracts::array_holder_t>(extract3);
80+
auto& inner_arr1 = boost::get<r::extracts::array_holder_t>(outer_arr.elements[0]);
81+
auto& inner_arr2 = boost::get<r::extracts::array_holder_t>(outer_arr.elements[1]);
82+
83+
REQUIRE(boost::get<r::extracts::string_t>(inner_arr1.elements[0]).str == id1.str);
84+
auto& arr1 = boost::get<r::extracts::array_holder_t>(inner_arr1.elements[1]);
85+
REQUIRE(boost::get<r::extracts::string_t>(arr1.elements[0]).str == "cpu-temp");
86+
REQUIRE(boost::get<r::extracts::string_t>(arr1.elements[1]).str == "23.4");
87+
REQUIRE(boost::get<r::extracts::string_t>(arr1.elements[2]).str == "load");
88+
REQUIRE(boost::get<r::extracts::string_t>(arr1.elements[3]).str == "2.3");
89+
90+
REQUIRE(boost::get<r::extracts::string_t>(inner_arr2.elements[0]).str == id2.str);
91+
auto& arr2 = boost::get<r::extracts::array_holder_t>(inner_arr2.elements[1]);
92+
REQUIRE(boost::get<r::extracts::string_t>(arr2.elements[0]).str == "cpu-temp");
93+
REQUIRE(boost::get<r::extracts::string_t>(arr2.elements[1]).str == "23.2");
94+
REQUIRE(boost::get<r::extracts::string_t>(arr2.elements[2]).str == "load");
95+
REQUIRE(boost::get<r::extracts::string_t>(arr2.elements[3]).str == "2.1");
96+
};

0 commit comments

Comments
 (0)