Skip to content

Commit dfb30f6

Browse files
committed
one to rule them all
1 parent 6ed0f0c commit dfb30f6

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

src/server/replica.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -508,11 +508,15 @@ error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_mast
508508
// Initialize shard flows.
509509
shard_flows_.resize(master_context_.num_flows);
510510
DCHECK(!shard_flows_.empty());
511-
for (unsigned i = 0; i < shard_flows_.size(); ++i) {
512-
shard_flows_[i].reset(
513-
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
514-
}
515511
thread_flow_map_ = Partition(shard_flows_.size());
512+
const size_t pool_sz = shard_set->pool()->size();
513+
514+
shard_set->pool()->AwaitFiberOnAll([pool_sz, this](auto index, auto* ctx) {
515+
for (unsigned i = index; i < shard_flows_.size(); i += pool_sz) {
516+
shard_flows_[i].reset(
517+
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
518+
}
519+
});
516520

517521
// Blocked on until all flows got full sync cut.
518522
BlockingCounter sync_block{unsigned(shard_flows_.size())};

0 commit comments

Comments
 (0)