Skip to content

Commit 0fbcbca

Browse files
authored
fix: Remove index docs on eviction and cluster migration (#6062)
When we evict keys or during slot migration we should also remove index docs. Signed-off-by: mkaruza <[email protected]>
1 parent 38cdb0f commit 0fbcbca

File tree

5 files changed

+132
-23
lines changed

5 files changed

+132
-23
lines changed

src/server/db_slice.cc

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotBuckets& eb, PrimeTable
230230
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
231231
RecordExpiryBlocking(cntx_.db_index, key);
232232
}
233-
db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table);
233+
db_slice_->Del(cntx_, DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)));
234234

235235
++evicted_;
236236
}
@@ -802,18 +802,25 @@ void DbSlice::ActivateDb(DbIndex db_ind) {
802802
CreateDb(db_ind);
803803
}
804804

805-
void DbSlice::Del(Context cntx, Iterator it) {
805+
void DbSlice::Del(Context cntx, Iterator it, DbTable* db_table) {
806806
CHECK(IsValid(it));
807807

808-
auto& db = db_arr_[cntx.db_index];
808+
ExpIterator exp_it;
809+
DbTable* table = db_table ? db_table : db_arr_[cntx.db_index].get();
809810
auto obj_type = it->second.ObjType();
810811

811812
if (doc_del_cb_ && (obj_type == OBJ_JSON || obj_type == OBJ_HASH)) {
812813
string tmp;
813814
string_view key = it->first.GetSlice(&tmp);
814815
doc_del_cb_(key, cntx, it->second);
815816
}
816-
PerformDeletion(it, db.get());
817+
818+
if (it->second.HasExpire()) {
819+
exp_it = ExpIterator::FromPrime(table->expire.Find(it->first));
820+
DCHECK(!exp_it.is_done());
821+
}
822+
823+
PerformDeletionAtomic(it, exp_it, table);
817824
}
818825

819826
void DbSlice::DelMutable(Context cntx, ItAndUpdater it_updater) {
@@ -833,14 +840,19 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
833840
boost::intrusive_ptr<DbTable> table = db_arr_.front();
834841
size_t memory_before = table->table_memory() + table->stats.obj_memory_usage;
835842

843+
DbContext db_cntx;
844+
db_cntx.time_now_ms = GetCurrentTimeMs();
845+
db_cntx.db_index = table->index;
846+
836847
std::string tmp;
837848
auto iterate_bucket = [&](PrimeTable::bucket_iterator it) {
838849
it.AdvanceIfNotOccupied();
839850
while (!it.is_done()) {
840851
std::string_view key = it->first.GetSlice(&tmp);
841852
SlotId sid = KeySlot(key);
842853
if (slot_ids.Contains(sid) && it.GetVersion() < next_version) {
843-
PerformDeletion(Iterator::FromPrime(it), table.get());
854+
// We use copy of table smart pointer and pass it as table because FLLUSHALL can drop table.
855+
Del(db_cntx, Iterator::FromPrime(it), table.get());
844856
++del_count;
845857
}
846858
++it;
@@ -1413,7 +1425,7 @@ int32_t DbSlice::GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) c
14131425
db_arr_[db_ind]->prime.GetSegmentCount();
14141426
}
14151427

1416-
pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStepAtomic(DbIndex db_ind,
1428+
pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStepAtomic(DbIndex db_ind, const Context& cntx,
14171429
size_t starting_segment_id,
14181430
size_t increase_goal_bytes) {
14191431
// Disable flush journal changes to prevent preemtion
@@ -1476,7 +1488,8 @@ pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStepAtomic(DbIndex db_ind,
14761488

14771489
evicted_bytes += evict_it->first.MallocUsed() + evict_it->second.MallocUsed();
14781490
++evicted_items;
1479-
PerformDeletion(Iterator(evict_it, StringOrView::FromView(key)), db_table.get());
1491+
1492+
Del(cntx, Iterator(evict_it, StringOrView::FromView(key)));
14801493

14811494
// returns when whichever condition is met first
14821495
if ((evicted_items == max_eviction_per_hb) || (evicted_bytes >= increase_goal_bytes))
@@ -1824,16 +1837,6 @@ void DbSlice::PerformDeletionAtomic(const Iterator& del_it, const ExpIterator& e
18241837
}
18251838
}
18261839

1827-
void DbSlice::PerformDeletion(Iterator del_it, DbTable* table) {
1828-
ExpIterator exp_it;
1829-
if (del_it->second.HasExpire()) {
1830-
exp_it = ExpIterator::FromPrime(table->expire.Find(del_it->first));
1831-
DCHECK(!exp_it.is_done());
1832-
}
1833-
1834-
PerformDeletionAtomic(del_it, exp_it, table);
1835-
}
1836-
18371840
void DbSlice::OnCbFinishBlocking() {
18381841
if (IsCacheMode()) {
18391842
// move fetched items to local variable

src/server/db_slice.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -322,11 +322,10 @@ class DbSlice {
322322
// Creates a database with index `db_ind`. If such database exists does nothing.
323323
void ActivateDb(DbIndex db_ind);
324324

325-
// Delete a key referred by its iterator.
326-
void PerformDeletion(Iterator del_it, DbTable* table);
327-
328325
// Deletes the iterator. The iterator must be valid.
329-
void Del(Context cntx, Iterator it);
326+
// Context argument is used only for document removal and it just needs
327+
// timestamp field. Last argument, db_table, is optional and is used only in FlushSlotsCb.
328+
void Del(Context cntx, Iterator it, DbTable* db_table = nullptr);
330329

331330
// Deletes a key after FindMutable(). Runs post_updater before deletion
332331
// to update memory accounting while the key is still valid.
@@ -456,7 +455,7 @@ class DbSlice {
456455
// Evicts items with dynamically allocated data from the primary table.
457456
// Does not shrink tables.
458457
// Returns number of (elements,bytes) freed due to evictions.
459-
std::pair<uint64_t, size_t> FreeMemWithEvictionStepAtomic(DbIndex db_indx,
458+
std::pair<uint64_t, size_t> FreeMemWithEvictionStepAtomic(DbIndex db_indx, const Context& cntx,
460459
size_t starting_segment_id,
461460
size_t increase_goal_bytes);
462461

src/server/engine_shard.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ void EngineShard::RetireExpiredAndEvict() {
688688
if (eviction_goal) {
689689
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
690690
auto [evicted_items, evicted_bytes] =
691-
db_slice.FreeMemWithEvictionStepAtomic(i, starting_segment_id, eviction_goal);
691+
db_slice.FreeMemWithEvictionStepAtomic(i, db_cntx, starting_segment_id, eviction_goal);
692692

693693
VLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal
694694
<< " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes

tests/dragonfly/cluster_test.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3483,3 +3483,68 @@ async def test_SearchRequestDistribution(df_factory: DflyInstanceFactory):
34833483
await asyncio.sleep(3)
34843484
for node in nodes:
34853485
await wait_for_ft_index_creation(node.client, "idx")
3486+
3487+
3488+
async def verify_keys_match_number_of_index_docs(client, expected_num_keys):
3489+
# Get number of docs in index
3490+
index_info = await client.execute_command(f"FT.INFO idx")
3491+
index_info_num_docs = index_info[9]
3492+
3493+
# Get number of keys in database
3494+
keyspace_info = await client.info("keyspace")
3495+
keyspace_keys = keyspace_info["db0"]["keys"]
3496+
3497+
assert index_info_num_docs == keyspace_keys
3498+
assert index_info_num_docs == expected_num_keys
3499+
assert keyspace_keys == expected_num_keys
3500+
3501+
3502+
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
3503+
async def test_remove_docs_on_cluster_migration(df_factory):
3504+
instances = [
3505+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
3506+
]
3507+
3508+
df_factory.start_all(instances)
3509+
3510+
nodes = [(await create_node_info(instance)) for instance in instances]
3511+
nodes[0].slots = [(0, 16383)]
3512+
nodes[1].slots = []
3513+
3514+
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
3515+
3516+
# Create index on both nodes
3517+
await nodes[0].client.execute_command(
3518+
"FT.CREATE", "idx", "ON", "HASH", "PREFIX", "1", "doc:", "SCHEMA", "v", "TEXT"
3519+
)
3520+
3521+
# Populate node 0
3522+
keys = 100
3523+
for i in range(0, keys):
3524+
random_string = "".join(random.choices(string.ascii_letters + string.digits, k=1_000))
3525+
await nodes[0].client.execute_command("HSET", f"doc:{i}", "v", random_string)
3526+
3527+
# Verify on node 0 that keys are added and index is populated
3528+
await verify_keys_match_number_of_index_docs(nodes[0].client, keys)
3529+
3530+
nodes[0].migrations.append(
3531+
MigrationInfo("127.0.0.1", instances[1].port, [(0, 16383)], nodes[1].id)
3532+
)
3533+
logging.debug("Start migration")
3534+
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
3535+
3536+
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")
3537+
3538+
nodes[0].migrations = []
3539+
nodes[0].slots = []
3540+
nodes[1].slots = [(0, 16383)]
3541+
logging.debug("finalize migration")
3542+
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
3543+
3544+
await asyncio.sleep(1)
3545+
3546+
# Verify on node 1 that keys are moved and index is populated
3547+
await verify_keys_match_number_of_index_docs(nodes[1].client, keys)
3548+
3549+
# Verify that node 0 doesn't have any keys and no index docs
3550+
await verify_keys_match_number_of_index_docs(nodes[0].client, 0)

tests/dragonfly/memory_test.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,3 +323,45 @@ async def poll():
323323
df.stop()
324324
found = df.find_in_logs("Commands squashing current reply size is overlimit")
325325
assert len(found) > 0
326+
327+
328+
@pytest.mark.asyncio
329+
async def test_remove_docs_on_eviction(df_factory):
330+
max_memory = 256 * 1024**2 # 256MB
331+
df_server = df_factory.create(
332+
proactor_threads=1,
333+
cache_mode="yes",
334+
maxmemory=max_memory,
335+
vmodule="engine_shard=2",
336+
eviction_memory_budget_threshold=0.99,
337+
enable_heartbeat_rss_eviction="no",
338+
)
339+
df_server.start()
340+
client = df_server.client()
341+
342+
await client.execute_command(
343+
"FT.CREATE", "idx", "ON", "HASH", "PREFIX", "1", "doc:", "SCHEMA", "v", "TEXT"
344+
)
345+
346+
i = 0
347+
while True:
348+
random_string = "".join(random.choices(string.ascii_letters + string.digits, k=1_000))
349+
await client.execute_command("HSET", f"doc:{i}", "v", random_string)
350+
stats_info = await client.info("stats")
351+
# Done when see at least 50 evictions
352+
if stats_info["evicted_keys"] > 50:
353+
break
354+
i = i + 1
355+
356+
# Give some time to eviction stabilize
357+
await asyncio.sleep(1)
358+
359+
# Get number of docs in index
360+
index_info = await client.execute_command(f"FT.INFO idx")
361+
index_info_num_docs = index_info[9]
362+
363+
# Get number of keys in database
364+
keyspace_info = await client.info("keyspace")
365+
keyspace_keys = keyspace_info["db0"]["keys"]
366+
367+
assert index_info_num_docs == keyspace_keys

0 commit comments

Comments
 (0)