diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 6859991cc68d..e755c894ca3b 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -1624,7 +1624,7 @@ MemoryResource* CompactObj::memory_resource() { bool CompactObj::JsonConsT::DefragIfNeeded(PageUsage* page_usage) { if (JsonType* old = json_ptr; ShouldDefragment(page_usage)) { - json_ptr = AllocateMR(DeepCopyJSON(old, memory_resource())); + json_ptr = AllocateMR(DeepCopyJSON(old)); DeleteMR(old); return true; } diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 75a15d7b7c37..3f9d2cfd4125 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -88,6 +88,7 @@ static void InitThreadStructs() { SmallString::InitThreadLocal(tlh); thread_local MiMemoryResource mi_resource(tlh); CompactObj::InitThreadLocal(&mi_resource); + InitTLJsonHeap(&mi_resource); }; static void CheckEverythingDeallocated() { @@ -455,14 +456,14 @@ TEST_F(CompactObjectTest, JsonTypeTest) { "children":[],"spouse":null} )"; std::optional json_option2 = - JsonFromString(R"({"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}})", CompactObj::memory_resource()); + ParseJsonUsingShardHeap(R"({"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}})"); cobj_.SetString(json_str, false); ASSERT_TRUE(cobj_.ObjType() == OBJ_STRING); // we set this as a string JsonType* failed_json = cobj_.GetJson(); ASSERT_TRUE(failed_json == nullptr); ASSERT_TRUE(cobj_.ObjType() == OBJ_STRING); - std::optional json_option = JsonFromString(json_str, CompactObj::memory_resource()); + std::optional json_option = ParseJsonUsingShardHeap(json_str); ASSERT_TRUE(json_option.has_value()); cobj_.SetJson(std::move(json_option.value())); ASSERT_TRUE(cobj_.ObjType() == OBJ_JSON); // and now this is a JSON type @@ -477,7 +478,7 @@ TEST_F(CompactObjectTest, JsonTypeTest) { ASSERT_TRUE(json != nullptr); ASSERT_TRUE(json->contains("b")); ASSERT_FALSE(json->contains("firstName")); - std::optional set_array = JsonFromString("", CompactObj::memory_resource()); + std::optional set_array = ParseJsonUsingShardHeap(""); // now set it to string again cobj_.SetString(R"({"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}})", false); ASSERT_TRUE(cobj_.ObjType() == OBJ_STRING); // we set this as a string @@ -504,7 +505,7 @@ TEST_F(CompactObjectTest, JsonTypeWithPathTest) { "title" : "The Night Watch", "author" : "Phillips, David Atlee" }]})"; - std::optional json_array = JsonFromString(books_json, CompactObj::memory_resource()); + std::optional json_array = ParseJsonUsingShardHeap(books_json); ASSERT_TRUE(json_array.has_value()); cobj_.SetJson(std::move(json_array.value())); ASSERT_TRUE(cobj_.ObjType() == OBJ_JSON); // and now this is a JSON type diff --git a/src/core/json/json_object.cc b/src/core/json/json_object.cc index d0f12d7b9431..b6e885b4991b 100644 --- a/src/core/json/json_object.cc +++ b/src/core/json/json_object.cc @@ -46,18 +46,22 @@ std::optional ParseWithDecoder(std::string_view input, json_decoder&& deco namespace dfly { -std::optional JsonFromString(std::string_view input) { - return ParseWithDecoder(input, json_decoder{}); +void InitTLJsonHeap(PMR_NS::memory_resource* mr) { + detail::tl_mr = mr; } -optional JsonFromString(string_view input, PMR_NS::memory_resource* mr) { - return ParseWithDecoder(input, json_decoder{PMR_NS::polymorphic_allocator{mr}}); +std::optional JsonFromString(std::string_view input) { + return ParseWithDecoder(input, json_decoder{}); } -JsonType DeepCopyJSON(const JsonType* j, PMR_NS::memory_resource* mr) { +optional ParseJsonUsingShardHeap(string_view input) { + return ParseWithDecoder(input, json_decoder{detail::StatelessJsonAllocator{}}); +} + +JsonType DeepCopyJSON(const JsonType* j) { std::string serialized; j->dump(serialized); - auto deserialized = JsonFromString(serialized, mr); + auto deserialized = ParseJsonUsingShardHeap(serialized); DCHECK(deserialized.has_value()); return std::move(deserialized.value()); } diff --git a/src/core/json/json_object.h b/src/core/json/json_object.h index b9e72eaf8b0d..3b175e915678 100644 --- a/src/core/json/json_object.h +++ b/src/core/json/json_object.h @@ -4,6 +4,8 @@ #pragma once +#include + #include // for __cpp_lib_to_chars macro. // std::from_chars is available in C++17 if __cpp_lib_to_chars is defined. @@ -21,10 +23,57 @@ namespace dfly { -using ShortLivedJSON = jsoncons::json; -using JsonType = jsoncons::pmr::json; +namespace detail { + +inline thread_local PMR_NS::memory_resource* tl_mr = nullptr; + +template class StatelessJsonAllocator { + public: + using value_type = T; + using size_type = std::size_t; + using difference_type = std::ptrdiff_t; + using is_always_equal = std::true_type; + + template StatelessJsonAllocator(const StatelessJsonAllocator&) noexcept { + } + + StatelessJsonAllocator() noexcept { + DCHECK_NE(tl_mr, nullptr) << "json allocator created without backing memory resource"; + }; + + static value_type* allocate(size_type n) { + void* ptr = tl_mr->allocate(n * sizeof(value_type), alignof(value_type)); + return static_cast(ptr); + } + + static void deallocate(value_type* ptr, size_type n) noexcept { + tl_mr->deallocate(ptr, n * sizeof(value_type), alignof(value_type)); + } + + static PMR_NS::memory_resource* resource() { + return tl_mr; + } +}; + +template +bool operator==(const StatelessJsonAllocator&, const StatelessJsonAllocator&) noexcept { + return true; +} + +template +bool operator!=(const StatelessJsonAllocator&, const StatelessJsonAllocator&) noexcept { + return false; +} + +} // namespace detail + +void InitTLJsonHeap(PMR_NS::memory_resource* mr); + +using TmpJson = jsoncons::json; +using JsonType = + jsoncons::basic_json>; -// A helper type to use in template functions which are expected to work with both ShortLivedJSON +// A helper type to use in template functions which are expected to work with both TmpJson // and JsonType template using JsonWithAllocator = jsoncons::basic_json; @@ -32,21 +81,24 @@ using JsonWithAllocator = jsoncons::basic_json JsonFromString(std::string_view input); +std::optional JsonFromString(std::string_view input); // Parses string into JSON, using mimalloc heap for allocations. This method should only be used on // shards where mimalloc heap is initialized. -std::optional JsonFromString(std::string_view input, PMR_NS::memory_resource* mr); +std::optional ParseJsonUsingShardHeap(std::string_view input); // Deep copy a JSON object, by first serializing it to a string and then deserializing the string. // The operation is intended to help during defragmentation, by copying into a page reserved for // malloc. -JsonType DeepCopyJSON(const JsonType* j, PMR_NS::memory_resource* mr); +JsonType DeepCopyJSON(const JsonType* j); -inline auto MakeJsonPathExpr(std::string_view path, std::error_code& ec) - -> jsoncons::jsonpath::jsonpath_expression { - return jsoncons::jsonpath::make_expression>( - jsoncons::allocator_set>(), path, ec); +template +auto MakeJsonPathExpr(std::string_view path, std::error_code& ec) + -> jsoncons::jsonpath::jsonpath_expression { + using ResultAllocT = typename Json::allocator_type; + using TmpAllocT = std::allocator; + using AllocSetT = jsoncons::allocator_set; + return jsoncons::jsonpath::make_expression(AllocSetT(), path, ec); } } // namespace dfly diff --git a/src/core/json/jsonpath_test.cc b/src/core/json/jsonpath_test.cc index ecce8e78bc66..f2fa1ce22324 100644 --- a/src/core/json/jsonpath_test.cc +++ b/src/core/json/jsonpath_test.cc @@ -39,7 +39,7 @@ class TestDriver : public Driver { template JSON ValidJson(string_view str); template <> JsonType ValidJson(string_view str) { - auto res = ::dfly::JsonFromString(str, pmr::get_default_resource()); + auto res = ParseJsonUsingShardHeap(str); CHECK(res) << "Failed to parse json: " << str; return *res; } @@ -89,6 +89,11 @@ bool is_array(FlatJson ref) { class ScannerTest : public ::testing::Test { protected: + void SetUp() override { + Test::SetUp(); + InitTLJsonHeap(PMR_NS::get_default_resource()); + } + ScannerTest() { driver_.lexer()->set_debug(1); } diff --git a/src/core/page_usage_stats_test.cc b/src/core/page_usage_stats_test.cc index c110c45ce477..07c5d6a14d65 100644 --- a/src/core/page_usage_stats_test.cc +++ b/src/core/page_usage_stats_test.cc @@ -45,6 +45,7 @@ class PageUsageStatsTest : public ::testing::Test { } PageUsageStatsTest() : m_(mi_heap_get_backing()) { + InitTLJsonHeap(&m_); } void SetUp() override { @@ -191,7 +192,7 @@ TEST_F(PageUsageStatsTest, JSONCons) { // encoding. std::string_view data{R"#({"data": "some", "count": 1, "checked": false})#"}; - auto parsed = JsonFromString(data, &m_); + auto parsed = ParseJsonUsingShardHeap(data); EXPECT_TRUE(parsed.has_value()); c_obj_.SetJson(std::move(parsed.value())); diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index 3082cabb6879..58550dd58add 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -127,7 +127,7 @@ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, namespace { constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv; -template optional ReadNumeric(const ShortLivedJSON& obj) { +template optional ReadNumeric(const TmpJson& obj) { if (!obj.is_number()) { LOG(ERROR) << kInvalidConfigPrefix << "object is not a number " << obj; return nullopt; @@ -136,7 +136,7 @@ template optional ReadNumeric(const ShortLivedJSON& obj) { return obj.as(); } -optional GetClusterSlotRanges(const ShortLivedJSON& slots) { +optional GetClusterSlotRanges(const TmpJson& slots) { if (!slots.is_array()) { LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots; return nullopt; @@ -162,7 +162,7 @@ optional GetClusterSlotRanges(const ShortLivedJSON& slots) { return SlotRanges(ranges); } -optional ParseClusterNode(const ShortLivedJSON& json) { +optional ParseClusterNode(const TmpJson& json) { if (!json.is_object()) { LOG(ERROR) << kInvalidConfigPrefix << "node config is not an object " << json; return nullopt; @@ -221,7 +221,7 @@ optional ParseClusterNode(const ShortLivedJSON& json) { return node; } -optional> ParseMigrations(const ShortLivedJSON& json) { +optional> ParseMigrations(const TmpJson& json) { std::vector res; if (json.is_null()) { return res; @@ -251,7 +251,7 @@ optional> ParseMigrations(const ShortLivedJSON& json) return res; } -optional BuildClusterConfigFromJson(const ShortLivedJSON& json) { +optional BuildClusterConfigFromJson(const TmpJson& json) { std::vector config; if (!json.is_array()) { @@ -309,7 +309,7 @@ optional BuildClusterConfigFromJson(const ShortLivedJSON& jso /* static */ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, std::string_view json_str) { - optional json_config = JsonFromString(json_str); + optional json_config = JsonFromString(json_str); if (!json_config.has_value()) { LOG(ERROR) << "Can't parse JSON for ClusterConfig " << json_str; return nullptr; diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index c6e32fe1b005..abb03c33d72b 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -422,6 +422,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb) { CompactObj::InitThreadLocal(shard_->memory_resource()); SmallString::InitThreadLocal(data_heap); + InitTLJsonHeap(shard_->memory_resource()); shard_->shard_search_indices_ = std::make_unique(); } diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 4f7e16d86bf1..f3e1a0b546ef 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -342,7 +342,7 @@ void SendJsonString(const OpResult& result, RedisReplyBuilder* rb) { if (result) { const string& json_str = result.value(); if (rb->IsResp3()) { - if (const std::optional parsed_json = JsonFromString(json_str)) { + if (const std::optional parsed_json = JsonFromString(json_str)) { Send(parsed_json.value(), rb); return; } @@ -475,7 +475,7 @@ std::optional ConvertJsonPathToJsonPointer(string_view json_path) { result before invoking SetJsonSize. Note that even after calling std::move on an optional, it may still hold the JSON value, which can lead to incorrect memory tracking. */ std::optional ShardJsonFromString(std::string_view input) { - return JsonFromString(input, CompactObj::memory_resource()); + return ParseJsonUsingShardHeap(input); } OpStatus SetFullJson(const OpArgs& op_args, string_view key, string_view json_str) { @@ -556,8 +556,7 @@ OpResult SetPartialJson(const OpArgs& op_args, string_view key, path_exists = true; if (!is_nx_condition) { value_was_set = true; - *val = JsonType(parsed_json.value(), - std::pmr::polymorphic_allocator{CompactObj::memory_resource()}); + *val = JsonType(parsed_json.value(), detail::StatelessJsonAllocator{}); } return {}; }; @@ -1526,10 +1525,19 @@ auto OpMemory(const OpArgs& op_args, string_view key, const WrappedJsonPath& jso ReadOnlyOperationOptions{false, CallbackResultOptions::DefaultReadOnlyOptions()}); } -// Returns json vector that represents the result of the json query. -auto OpResp(const OpArgs& op_args, string_view key, const WrappedJsonPath& json_path) { - auto cb = [](const string_view&, const JsonType& val) { return val; }; - return JsonReadOnlyOperation(op_args, key, json_path, std::move(cb)); +// Returns json vector that represents the result of the json query. A shard local allocated JSON +// cannot be copied and then destroyed on another shard. We use stateless allocators which forward +// all requests to thread local memory resource which uses mimalloc, and the coordinator thread may +// not have a mimalloc backed heap set up. So the value is first copied to the std allocator-backed +// type TmpJson. +OpResult> OpResp(const OpArgs& op_args, string_view key, + const WrappedJsonPath& json_path) { + auto cb = [](const string_view&, const JsonType& val) { + string s; + val.dump(s); + return JsonFromString(s); + }; + return JsonReadOnlyOperation(op_args, key, json_path, std::move(cb)); } // Returns boolean that represents the result of the operation. @@ -2004,7 +2012,7 @@ void JsonFamily::StrAppend(CmdArgList args, const CommandContext& cmd_cntx) { WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path)); // We try parsing the value into json string object first. - optional parsed_json = JsonFromString(value); + optional parsed_json = JsonFromString(value); if (!parsed_json || !parsed_json->is_string()) { return builder->SendError("expected string value", kSyntaxErrType); }; diff --git a/src/server/json_family_memory_test.cc b/src/server/json_family_memory_test.cc index 6cd6243d6438..fc68d7e26447 100644 --- a/src/server/json_family_memory_test.cc +++ b/src/server/json_family_memory_test.cc @@ -6,7 +6,6 @@ #include "base/logging.h" #include "facade/facade_test.h" #include "server/command_registry.h" -#include "server/json_family.h" #include "server/test_utils.h" using namespace testing; @@ -19,25 +18,33 @@ namespace dfly { class JsonFamilyMemoryTest : public BaseFamilyTest { public: - static dfly::MiMemoryResource* GetMemoryResource() { - static thread_local mi_heap_t* heap = mi_heap_new(); - static thread_local dfly::MiMemoryResource memory_resource{heap}; + static MiMemoryResource* GetMemoryResource() { + thread_local mi_heap_t* heap = mi_heap_new(); + thread_local MiMemoryResource memory_resource{heap}; return &memory_resource; } protected: + void SetUp() override { + BaseFamilyTest::SetUp(); + // Make the core running the thread use the same resource as the rest of the test. Although + // BaseFamilyTest initializes the heap on shards serving transactions, the core running the test + // needs this initialized explicitly. + InitTLJsonHeap(GetMemoryResource()); + } + auto GetJsonMemoryUsageFromDb(std::string_view key) { return Run({"MEMORY", "USAGE", key, "WITHOUTKEY"}); } }; size_t GetMemoryUsage() { - return static_cast(JsonFamilyMemoryTest::GetMemoryResource())->used(); + return JsonFamilyMemoryTest::GetMemoryResource()->used(); } size_t GetJsonMemoryUsageFromString(std::string_view json_str) { size_t start = GetMemoryUsage(); - auto json = dfly::JsonFromString(json_str, JsonFamilyMemoryTest::GetMemoryResource()); + auto json = ParseJsonUsingShardHeap(json_str); if (!json) { return 0; } @@ -111,9 +118,8 @@ TEST_F(JsonFamilyMemoryTest, JsonConsDelTest) { size_t start = GetMemoryUsage(); - auto json = dfly::JsonFromString(start_json, JsonFamilyMemoryTest::GetMemoryResource()); - void* ptr = - JsonFamilyMemoryTest::GetMemoryResource()->allocate(sizeof(JsonType), alignof(JsonType)); + auto json = ParseJsonUsingShardHeap(start_json); + void* ptr = GetMemoryResource()->allocate(sizeof(JsonType), alignof(JsonType)); JsonType* json_on_heap = new (ptr) JsonType(std::move(json).value()); size_t memory_usage_before_erase = GetMemoryUsage() - start; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 883e4e3e6fc8..e7b7903a2f1c 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -984,8 +984,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { } else if (rdb_type_ == RDB_TYPE_JSON) { size_t start_size = static_cast(CompactObj::memory_resource())->used(); { - auto json = JsonFromString(blob, CompactObj::memory_resource()); - if (json) { + if (auto json = ParseJsonUsingShardHeap(blob)) { pv_->SetJson(std::move(*json)); } else { LOG(INFO) << "Invalid JSON string during rdb load of JSON object: " << blob; diff --git a/src/server/search/search_family.cc b/src/server/search/search_family.cc index 3e12dbfd4698..232ade9afa47 100644 --- a/src/server/search/search_family.cc +++ b/src/server/search/search_family.cc @@ -81,7 +81,7 @@ bool SendErrorIfOccurred(const ParseResult& result, CmdArgParser* parser, bool IsValidJsonPath(string_view path) { error_code ec; - MakeJsonPathExpr(path, ec); + MakeJsonPathExpr(path, ec); return !ec; }