diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 93b7e7fd..c1afec39 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -209,52 +210,31 @@ struct connection_impl { template struct writer_op { connection_impl* conn_; - asio::coroutine coro{}; + writer_fsm fsm_; + + explicit writer_op(connection_impl& conn) noexcept + : conn_(&conn) + , fsm_(conn.mpx_, conn.logger_) + { } template - void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) + void operator()(Self& self, system::error_code ec = {}, std::size_t = {}) { - ignore_unused(n); - - BOOST_ASIO_CORO_REENTER(coro) for (;;) - { - while (conn_->mpx_.prepare_write() != 0) { - BOOST_ASIO_CORO_YIELD - asio::async_write( - conn_->stream_, - asio::buffer(conn_->mpx_.get_write_buffer()), - std::move(self)); - - conn_->logger_.on_write(ec, conn_->mpx_.get_write_buffer().size()); - - if (ec) { - conn_->logger_.trace("writer_op (1)", ec); - conn_->cancel(operation::run); - self.complete(ec); - return; - } - - conn_->mpx_.commit_write(); - - // A socket.close() may have been called while a - // successful write might had already been queued, so we - // have to check here before proceeding. - if (!conn_->is_open()) { - conn_->logger_.trace("writer_op (2): connection is closed."); - self.complete({}); + for (;;) { + auto act = fsm_.resume(ec, self.get_cancellation_state().cancelled()); + + switch (act.type()) { + case writer_action_type::done: self.complete(act.error()); return; + case writer_action_type::write: + asio::async_write( + conn_->stream_, + asio::buffer(conn_->mpx_.get_write_buffer()), + std::move(self)); return; - } - } - - BOOST_ASIO_CORO_YIELD - conn_->writer_timer_.async_wait(std::move(self)); - if (!conn_->is_open()) { - conn_->logger_.trace("writer_op (3): connection is closed."); - // Notice this is not an error of the op, stoping was - // requested from the outside, so we complete with - // success. - self.complete({}); - return; + case writer_action_type::wait: conn_->writer_timer_.async_wait(std::move(self)); return; + case writer_action_type::cancel_run: + conn_->cancel(operation::run); + continue; // This op doesn't need yielding } } } @@ -339,7 +319,7 @@ class run_op { auto writer(CompletionToken&& token) { return asio::async_compose( - writer_op{conn_}, + writer_op{*conn_}, std::forward(token), conn_->writer_timer_); } diff --git a/include/boost/redis/detail/connection_logger.hpp b/include/boost/redis/detail/connection_logger.hpp index 34d6a4b7..73a02bec 100644 --- a/include/boost/redis/detail/connection_logger.hpp +++ b/include/boost/redis/detail/connection_logger.hpp @@ -37,7 +37,6 @@ class connection_logger { void on_connect(system::error_code const& ec, asio::ip::tcp::endpoint const& ep); void on_connect(system::error_code const& ec, std::string_view unix_socket_ep); void on_ssl_handshake(system::error_code const& ec); - void on_write(system::error_code const& ec, std::size_t n); void on_fsm_resume(reader_fsm::action const& action); void on_hello(system::error_code const& ec, generic_response const& resp); void log(logger::level lvl, std::string_view msg); @@ -47,6 +46,15 @@ class connection_logger { { log(logger::level::debug, op, ec); } + + template + void log_fn(logger::level lvl, Fn fn) + { + if (logger_.lvl < lvl) + return; + fn(msg_); + logger_.fn(lvl, msg_); + } }; } // namespace boost::redis::detail diff --git a/include/boost/redis/detail/writer_fsm.hpp b/include/boost/redis/detail/writer_fsm.hpp new file mode 100644 index 00000000..0f4361ba --- /dev/null +++ b/include/boost/redis/detail/writer_fsm.hpp @@ -0,0 +1,67 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_WRITER_FSM_HPP +#define BOOST_REDIS_WRITER_FSM_HPP + +#include +#include + +// Sans-io algorithm for the writer task, as a finite state machine + +namespace boost::redis::detail { + +// Forward decls +class connection_logger; +class multiplexer; + +// What should we do next? +enum class writer_action_type +{ + done, // Call the final handler + write, // Issue a write on the stream + wait, // Wait until there is data to be written + cancel_run, // Cancel the connection's run operation +}; + +class writer_action { + writer_action_type type_; + system::error_code ec_; + +public: + writer_action(writer_action_type type) noexcept + : type_{type} + { } + + writer_action(system::error_code ec) noexcept + : type_{writer_action_type::done} + , ec_{ec} + { } + + writer_action_type type() const { return type_; } + system::error_code error() const { return ec_; } +}; + +class writer_fsm { + int resume_point_{0}; + multiplexer* mpx_; + connection_logger* logger_; + system::error_code stored_ec_; + +public: + writer_fsm(multiplexer& mpx, connection_logger& logger) noexcept + : mpx_(&mpx) + , logger_(&logger) + { } + + writer_action resume(system::error_code ec, asio::cancellation_type_t cancel_state); +}; + +} // namespace boost::redis::detail + +#endif // BOOST_REDIS_CONNECTOR_HPP diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index 54f8b33f..d285c69a 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -5,6 +5,7 @@ */ #include +#include #include #include @@ -141,22 +142,6 @@ void connection_logger::on_ssl_handshake(system::error_code const& ec) logger_.fn(logger::level::info, msg_); } -void connection_logger::on_write(system::error_code const& ec, std::size_t n) -{ - if (logger_.lvl < logger::level::info) - return; - - msg_ = "writer_op: "; - if (ec) { - format_error_code(ec, msg_); - } else { - msg_ += std::to_string(n); - msg_ += " bytes written."; - } - - logger_.fn(logger::level::info, msg_); -} - void connection_logger::on_fsm_resume(reader_fsm::action const& action) { if (logger_.lvl < logger::level::debug) diff --git a/include/boost/redis/impl/exec_fsm.ipp b/include/boost/redis/impl/exec_fsm.ipp index 25675e02..fdea6804 100644 --- a/include/boost/redis/impl/exec_fsm.ipp +++ b/include/boost/redis/impl/exec_fsm.ipp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -18,13 +19,6 @@ namespace boost::redis::detail { -inline bool is_cancellation(asio::cancellation_type_t type) -{ - return !!( - type & (asio::cancellation_type_t::total | asio::cancellation_type_t::partial | - asio::cancellation_type_t::terminal)); -} - exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t cancel_state) { switch (resume_point_) { diff --git a/include/boost/redis/impl/is_cancellation.hpp b/include/boost/redis/impl/is_cancellation.hpp new file mode 100644 index 00000000..7a5d88f2 --- /dev/null +++ b/include/boost/redis/impl/is_cancellation.hpp @@ -0,0 +1,25 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_IS_CANCELLATION_HPP +#define BOOST_REDIS_IS_CANCELLATION_HPP + +#include + +namespace boost::redis::detail { + +inline bool is_cancellation(asio::cancellation_type_t type) +{ + return !!( + type & (asio::cancellation_type_t::total | asio::cancellation_type_t::partial | + asio::cancellation_type_t::terminal)); +} + +} // namespace boost::redis::detail + +#endif diff --git a/include/boost/redis/impl/writer_fsm.ipp b/include/boost/redis/impl/writer_fsm.ipp new file mode 100644 index 00000000..0a098f5a --- /dev/null +++ b/include/boost/redis/impl/writer_fsm.ipp @@ -0,0 +1,84 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_WRITER_FSM_IPP +#define BOOST_REDIS_WRITER_FSM_IPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace boost::redis::detail { + +inline void log_write_success(connection_logger& logger, std::size_t bytes_written) +{ + logger.log_fn(logger::level::info, [bytes_written](std::string& buff) { + buff = "Writer task: "; + buff += std::to_string(bytes_written); + buff += " bytes written."; + }); +} + +writer_action writer_fsm::resume(system::error_code ec, asio::cancellation_type_t cancel_state) +{ + switch (resume_point_) { + BOOST_REDIS_CORO_INITIAL + + for (;;) { + // Attempt to write while we have requests ready to send + while (mpx_->prepare_write() != 0u) { + // Write + BOOST_REDIS_YIELD(resume_point_, 1, writer_action_type::write) + + // A failed write means that we should tear down the connection + if (ec) { + logger_->log(logger::level::err, "Writer task error: ", ec); + stored_ec_ = ec; + BOOST_REDIS_YIELD(resume_point_, 2, writer_action_type::cancel_run) + return stored_ec_; + } + + // Log what we wrote + log_write_success(*logger_, mpx_->get_write_buffer().size()); + + // Mark requests as written + mpx_->commit_write(); + + // Check for cancellations + if (is_cancellation(cancel_state)) { + logger_->trace("Writer task cancelled (1)."); + return system::error_code(asio::error::operation_aborted); + } + } + + // No more requests ready to be written. Wait for more + BOOST_REDIS_YIELD(resume_point_, 3, writer_action_type::wait) + + // Check for cancellations + if (is_cancellation(cancel_state)) { + logger_->trace("Writer task cancelled (2)."); + return system::error_code(asio::error::operation_aborted); + } + } + } + + // We should never reach here + BOOST_ASSERT(false); + return system::error_code(); +} + +} // namespace boost::redis::detail + +#endif diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 144b3a4a..26b6cd95 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 24cf9410..b17c4c68 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -41,6 +41,7 @@ make_test(test_exec_fsm) make_test(test_log_to_file) make_test(test_conn_logging) make_test(test_reader_fsm) +make_test(test_connection_logger) # Tests that require a real Redis server make_test(test_conn_quit) diff --git a/test/Jamfile b/test/Jamfile index f5425573..ded74dae 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -57,6 +57,7 @@ local tests = test_log_to_file test_conn_logging test_reader_fsm + test_connection_logger ; # Build and run the tests diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index e02c3bce..657b948a 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -150,7 +150,7 @@ BOOST_AUTO_TEST_CASE(correct_database) auto conn = std::make_shared(ioc); request req; - req.push("CLIENT", "LIST"); + req.push("CLIENT", "INFO"); generic_response resp; diff --git a/test/test_connection_logger.cpp b/test/test_connection_logger.cpp new file mode 100644 index 00000000..0bc9d3b9 --- /dev/null +++ b/test/test_connection_logger.cpp @@ -0,0 +1,154 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +using namespace boost::redis; +using detail::connection_logger; +using boost::system::error_code; + +namespace boost::redis { + +// Printing +std::ostream& operator<<(std::ostream& os, logger::level lvl) +{ + switch (lvl) { + case logger::level::disabled: return os << "logger::level::disabled"; + case logger::level::emerg: return os << "logger::level::emerg"; + case logger::level::alert: return os << "logger::level::alert"; + case logger::level::crit: return os << "logger::level::crit"; + case logger::level::err: return os << "logger::level::err"; + case logger::level::warning: return os << "logger::level::warning"; + case logger::level::notice: return os << "logger::level::notice"; + case logger::level::info: return os << "logger::level::info"; + case logger::level::debug: return os << "logger::level::debug"; + default: return os << ""; + } +} + +} // namespace boost::redis + +namespace { + +// Mock logger that records the last issued message and +// the number of issued messages +struct fixture { + std::size_t num_msgs{}; + logger::level msg_level{}; + std::string msg; + connection_logger logger; + + explicit fixture(logger::level lvl) + : logger({lvl, [this](logger::level lvl, std::string_view msg) { + ++this->num_msgs; + this->msg_level = lvl; + this->msg = msg; + }}) + { } +}; + +// log with only a message +void test_log_message() +{ + // Setup + fixture fix{logger::level::warning}; + + // Issuing a message with level > the one configured logs it + fix.logger.log(logger::level::alert, "some message"); + BOOST_TEST_EQ(fix.num_msgs, 1u); + BOOST_TEST_EQ(fix.msg_level, logger::level::alert); + BOOST_TEST_EQ(fix.msg, "some message"); + + // Issuing a message with level == the one configured logs it. + // Internal buffers are cleared + fix.logger.log(logger::level::warning, "other thing"); + BOOST_TEST_EQ(fix.num_msgs, 2u); + BOOST_TEST_EQ(fix.msg_level, logger::level::warning); + BOOST_TEST_EQ(fix.msg, "other thing"); + + // Issuing a message with level < the one configured does not log it. + fix.logger.log(logger::level::info, "bad"); + BOOST_TEST_EQ(fix.num_msgs, 2u); +} + +// log with a message and an error code +void test_log_message_errorcode() +{ + // Setup + fixture fix{logger::level::warning}; + + // Issuing a message with level > the one configured logs it + fix.logger.log(logger::level::alert, "Some message", error::connect_timeout); + BOOST_TEST_EQ(fix.num_msgs, 1u); + BOOST_TEST_EQ(fix.msg_level, logger::level::alert); + BOOST_TEST_EQ(fix.msg, "Some message: Connect timeout. [boost.redis:18]"); + + // Issuing a message with level == the one configured logs it. + // Internal buffers are cleared. + // Source code info is not printed + constexpr auto loc = BOOST_CURRENT_LOCATION; + fix.logger.log(logger::level::warning, "Other thing", error_code(error::empty_field, &loc)); + BOOST_TEST_EQ(fix.num_msgs, 2u); + BOOST_TEST_EQ(fix.msg_level, logger::level::warning); + BOOST_TEST_EQ(fix.msg, "Other thing: Expected field value is empty. [boost.redis:5]"); + + // Issuing a message with level < the one configured does not log it. + fix.logger.log(logger::level::info, "bad", error::expects_resp3_map); + BOOST_TEST_EQ(fix.num_msgs, 2u); +} + +// log with a function +void test_log_fn() +{ + // Setup + fixture fix{logger::level::warning}; + + // Issuing a message with level > the one configured logs it + fix.logger.log_fn(logger::level::alert, [](std::string& buff) { + buff = "Some message"; + }); + BOOST_TEST_EQ(fix.num_msgs, 1u); + BOOST_TEST_EQ(fix.msg_level, logger::level::alert); + BOOST_TEST_EQ(fix.msg, "Some message"); + + // Issuing a message with level == the one configured logs it. + // Internal buffers are cleared. + fix.logger.log_fn(logger::level::warning, [](std::string& buff) { + buff = "This is another message."; + }); + BOOST_TEST_EQ(fix.num_msgs, 2u); + BOOST_TEST_EQ(fix.msg_level, logger::level::warning); + BOOST_TEST_EQ(fix.msg, "This is another message."); + + // Issuing a message with level < the one configured does not log it. + fix.logger.log_fn(logger::level::info, [](std::string& buff) { + buff = "This message should not be logged."; + }); + BOOST_TEST_EQ(fix.num_msgs, 2u); +} + +} // namespace + +int main() +{ + test_log_message(); + test_log_message_errorcode(); + test_log_fn(); + + return boost::report_errors(); +}