Skip to content

Commit 4612b28

Browse files
authored
fix: replication for Z...STORE commands (#5728)
* fix: replication for Z...STORE commands
1 parent a6a42ad commit 4612b28

File tree

4 files changed

+55
-9
lines changed

4 files changed

+55
-9
lines changed

src/server/set_family.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
488488
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
489489
}
490490
}
491-
return 0;
491+
return OpStatus::OK;
492492
}
493493

494494
// We can use std::nullopt here because we check the type later.
@@ -544,6 +544,8 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
544544
res = StringSetWrapper{co, op_args.db_cntx}.Add(vals, UINT32_MAX, false);
545545
}
546546

547+
// TODO: consider optimization to record real command if the replica is in stable_sync state
548+
// and there is no slot migration process going on.
547549
if (journal_update && op_args.shard->journal()) {
548550
if (overwrite) {
549551
RecordJournal(op_args, "DEL"sv, ArgSlice{key});

src/server/zset_family.cc

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,9 @@ void OutputScoredArrayResult(const OpResult<ScoredArray>& result, SinkReplyBuild
152152
rb->SendScoredArray(result.value(), true /* with scores */);
153153
}
154154

155-
OpResult<DbSlice::ItAndUpdater> FindZEntry(const ZSetFamily::ZParams& zparams,
156-
const OpArgs& op_args, string_view key,
157-
size_t member_len) {
155+
OpResult<DbSlice::ItAndUpdater> PrepareZEntry(const ZSetFamily::ZParams& zparams,
156+
const OpArgs& op_args, string_view key,
157+
size_t member_len) {
158158
auto& db_slice = op_args.GetDbSlice();
159159
if (zparams.flags & ZADD_IN_XX) {
160160
return db_slice.FindMutable(op_args.db_cntx, key, OBJ_ZSET);
@@ -1551,7 +1551,8 @@ void ZBooleanOperation(CmdArgList args, string_view cmd, bool is_union, bool sto
15511551
auto store_cb = [&, dest_shard = Shard(dest_key, maps.size())](Transaction* t,
15521552
EngineShard* shard) {
15531553
if (shard->shard_id() == dest_shard)
1554-
ZSetFamily::OpAdd(t->GetOpArgs(shard), ZSetFamily::ZParams{.override = true}, dest_key,
1554+
ZSetFamily::OpAdd(t->GetOpArgs(shard),
1555+
ZSetFamily::ZParams{.override = true, .journal_update = true}, dest_key,
15551556
smvec);
15561557
return OpStatus::OK;
15571558
};
@@ -1696,7 +1697,8 @@ void ZRangeInternal(CmdArgList args, ZSetFamily::RangeParams range_params, Trans
16961697
mvec[i++] = {score, str};
16971698
}
16981699

1699-
add_result = ZSetFamily::OpAdd(t->GetOpArgs(shard), ZSetFamily::ZParams{.override = true},
1700+
add_result = ZSetFamily::OpAdd(t->GetOpArgs(shard),
1701+
ZSetFamily::ZParams{.override = true, .journal_update = true},
17001702
*range_params.store_key, mvec);
17011703

17021704
return OpStatus::OK;
@@ -1967,6 +1969,9 @@ OpResult<ZSetFamily::AddResult> ZSetFamily::OpAdd(const OpArgs& op_args,
19671969
if (res_it && IsValid(res_it->it)) {
19681970
res_it->post_updater.Run();
19691971
db_slice.Del(op_args.db_cntx, res_it->it);
1972+
if (zparams.journal_update && op_args.shard->journal()) {
1973+
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
1974+
}
19701975
}
19711976
return OpStatus::OK;
19721977
}
@@ -1976,7 +1981,7 @@ OpResult<ZSetFamily::AddResult> ZSetFamily::OpAdd(const OpArgs& op_args,
19761981
size_t field_len = members.size() > server.zset_max_listpack_entries
19771982
? UINT32_MAX
19781983
: members.front().second.size();
1979-
auto res_it = FindZEntry(zparams, op_args, key, field_len);
1984+
auto res_it = PrepareZEntry(zparams, op_args, key, field_len);
19801985

19811986
if (!res_it)
19821987
return res_it.status();
@@ -2039,6 +2044,26 @@ OpResult<ZSetFamily::AddResult> ZSetFamily::OpAdd(const OpArgs& op_args,
20392044

20402045
if (op_status != OpStatus::OK)
20412046
return op_status;
2047+
2048+
// TODO: consider optimization to record real command if the replica is in stable_sync state
2049+
// and there is no slot migration process going on.
2050+
if (zparams.journal_update && op_args.shard->journal()) {
2051+
if (zparams.override) {
2052+
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
2053+
}
2054+
2055+
vector<string> scores;
2056+
vector<string_view> mapped;
2057+
scores.reserve(members.size());
2058+
mapped.reserve(members.size() * 2 + 1);
2059+
mapped.push_back(key);
2060+
for (const auto& [score, member] : members) {
2061+
scores.push_back(absl::StrCat(score));
2062+
mapped.push_back(scores.back());
2063+
mapped.push_back(member);
2064+
}
2065+
RecordJournal(op_args, "ZADD"sv, mapped);
2066+
}
20422067
return aresult;
20432068
}
20442069

@@ -2336,7 +2361,8 @@ void ZSetFamily::ZDiffStore(CmdArgList args, const CommandContext& cmd_cntx) {
23362361

23372362
auto store_cb = [&](Transaction* t, EngineShard* shard) {
23382363
if (shard->shard_id() == dest_shard)
2339-
ZSetFamily::OpAdd(t->GetOpArgs(shard), ZSetFamily::ZParams{.override = true}, dest_key,
2364+
ZSetFamily::OpAdd(t->GetOpArgs(shard),
2365+
ZSetFamily::ZParams{.override = true, .journal_update = true}, dest_key,
23402366
smvec);
23412367
return OpStatus::OK;
23422368
};
@@ -2836,7 +2862,7 @@ constexpr uint32_t kZUnionStore = WRITE | SORTEDSET | SLOW;
28362862
} // namespace acl
28372863

28382864
void ZSetFamily::Register(CommandRegistry* registry) {
2839-
constexpr uint32_t kStoreMask = CO::WRITE | CO::VARIADIC_KEYS | CO::DENYOOM;
2865+
constexpr uint32_t kStoreMask = CO::WRITE | CO::VARIADIC_KEYS | CO::DENYOOM | CO::NO_AUTOJOURNAL;
28402866
registry->StartFamily();
28412867
// TODO: to add support for SCRIPT for BZPOPMIN, BZPOPMAX similarly to BLPOP.
28422868
*registry

src/server/zset_family.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class ZSetFamily {
6464
unsigned flags = 0; // mask of ZADD_IN_ macros.
6565
bool ch = false; // Corresponds to CH option.
6666
bool override = false;
67+
bool journal_update = false;
6768
};
6869

6970
using ScoredMember = std::pair<std::string, double>;

tests/dragonfly/replication_test.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,22 @@ async def check_expire(key):
674674
# Check SUNIONSTORE turns into DEL and SADD
675675
await check_list_ooo("SUNIONSTORE k set1 set2", [r"DEL k", r"SADD k (.*?)"])
676676

677+
# Check ZDIFFSTORE turns into DEL and ZADD
678+
await c_master.execute_command("zadd zet1 1 v1 2 v2 3 v3")
679+
await c_master.execute_command("zadd zet2 1 v1 2 v2")
680+
await skip_cmd()
681+
await skip_cmd()
682+
await check_list("ZDIFFSTORE k 2 zet1 zet2", [r"DEL k", r"ZADD k 3 v3"])
683+
684+
# Check ZINTERSTORE turns into DEL and ZADD
685+
await check_list("ZINTERSTORE k 2 zet1 zet2", [r"DEL k", r"ZADD k (.*?)"])
686+
687+
# Check ZRANGESTORE turns into SREM and ZADD
688+
await check_list_ooo("ZRANGESTORE k zet1 2 -1", [r"DEL k", r"ZADD k 3 v3"])
689+
690+
# Check ZUNIONSTORE turns into DEL and ZADD
691+
await check_list_ooo("ZUNIONSTORE k 2 zet1 zet2", [r"DEL k", r"ZADD k (.*?)"])
692+
677693
await c_master.set("k1", "1000")
678694
await c_master.set("k2", "1100")
679695
await skip_cmd()
@@ -751,6 +767,7 @@ async def check_expire(key):
751767
[r"XTRIM k-stream MINID 0", r"SREM k-one-element-set value[12]"],
752768
)
753769

770+
# TODO next Z-tests won't work with no-point-in-time replication
754771
# check BZMPOP turns into ZPOPMAX and ZPOPMIN command
755772
await c_master.zadd("key", {"a": 1, "b": 2, "c": 3})
756773
await skip_cmd()

0 commit comments

Comments
 (0)