7
7
#ifndef BOOST_REDIS_CONNECTION_HPP
8
8
#define BOOST_REDIS_CONNECTION_HPP
9
9
10
- #include < boost/redis/connection_base.hpp>
10
+ #include < boost/redis/detail/ connection_base.hpp>
11
11
#include < boost/redis/logger.hpp>
12
12
#include < boost/redis/config.hpp>
13
13
#include < boost/asio/io_context.hpp>
@@ -34,7 +34,7 @@ struct reconnection_op {
34
34
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
35
35
{
36
36
BOOST_ASIO_CORO_YIELD
37
- conn_->async_run_one (conn_->cfg_ , logger_, std::move (self));
37
+ conn_->impl_ . async_run (conn_->cfg_ , logger_, std::move (self));
38
38
conn_->cancel (operation::receive);
39
39
logger_.on_connection_lost (ec);
40
40
if (!conn_->will_reconnect () || is_cancelled (self)) {
@@ -68,14 +68,15 @@ struct reconnection_op {
68
68
*
69
69
*/
70
70
template <class Executor >
71
- class basic_connection : public connection_base <Executor> {
71
+ class basic_connection {
72
72
public:
73
- using base_type = connection_base<Executor>;
74
- using this_type = basic_connection<Executor>;
75
-
76
73
// / Executor type.
77
74
using executor_type = Executor;
78
75
76
+ // / Returns the underlying executor.
77
+ executor_type get_executor () noexcept
78
+ { return impl_.get_executor (); }
79
+
79
80
// / Rebinds the socket type to another executor.
80
81
template <class Executor1 >
81
82
struct rebind_executor
@@ -87,7 +88,7 @@ class basic_connection : public connection_base<Executor> {
87
88
// / Contructs from an executor.
88
89
explicit
89
90
basic_connection (executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client)
90
- : base_type {ex, method}
91
+ : impl_ {ex, method}
91
92
, timer_{ex}
92
93
{ }
93
94
@@ -97,12 +98,28 @@ class basic_connection : public connection_base<Executor> {
97
98
: basic_connection(ioc.get_executor(), method)
98
99
{ }
99
100
100
- /* * @brief High-level connection to Redis.
101
+ /* * @brief Starts underlying connection operations.
102
+ *
103
+ * This member function provides the following functionality
104
+ *
105
+ * 1. Resolve the address passed on `boost::redis::config::addr`.
106
+ * 2. Connect to one of the results obtained in the resolve operation.
107
+ * 3. Send a [HELLO](https://redis.io/commands/hello/) command where each of its parameters are read from `cfg`.
108
+ * 4. Start a health-check operation where ping commands are sent
109
+ * at intervals specified in
110
+ * `boost::redis::config::health_check_interval`. The message passed to
111
+ * `PING` will be `boost::redis::config::health_check_id`. Passing a
112
+ * timeout with value zero will disable health-checks. If the Redis
113
+ * server does not respond to a health-check within two times the value
114
+ * specified here, it will be considered unresponsive and the connection
115
+ * will be closed and a new connection will be stablished.
116
+ * 5. Starts read and write operations with the Redis
117
+ * server. More specifically it will trigger the write of all
118
+ * requests i.e. calls to `async_exec` that happened prior to this
119
+ * call.
101
120
*
102
- * This connection class adds reconnection functionality to
103
- * `boost::redis::connection_base::async_run_one`. When a
104
- * connection is lost for any reason, a new one is stablished
105
- * automatically. To disable reconnection call
121
+ * When a connection is lost for any reason, a new one is
122
+ * stablished automatically. To disable reconnection call
106
123
* `boost::redis::connection::cancel(operation::reconnection)`.
107
124
*
108
125
* @param cfg Configuration paramters.
@@ -115,11 +132,6 @@ class basic_connection : public connection_base<Executor> {
115
132
* void f(system::error_code);
116
133
* @endcode
117
134
*
118
- * @remarks
119
- *
120
- * * This function will complete only if reconnection was disabled
121
- * and the connection is lost.
122
- *
123
135
* For example on how to call this function refer to
124
136
* cpp20_intro.cpp or any other example.
125
137
*/
@@ -132,6 +144,8 @@ class basic_connection : public connection_base<Executor> {
132
144
Logger l = Logger{},
133
145
CompletionToken token = CompletionToken{})
134
146
{
147
+ using this_type = basic_connection<executor_type>;
148
+
135
149
cfg_ = cfg;
136
150
l.set_prefix (cfg_.log_prefix );
137
151
return asio::async_compose
@@ -140,6 +154,77 @@ class basic_connection : public connection_base<Executor> {
140
154
>(detail::reconnection_op<this_type, Logger>{this , l}, token, timer_);
141
155
}
142
156
157
+ /* * @brief Receives server side pushes asynchronously.
158
+ *
159
+ * When pushes arrive and there is no `async_receive` operation in
160
+ * progress, pushed data, requests, and responses will be paused
161
+ * until `async_receive` is called again. Apps will usually want
162
+ * to call `async_receive` in a loop.
163
+ *
164
+ * To cancel an ongoing receive operation apps should call
165
+ * `connection::cancel(operation::receive)`.
166
+ *
167
+ * @param response Response object.
168
+ * @param token Completion token.
169
+ *
170
+ * For an example see cpp20_subscriber.cpp. The completion token must
171
+ * have the following signature
172
+ *
173
+ * @code
174
+ * void f(system::error_code, std::size_t);
175
+ * @endcode
176
+ *
177
+ * Where the second parameter is the size of the push received in
178
+ * bytes.
179
+ */
180
+ template <
181
+ class Response = ignore_t ,
182
+ class CompletionToken = asio::default_completion_token_t <executor_type>
183
+ >
184
+ auto
185
+ async_receive (
186
+ Response& response,
187
+ CompletionToken token = CompletionToken{})
188
+ {
189
+ return impl_.async_receive (response, token);
190
+ }
191
+
192
+ /* * @brief Executes commands on the Redis server asynchronously.
193
+ *
194
+ * This function sends a request to the Redis server and waits for
195
+ * the responses to each individual command in the request. If the
196
+ * request contains only commands that don't expect a response,
197
+ * the completion occurs after it has been written to the
198
+ * underlying stream. Multiple concurrent calls to this function
199
+ * will be automatically queued by the implementation.
200
+ *
201
+ * @param req Request.
202
+ * @param resp Response.
203
+ * @param token Completion token.
204
+ *
205
+ * For an example see cpp20_echo_server.cpp. The completion token must
206
+ * have the following signature
207
+ *
208
+ * @code
209
+ * void f(system::error_code, std::size_t);
210
+ * @endcode
211
+ *
212
+ * Where the second parameter is the size of the response received
213
+ * in bytes.
214
+ */
215
+ template <
216
+ class Response = ignore_t ,
217
+ class CompletionToken = asio::default_completion_token_t <executor_type>
218
+ >
219
+ auto
220
+ async_exec (
221
+ request const & req,
222
+ Response& resp = ignore,
223
+ CompletionToken token = CompletionToken{})
224
+ {
225
+ return impl_.async_exec (req, resp, token);
226
+ }
227
+
143
228
/* * @brief Cancel operations.
144
229
*
145
230
* @li `operation::exec`: Cancels operations started with
@@ -152,7 +237,7 @@ class basic_connection : public connection_base<Executor> {
152
237
* @param op: The operation to be cancelled.
153
238
* @returns The number of operations that have been canceled.
154
239
*/
155
- void cancel (operation op = operation::all) override
240
+ void cancel (operation op = operation::all)
156
241
{
157
242
switch (op) {
158
243
case operation::reconnection:
@@ -163,16 +248,51 @@ class basic_connection : public connection_base<Executor> {
163
248
default : /* ignore */ ;
164
249
}
165
250
166
- base_type:: cancel (op);
251
+ impl_. cancel (op);
167
252
}
168
253
169
254
// / Returns true if the connection was canceled.
170
255
bool will_reconnect () const noexcept
171
256
{ return cfg_.reconnect_wait_interval != std::chrono::seconds::zero ();}
172
257
173
- private:
174
- config cfg_;
258
+ /* * @brief Reserve memory on the read and write internal buffers.
259
+ *
260
+ * This function will call `std::string::reserve` on the
261
+ * underlying buffers.
262
+ *
263
+ * @param read The new capacity of the read buffer.
264
+ * @param write The new capacity of the write buffer.
265
+ */
266
+ void reserve (std::size_t read, std::size_t write)
267
+ {
268
+ impl_.reserve (read, write);
269
+ }
270
+
271
+ // / Sets the maximum size of the read buffer.
272
+ void set_max_buffer_read_size (std::size_t max_read_size) noexcept
273
+ { impl_.set_max_buffer_read_size (max_read_size); }
175
274
275
+ // / Returns the ssl context.
276
+ auto const & get_ssl_context () const noexcept
277
+ { return impl_.get_ssl_context ();}
278
+
279
+ // / Returns the ssl context.
280
+ auto & get_ssl_context () noexcept
281
+ { return impl_.get_ssl_context ();}
282
+
283
+ // / Resets the underlying stream.
284
+ void reset_stream ()
285
+ { impl_.reset_stream (); }
286
+
287
+ // / Returns a reference to the next layer.
288
+ auto & next_layer () noexcept
289
+ { return impl_.next_layer (); }
290
+
291
+ // / Returns a const reference to the next layer.
292
+ auto const & next_layer () const noexcept
293
+ { return impl_.next_layer (); }
294
+
295
+ private:
176
296
using timer_type =
177
297
asio::basic_waitable_timer<
178
298
std::chrono::steady_clock,
@@ -181,11 +301,19 @@ class basic_connection : public connection_base<Executor> {
181
301
182
302
template <class , class > friend struct detail ::reconnection_op;
183
303
304
+ config cfg_;
305
+ detail::connection_base<executor_type> impl_;
184
306
timer_type timer_;
185
307
};
186
308
187
- /* * \brief A connection that uses the asio::any_io_executor .
309
+ /* * \brief A basic_connection that type erases the executor .
188
310
* \ingroup high-level-api
311
+ *
312
+ * This connection type uses the asio::any_io_executor and
313
+ * asio::any_completion_token to reduce compilation times.
314
+ *
315
+ * For documentaiton of each member function see
316
+ * `boost::redis::basic_connection`.
189
317
*/
190
318
class connection {
191
319
public:
@@ -198,8 +326,11 @@ class connection {
198
326
// / Contructs from a context.
199
327
explicit connection (asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client);
200
328
201
- executor_type get_executor () noexcept { return impl_.get_executor (); }
329
+ // / Returns the underlying executor.
330
+ executor_type get_executor () noexcept
331
+ { return impl_.get_executor (); }
202
332
333
+ // / Calls `boost::redis::basic_connection::async_run`.
203
334
template <class CompletionToken >
204
335
auto async_run (config const & cfg, logger l, CompletionToken token)
205
336
{
@@ -211,31 +342,38 @@ class connection {
211
342
}, token, this , &cfg, l);
212
343
}
213
344
345
+ // / Calls `boost::redis::basic_connection::async_receive`.
214
346
template <class Response , class CompletionToken >
215
347
auto async_receive (Response& response, CompletionToken token)
216
348
{
217
349
return impl_.async_receive (response, std::move (token));
218
350
}
219
351
352
+ // / Calls `boost::redis::basic_connection::async_exec`.
220
353
template <class Response , class CompletionToken >
221
354
auto async_exec (request const & req, Response& resp, CompletionToken token)
222
355
{
223
356
return impl_.async_exec (req, resp, std::move (token));
224
357
}
225
358
359
+ // / Calls `boost::redis::basic_connection::cancel`.
226
360
void cancel (operation op = operation::all);
227
361
228
- // / Returns true if the connection was canceled .
362
+ // / Calls `boost::redis::basic_connection::will_reconnect` .
229
363
bool will_reconnect () const noexcept
230
364
{ return impl_.will_reconnect ();}
231
365
232
- // / Returns a reference to the next layer.
233
- auto & next_layer () noexcept { return impl_.next_layer (); }
366
+ // / Calls `boost::redis::basic_connection::next_layer`.
367
+ auto & next_layer () noexcept
368
+ { return impl_.next_layer (); }
234
369
235
- // / Returns a const reference to the next layer.
236
- auto const & next_layer () const noexcept { return impl_.next_layer (); }
370
+ // / Calls `boost::redis::basic_connection::next_layer`.
371
+ auto const & next_layer () const noexcept
372
+ { return impl_.next_layer (); }
237
373
238
- void reset_stream () { impl_.reset_stream ();}
374
+ // / Calls `boost::redis::basic_connection::reset_stream`.
375
+ void reset_stream ()
376
+ { impl_.reset_stream ();}
239
377
240
378
private:
241
379
void
0 commit comments