diff --git a/example/cpp20_chat_room.cpp b/example/cpp20_chat_room.cpp index 17dfd0f5..3e3a0d17 100644 --- a/example/cpp20_chat_room.cpp +++ b/example/cpp20_chat_room.cpp @@ -31,7 +31,7 @@ using boost::asio::redirect_error; using boost::asio::use_awaitable; using boost::redis::config; using boost::redis::connection; -using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::ignore; using boost::redis::request; using boost::system::error_code; @@ -45,7 +45,7 @@ auto receiver(std::shared_ptr conn) -> awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_response resp; + generic_flat_response resp; conn->set_receive_response(resp); while (conn->will_reconnect()) { diff --git a/example/cpp20_streams.cpp b/example/cpp20_streams.cpp index 1cef143f..14c14087 100644 --- a/example/cpp20_streams.cpp +++ b/example/cpp20_streams.cpp @@ -23,7 +23,7 @@ namespace net = boost::asio; using boost::redis::config; -using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::operation; using boost::redis::request; using boost::redis::connection; @@ -33,7 +33,7 @@ auto stream_reader(std::shared_ptr conn) -> net::awaitable { std::string redisStreamKey_; request req; - generic_response resp; + generic_flat_response resp; std::string stream_id{"$"}; std::string const field = "myfield"; @@ -51,7 +51,7 @@ auto stream_reader(std::shared_ptr conn) -> net::awaitable // The following approach was taken in order to be able to // deal with the responses, as generated by redis in the case // that there are multiple stream 'records' within a single - // generic_response. The nesting and number of values in + // generic_flat_response. The nesting and number of values in // resp.value() are different, depending on the contents // of the stream in redis. Uncomment the above commented-out // code for examples while running the XADD command. diff --git a/example/cpp20_subscriber.cpp b/example/cpp20_subscriber.cpp index d2ec462f..40017835 100644 --- a/example/cpp20_subscriber.cpp +++ b/example/cpp20_subscriber.cpp @@ -22,7 +22,7 @@ namespace asio = boost::asio; using namespace std::chrono_literals; using boost::redis::request; -using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::consume_one; using boost::redis::logger; using boost::redis::config; @@ -54,7 +54,7 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_response resp; + generic_flat_response resp; conn->set_receive_response(resp); // Loop while reconnection is enabled diff --git a/include/boost/redis/adapter/detail/adapters.hpp b/include/boost/redis/adapter/detail/adapters.hpp index 9c05b23d..ea0d655c 100644 --- a/include/boost/redis/adapter/detail/adapters.hpp +++ b/include/boost/redis/adapter/detail/adapters.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include @@ -19,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -136,8 +138,6 @@ void boost_redis_from_bulk(T& t, resp3::basic_node const& node, system:: from_bulk_impl::apply(t, node, ec); } -//================================================ - template class general_aggregate { private: @@ -174,6 +174,41 @@ class general_aggregate { } }; +template <> +class general_aggregate> { +private: + result* result_; + +public: + explicit general_aggregate(result* c = nullptr) + : result_(c) + { } + + void on_init() { } + void on_done() + { + if (result_->has_value()) { + result_->value().set_view(); + } + } + + template + void on_node(resp3::basic_node const& nd, system::error_code&) + { + BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); + switch (nd.data_type) { + case resp3::type::blob_error: + case resp3::type::simple_error: + *result_ = error{ + nd.data_type, + std::string{std::cbegin(nd.value), std::cend(nd.value)} + }; + break; + default: result_->value().add_node(nd); + } + } +}; + template class general_simple { private: diff --git a/include/boost/redis/adapter/detail/response_traits.hpp b/include/boost/redis/adapter/detail/response_traits.hpp index f24dc891..49006bf6 100644 --- a/include/boost/redis/adapter/detail/response_traits.hpp +++ b/include/boost/redis/adapter/detail/response_traits.hpp @@ -99,6 +99,14 @@ struct response_traits, Allocator>> static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } }; +template <> +struct response_traits { + using response_type = generic_flat_response; + using adapter_type = general_aggregate; + + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + template struct response_traits> { using response_type = response; diff --git a/include/boost/redis/adapter/detail/result_traits.hpp b/include/boost/redis/adapter/detail/result_traits.hpp index da625c99..2e158a39 100644 --- a/include/boost/redis/adapter/detail/result_traits.hpp +++ b/include/boost/redis/adapter/detail/result_traits.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include @@ -62,6 +63,13 @@ struct result_traits, Allocator>>> static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } }; +template <> +struct result_traits { + using response_type = generic_flat_response; + using adapter_type = adapter::detail::general_aggregate; + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + template using adapter_t = typename result_traits>::adapter_type; diff --git a/include/boost/redis/impl/response.ipp b/include/boost/redis/impl/response.ipp index a4b09a6e..21e7b287 100644 --- a/include/boost/redis/impl/response.ipp +++ b/include/boost/redis/impl/response.ipp @@ -11,15 +11,30 @@ namespace boost::redis { -void consume_one(generic_response& r, system::error_code& ec) +namespace { +template +auto& get_value(Container& c) +{ + return c; +} + +template <> +auto& get_value(flat_response_value& c) +{ + return c.view(); +} + +template +void consume_one_impl(Response& r, system::error_code& ec) { if (r.has_error()) return; // Nothing to consume. - if (std::empty(r.value())) + auto& value = get_value(r.value()); + if (std::empty(value)) return; // Nothing to consume. - auto const depth = r.value().front().depth; + auto const depth = value.front().depth; // To simplify we will refuse to consume any data-type that is not // a root node. I think there is no use for that and it is complex @@ -33,11 +48,17 @@ void consume_one(generic_response& r, system::error_code& ec) return e.depth == depth; }; - auto match = std::find_if(std::next(std::cbegin(r.value())), std::cend(r.value()), f); + auto match = std::find_if(std::next(std::cbegin(value)), std::cend(value), f); - r.value().erase(std::cbegin(r.value()), match); + value.erase(std::cbegin(value), match); } +} // namespace + +void consume_one(generic_response& r, system::error_code& ec) { consume_one_impl(r, ec); } + +void consume_one(generic_flat_response& r, system::error_code& ec) { consume_one_impl(r, ec); } + void consume_one(generic_response& r) { system::error_code ec; @@ -46,4 +67,12 @@ void consume_one(generic_response& r) throw system::system_error(ec); } +void consume_one(generic_flat_response& r) +{ + system::error_code ec; + consume_one(r, ec); + if (ec) + throw system::system_error(ec); +} + } // namespace boost::redis diff --git a/include/boost/redis/resp3/node.hpp b/include/boost/redis/resp3/node.hpp index 6e3f4aac..84fb5fca 100644 --- a/include/boost/redis/resp3/node.hpp +++ b/include/boost/redis/resp3/node.hpp @@ -59,6 +59,21 @@ using node = basic_node; /// A node in the response tree that does not own its data. using node_view = basic_node; +struct offset_string { + std::string_view data; + std::size_t offset{}; + std::size_t size{}; + + operator std::string() const { return std::string{data}; } + + friend std::ostream& operator<<(std::ostream& os, offset_string const& s) + { + return os << s.data; + } +}; + +using offset_node = basic_node; + } // namespace boost::redis::resp3 #endif // BOOST_REDIS_RESP3_NODE_HPP diff --git a/include/boost/redis/response.hpp b/include/boost/redis/response.hpp index 94fdd63d..304ca1e6 100644 --- a/include/boost/redis/response.hpp +++ b/include/boost/redis/response.hpp @@ -31,6 +31,82 @@ using response = std::tuple...>; */ using generic_response = adapter::result>; +/** + * Forward declaration to allow friendship with the template class + * that manages filling of flat_response_value. + */ +namespace adapter::detail { +template +class general_aggregate; +} + +struct flat_response_value { +public: + /// Reserve capacity for nodes and data storage. + void reserve(std::size_t num_nodes, std::size_t string_size) + { + data_.reserve(num_nodes * string_size); + view_.reserve(num_nodes); + } + + void clear() + { + data_.clear(); + view_.clear(); + } + + std::size_t size() const noexcept { return view_.size(); } + bool empty() noexcept { return view_.empty(); } + + resp3::offset_node& at(std::size_t index) { return view_.at(index); } + resp3::offset_node const& at(std::size_t index) const { return view_.at(index); } + + std::vector const& view() const { return view_; } + std::vector& view() { return view_; } + +private: + void set_view() + { + for (auto& node : view_) { + auto& offset_string = node.value; + offset_string.data = std::string_view{ + data_.data() + offset_string.offset, + offset_string.size}; + } + } + + template + void add_node(resp3::basic_node const& nd) + { + resp3::offset_string offset_string; + offset_string.offset = data_.size(); + offset_string.size = nd.value.size(); + + data_.append(nd.value.data(), nd.value.size()); + + resp3::offset_node new_node; + new_node.data_type = nd.data_type; + new_node.aggregate_size = nd.aggregate_size; + new_node.depth = nd.depth; + new_node.value = std::move(offset_string); + + view_.push_back(std::move(new_node)); + } + + template + friend class adapter::detail::general_aggregate; + + std::string data_; + std::vector view_; +}; + +/** @brief A memory-efficient generic response to a request. + * @ingroup high-level-api + * + * Uses a compact buffer to store RESP3 data with reduced allocations. + */ +using generic_flat_response = adapter::result; + /** @brief Consume on response from a generic response * * This function rotates the elements so that the start of the next @@ -72,12 +148,16 @@ using generic_response = adapter::result>; */ void consume_one(generic_response& r, system::error_code& ec); +/// Consume on response from a generic flat response +void consume_one(generic_flat_response& r, system::error_code& ec); + /** - * @brief Throwing overload of `consume_one`. + * @brief Throwing overloads of `consume_one`. * * @param r The response to modify. */ void consume_one(generic_response& r); +void consume_one(generic_flat_response& r); } // namespace boost::redis diff --git a/test/test_any_adapter.cpp b/test/test_any_adapter.cpp index f0345ac1..6d4ee82e 100644 --- a/test/test_any_adapter.cpp +++ b/test/test_any_adapter.cpp @@ -13,6 +13,7 @@ #include using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::response; using boost::redis::ignore; using boost::redis::any_adapter; @@ -24,10 +25,12 @@ BOOST_AUTO_TEST_CASE(any_adapter_response_types) response r1; response r2; generic_response r3; + generic_flat_response r4; BOOST_CHECK_NO_THROW(any_adapter{r1}); BOOST_CHECK_NO_THROW(any_adapter{r2}); BOOST_CHECK_NO_THROW(any_adapter{r3}); + BOOST_CHECK_NO_THROW(any_adapter{r4}); BOOST_CHECK_NO_THROW(any_adapter{ignore}); } diff --git a/test/test_conn_check_health.cpp b/test/test_conn_check_health.cpp index 78efa82c..62ff3f96 100644 --- a/test/test_conn_check_health.cpp +++ b/test/test_conn_check_health.cpp @@ -23,7 +23,7 @@ using connection = boost::redis::connection; using boost::redis::request; using boost::redis::ignore; using boost::redis::operation; -using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::consume_one; using namespace std::chrono_literals; @@ -34,7 +34,7 @@ namespace { struct push_callback { connection* conn1; connection* conn2; - generic_response* resp2; + generic_flat_response* resp2; request* req1; int i = 0; boost::asio::coroutine coro{}; @@ -52,7 +52,7 @@ struct push_callback { BOOST_TEST(resp2->has_value()); BOOST_TEST(!resp2->value().empty()); - std::clog << "Event> " << resp2->value().front().value << std::endl; + std::clog << "Event> " << resp2->value().view().front().value << std::endl; consume_one(*resp2); ++i; @@ -110,7 +110,7 @@ BOOST_AUTO_TEST_CASE(check_health) request req2; req2.push("MONITOR"); - generic_response resp2; + generic_flat_response resp2; conn2.set_receive_response(resp2); conn2.async_exec(req2, ignore, [&exec_finished](error_code ec, std::size_t) { diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index 4c69cdda..ccf62715 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -26,7 +26,7 @@ namespace net = boost::asio; using boost::redis::config; using boost::redis::connection; -using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::ignore; using boost::redis::operation; using boost::redis::request; @@ -153,7 +153,7 @@ BOOST_AUTO_TEST_CASE(correct_database) request req; req.push("CLIENT", "LIST"); - generic_response resp; + generic_flat_response resp; bool exec_finished = false, run_finished = false; @@ -174,7 +174,7 @@ BOOST_AUTO_TEST_CASE(correct_database) BOOST_TEST_REQUIRE(run_finished); BOOST_TEST_REQUIRE(!resp.value().empty()); - auto const& value = resp.value().front().value; + std::string value = resp.value().view().front().value; auto const pos = value.find("db="); auto const index_str = value.substr(pos + 3, 1); auto const index = std::stoi(index_str); diff --git a/test/test_conn_exec_cancel.cpp b/test/test_conn_exec_cancel.cpp index 9c32a8cb..3e84218c 100644 --- a/test/test_conn_exec_cancel.cpp +++ b/test/test_conn_exec_cancel.cpp @@ -38,7 +38,7 @@ using boost::redis::operation; using boost::redis::error; using boost::redis::request; using boost::redis::response; -using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::logger; diff --git a/test/test_conn_exec_cancel2.cpp b/test/test_conn_exec_cancel2.cpp index 7b745fc6..08337461 100644 --- a/test/test_conn_exec_cancel2.cpp +++ b/test/test_conn_exec_cancel2.cpp @@ -26,7 +26,7 @@ using error_code = boost::system::error_code; using boost::redis::operation; using boost::redis::request; using boost::redis::response; -using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::config; @@ -40,7 +40,7 @@ auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable { auto ex = co_await net::this_coro::executor; - generic_response gresp; + generic_flat_response gresp; auto conn = std::make_shared(ex); run(conn); diff --git a/test/test_conn_exec_error.cpp b/test/test_conn_exec_error.cpp index 183ce509..94b912a6 100644 --- a/test/test_conn_exec_error.cpp +++ b/test/test_conn_exec_error.cpp @@ -21,7 +21,7 @@ using error_code = boost::system::error_code; using boost::redis::connection; using boost::redis::request; using boost::redis::response; -using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::error; @@ -266,7 +266,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax) conn->async_exec(req1, ignore, c1); - generic_response gresp; + generic_flat_response gresp; conn->set_receive_response(gresp); auto c3 = [&](error_code ec, std::size_t) { diff --git a/test/test_low_level.cpp b/test/test_low_level.cpp index c173d750..5d5dab2e 100644 --- a/test/test_low_level.cpp +++ b/test/test_low_level.cpp @@ -29,6 +29,7 @@ using boost::system::error_code; using boost::redis::request; using boost::redis::response; using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::adapter::result; @@ -633,7 +634,7 @@ BOOST_AUTO_TEST_CASE(cancel_one_1) BOOST_AUTO_TEST_CASE(cancel_one_empty) { - generic_response resp; + generic_flat_response resp; BOOST_TEST(resp.has_value()); consume_one(resp); @@ -642,7 +643,7 @@ BOOST_AUTO_TEST_CASE(cancel_one_empty) BOOST_AUTO_TEST_CASE(cancel_one_has_error) { - generic_response resp = boost::redis::adapter::error{resp3::type::simple_string, {}}; + generic_flat_response resp = boost::redis::adapter::error{resp3::type::simple_string, {}}; BOOST_TEST(resp.has_error()); consume_one(resp); diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 9b198c4b..6a0e39dd 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -25,7 +25,7 @@ using boost::redis::adapter::result; using boost::redis::config; using boost::redis::detail::multiplexer; using boost::redis::detail::push_hello; -using boost::redis::generic_response; +using boost::redis::generic_flat_response; using boost::redis::ignore_t; using boost::redis::request; using boost::redis::resp3::detail::deserialize; @@ -264,7 +264,7 @@ std::ostream& operator<<(std::ostream& os, node const& nd) BOOST_AUTO_TEST_CASE(multiplexer_push) { multiplexer mpx; - generic_response resp; + generic_flat_response resp; mpx.set_receive_adapter(any_adapter{resp}); boost::system::error_code ec; @@ -276,17 +276,17 @@ BOOST_AUTO_TEST_CASE(multiplexer_push) // TODO: Provide operator << for generic_response so we can compare // the whole vector. BOOST_CHECK_EQUAL(resp.value().size(), 3u); - BOOST_CHECK_EQUAL(resp.value().at(1).value, "one"); - BOOST_CHECK_EQUAL(resp.value().at(2).value, "two"); + BOOST_CHECK_EQUAL(resp.value().at(1).value.data, "one"); + BOOST_CHECK_EQUAL(resp.value().at(2).value.data, "two"); - for (auto const& e : resp.value()) - std::cout << e << std::endl; + for (auto const& e : resp.value().view()) + std::cout << e.value << std::endl; } BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more) { multiplexer mpx; - generic_response resp; + generic_flat_response resp; mpx.set_receive_adapter(any_adapter{resp}); std::string msg; @@ -307,13 +307,13 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more) // TODO: Provide operator << for generic_response so we can compare // the whole vector. BOOST_CHECK_EQUAL(resp.value().size(), 3u); - BOOST_CHECK_EQUAL(resp.value().at(1).value, "one"); - BOOST_CHECK_EQUAL(resp.value().at(2).value, "two"); + BOOST_CHECK_EQUAL(resp.value().at(1).value.data, "one"); + BOOST_CHECK_EQUAL(resp.value().at(2).value.data, "two"); } struct test_item { request req; - generic_response resp; + generic_flat_response resp; std::shared_ptr elem_ptr; bool done = false; diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index db11df7f..f8cdef8a 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -21,8 +21,9 @@ using boost::system::error_code; using net::cancellation_type_t; using redis::detail::reader_fsm; using redis::detail::multiplexer; +using redis::generic_flat_response; using redis::detail::read_buffer; -using redis::generic_response; +using redis::generic_flat_response; using redis::any_adapter; using action = redis::detail::reader_fsm::action; @@ -44,7 +45,7 @@ void test_push() { read_buffer rbuf; multiplexer mpx; - generic_response resp; + generic_flat_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; @@ -92,7 +93,7 @@ void test_read_needs_more() { read_buffer rbuf; multiplexer mpx; - generic_response resp; + generic_flat_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; @@ -138,7 +139,7 @@ void test_read_error() { read_buffer rbuf; multiplexer mpx; - generic_response resp; + generic_flat_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; @@ -169,7 +170,7 @@ void test_parse_error() { read_buffer rbuf; multiplexer mpx; - generic_response resp; + generic_flat_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; @@ -200,7 +201,7 @@ void test_push_deliver_error() { read_buffer rbuf; multiplexer mpx; - generic_response resp; + generic_flat_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; @@ -236,7 +237,7 @@ void test_max_read_buffer_size() read_buffer rbuf; rbuf.set_config({5, 7}); multiplexer mpx; - generic_response resp; + generic_flat_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec;