Skip to content

Commit 31d1144

Browse files
authored
feat(cluster): migrate thread on same shard config (#5554)
--------- Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent 5c5ca75 commit 31d1144

File tree

5 files changed

+28
-2
lines changed

5 files changed

+28
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ releases
2020
.hypothesis
2121
.secrets
2222
cmake-build-debug
23+
.venv/

src/server/cluster/cluster_family.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -977,6 +977,16 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
977977

978978
builder->SendOk();
979979

980+
// Try migrating the connection if we have the same shard configuration
981+
if (migration->ShardNum() == shard_set->size() &&
982+
int32_t(shard_id) != fb2::ProactorBase::me()->GetPoolIndex()) {
983+
DCHECK_LT(shard_id, shard_set->size());
984+
if (bool success = cntx->conn()->Migrate(shard_set->pool()->at(shard_id)); !success) {
985+
builder->SendError("invalid state");
986+
return;
987+
}
988+
}
989+
980990
migration->StartFlow(shard_id, cntx->conn()->socket());
981991
}
982992

src/server/cluster/incoming_slot_migration.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class IncomingSlotMigration {
5050
return source_id_;
5151
}
5252

53+
size_t ShardNum() const {
54+
return shard_flows_.size();
55+
}
56+
5357
// Switch to FATAL state and store error message
5458
void ReportFatalError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(state_mu_, error_mu_) {
5559
errors_count_.fetch_add(1, std::memory_order_relaxed);

src/server/db_slice.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -802,14 +802,17 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
802802
uint64_t next_version = 0;
803803
uint64_t del_count = 0;
804804

805+
// Explicitly copy table smart pointer to keep reference count up (flushall drops it)
806+
boost::intrusive_ptr<DbTable> table = db_arr_.front();
807+
805808
std::string tmp;
806809
auto iterate_bucket = [&](PrimeTable::bucket_iterator it) {
807810
it.AdvanceIfNotOccupied();
808811
while (!it.is_done()) {
809812
std::string_view key = it->first.GetSlice(&tmp);
810813
SlotId sid = KeySlot(key);
811814
if (slot_ids.Contains(sid) && it.GetVersion() < next_version) {
812-
PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get());
815+
PerformDeletion(Iterator::FromPrime(it), table.get());
813816
++del_count;
814817
}
815818
++it;
@@ -836,7 +839,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
836839
next_version = RegisterOnChange(std::move(on_change));
837840

838841
ServerState& etl = *ServerState::tlocal();
839-
PrimeTable* pt = &db_arr_[0]->prime;
842+
PrimeTable* pt = &table->prime;
840843
PrimeTable::Cursor cursor;
841844

842845
do {

tests/dragonfly/cluster_test.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,6 +1330,14 @@ async def test_cluster_flushall_during_migration(
13301330

13311331
assert await nodes[0].client.dbsize() == 0
13321332

1333+
# Push config that causes mass async slot deletion on nodes[1]
1334+
nodes[0].slots = [(0, 16383)]
1335+
nodes[1].slots = []
1336+
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
1337+
1338+
# Issue flushall right after pushing new config so it runs at the same time as disowned slots are flushed
1339+
await nodes[1].client.execute_command("flushall")
1340+
13331341

13341342
@pytest.mark.parametrize("interrupt", [False, True])
13351343
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})

0 commit comments

Comments
 (0)