Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1624,7 +1624,7 @@ MemoryResource* CompactObj::memory_resource() {

bool CompactObj::JsonConsT::DefragIfNeeded(PageUsage* page_usage) {
if (JsonType* old = json_ptr; ShouldDefragment(page_usage)) {
json_ptr = AllocateMR<JsonType>(DeepCopyJSON(old, memory_resource()));
json_ptr = AllocateMR<JsonType>(DeepCopyJSON(old));
DeleteMR<JsonType>(old);
return true;
}
Expand Down
9 changes: 5 additions & 4 deletions src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ static void InitThreadStructs() {
SmallString::InitThreadLocal(tlh);
thread_local MiMemoryResource mi_resource(tlh);
CompactObj::InitThreadLocal(&mi_resource);
InitTLJsonHeap(&mi_resource);
};

static void CheckEverythingDeallocated() {
Expand Down Expand Up @@ -455,14 +456,14 @@ TEST_F(CompactObjectTest, JsonTypeTest) {
"children":[],"spouse":null}
)";
std::optional<JsonType> json_option2 =
JsonFromString(R"({"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}})", CompactObj::memory_resource());
ParseJsonUsingShardHeap(R"({"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}})");

cobj_.SetString(json_str, false);
ASSERT_TRUE(cobj_.ObjType() == OBJ_STRING); // we set this as a string
JsonType* failed_json = cobj_.GetJson();
ASSERT_TRUE(failed_json == nullptr);
ASSERT_TRUE(cobj_.ObjType() == OBJ_STRING);
std::optional<JsonType> json_option = JsonFromString(json_str, CompactObj::memory_resource());
std::optional<JsonType> json_option = ParseJsonUsingShardHeap(json_str);
ASSERT_TRUE(json_option.has_value());
cobj_.SetJson(std::move(json_option.value()));
ASSERT_TRUE(cobj_.ObjType() == OBJ_JSON); // and now this is a JSON type
Expand All @@ -477,7 +478,7 @@ TEST_F(CompactObjectTest, JsonTypeTest) {
ASSERT_TRUE(json != nullptr);
ASSERT_TRUE(json->contains("b"));
ASSERT_FALSE(json->contains("firstName"));
std::optional<JsonType> set_array = JsonFromString("", CompactObj::memory_resource());
std::optional<JsonType> set_array = ParseJsonUsingShardHeap("");
// now set it to string again
cobj_.SetString(R"({"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}})", false);
ASSERT_TRUE(cobj_.ObjType() == OBJ_STRING); // we set this as a string
Expand All @@ -504,7 +505,7 @@ TEST_F(CompactObjectTest, JsonTypeWithPathTest) {
"title" : "The Night Watch",
"author" : "Phillips, David Atlee"
}]})";
std::optional<JsonType> json_array = JsonFromString(books_json, CompactObj::memory_resource());
std::optional<JsonType> json_array = ParseJsonUsingShardHeap(books_json);
ASSERT_TRUE(json_array.has_value());
cobj_.SetJson(std::move(json_array.value()));
ASSERT_TRUE(cobj_.ObjType() == OBJ_JSON); // and now this is a JSON type
Expand Down
16 changes: 10 additions & 6 deletions src/core/json/json_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,22 @@ std::optional<T> ParseWithDecoder(std::string_view input, json_decoder<T>&& deco

namespace dfly {

std::optional<ShortLivedJSON> JsonFromString(std::string_view input) {
return ParseWithDecoder(input, json_decoder<ShortLivedJSON>{});
void InitTLJsonHeap(PMR_NS::memory_resource* mr) {
detail::tl_mr = mr;
}

optional<JsonType> JsonFromString(string_view input, PMR_NS::memory_resource* mr) {
return ParseWithDecoder(input, json_decoder<JsonType>{PMR_NS::polymorphic_allocator<char>{mr}});
std::optional<TmpJson> JsonFromString(std::string_view input) {
return ParseWithDecoder(input, json_decoder<TmpJson>{});
}

JsonType DeepCopyJSON(const JsonType* j, PMR_NS::memory_resource* mr) {
optional<JsonType> ParseJsonUsingShardHeap(string_view input) {
return ParseWithDecoder(input, json_decoder<JsonType>{detail::StatelessJsonAllocator<char>{}});
}

JsonType DeepCopyJSON(const JsonType* j) {
std::string serialized;
j->dump(serialized);
auto deserialized = JsonFromString(serialized, mr);
auto deserialized = ParseJsonUsingShardHeap(serialized);
DCHECK(deserialized.has_value());
return std::move(deserialized.value());
}
Expand Down
72 changes: 62 additions & 10 deletions src/core/json/json_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#pragma once

#include <glog/logging.h>

#include <version> // for __cpp_lib_to_chars macro.

// std::from_chars is available in C++17 if __cpp_lib_to_chars is defined.
Expand All @@ -21,32 +23,82 @@

namespace dfly {

using ShortLivedJSON = jsoncons::json;
using JsonType = jsoncons::pmr::json;
namespace detail {

inline thread_local PMR_NS::memory_resource* tl_mr = nullptr;

template <typename T> class StatelessJsonAllocator {
public:
using value_type = T;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;
using is_always_equal = std::true_type;

template <typename U> StatelessJsonAllocator(const StatelessJsonAllocator<U>&) noexcept {
}

StatelessJsonAllocator() noexcept {
DCHECK_NE(tl_mr, nullptr) << "json allocator created without backing memory resource";
};

static value_type* allocate(size_type n) {
void* ptr = tl_mr->allocate(n * sizeof(value_type), alignof(value_type));
return static_cast<value_type*>(ptr);
}

static void deallocate(value_type* ptr, size_type n) noexcept {
tl_mr->deallocate(ptr, n * sizeof(value_type), alignof(value_type));
}

static PMR_NS::memory_resource* resource() {
return tl_mr;
}
};

template <typename T, typename U>
bool operator==(const StatelessJsonAllocator<T>&, const StatelessJsonAllocator<U>&) noexcept {
return true;
}

template <typename T, typename U>
bool operator!=(const StatelessJsonAllocator<T>&, const StatelessJsonAllocator<U>&) noexcept {
return false;
}

} // namespace detail

void InitTLJsonHeap(PMR_NS::memory_resource* mr);

using TmpJson = jsoncons::json;
using JsonType =
jsoncons::basic_json<char, jsoncons::sorted_policy, detail::StatelessJsonAllocator<char>>;

// A helper type to use in template functions which are expected to work with both ShortLivedJSON
// A helper type to use in template functions which are expected to work with both TmpJson
// and JsonType
template <typename Allocator>
using JsonWithAllocator = jsoncons::basic_json<char, jsoncons::sorted_policy, Allocator>;

// Parses string into JSON. Any allocatons are done using the std allocator. This method should be
// used for generic JSON parsing, in particular, it should not be used to parse objects which will
// be stored in the db, as the backing storage is not managed by mimalloc.
std::optional<ShortLivedJSON> JsonFromString(std::string_view input);
std::optional<TmpJson> JsonFromString(std::string_view input);

// Parses string into JSON, using mimalloc heap for allocations. This method should only be used on
// shards where mimalloc heap is initialized.
std::optional<JsonType> JsonFromString(std::string_view input, PMR_NS::memory_resource* mr);
std::optional<JsonType> ParseJsonUsingShardHeap(std::string_view input);

// Deep copy a JSON object, by first serializing it to a string and then deserializing the string.
// The operation is intended to help during defragmentation, by copying into a page reserved for
// malloc.
JsonType DeepCopyJSON(const JsonType* j, PMR_NS::memory_resource* mr);
JsonType DeepCopyJSON(const JsonType* j);

inline auto MakeJsonPathExpr(std::string_view path, std::error_code& ec)
-> jsoncons::jsonpath::jsonpath_expression<JsonType> {
return jsoncons::jsonpath::make_expression<JsonType, std::allocator<char>>(
jsoncons::allocator_set<JsonType::allocator_type, std::allocator<char>>(), path, ec);
template <typename Json = JsonType>
auto MakeJsonPathExpr(std::string_view path, std::error_code& ec)
-> jsoncons::jsonpath::jsonpath_expression<Json> {
using ResultAllocT = typename Json::allocator_type;
using TmpAllocT = std::allocator<char>;
using AllocSetT = jsoncons::allocator_set<ResultAllocT, TmpAllocT>;
return jsoncons::jsonpath::make_expression<Json, TmpAllocT>(AllocSetT(), path, ec);
}

} // namespace dfly
7 changes: 6 additions & 1 deletion src/core/json/jsonpath_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TestDriver : public Driver {
template <typename JSON> JSON ValidJson(string_view str);

template <> JsonType ValidJson<JsonType>(string_view str) {
auto res = ::dfly::JsonFromString(str, pmr::get_default_resource());
auto res = ParseJsonUsingShardHeap(str);
CHECK(res) << "Failed to parse json: " << str;
return *res;
}
Expand Down Expand Up @@ -89,6 +89,11 @@ bool is_array(FlatJson ref) {

class ScannerTest : public ::testing::Test {
protected:
void SetUp() override {
Test::SetUp();
InitTLJsonHeap(PMR_NS::get_default_resource());
}

ScannerTest() {
driver_.lexer()->set_debug(1);
}
Expand Down
3 changes: 2 additions & 1 deletion src/core/page_usage_stats_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class PageUsageStatsTest : public ::testing::Test {
}

PageUsageStatsTest() : m_(mi_heap_get_backing()) {
InitTLJsonHeap(&m_);
}

void SetUp() override {
Expand Down Expand Up @@ -191,7 +192,7 @@ TEST_F(PageUsageStatsTest, JSONCons) {
// encoding.
std::string_view data{R"#({"data": "some", "count": 1, "checked": false})#"};

auto parsed = JsonFromString(data, &m_);
auto parsed = ParseJsonUsingShardHeap(data);
EXPECT_TRUE(parsed.has_value());

c_obj_.SetJson(std::move(parsed.value()));
Expand Down
12 changes: 6 additions & 6 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
namespace {
constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv;

template <typename T> optional<T> ReadNumeric(const ShortLivedJSON& obj) {
template <typename T> optional<T> ReadNumeric(const TmpJson& obj) {
if (!obj.is_number()) {
LOG(ERROR) << kInvalidConfigPrefix << "object is not a number " << obj;
return nullopt;
Expand All @@ -136,7 +136,7 @@ template <typename T> optional<T> ReadNumeric(const ShortLivedJSON& obj) {
return obj.as<T>();
}

optional<SlotRanges> GetClusterSlotRanges(const ShortLivedJSON& slots) {
optional<SlotRanges> GetClusterSlotRanges(const TmpJson& slots) {
if (!slots.is_array()) {
LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
return nullopt;
Expand All @@ -162,7 +162,7 @@ optional<SlotRanges> GetClusterSlotRanges(const ShortLivedJSON& slots) {
return SlotRanges(ranges);
}

optional<ClusterExtendedNodeInfo> ParseClusterNode(const ShortLivedJSON& json) {
optional<ClusterExtendedNodeInfo> ParseClusterNode(const TmpJson& json) {
if (!json.is_object()) {
LOG(ERROR) << kInvalidConfigPrefix << "node config is not an object " << json;
return nullopt;
Expand Down Expand Up @@ -221,7 +221,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const ShortLivedJSON& json) {
return node;
}

optional<std::vector<MigrationInfo>> ParseMigrations(const ShortLivedJSON& json) {
optional<std::vector<MigrationInfo>> ParseMigrations(const TmpJson& json) {
std::vector<MigrationInfo> res;
if (json.is_null()) {
return res;
Expand Down Expand Up @@ -251,7 +251,7 @@ optional<std::vector<MigrationInfo>> ParseMigrations(const ShortLivedJSON& json)
return res;
}

optional<ClusterShardInfos> BuildClusterConfigFromJson(const ShortLivedJSON& json) {
optional<ClusterShardInfos> BuildClusterConfigFromJson(const TmpJson& json) {
std::vector<ClusterShardInfo> config;

if (!json.is_array()) {
Expand Down Expand Up @@ -309,7 +309,7 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const ShortLivedJSON& jso
/* static */
shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
std::string_view json_str) {
optional<ShortLivedJSON> json_config = JsonFromString(json_str);
optional<TmpJson> json_config = JsonFromString(json_str);
if (!json_config.has_value()) {
LOG(ERROR) << "Can't parse JSON for ClusterConfig " << json_str;
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb) {

CompactObj::InitThreadLocal(shard_->memory_resource());
SmallString::InitThreadLocal(data_heap);
InitTLJsonHeap(shard_->memory_resource());

shard_->shard_search_indices_ = std::make_unique<ShardDocIndices>();
}
Expand Down
26 changes: 17 additions & 9 deletions src/server/json_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ void SendJsonString(const OpResult<string>& result, RedisReplyBuilder* rb) {
if (result) {
const string& json_str = result.value();
if (rb->IsResp3()) {
if (const std::optional<ShortLivedJSON> parsed_json = JsonFromString(json_str)) {
if (const std::optional<TmpJson> parsed_json = JsonFromString(json_str)) {
Send(parsed_json.value(), rb);
return;
}
Expand Down Expand Up @@ -475,7 +475,7 @@ std::optional<std::string> ConvertJsonPathToJsonPointer(string_view json_path) {
result before invoking SetJsonSize. Note that even after calling std::move on an optional, it may
still hold the JSON value, which can lead to incorrect memory tracking. */
std::optional<JsonType> ShardJsonFromString(std::string_view input) {
return JsonFromString(input, CompactObj::memory_resource());
return ParseJsonUsingShardHeap(input);
}

OpStatus SetFullJson(const OpArgs& op_args, string_view key, string_view json_str) {
Expand Down Expand Up @@ -556,8 +556,7 @@ OpResult<bool> SetPartialJson(const OpArgs& op_args, string_view key,
path_exists = true;
if (!is_nx_condition) {
value_was_set = true;
*val = JsonType(parsed_json.value(),
std::pmr::polymorphic_allocator<char>{CompactObj::memory_resource()});
*val = JsonType(parsed_json.value(), detail::StatelessJsonAllocator<char>{});
}
return {};
};
Expand Down Expand Up @@ -1526,10 +1525,19 @@ auto OpMemory(const OpArgs& op_args, string_view key, const WrappedJsonPath& jso
ReadOnlyOperationOptions{false, CallbackResultOptions::DefaultReadOnlyOptions()});
}

// Returns json vector that represents the result of the json query.
auto OpResp(const OpArgs& op_args, string_view key, const WrappedJsonPath& json_path) {
auto cb = [](const string_view&, const JsonType& val) { return val; };
return JsonReadOnlyOperation<JsonType>(op_args, key, json_path, std::move(cb));
// Returns json vector that represents the result of the json query. A shard local allocated JSON
// cannot be copied and then destroyed on another shard. We use stateless allocators which forward
// all requests to thread local memory resource which uses mimalloc, and the coordinator thread may
// not have a mimalloc backed heap set up. So the value is first copied to the std allocator-backed
// type TmpJson.
OpResult<JsonCallbackResult<TmpJson>> OpResp(const OpArgs& op_args, string_view key,
const WrappedJsonPath& json_path) {
auto cb = [](const string_view&, const JsonType& val) {
string s;
val.dump(s);
return JsonFromString(s);
Comment on lines +1536 to +1538
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunate side effect of not being able to send JsonType object to the coordinator thread, it can only be destroyed on the shard that created it. We have to copy it into a safe type before sending it to coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: mimalloc will deallocate the object from another thread, but the coordinator thread might not have mimalloc heap set up on it.

};
return JsonReadOnlyOperation<TmpJson>(op_args, key, json_path, std::move(cb));
}

// Returns boolean that represents the result of the operation.
Expand Down Expand Up @@ -2004,7 +2012,7 @@ void JsonFamily::StrAppend(CmdArgList args, const CommandContext& cmd_cntx) {
WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path));

// We try parsing the value into json string object first.
optional<ShortLivedJSON> parsed_json = JsonFromString(value);
optional<TmpJson> parsed_json = JsonFromString(value);
if (!parsed_json || !parsed_json->is_string()) {
return builder->SendError("expected string value", kSyntaxErrType);
};
Expand Down
Loading
Loading