Skip to content

Commit ad5dd8c

Browse files
committed
Refactors the parser so it is not header-only.
1 parent 842f864 commit ad5dd8c

File tree

9 files changed

+239
-212
lines changed

9 files changed

+239
-212
lines changed

README.md

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,30 @@ auto async_main() -> net::awaitable<void>
4242
```
4343

4444
The execution of `connection::async_exec` above is composed with
45-
`connection::async_run` with the aid of the Asio awaitable operator ||
45+
`connection::async_run` with the aid of the Asio awaitable `operator ||`
4646
that ensures that one operation is cancelled as soon as the other
4747
completes, these functions play the following roles
4848

49-
* `connection::async_exec`: Execute commands (i.e. write the request and reads the response).
50-
* `connection::async_run`: Coordinate read and write operations and remains suspended until the connection is lost.
49+
* `connection::async_exec`: Execute commands by writing the request payload to the underlying stream and reading the response sent back by Redis. It can be called from multiple places in your code concurrently.
50+
* `connection::async_run`: Coordinate the low-level IO (read and write) operations and remains suspended until the connection is lost.
5151

52-
Let us dig in.
52+
When a connection is lost, the `async_exec` calls won't automatically
53+
fail, instead, they will remain suspended until they are either all
54+
canceled with a call to `connection::cancel(operation::exec)` or a new
55+
connection is established and `async_run` is called again, in which
56+
case they will be resent automatically. Users can customise this
57+
behaviour by carefully choosing the values of
58+
`aedis::resp3::request::config`. The role played by `async_run`
59+
becomes clearer with long-lived connections, which we will cover
60+
in the next section.
5361

5462
<a name="connection"></a>
5563
## Connection
5664

57-
In general we will want to reuse the same connection for multiple
58-
requests, we can do this with the example above by decoupling the
59-
HELLO command and the call to `async_run` in a separate coroutine
65+
For performance reasons we will usually want to perform multiple
66+
requests on the same connection. We can do this with the example above
67+
by decoupling the HELLO command and the call to `async_run` in a
68+
separate coroutine
6069

6170
```cpp
6271
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
@@ -67,23 +76,33 @@ auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
6776
req.push("HELLO", 3); // Upgrade to RESP3
6877

6978
// Notice we use && instead of || so async_run is not cancelled
70-
// when the response to HELLO comes.
79+
// when the HELLO response arrives. We are also ignoring the
80+
// response for simplicity.
7181
co_await (conn->async_run() && conn->async_exec(req));
7282
}
7383
```
7484
We can now let `run` run detached in the background while other
75-
coroutines perform requests on the connection
85+
coroutines perform requests on the connection, for example
7686
7787
```cpp
7888
auto async_main() -> net::awaitable<void>
7989
{
8090
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
8191
82-
// Calls async_run detached.
83-
net::co_spawn(ex, run(conn), net::detached)
92+
// Run detached.
93+
net::co_spawn(ex, run(conn), net::detached);
94+
95+
// Here we can use the connection to perform requests and pass it
96+
// around to other coroutines so they can make requests.
97+
98+
resp3::request req;
99+
req.push("PING", "Hello world");
100+
co_await conn->async_exec(req);
84101
85-
// Here we can pass conn around to other coroutines so they can make requests.
86102
...
103+
104+
// Cancels the run operation so we can exit.
105+
conn->cancel(operation::run);
87106
}
88107
```
89108

@@ -138,10 +157,12 @@ auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
138157

139158
for (;;) {
140159
co_await connect(conn, "127.0.0.1", "6379");
141-
co_await ((conn->async_run() || healthy_checker(conn) || receiver(conn))
142-
&& conn->async_exec(req));
160+
co_await ((conn->async_run() || healthy_checker(conn) || receiver(conn)) && conn->async_exec(req));
143161

162+
// Prepare the stream to a new connection.
144163
conn->reset_stream();
164+
165+
// Waits one second before trying to reconnect.
145166
timer.expires_after(std::chrono::seconds{1});
146167
co_await timer.async_wait();
147168
}
@@ -152,23 +173,23 @@ For failover with sentinels see `resolve_with_sentinel.cpp`. At
152173
this point the reasons for why `async_run` was introduced in Aedis
153174
might have become apparent to the reader
154175
155-
* Provide quick reaction to disconnections and hence faster failover.
176+
* Provide quick reaction to disconnections and hence faster failovers.
156177
* Support server pushes and requests in the same connection object, concurrently.
157-
* Separate requests, handling of server pushes and reconnection operations.
178+
* Separate requests, handling of server pushes and reconnect operations.
158179
159180
### Cancellation
160181
161182
Aedis supports both implicit and explicit cancellation of connection
162183
operations. Explicit cancellation is supported by means of the
163184
`aedis::connection::cancel` member function. Implicit cancellation,
164-
like those that may happen when using Asio awaitable operators && and
165-
|| will be discussed with more detail below.
185+
like those that may happen when using Asio awaitable operators `&&` and
186+
`||` will be discussed with more detail below.
166187
167188
```cpp
168189
co_await (conn.async_run(...) && conn.async_exec(...))
169190
```
170191

171-
* Provide a simple way to send HELLO and perform channel subscription.
192+
* Provides a simple way to send HELLO and perform channel subscription.
172193

173194
```cpp
174195
co_await (conn.async_run(...) || conn.async_exec(...))
@@ -185,7 +206,7 @@ co_await (conn.async_exec(...) || time.async_wait(...))
185206
should last.
186207
* The cancellation will be ignored if the request has already
187208
been written to the socket.
188-
* It is usually a better idea to have a healthy checker than adding
209+
* NOTE: It is usually a better idea to have a healthy checker than adding
189210
per request timeout, see subscriber.cpp for an example.
190211

191212
```cpp
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva ([email protected])
2+
*
3+
* Distributed under the Boost Software License, Version 1.0. (See
4+
* accompanying file LICENSE.txt)
5+
*/
6+
7+
#include <aedis/resp3/detail/parser.hpp>
8+
9+
#include <aedis/error.hpp>
10+
#include <boost/assert.hpp>
11+
#include <charconv>
12+
13+
namespace aedis::resp3::detail {
14+
15+
void to_int(int_type& i, std::string_view sv, boost::system::error_code& ec)
16+
{
17+
auto const res = std::from_chars(sv.data(), sv.data() + std::size(sv), i);
18+
if (res.ec != std::errc())
19+
ec = error::not_a_number;
20+
}
21+
22+
parser::parser()
23+
{
24+
sizes_[0] = 2; // The sentinel must be more than 1.
25+
}
26+
27+
auto
28+
parser::consume(
29+
char const* data,
30+
std::size_t n,
31+
boost::system::error_code& ec) -> std::pair<node_type, std::size_t>
32+
{
33+
node_type ret;
34+
if (bulk_expected()) {
35+
n = bulk_length_ + 2;
36+
ret = {bulk_, 1, depth_, {data, bulk_length_}};
37+
bulk_ = type::invalid;
38+
--sizes_[depth_];
39+
40+
} else if (sizes_[depth_] != 0) {
41+
auto const t = to_type(*data);
42+
switch (t) {
43+
case type::streamed_string_part:
44+
{
45+
to_int(bulk_length_ , std::string_view{data + 1, n - 3}, ec);
46+
if (ec)
47+
return std::make_pair(node_type{}, 0);
48+
49+
if (bulk_length_ == 0) {
50+
ret = {type::streamed_string_part, 1, depth_, {}};
51+
sizes_[depth_] = 0; // We are done.
52+
bulk_ = type::invalid;
53+
} else {
54+
bulk_ = type::streamed_string_part;
55+
}
56+
} break;
57+
case type::blob_error:
58+
case type::verbatim_string:
59+
case type::blob_string:
60+
{
61+
if (data[1] == '?') {
62+
// NOTE: This can only be triggered with blob_string.
63+
// Trick: A streamed string is read as an aggregate
64+
// of infinite lenght. When the streaming is done
65+
// the server is supposed to send a part with length
66+
// 0.
67+
sizes_[++depth_] = (std::numeric_limits<std::size_t>::max)();
68+
ret = {type::streamed_string, 0, depth_, {}};
69+
} else {
70+
to_int(bulk_length_ , std::string_view{data + 1, n - 3} , ec);
71+
if (ec)
72+
return std::make_pair(node_type{}, 0);
73+
74+
bulk_ = t;
75+
}
76+
} break;
77+
case type::boolean:
78+
{
79+
if (n == 3) {
80+
ec = error::empty_field;
81+
return std::make_pair(node_type{}, 0);
82+
}
83+
84+
if (data[1] != 'f' && data[1] != 't') {
85+
ec = error::unexpected_bool_value;
86+
return std::make_pair(node_type{}, 0);
87+
}
88+
89+
ret = {t, 1, depth_, {data + 1, n - 3}};
90+
--sizes_[depth_];
91+
} break;
92+
case type::doublean:
93+
case type::big_number:
94+
case type::number:
95+
{
96+
if (n == 3) {
97+
ec = error::empty_field;
98+
return std::make_pair(node_type{}, 0);
99+
}
100+
101+
ret = {t, 1, depth_, {data + 1, n - 3}};
102+
--sizes_[depth_];
103+
} break;
104+
case type::simple_error:
105+
case type::simple_string:
106+
{
107+
ret = {t, 1, depth_, {&data[1], n - 3}};
108+
--sizes_[depth_];
109+
} break;
110+
case type::null:
111+
{
112+
ret = {type::null, 1, depth_, {}};
113+
--sizes_[depth_];
114+
} break;
115+
case type::push:
116+
case type::set:
117+
case type::array:
118+
case type::attribute:
119+
case type::map:
120+
{
121+
int_type l = -1;
122+
to_int(l, std::string_view{data + 1, n - 3}, ec);
123+
if (ec)
124+
return std::make_pair(node_type{}, 0);
125+
126+
ret = {t, l, depth_, {}};
127+
if (l == 0) {
128+
--sizes_[depth_];
129+
} else {
130+
if (depth_ == max_embedded_depth) {
131+
ec = error::exceeeds_max_nested_depth;
132+
return std::make_pair(node_type{}, 0);
133+
}
134+
135+
++depth_;
136+
137+
sizes_[depth_] = l * element_multiplicity(t);
138+
}
139+
} break;
140+
default:
141+
{
142+
ec = error::invalid_data_type;
143+
return std::make_pair(node_type{}, 0);
144+
}
145+
}
146+
}
147+
148+
while (sizes_[depth_] == 0) {
149+
--depth_;
150+
--sizes_[depth_];
151+
}
152+
153+
return std::make_pair(ret, n);
154+
}
155+
} // aedis::resp3::detail

0 commit comments

Comments
 (0)