Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions example/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::generic_flat_response;
using boost::redis::consume_one;
using boost::redis::logger;
using boost::redis::config;
Expand Down Expand Up @@ -54,6 +55,8 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
request req;
req.push("SUBSCRIBE", "channel");

// Alternatively, you can use generic_flat_response here, but keep in mind
// that to access elements you need to call .view() on resp.value()
generic_response resp;
conn->set_receive_response(resp);

Expand Down
27 changes: 27 additions & 0 deletions include/boost/redis/adapter/detail/adapters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/serialization.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/response.hpp>

#include <boost/assert.hpp>

Expand Down Expand Up @@ -170,6 +171,32 @@ class general_aggregate {
}
};

template <>
class general_aggregate<result<flat_response_value>> {
private:
result<flat_response_value>* result_;

public:
explicit general_aggregate(result<flat_response_value>* c = nullptr)
: result_(c)
{ }
template <class String>
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
{
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
switch (nd.data_type) {
case resp3::type::blob_error:
case resp3::type::simple_error:
*result_ = error{
nd.data_type,
std::string{std::cbegin(nd.value), std::cend(nd.value)}
};
break;
default: result_->value().push_back(nd);
}
}
};

template <class Node>
class general_simple {
private:
Expand Down
8 changes: 8 additions & 0 deletions include/boost/redis/adapter/detail/response_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ struct response_traits<response<Ts...>> {
static auto adapt(response_type& r) noexcept { return adapter_type{r}; }
};

template <>
struct response_traits<generic_flat_response> {
using response_type = generic_flat_response;
using adapter_type = vector_adapter<response_type>;

static auto adapt(response_type& v) noexcept { return adapter_type{v}; }
};

template <class Adapter>
class wrapper {
public:
Expand Down
8 changes: 8 additions & 0 deletions include/boost/redis/adapter/detail/result_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <boost/redis/error.hpp>
#include <boost/redis/ignore.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/response.hpp>

#include <boost/mp11.hpp>

Expand Down Expand Up @@ -62,6 +63,13 @@ struct result_traits<result<std::vector<resp3::basic_node<String>, Allocator>>>
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};

template <>
struct result_traits<generic_flat_response> {
using response_type = generic_flat_response;
using adapter_type = adapter::detail::general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};

template <class T>
using adapter_t = typename result_traits<std::decay_t<T>>::adapter_type;

Expand Down
37 changes: 25 additions & 12 deletions include/boost/redis/impl/response.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,30 @@

namespace boost::redis {

void consume_one(generic_response& r, system::error_code& ec)
namespace {
template <typename Container>
auto& get_value(Container& c)
{
return c;
}

template <>
auto& get_value(flat_response_value& c)
{
return c.view();
}

template <typename Response>
void consume_one_impl(Response& r, system::error_code& ec)
{
if (r.has_error())
return; // Nothing to consume.

if (std::empty(r.value()))
auto& value = get_value(r.value());
if (std::empty(value))
return; // Nothing to consume.

auto const depth = r.value().front().depth;
auto const depth = value.front().depth;

// To simplify we will refuse to consume any data-type that is not
// a root node. I think there is no use for that and it is complex
Expand All @@ -33,17 +48,15 @@ void consume_one(generic_response& r, system::error_code& ec)
return e.depth == depth;
};

auto match = std::find_if(std::next(std::cbegin(r.value())), std::cend(r.value()), f);
auto match = std::find_if(std::next(std::cbegin(value)), std::cend(value), f);

r.value().erase(std::cbegin(r.value()), match);
value.erase(std::cbegin(value), match);
}

void consume_one(generic_response& r)
{
system::error_code ec;
consume_one(r, ec);
if (ec)
throw system::system_error(ec);
}
} // namespace

void consume_one(generic_response& r, system::error_code& ec) { consume_one_impl(r, ec); }

void consume_one(generic_flat_response& r, system::error_code& ec) { consume_one_impl(r, ec); }

} // namespace boost::redis
8 changes: 8 additions & 0 deletions include/boost/redis/resp3/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ using node = basic_node<std::string>;
*/
using node_view = basic_node<std::string_view>;

struct offset_string {
std::string_view data;
std::size_t offset{};
std::size_t size{};
};

using offset_node = basic_node<offset_string>;

} // namespace boost::redis::resp3

#endif // BOOST_REDIS_RESP3_NODE_HPP
58 changes: 57 additions & 1 deletion include/boost/redis/response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,52 @@ using response = std::tuple<adapter::result<Ts>...>;
*/
using generic_response = adapter::result<std::vector<resp3::node>>;

struct flat_response_value {
public:
/// Reserve capacity for nodes and data storage.
void reserve(std::size_t num_nodes, std::size_t string_size)
{
data_.reserve(num_nodes * string_size);
view_.reserve(num_nodes);
}

std::vector<resp3::offset_node> const& view() const { return view_; }
std::vector<resp3::offset_node>& view() { return view_; }

template <typename String>
void push_back(const resp3::basic_node<String>& nd)
{
resp3::offset_string offset_string;
offset_string.offset = data_.size();
offset_string.size = nd.value.size();

data_.append(nd.value.data());

offset_string.data = std::string_view{
data_.data() + offset_string.offset,
offset_string.size};

resp3::offset_node new_node;
new_node.data_type = nd.data_type;
new_node.aggregate_size = nd.aggregate_size;
new_node.depth = nd.depth;
new_node.value = std::move(offset_string);

view_.push_back(std::move(new_node));
}

private:
std::string data_;
std::vector<resp3::offset_node> view_;
};

/** @brief A memory-efficient generic response to a request.
* @ingroup high-level-api
*
* Uses a compact buffer to store RESP3 data with reduced allocations.
*/
using generic_flat_response = adapter::result<flat_response_value>;

/** @brief Consume on response from a generic response
*
* This function rotates the elements so that the start of the next
Expand Down Expand Up @@ -72,8 +118,18 @@ using generic_response = adapter::result<std::vector<resp3::node>>;
*/
void consume_one(generic_response& r, system::error_code& ec);

/// Consume on response from a generic flat response
void consume_one(generic_flat_response& r, system::error_code& ec);

/// Throwing overload of `consume_one`.
void consume_one(generic_response& r);
template <typename Response>
void consume_one(Response& r)
{
system::error_code ec;
consume_one(r, ec);
if (ec)
throw system::system_error(ec);
}

} // namespace boost::redis

Expand Down