Skip to content

Commit 978961f

Browse files
authored
server: Parse JSON on destination shard (#6061)
* server: Parse JSON on shard JSON is parsed on shard to make sure that the shard's thread local memory resource is used to allocate. This is done in preparation to use a stateless allocator. With a stateless allocator, it will not be possible to safely parse the search value on coordinator shard, because currently the coordinator shard uses the default memory resource and the destination shard uses mimalloc. It can be possible that the coordinator shard does not have mimalloc heap, in which case it will not be able to parse using such a heap. To avoid this the parsing is pushed to destination shard, and the error handling is delayed until the transaction hop finishes. * core: Introduce short lived json type This is the default heap allocated json type from jsoncons, using std::allocator. It is renamed to clearly indicate its use in a non db context. * server: Use std allocated JSON where applicable Signed-off-by: Abhijat Malviya <[email protected]>
1 parent b5f3a1d commit 978961f

File tree

4 files changed

+92
-66
lines changed

4 files changed

+92
-66
lines changed

src/core/json/json_object.cc

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
#include "base/logging.h"
88

99
using namespace jsoncons;
10-
using namespace std;
1110

12-
namespace dfly {
11+
namespace {
1312

14-
optional<JsonType> JsonFromString(string_view input, PMR_NS::memory_resource* mr) {
15-
error_code ec;
13+
template <typename T>
14+
std::optional<T> ParseWithDecoder(std::string_view input, json_decoder<T>&& decoder) {
15+
std::error_code ec;
1616
auto JsonErrorHandler = [](json_errc ec, const ser_context&) {
1717
VLOG(1) << "Error while decode JSON: " << make_error_code(ec).message();
1818
return false;
@@ -28,10 +28,9 @@ optional<JsonType> JsonFromString(string_view input, PMR_NS::memory_resource* mr
2828

2929
/* The maximum possible JSON nesting depth is either the specified json_nesting_depth_limit or
3030
half of the input size. Since nesting a JSON object requires at least 2 characters. */
31-
auto parser_options = jsoncons::json_options{}.max_nesting_depth(
31+
auto parser_options = json_options{}.max_nesting_depth(
3232
std::min(json_nesting_depth_limit, uint32_t(input.size() / 2)));
3333

34-
json_decoder<JsonType> decoder(std::pmr::polymorphic_allocator<char>{mr});
3534
json_parser parser(parser_options, JsonErrorHandler);
3635

3736
parser.update(input);
@@ -40,7 +39,19 @@ optional<JsonType> JsonFromString(string_view input, PMR_NS::memory_resource* mr
4039
if (!ec && decoder.is_valid()) {
4140
return decoder.get_result();
4241
}
43-
return nullopt;
42+
return std::nullopt;
43+
}
44+
45+
} // namespace
46+
47+
namespace dfly {
48+
49+
std::optional<ShortLivedJSON> JsonFromString(std::string_view input) {
50+
return ParseWithDecoder(input, json_decoder<ShortLivedJSON>{});
51+
}
52+
53+
optional<JsonType> JsonFromString(string_view input, PMR_NS::memory_resource* mr) {
54+
return ParseWithDecoder(input, json_decoder<JsonType>{PMR_NS::polymorphic_allocator<char>{mr}});
4455
}
4556

4657
JsonType DeepCopyJSON(const JsonType* j, PMR_NS::memory_resource* mr) {

src/core/json/json_object.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,21 @@
2121

2222
namespace dfly {
2323

24+
using ShortLivedJSON = jsoncons::json;
2425
using JsonType = jsoncons::pmr::json;
2526

26-
// Build a json object from string. If the string is not legal json, will return nullopt
27+
// A helper type to use in template functions which are expected to work with both ShortLivedJSON
28+
// and JsonType
29+
template <typename Allocator>
30+
using JsonWithAllocator = jsoncons::basic_json<char, jsoncons::sorted_policy, Allocator>;
31+
32+
// Parses string into JSON. Any allocatons are done using the std allocator. This method should be
33+
// used for generic JSON parsing, in particular, it should not be used to parse objects which will
34+
// be stored in the db, as the backing storage is not managed by mimalloc.
35+
std::optional<ShortLivedJSON> JsonFromString(std::string_view input);
36+
37+
// Parses string into JSON, using mimalloc heap for allocations. This method should only be used on
38+
// shards where mimalloc heap is initialized.
2739
std::optional<JsonType> JsonFromString(std::string_view input, PMR_NS::memory_resource* mr);
2840

2941
// Deep copy a JSON object, by first serializing it to a string and then deserializing the string.

src/server/cluster/cluster_config.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
127127
namespace {
128128
constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv;
129129

130-
template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
130+
template <typename T> optional<T> ReadNumeric(const ShortLivedJSON& obj) {
131131
if (!obj.is_number()) {
132132
LOG(ERROR) << kInvalidConfigPrefix << "object is not a number " << obj;
133133
return nullopt;
@@ -136,7 +136,7 @@ template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
136136
return obj.as<T>();
137137
}
138138

139-
optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
139+
optional<SlotRanges> GetClusterSlotRanges(const ShortLivedJSON& slots) {
140140
if (!slots.is_array()) {
141141
LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
142142
return nullopt;
@@ -162,7 +162,7 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
162162
return SlotRanges(ranges);
163163
}
164164

165-
optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
165+
optional<ClusterExtendedNodeInfo> ParseClusterNode(const ShortLivedJSON& json) {
166166
if (!json.is_object()) {
167167
LOG(ERROR) << kInvalidConfigPrefix << "node config is not an object " << json;
168168
return nullopt;
@@ -221,7 +221,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
221221
return node;
222222
}
223223

224-
optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
224+
optional<std::vector<MigrationInfo>> ParseMigrations(const ShortLivedJSON& json) {
225225
std::vector<MigrationInfo> res;
226226
if (json.is_null()) {
227227
return res;
@@ -251,7 +251,7 @@ optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
251251
return res;
252252
}
253253

254-
optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
254+
optional<ClusterShardInfos> BuildClusterConfigFromJson(const ShortLivedJSON& json) {
255255
std::vector<ClusterShardInfo> config;
256256

257257
if (!json.is_array()) {
@@ -309,7 +309,7 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
309309
/* static */
310310
shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
311311
std::string_view json_str) {
312-
optional<JsonType> json_config = JsonFromString(json_str, PMR_NS::get_default_resource());
312+
optional<ShortLivedJSON> json_config = JsonFromString(json_str);
313313
if (!json_config.has_value()) {
314314
LOG(ERROR) << "Can't parse JSON for ClusterConfig " << json_str;
315315
return nullptr;

src/server/json_family.cc

Lines changed: 55 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,12 @@ void Send(const std::vector<std::string>& vec, RedisReplyBuilder* rb) {
235235
Send(vec.begin(), vec.end(), rb);
236236
}
237237

238-
void Send(const JsonType& value, RedisReplyBuilder* rb) {
238+
template <typename Allocator>
239+
void Send(const JsonWithAllocator<Allocator>& value, RedisReplyBuilder* rb) {
239240
if (value.is_double()) {
240241
Send(value.as_double(), rb);
241242
} else if (value.is_number()) {
242-
Send(value.as_integer<long>(), rb);
243+
Send(value.template as_integer<long>(), rb);
243244
} else if (value.is_bool()) {
244245
rb->SendSimpleString(value.as_bool() ? "true" : "false");
245246
} else if (value.is_null()) {
@@ -341,9 +342,7 @@ void SendJsonString(const OpResult<string>& result, RedisReplyBuilder* rb) {
341342
if (result) {
342343
const string& json_str = result.value();
343344
if (rb->IsResp3()) {
344-
std::optional<JsonType> parsed_json =
345-
JsonFromString(json_str, PMR_NS::get_default_resource());
346-
if (parsed_json) {
345+
if (const std::optional<ShortLivedJSON> parsed_json = JsonFromString(json_str)) {
347346
Send(parsed_json.value(), rb);
348347
return;
349348
}
@@ -470,18 +469,13 @@ std::optional<std::string> ConvertJsonPathToJsonPointer(string_view json_path) {
470469
return pointer;
471470
}
472471

473-
// Use this method on the coordinator thread
474-
std::optional<JsonType> JsonFromString(std::string_view input) {
475-
return dfly::JsonFromString(input, PMR_NS::get_default_resource());
476-
}
477-
478472
/* Use this method on the shard thread
479473
480474
If you do memory tracking, make sure to initialize it before calling this method, and reset the
481475
result before invoking SetJsonSize. Note that even after calling std::move on an optional, it may
482476
still hold the JSON value, which can lead to incorrect memory tracking. */
483477
std::optional<JsonType> ShardJsonFromString(std::string_view input) {
484-
return dfly::JsonFromString(input, CompactObj::memory_resource());
478+
return JsonFromString(input, CompactObj::memory_resource());
485479
}
486480

487481
OpStatus SetFullJson(const OpArgs& op_args, string_view key, string_view json_str) {
@@ -1307,7 +1301,19 @@ auto OpArrTrim(const OpArgs& op_args, string_view key, const WrappedJsonPath& pa
13071301
// Returns numeric vector that represents the new length of the array at each path.
13081302
OpResult<JsonCallbackResult<OptSize>> OpArrInsert(const OpArgs& op_args, string_view key,
13091303
const WrappedJsonPath& json_path, int index,
1310-
const vector<JsonType>& new_values) {
1304+
const vector<string_view>& new_values) {
1305+
vector<JsonType> parsed_values;
1306+
parsed_values.reserve(new_values.size());
1307+
1308+
for (const auto& nv : new_values) {
1309+
const optional<JsonType> v = ShardJsonFromString(nv);
1310+
if (!v) {
1311+
return OpStatus::SYNTAX_ERR;
1312+
}
1313+
1314+
parsed_values.emplace_back(std::move(*v));
1315+
}
1316+
13111317
bool out_of_boundaries_encountered = false;
13121318

13131319
// Insert user-supplied value into the supplied index that should be valid.
@@ -1336,7 +1342,7 @@ OpResult<JsonCallbackResult<OptSize>> OpArrInsert(const OpArgs& op_args, string_
13361342
}
13371343

13381344
auto it = GetJsonArrayIterator(val, insert_before_index);
1339-
for (auto& new_val : new_values) {
1345+
for (auto& new_val : parsed_values) {
13401346
it = val->insert(it, new_val);
13411347
it++;
13421348
}
@@ -1350,14 +1356,26 @@ OpResult<JsonCallbackResult<OptSize>> OpArrInsert(const OpArgs& op_args, string_
13501356
return res;
13511357
}
13521358

1353-
auto OpArrAppend(const OpArgs& op_args, string_view key, const WrappedJsonPath& path,
1354-
const vector<JsonType>& append_values) {
1359+
OpResult<JsonCallbackResult<optional<optional<unsigned long>>>> OpArrAppend(
1360+
const OpArgs& op_args, string_view key, const WrappedJsonPath& path,
1361+
const vector<string_view>& append_values) {
1362+
vector<JsonType> parsed_values;
1363+
parsed_values.reserve(append_values.size());
1364+
1365+
for (const auto& v : append_values) {
1366+
const optional<JsonType> parsed = ShardJsonFromString(v);
1367+
if (!parsed) {
1368+
return OpStatus::SYNTAX_ERR;
1369+
}
1370+
parsed_values.emplace_back(std::move(*parsed));
1371+
}
1372+
13551373
auto cb = [&](std::optional<std::string_view>,
13561374
JsonType* val) -> MutateCallbackResult<std::optional<std::size_t>> {
13571375
if (!val->is_array()) {
13581376
return {};
13591377
}
1360-
for (auto& new_val : append_values) {
1378+
for (auto& new_val : parsed_values) {
13611379
val->emplace_back(new_val);
13621380
}
13631381
return {false, val->size()};
@@ -1368,8 +1386,15 @@ auto OpArrAppend(const OpArgs& op_args, string_view key, const WrappedJsonPath&
13681386
// Returns a numeric vector representing each JSON value first index of the JSON scalar.
13691387
// An index value of -1 represents unfound in the array.
13701388
// JSON scalar has types of string, boolean, null, and number.
1371-
auto OpArrIndex(const OpArgs& op_args, string_view key, const WrappedJsonPath& json_path,
1372-
const JsonType& search_val, int start_index, int end_index) {
1389+
OpResult<JsonCallbackResult<optional<long>>> OpArrIndex(const OpArgs& op_args, string_view key,
1390+
const WrappedJsonPath& json_path,
1391+
string_view search_val, int start_index,
1392+
int end_index) {
1393+
const optional<JsonType> search_value_json = ShardJsonFromString(search_val);
1394+
if (!search_value_json) {
1395+
return OpStatus::SYNTAX_ERR;
1396+
}
1397+
13731398
auto cb = [&](const string_view&, const JsonType& val) -> std::optional<long> {
13741399
if (!val.is_array()) {
13751400
return std::nullopt;
@@ -1403,7 +1428,7 @@ auto OpArrIndex(const OpArgs& op_args, string_view key, const WrappedJsonPath& j
14031428
size_t pos = -1;
14041429
auto it = GetJsonArrayIterator(val, pos_start_index);
14051430
while (it != val.array_range().end()) {
1406-
if (JsonAreEquals(search_val, *it)) {
1431+
if (JsonAreEquals(search_value_json, *it)) {
14071432
pos = pos_start_index;
14081433
break;
14091434
}
@@ -1710,8 +1735,7 @@ void JsonFamily::Resp(CmdArgList args, const CommandContext& cmd_cntx) {
17101735
};
17111736

17121737
auto result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
1713-
auto* rb = static_cast<RedisReplyBuilder*>(builder);
1714-
reply_generic::Send(result, rb);
1738+
reply_generic::Send(result, builder);
17151739
}
17161740

17171741
void JsonFamily::Debug(CmdArgList args, const CommandContext& cmd_cntx) {
@@ -1826,11 +1850,7 @@ void JsonFamily::ArrIndex(CmdArgList args, const CommandContext& cmd_cntx) {
18261850
auto* builder = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
18271851
WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path));
18281852

1829-
optional<JsonType> search_value = JsonFromString(parser.Next());
1830-
if (!search_value) {
1831-
builder->SendError(kSyntaxErr);
1832-
return;
1833-
}
1853+
string_view search_value = parser.Next();
18341854

18351855
int start_index = 0;
18361856
if (parser.HasNext()) {
@@ -1851,12 +1871,11 @@ void JsonFamily::ArrIndex(CmdArgList args, const CommandContext& cmd_cntx) {
18511871
}
18521872

18531873
auto cb = [&](Transaction* t, EngineShard* shard) {
1854-
return OpArrIndex(t->GetOpArgs(shard), key, json_path, *search_value, start_index, end_index);
1874+
return OpArrIndex(t->GetOpArgs(shard), key, json_path, search_value, start_index, end_index);
18551875
};
18561876

18571877
auto result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
1858-
auto* rb = static_cast<RedisReplyBuilder*>(builder);
1859-
reply_generic::Send(result, rb);
1878+
reply_generic::Send(result, builder);
18601879
}
18611880

18621881
void JsonFamily::ArrInsert(CmdArgList args, const CommandContext& cmd_cntx) {
@@ -1873,15 +1892,9 @@ void JsonFamily::ArrInsert(CmdArgList args, const CommandContext& cmd_cntx) {
18731892

18741893
WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path));
18751894

1876-
vector<JsonType> new_values;
1895+
vector<string_view> new_values;
18771896
for (size_t i = 3; i < args.size(); i++) {
1878-
optional<JsonType> val = JsonFromString(ArgS(args, i));
1879-
if (!val) {
1880-
builder->SendError(kSyntaxErr);
1881-
return;
1882-
}
1883-
1884-
new_values.emplace_back(std::move(*val));
1897+
new_values.emplace_back(ArgS(args, i));
18851898
}
18861899

18871900
auto cb = [&](Transaction* t, EngineShard* shard) {
@@ -1900,26 +1913,17 @@ void JsonFamily::ArrAppend(CmdArgList args, const CommandContext& cmd_cntx) {
19001913
auto* builder = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
19011914
WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path));
19021915

1903-
vector<JsonType> append_values;
1904-
1905-
// TODO: there is a bug here, because we parse json using the allocator from
1906-
// the coordinator thread, and we pass it to the shard thread, which is not safe.
1916+
vector<string_view> append_values;
19071917
for (size_t i = 2; i < args.size(); ++i) {
1908-
optional<JsonType> converted_val = JsonFromString(ArgS(args, i));
1909-
if (!converted_val) {
1910-
builder->SendError(kSyntaxErr);
1911-
return;
1912-
}
1913-
append_values.emplace_back(converted_val);
1918+
append_values.emplace_back(ArgS(args, i));
19141919
}
19151920

19161921
auto cb = [&](Transaction* t, EngineShard* shard) {
19171922
return OpArrAppend(t->GetOpArgs(shard), key, json_path, append_values);
19181923
};
19191924

19201925
auto result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
1921-
auto* rb = static_cast<RedisReplyBuilder*>(builder);
1922-
reply_generic::Send(result, rb);
1926+
reply_generic::Send(result, builder);
19231927
}
19241928

19251929
void JsonFamily::ArrTrim(CmdArgList args, const CommandContext& cmd_cntx) {
@@ -2000,7 +2004,7 @@ void JsonFamily::StrAppend(CmdArgList args, const CommandContext& cmd_cntx) {
20002004
WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path));
20012005

20022006
// We try parsing the value into json string object first.
2003-
optional<JsonType> parsed_json = JsonFromString(value);
2007+
optional<ShortLivedJSON> parsed_json = JsonFromString(value);
20042008
if (!parsed_json || !parsed_json->is_string()) {
20052009
return builder->SendError("expected string value", kSyntaxErrType);
20062010
};
@@ -2011,8 +2015,7 @@ void JsonFamily::StrAppend(CmdArgList args, const CommandContext& cmd_cntx) {
20112015
};
20122016

20132017
auto result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
2014-
auto* rb = static_cast<RedisReplyBuilder*>(builder);
2015-
reply_generic::Send(result, rb);
2018+
reply_generic::Send(result, builder);
20162019
}
20172020

20182021
void JsonFamily::ObjKeys(CmdArgList args, const CommandContext& cmd_cntx) {

0 commit comments

Comments
 (0)