Skip to content

Commit b676a6d

Browse files
authored
fix(server): Support FLUSH(ALL) SYNC (#5821)
1 parent 9eba4bc commit b676a6d

File tree

5 files changed

+38
-39
lines changed

5 files changed

+38
-39
lines changed

src/server/db_slice.cc

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -893,7 +893,7 @@ void DbSlice::FlushSlots(const cluster::SlotRanges& slot_ranges) {
893893
}).Detach();
894894
}
895895

896-
void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
896+
util::fb2::Fiber DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
897897
bool clear_tiered = owner_->tiered_storage() != nullptr;
898898

899899
if (clear_tiered)
@@ -902,7 +902,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
902902
DbTableArray flush_db_arr(db_arr_.size());
903903

904904
for (DbIndex index : indexes) {
905-
if (!index) {
905+
if (index == 0) { // TODO: Async dealloc?
906906
owner_->search_indices()->DropAllIndices();
907907
}
908908

@@ -925,20 +925,17 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
925925
ServerState::kGlibcmalloc);
926926
};
927927

928-
fb2::Fiber("flush_dbs", std::move(cb)).Detach();
928+
return {"flush_dbs", std::move(cb)};
929929
}
930930

931-
void DbSlice::FlushDb(DbIndex db_ind) {
931+
util::fb2::Fiber DbSlice::FlushDb(DbIndex db_ind) {
932932
DVLOG(1) << "Flushing db " << db_ind;
933933

934934
// clear client tracking map.
935935
client_tracking_map_.clear();
936936

937-
if (db_ind != kDbAll) {
938-
// Flush a single database if a specific index is provided
939-
FlushDbIndexes({db_ind});
940-
return;
941-
}
937+
if (db_ind != kDbAll) // Flush a single database if a specific index is provided
938+
return FlushDbIndexes({db_ind});
942939

943940
std::vector<DbIndex> indexes;
944941
indexes.reserve(db_arr_.size());
@@ -948,7 +945,7 @@ void DbSlice::FlushDb(DbIndex db_ind) {
948945
}
949946
}
950947

951-
FlushDbIndexes(indexes);
948+
return FlushDbIndexes(indexes);
952949
}
953950

954951
void DbSlice::AddExpire(DbIndex db_ind, const Iterator& main_it, uint64_t at) {

src/server/db_slice.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ class DbSlice {
331331
constexpr static DbIndex kDbAll = 0xFFFF;
332332

333333
// Flushes db_ind or all databases if kDbAll is passed
334-
void FlushDb(DbIndex db_ind);
334+
util::fb2::Fiber FlushDb(DbIndex db_ind);
335335

336336
// Flushes the data of given slot ranges.
337337
void FlushSlots(const cluster::SlotRanges& slot_ranges);
@@ -551,7 +551,7 @@ class DbSlice {
551551
bool force_update);
552552

553553
void FlushSlotsFb(const cluster::SlotSet& slot_ids);
554-
void FlushDbIndexes(const std::vector<DbIndex>& indexes);
554+
util::fb2::Fiber FlushDbIndexes(const std::vector<DbIndex>& indexes);
555555

556556
// Invalidate all watched keys in database. Used on FLUSH.
557557
void InvalidateDbWatches(DbIndex db_indx);

src/server/server_family.cc

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,10 +1284,7 @@ void ServerFamily::FlushAll(Namespace* ns) {
12841284
boost::intrusive_ptr<Transaction> flush_trans(new Transaction{cid});
12851285
flush_trans->InitByArgs(ns, 0, {});
12861286
VLOG(1) << "Performing flush";
1287-
error_code ec = Drakarys(flush_trans.get(), DbSlice::kDbAll);
1288-
if (ec) {
1289-
LOG(ERROR) << "Error flushing db " << ec.message();
1290-
}
1287+
Drakarys(flush_trans.get(), DbSlice::kDbAll, false);
12911288
}
12921289

12931290
// Load starts as many fibers as there are files to load each one separately.
@@ -2206,17 +2203,20 @@ bool ServerFamily::TEST_IsSaving() const {
22062203
return is_saving.load(std::memory_order_relaxed);
22072204
}
22082205

2209-
error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) {
2206+
void ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind, bool wait) {
22102207
VLOG(1) << "Drakarys";
22112208

2209+
vector<fb2::Fiber> fibers(shard_set->size());
22122210
transaction->Execute(
2213-
[db_ind](Transaction* t, EngineShard* shard) {
2214-
t->GetDbSlice(shard->shard_id()).FlushDb(db_ind);
2211+
[db_ind, &fibers](Transaction* t, EngineShard* shard) {
2212+
fibers[shard->shard_id()] = t->GetDbSlice(shard->shard_id()).FlushDb(db_ind);
22152213
return OpStatus::OK;
22162214
},
22172215
true);
22182216

2219-
return error_code{};
2217+
auto action = wait ? &fb2::Fiber::JoinIfNeeded : &fb2::Fiber::Detach;
2218+
for (auto& f : fibers)
2219+
(f.*action)();
22202220
}
22212221

22222222
SaveInfoData ServerFamily::GetLastSaveInfo() const {
@@ -2286,20 +2286,14 @@ void ServerFamily::SendInvalidationMessages() const {
22862286
}
22872287

22882288
void ServerFamily::FlushDb(CmdArgList args, const CommandContext& cmd_cntx) {
2289-
DCHECK(cmd_cntx.tx);
2290-
Drakarys(cmd_cntx.tx, cmd_cntx.tx->GetDbIndex());
2291-
SendInvalidationMessages();
2292-
cmd_cntx.rb->SendOk();
2293-
}
2289+
if (args.size() > 1)
2290+
return cmd_cntx.rb->SendError(kSyntaxErr);
22942291

2295-
void ServerFamily::FlushAll(CmdArgList args, const CommandContext& cmd_cntx) {
2296-
if (args.size() > 1) {
2297-
cmd_cntx.rb->SendError(kSyntaxErr);
2298-
return;
2299-
}
2292+
bool sync = CmdArgParser{args}.Check("SYNC");
2293+
string_view cmd_name = cmd_cntx.tx->GetCId()->name();
2294+
DbIndex index = cmd_name == "FLUSHALL" ? DbSlice::kDbAll : cmd_cntx.tx->GetDbIndex();
23002295

2301-
DCHECK(cmd_cntx.tx);
2302-
Drakarys(cmd_cntx.tx, DbSlice::kDbAll);
2296+
Drakarys(cmd_cntx.tx, index, sync);
23032297
SendInvalidationMessages();
23042298
cmd_cntx.rb->SendOk();
23052299
}
@@ -4037,10 +4031,10 @@ void ServerFamily::Register(CommandRegistry* registry) {
40374031
<< CI{"CONFIG", CO::ADMIN | CO::LOADING | CO::DANGEROUS, -2, 0, 0, acl::kConfig}.HFUNC(Config)
40384032
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, acl::kDbSize}.HFUNC(DbSize)
40394033
<< CI{"DEBUG", CO::ADMIN | CO::LOADING, -2, 0, 0, acl::kDebug}.HFUNC(Debug)
4040-
<< CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS | CO::DANGEROUS, 1, 0, 0, acl::kFlushDB}.HFUNC(
4034+
<< CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS | CO::DANGEROUS, -1, 0, 0, acl::kFlushDB}.HFUNC(
40414035
FlushDb)
40424036
<< CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS | CO::DANGEROUS, -1, 0, 0, acl::kFlushAll}
4043-
.HFUNC(FlushAll)
4037+
.HFUNC(FlushDb)
40444038
<< CI{"INFO", CO::LOADING, -1, 0, 0, acl::kInfo}.HFUNC(Info)
40454039
<< CI{"HELLO", CO::LOADING, -1, 0, 0, acl::kHello}.HFUNC(Hello)
40464040
<< CI{"LASTSAVE", CO::LOADING | CO::FAST, 1, 0, 0, acl::kLastSave}.HFUNC(LastSave)

src/server/server_family.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ class ServerFamily {
250250

251251
// Burns down and destroy all the data from the database.
252252
// if kDbAll is passed, burns all the databases to the ground.
253-
std::error_code Drakarys(Transaction* transaction, DbIndex db_ind);
253+
// `wait` makes it wait for all fibers to finish and decommit
254+
void Drakarys(Transaction* transaction, DbIndex db_ind, bool wait);
254255

255256
SaveInfoData GetLastSaveInfo() const;
256257

@@ -344,7 +345,6 @@ class ServerFamily {
344345
void Dfly(CmdArgList args, const CommandContext& cmd_cntx);
345346
void Memory(CmdArgList args, const CommandContext& cmd_cntx);
346347
void FlushDb(CmdArgList args, const CommandContext& cmd_cntx);
347-
void FlushAll(CmdArgList args, const CommandContext& cmd_cntx);
348348
void Info(CmdArgList args, const CommandContext& cmd_cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_);
349349
void Hello(CmdArgList args, const CommandContext& cmd_cntx);
350350
void LastSave(CmdArgList args, const CommandContext& cmd_cntx);

tests/dragonfly/memory_test.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
# We limit to 5gb just in case to sanity check the gh runner. Otherwise, if we ask for too much
2525
# memory it might force the gh runner to run out of memory (since OOM killer might not even
2626
# get a chance to run).
27-
async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements):
27+
async def test_rss_used_mem_gap(df_factory: DflyInstanceFactory, type, keys, val_size, elements):
2828
dbfilename = f"dump_{tmp_file_name()}"
2929
instance = df_factory.create(
3030
proactor_threads=2,
@@ -74,9 +74,17 @@ async def check_memory():
7474

7575
assert await client.execute_command("SAVE", "DF") == True
7676
assert await client.execute_command("DFLY", "LOAD", f"{dbfilename}-summary.dfs") == "OK"
77-
#
77+
7878
await check_memory()
79-
await client.execute_command("FLUSHALL")
79+
80+
# FLUSHALL sync waits for flush to finish and decommit memory, so send INFO immediately after
81+
p = client.pipeline(transaction=False)
82+
p.execute_command("FLUSHALL", "SYNC") # flushall(asynchronous=False) will just issue FLUSHALL$
83+
p.info("memory")
84+
85+
info = (await p.execute())[-1]
86+
assert info["used_memory"] < 2 * 1_000_000 # Table memory
87+
assert info["used_memory_rss"] < min_rss / 10 # RSS must have been freed
8088

8189

8290
#

0 commit comments

Comments
 (0)