Skip to content

Commit 7de47b0

Browse files
committed
server(JSON): Remove parse method which used default heap
There were two parse methods, one which used the default heap and the other which used a thread local mi heap. The former one is removed here. The reason is to prepare for using a stateless allocator. The allocator is part of the type of the json object. A stateless allocator must not hold state of its own and simply forward requests to a memory resource pointer accessed from a static storage. In such a model, having two memory resources does not work because two allocators using different heaps are not interchangeable. It is possible to use two different types, but in that case the json object types become different, and it becomes impossible to work with them transparently. The switch to mi heap everywhere can cause one problem during accounting. The JSONAutoUpdater class uses memory usage before and after parsing to figure out how many bytes are used by a given object. If the objects parsed by the method which is removed in this PR are interspersed with JSONAutoUpdater, then this can cause a problem with accounting. At present there are no such cases in the code. Signed-off-by: Abhijat Malviya <[email protected]>
1 parent 978961f commit 7de47b0

File tree

10 files changed

+101
-32
lines changed

10 files changed

+101
-32
lines changed

src/core/compact_object.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1624,7 +1624,7 @@ MemoryResource* CompactObj::memory_resource() {
16241624

16251625
bool CompactObj::JsonConsT::DefragIfNeeded(PageUsage* page_usage) {
16261626
if (JsonType* old = json_ptr; ShouldDefragment(page_usage)) {
1627-
json_ptr = AllocateMR<JsonType>(DeepCopyJSON(old, memory_resource()));
1627+
json_ptr = AllocateMR<JsonType>(DeepCopyJSON(old));
16281628
DeleteMR<JsonType>(old);
16291629
return true;
16301630
}

src/core/compact_object_test.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ static void InitThreadStructs() {
8888
SmallString::InitThreadLocal(tlh);
8989
thread_local MiMemoryResource mi_resource(tlh);
9090
CompactObj::InitThreadLocal(&mi_resource);
91+
InitTLJsonHeap(&mi_resource);
9192
};
9293

9394
static void CheckEverythingDeallocated() {
@@ -455,14 +456,14 @@ TEST_F(CompactObjectTest, JsonTypeTest) {
455456
"children":[],"spouse":null}
456457
)";
457458
std::optional<JsonType> json_option2 =
458-
JsonFromString(R"({"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}})", CompactObj::memory_resource());
459+
ParseJsonUsingShardHeap(R"({"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}})");
459460

460461
cobj_.SetString(json_str, false);
461462
ASSERT_TRUE(cobj_.ObjType() == OBJ_STRING); // we set this as a string
462463
JsonType* failed_json = cobj_.GetJson();
463464
ASSERT_TRUE(failed_json == nullptr);
464465
ASSERT_TRUE(cobj_.ObjType() == OBJ_STRING);
465-
std::optional<JsonType> json_option = JsonFromString(json_str, CompactObj::memory_resource());
466+
std::optional<JsonType> json_option = ParseJsonUsingShardHeap(json_str);
466467
ASSERT_TRUE(json_option.has_value());
467468
cobj_.SetJson(std::move(json_option.value()));
468469
ASSERT_TRUE(cobj_.ObjType() == OBJ_JSON); // and now this is a JSON type
@@ -477,7 +478,7 @@ TEST_F(CompactObjectTest, JsonTypeTest) {
477478
ASSERT_TRUE(json != nullptr);
478479
ASSERT_TRUE(json->contains("b"));
479480
ASSERT_FALSE(json->contains("firstName"));
480-
std::optional<JsonType> set_array = JsonFromString("", CompactObj::memory_resource());
481+
std::optional<JsonType> set_array = ParseJsonUsingShardHeap("");
481482
// now set it to string again
482483
cobj_.SetString(R"({"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}})", false);
483484
ASSERT_TRUE(cobj_.ObjType() == OBJ_STRING); // we set this as a string
@@ -504,7 +505,7 @@ TEST_F(CompactObjectTest, JsonTypeWithPathTest) {
504505
"title" : "The Night Watch",
505506
"author" : "Phillips, David Atlee"
506507
}]})";
507-
std::optional<JsonType> json_array = JsonFromString(books_json, CompactObj::memory_resource());
508+
std::optional<JsonType> json_array = ParseJsonUsingShardHeap(books_json);
508509
ASSERT_TRUE(json_array.has_value());
509510
cobj_.SetJson(std::move(json_array.value()));
510511
ASSERT_TRUE(cobj_.ObjType() == OBJ_JSON); // and now this is a JSON type

src/core/json/json_object.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,22 @@ std::optional<T> ParseWithDecoder(std::string_view input, json_decoder<T>&& deco
4646

4747
namespace dfly {
4848

49+
void InitTLJsonHeap(PMR_NS::memory_resource* mr) {
50+
detail::tl_mr = mr;
51+
}
52+
4953
std::optional<ShortLivedJSON> JsonFromString(std::string_view input) {
5054
return ParseWithDecoder(input, json_decoder<ShortLivedJSON>{});
5155
}
5256

53-
optional<JsonType> JsonFromString(string_view input, PMR_NS::memory_resource* mr) {
54-
return ParseWithDecoder(input, json_decoder<JsonType>{PMR_NS::polymorphic_allocator<char>{mr}});
57+
optional<JsonType> ParseJsonUsingShardHeap(string_view input) {
58+
return ParseWithDecoder(input, json_decoder<JsonType>{detail::StatelessJsonAllocator<char>{}});
5559
}
5660

57-
JsonType DeepCopyJSON(const JsonType* j, PMR_NS::memory_resource* mr) {
61+
JsonType DeepCopyJSON(const JsonType* j) {
5862
std::string serialized;
5963
j->dump(serialized);
60-
auto deserialized = JsonFromString(serialized, mr);
64+
auto deserialized = ParseJsonUsingShardHeap(serialized);
6165
DCHECK(deserialized.has_value());
6266
return std::move(deserialized.value());
6367
}

src/core/json/json_object.h

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#pragma once
66

7+
#include <glog/logging.h>
8+
79
#include <version> // for __cpp_lib_to_chars macro.
810

911
// std::from_chars is available in C++17 if __cpp_lib_to_chars is defined.
@@ -21,8 +23,55 @@
2123

2224
namespace dfly {
2325

26+
namespace detail {
27+
28+
inline thread_local PMR_NS::memory_resource* tl_mr = nullptr;
29+
30+
template <typename T> class StatelessJsonAllocator {
31+
public:
32+
using value_type = T;
33+
using size_type = std::size_t;
34+
using difference_type = std::ptrdiff_t;
35+
using is_always_equal = std::true_type;
36+
37+
template <typename U> StatelessJsonAllocator(const StatelessJsonAllocator<U>&) noexcept {
38+
}
39+
40+
StatelessJsonAllocator() noexcept {
41+
DCHECK_NE(tl_mr, nullptr) << "JSON allocator created without backing mimalloc heap";
42+
}
43+
44+
static value_type* allocate(size_type n) {
45+
void* ptr = tl_mr->allocate(n * sizeof(value_type), alignof(value_type));
46+
return static_cast<value_type*>(ptr);
47+
}
48+
49+
static void deallocate(value_type* ptr, size_type n) noexcept {
50+
tl_mr->deallocate(ptr, n * sizeof(value_type), alignof(value_type));
51+
}
52+
53+
static PMR_NS::memory_resource* resource() {
54+
return tl_mr;
55+
}
56+
};
57+
58+
template <typename T, typename U>
59+
bool operator==(const StatelessJsonAllocator<T>&, const StatelessJsonAllocator<U>&) noexcept {
60+
return true;
61+
}
62+
63+
template <typename T, typename U>
64+
bool operator!=(const StatelessJsonAllocator<T>&, const StatelessJsonAllocator<U>&) noexcept {
65+
return false;
66+
}
67+
68+
} // namespace detail
69+
70+
void InitTLJsonHeap(PMR_NS::memory_resource* mr);
71+
2472
using ShortLivedJSON = jsoncons::json;
25-
using JsonType = jsoncons::pmr::json;
73+
using JsonType =
74+
jsoncons::basic_json<char, jsoncons::sorted_policy, detail::StatelessJsonAllocator<char>>;
2675

2776
// A helper type to use in template functions which are expected to work with both ShortLivedJSON
2877
// and JsonType
@@ -36,12 +85,12 @@ std::optional<ShortLivedJSON> JsonFromString(std::string_view input);
3685

3786
// Parses string into JSON, using mimalloc heap for allocations. This method should only be used on
3887
// shards where mimalloc heap is initialized.
39-
std::optional<JsonType> JsonFromString(std::string_view input, PMR_NS::memory_resource* mr);
88+
std::optional<JsonType> ParseJsonUsingShardHeap(std::string_view input);
4089

4190
// Deep copy a JSON object, by first serializing it to a string and then deserializing the string.
4291
// The operation is intended to help during defragmentation, by copying into a page reserved for
4392
// malloc.
44-
JsonType DeepCopyJSON(const JsonType* j, PMR_NS::memory_resource* mr);
93+
JsonType DeepCopyJSON(const JsonType* j);
4594

4695
inline auto MakeJsonPathExpr(std::string_view path, std::error_code& ec)
4796
-> jsoncons::jsonpath::jsonpath_expression<JsonType> {

src/core/json/jsonpath_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class TestDriver : public Driver {
3939
template <typename JSON> JSON ValidJson(string_view str);
4040

4141
template <> JsonType ValidJson<JsonType>(string_view str) {
42-
auto res = ::dfly::JsonFromString(str, pmr::get_default_resource());
42+
auto res = ParseJsonUsingShardHeap(str);
4343
CHECK(res) << "Failed to parse json: " << str;
4444
return *res;
4545
}
@@ -91,6 +91,7 @@ class ScannerTest : public ::testing::Test {
9191
protected:
9292
ScannerTest() {
9393
driver_.lexer()->set_debug(1);
94+
InitTLJsonHeap(PMR_NS::get_default_resource());
9495
}
9596

9697
void SetInput(const std::string& str) {

src/core/page_usage_stats_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class PageUsageStatsTest : public ::testing::Test {
4545
}
4646

4747
PageUsageStatsTest() : m_(mi_heap_get_backing()) {
48+
InitTLJsonHeap(&m_);
4849
}
4950

5051
void SetUp() override {
@@ -191,7 +192,7 @@ TEST_F(PageUsageStatsTest, JSONCons) {
191192
// encoding.
192193
std::string_view data{R"#({"data": "some", "count": 1, "checked": false})#"};
193194

194-
auto parsed = JsonFromString(data, &m_);
195+
auto parsed = ParseJsonUsingShardHeap(data);
195196
EXPECT_TRUE(parsed.has_value());
196197

197198
c_obj_.SetJson(std::move(parsed.value()));

src/server/engine_shard.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb) {
422422

423423
CompactObj::InitThreadLocal(shard_->memory_resource());
424424
SmallString::InitThreadLocal(data_heap);
425+
InitTLJsonHeap(shard_->memory_resource());
425426

426427
shard_->shard_search_indices_ = std::make_unique<ShardDocIndices>();
427428
}

src/server/json_family.cc

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ std::optional<std::string> ConvertJsonPathToJsonPointer(string_view json_path) {
475475
result before invoking SetJsonSize. Note that even after calling std::move on an optional, it may
476476
still hold the JSON value, which can lead to incorrect memory tracking. */
477477
std::optional<JsonType> ShardJsonFromString(std::string_view input) {
478-
return JsonFromString(input, CompactObj::memory_resource());
478+
return ParseJsonUsingShardHeap(input);
479479
}
480480

481481
OpStatus SetFullJson(const OpArgs& op_args, string_view key, string_view json_str) {
@@ -556,8 +556,7 @@ OpResult<bool> SetPartialJson(const OpArgs& op_args, string_view key,
556556
path_exists = true;
557557
if (!is_nx_condition) {
558558
value_was_set = true;
559-
*val = JsonType(parsed_json.value(),
560-
std::pmr::polymorphic_allocator<char>{CompactObj::memory_resource()});
559+
*val = JsonType(parsed_json.value(), detail::StatelessJsonAllocator<char>{});
561560
}
562561
return {};
563562
};
@@ -1526,10 +1525,18 @@ auto OpMemory(const OpArgs& op_args, string_view key, const WrappedJsonPath& jso
15261525
ReadOnlyOperationOptions{false, CallbackResultOptions::DefaultReadOnlyOptions()});
15271526
}
15281527

1529-
// Returns json vector that represents the result of the json query.
1530-
auto OpResp(const OpArgs& op_args, string_view key, const WrappedJsonPath& json_path) {
1531-
auto cb = [](const string_view&, const JsonType& val) { return val; };
1532-
return JsonReadOnlyOperation<JsonType>(op_args, key, json_path, std::move(cb));
1528+
// Returns json vector that represents the result of the json query. A shard local
1529+
// heap allocated JSON cannot be copied and then destroyed on another shard because we use stateless
1530+
// allocators which forward all requests to thread local memory resource. So the value is first
1531+
// copied to the std allocator-backed type ShortLivedJSON.
1532+
OpResult<JsonCallbackResult<ShortLivedJSON>> OpResp(const OpArgs& op_args, string_view key,
1533+
const WrappedJsonPath& json_path) {
1534+
auto cb = [](const string_view&, const JsonType& val) {
1535+
string s;
1536+
val.dump(s);
1537+
return JsonFromString(s);
1538+
};
1539+
return JsonReadOnlyOperation<ShortLivedJSON>(op_args, key, json_path, std::move(cb));
15331540
}
15341541

15351542
// Returns boolean that represents the result of the operation.

src/server/json_family_memory_test.cc

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#include "base/logging.h"
77
#include "facade/facade_test.h"
88
#include "server/command_registry.h"
9-
#include "server/json_family.h"
109
#include "server/test_utils.h"
1110

1211
using namespace testing;
@@ -19,25 +18,33 @@ namespace dfly {
1918

2019
class JsonFamilyMemoryTest : public BaseFamilyTest {
2120
public:
22-
static dfly::MiMemoryResource* GetMemoryResource() {
23-
static thread_local mi_heap_t* heap = mi_heap_new();
24-
static thread_local dfly::MiMemoryResource memory_resource{heap};
21+
static MiMemoryResource* GetMemoryResource() {
22+
thread_local mi_heap_t* heap = mi_heap_new();
23+
thread_local MiMemoryResource memory_resource{heap};
2524
return &memory_resource;
2625
}
2726

2827
protected:
28+
void SetUp() override {
29+
BaseFamilyTest::SetUp();
30+
// Make the core running the thread use the same resource as the rest of the test. Although
31+
// BaseFamilyTest initializes the heap on shards serving transactions, the core running the test
32+
// needs this initialized explicitly.
33+
InitTLJsonHeap(GetMemoryResource());
34+
}
35+
2936
auto GetJsonMemoryUsageFromDb(std::string_view key) {
3037
return Run({"MEMORY", "USAGE", key, "WITHOUTKEY"});
3138
}
3239
};
3340

3441
size_t GetMemoryUsage() {
35-
return static_cast<MiMemoryResource*>(JsonFamilyMemoryTest::GetMemoryResource())->used();
42+
return JsonFamilyMemoryTest::GetMemoryResource()->used();
3643
}
3744

3845
size_t GetJsonMemoryUsageFromString(std::string_view json_str) {
3946
size_t start = GetMemoryUsage();
40-
auto json = dfly::JsonFromString(json_str, JsonFamilyMemoryTest::GetMemoryResource());
47+
auto json = ParseJsonUsingShardHeap(json_str);
4148
if (!json) {
4249
return 0;
4350
}
@@ -111,9 +118,8 @@ TEST_F(JsonFamilyMemoryTest, JsonConsDelTest) {
111118

112119
size_t start = GetMemoryUsage();
113120

114-
auto json = dfly::JsonFromString(start_json, JsonFamilyMemoryTest::GetMemoryResource());
115-
void* ptr =
116-
JsonFamilyMemoryTest::GetMemoryResource()->allocate(sizeof(JsonType), alignof(JsonType));
121+
auto json = ParseJsonUsingShardHeap(start_json);
122+
void* ptr = GetMemoryResource()->allocate(sizeof(JsonType), alignof(JsonType));
117123
JsonType* json_on_heap = new (ptr) JsonType(std::move(json).value());
118124

119125
size_t memory_usage_before_erase = GetMemoryUsage() - start;

src/server/rdb_load.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -984,8 +984,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
984984
} else if (rdb_type_ == RDB_TYPE_JSON) {
985985
size_t start_size = static_cast<MiMemoryResource*>(CompactObj::memory_resource())->used();
986986
{
987-
auto json = JsonFromString(blob, CompactObj::memory_resource());
988-
if (json) {
987+
if (auto json = ParseJsonUsingShardHeap(blob)) {
989988
pv_->SetJson(std::move(*json));
990989
} else {
991990
LOG(INFO) << "Invalid JSON string during rdb load of JSON object: " << blob;

0 commit comments

Comments
 (0)