Skip to content

Commit a39d130

Browse files
authored
Adds a redis_stream class that encapsulates connection, reading and writing (#266)
Deprecates get_ssl_context for connection and basic_connection Deprecates reset_stream for connection and basic_connection
1 parent 6d5b550 commit a39d130

File tree

5 files changed

+247
-543
lines changed

5 files changed

+247
-543
lines changed

include/boost/redis/connection.hpp

Lines changed: 45 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@
1010
#include <boost/redis/adapter/adapt.hpp>
1111
#include <boost/redis/adapter/any_adapter.hpp>
1212
#include <boost/redis/config.hpp>
13-
#include <boost/redis/detail/connector.hpp>
1413
#include <boost/redis/detail/health_checker.hpp>
1514
#include <boost/redis/detail/helper.hpp>
1615
#include <boost/redis/detail/multiplexer.hpp>
17-
#include <boost/redis/detail/resolver.hpp>
16+
#include <boost/redis/detail/redis_stream.hpp>
1817
#include <boost/redis/detail/resp3_handshaker.hpp>
1918
#include <boost/redis/error.hpp>
2019
#include <boost/redis/logger.hpp>
@@ -48,9 +47,8 @@
4847
#include <array>
4948
#include <chrono>
5049
#include <cstddef>
51-
#include <deque>
52-
#include <functional>
5350
#include <memory>
51+
#include <utility>
5452

5553
namespace boost::redis {
5654
namespace detail {
@@ -192,19 +190,11 @@ struct writer_op {
192190
BOOST_ASIO_CORO_REENTER(coro) for (;;)
193191
{
194192
while (conn_->mpx_.prepare_write() != 0) {
195-
if (conn_->use_ssl()) {
196-
BOOST_ASIO_CORO_YIELD
197-
asio::async_write(
198-
conn_->next_layer(),
199-
asio::buffer(conn_->mpx_.get_write_buffer()),
200-
std::move(self));
201-
} else {
202-
BOOST_ASIO_CORO_YIELD
203-
asio::async_write(
204-
conn_->next_layer().next_layer(),
205-
asio::buffer(conn_->mpx_.get_write_buffer()),
206-
std::move(self));
207-
}
193+
BOOST_ASIO_CORO_YIELD
194+
asio::async_write(
195+
conn_->stream_,
196+
asio::buffer(conn_->mpx_.get_write_buffer()),
197+
std::move(self));
208198

209199
logger_.on_write(ec, conn_->mpx_.get_write_buffer());
210200

@@ -262,21 +252,12 @@ struct reader_op {
262252
BOOST_ASIO_CORO_REENTER(coro) for (;;)
263253
{
264254
// Appends some data to the buffer if necessary.
265-
if (conn_->use_ssl()) {
266-
BOOST_ASIO_CORO_YIELD
267-
async_append_some(
268-
conn_->next_layer(),
269-
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
270-
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
271-
std::move(self));
272-
} else {
273-
BOOST_ASIO_CORO_YIELD
274-
async_append_some(
275-
conn_->next_layer().next_layer(),
276-
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
277-
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
278-
std::move(self));
279-
}
255+
BOOST_ASIO_CORO_YIELD
256+
async_append_some(
257+
conn_->stream_,
258+
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
259+
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
260+
std::move(self));
280261

281262
logger_.on_read(ec, n);
282263

@@ -350,6 +331,12 @@ class run_op {
350331
, logger_{l}
351332
{ }
352333

334+
template <class Self>
335+
void operator()(Self& self, system::error_code ec)
336+
{
337+
(*this)(self, order_t{}, ec);
338+
}
339+
353340
template <class Self>
354341
void operator()(
355342
Self& self,
@@ -362,45 +349,16 @@ class run_op {
362349
{
363350
BOOST_ASIO_CORO_REENTER(coro_) for (;;)
364351
{
352+
// Try to connect
365353
BOOST_ASIO_CORO_YIELD
366-
conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t{}));
367-
368-
logger_.on_resolve(ec0, conn_->resv_.results());
354+
conn_->stream_.async_connect(&conn_->cfg_, logger_, std::move(self));
369355

356+
// If we failed, try again
370357
if (ec0) {
371358
self.complete(ec0);
372359
return;
373360
}
374361

375-
BOOST_ASIO_CORO_YIELD
376-
conn_->ctor_.async_connect(
377-
conn_->next_layer().next_layer(),
378-
conn_->resv_.results(),
379-
asio::prepend(std::move(self), order_t{}));
380-
381-
logger_.on_connect(ec0, conn_->ctor_.endpoint());
382-
383-
if (ec0) {
384-
self.complete(ec0);
385-
return;
386-
}
387-
388-
if (conn_->use_ssl()) {
389-
BOOST_ASIO_CORO_YIELD
390-
conn_->next_layer().async_handshake(
391-
asio::ssl::stream_base::client,
392-
asio::prepend(
393-
asio::cancel_after(conn_->cfg_.ssl_handshake_timeout, std::move(self)),
394-
order_t{}));
395-
396-
logger_.on_ssl_handshake(ec0);
397-
398-
if (ec0) {
399-
self.complete(ec0);
400-
return;
401-
}
402-
}
403-
404362
conn_->mpx_.reset();
405363

406364
// Note: Order is important here because the writer might
@@ -459,8 +417,6 @@ class run_op {
459417
self.complete(asio::error::operation_aborted);
460418
return;
461419
}
462-
463-
conn_->reset_stream();
464420
}
465421
}
466422
};
@@ -506,12 +462,10 @@ class basic_connection {
506462
explicit basic_connection(
507463
executor_type ex,
508464
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client})
509-
: ctx_{std::move(ctx)}
510-
, stream_{std::make_unique<next_layer_type>(ex, ctx_)}
465+
: stream_{ex, std::move(ctx)}
511466
, writer_timer_{ex}
512467
, reconnect_timer_{ex}
513468
, receive_channel_{ex, 256}
514-
, resv_{ex}
515469
, health_checker_{ex}
516470
{
517471
set_receive_response(ignore);
@@ -568,8 +522,6 @@ class basic_connection {
568522
auto async_run(config const& cfg = {}, Logger l = Logger{}, CompletionToken&& token = {})
569523
{
570524
cfg_ = cfg;
571-
resv_.set_config(cfg);
572-
ctor_.set_config(cfg);
573525
health_checker_.set_config(cfg);
574526
handshaker_.set_config(cfg);
575527
l.set_prefix(cfg.log_prefix);
@@ -711,7 +663,7 @@ class basic_connection {
711663
void cancel(operation op = operation::all)
712664
{
713665
switch (op) {
714-
case operation::resolve: resv_.cancel(); break;
666+
case operation::resolve: stream_.cancel_resolve(); break;
715667
case operation::exec: mpx_.cancel_waiting(); break;
716668
case operation::reconnection:
717669
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
@@ -720,7 +672,7 @@ class basic_connection {
720672
case operation::receive: receive_channel_.cancel(); break;
721673
case operation::health_check: health_checker_.cancel(); break;
722674
case operation::all:
723-
resv_.cancel();
675+
stream_.cancel_resolve();
724676
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
725677
health_checker_.cancel();
726678
cancel_run(); // run
@@ -740,25 +692,28 @@ class basic_connection {
740692
}
741693

742694
/// Returns the ssl context.
743-
auto const& get_ssl_context() const noexcept { return ctx_; }
695+
BOOST_DEPRECATED(
696+
"ssl::context has no const methods, so this function should not be called. Set up any "
697+
"required TLS configuration before passing the ssl::context to the connection's constructor.")
698+
auto const& get_ssl_context() const noexcept { return stream_.get_ssl_context(); }
744699

745700
/// Resets the underlying stream.
746-
void reset_stream()
747-
{
748-
stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
749-
}
701+
BOOST_DEPRECATED(
702+
"This function is no longer necessary and is currently a no-op. connection resets the stream "
703+
"internally as required. This function will be removed in subsequent releases")
704+
void reset_stream() { }
750705

751706
/// Returns a reference to the next layer.
752707
BOOST_DEPRECATED(
753708
"Accessing the underlying stream is deprecated and will be removed in the next release. Use "
754709
"the other member functions to interact with the connection.")
755-
auto& next_layer() noexcept { return *stream_; }
710+
auto& next_layer() noexcept { return stream_.next_layer(); }
756711

757712
/// Returns a const reference to the next layer.
758713
BOOST_DEPRECATED(
759714
"Accessing the underlying stream is deprecated and will be removed in the next release. Use "
760715
"the other member functions to interact with the connection.")
761-
auto const& next_layer() const noexcept { return *stream_; }
716+
auto const& next_layer() const noexcept { return stream_.next_layer(); }
762717

763718
/// Sets the response object of `async_receive` operations.
764719
template <class Response>
@@ -778,15 +733,14 @@ class basic_connection {
778733
using receive_channel_type = asio::experimental::channel<
779734
executor_type,
780735
void(system::error_code, std::size_t)>;
781-
using resolver_type = detail::resolver<Executor>;
782736
using health_checker_type = detail::health_checker<Executor>;
783737
using resp3_handshaker_type = detail::resp3_handshaker<executor_type>;
784738

785739
auto use_ssl() const noexcept { return cfg_.use_ssl; }
786740

787741
void cancel_run()
788742
{
789-
close();
743+
stream_.close();
790744
writer_timer_.cancel();
791745
receive_channel_.cancel();
792746
mpx_.cancel_on_conn_lost();
@@ -815,30 +769,18 @@ class basic_connection {
815769
writer_timer_);
816770
}
817771

818-
void close()
819-
{
820-
if (stream_->next_layer().is_open()) {
821-
system::error_code ec;
822-
stream_->next_layer().close(ec);
823-
}
824-
}
825-
826-
auto is_open() const noexcept { return stream_->next_layer().is_open(); }
827-
auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
772+
auto is_open() const noexcept { return stream_.is_open(); }
828773

829774
[[nodiscard]] bool trigger_write() const noexcept { return is_open() && !mpx_.is_writing(); }
830775

831-
asio::ssl::context ctx_;
832-
std::unique_ptr<next_layer_type> stream_;
776+
detail::redis_stream<Executor> stream_;
833777

834778
// Notice we use a timer to simulate a condition-variable. It is
835779
// also more suitable than a channel and the notify operation does
836780
// not suspend.
837781
timer_type writer_timer_;
838782
timer_type reconnect_timer_; // to wait the reconnection period
839783
receive_channel_type receive_channel_;
840-
resolver_type resv_;
841-
detail::connector ctor_;
842784
health_checker_type health_checker_;
843785
resp3_handshaker_type handshaker_;
844786

@@ -937,7 +879,10 @@ class connection {
937879
auto const& next_layer() const noexcept { return impl_.next_layer(); }
938880

939881
/// Calls `boost::redis::basic_connection::reset_stream`.
940-
void reset_stream() { impl_.reset_stream(); }
882+
BOOST_DEPRECATED(
883+
"This function is no longer necessary and is currently a no-op. connection resets the stream "
884+
"internally as required. This function will be removed in subsequent releases")
885+
void reset_stream() { }
941886

942887
/// Sets the response object of `async_receive` operations.
943888
template <class Response>
@@ -950,6 +895,9 @@ class connection {
950895
usage get_usage() const noexcept { return impl_.get_usage(); }
951896

952897
/// Returns the ssl context.
898+
BOOST_DEPRECATED(
899+
"ssl::context has no const methods, so this function should not be called. Set up any "
900+
"required TLS configuration before passing the ssl::context to the connection's constructor.")
953901
auto const& get_ssl_context() const noexcept { return impl_.get_ssl_context(); }
954902

955903
private:

include/boost/redis/detail/connector.hpp

Lines changed: 0 additions & 85 deletions
This file was deleted.

0 commit comments

Comments
 (0)