@@ -170,9 +170,13 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
170
170
sync_fb_.JoinIfNeeded ();
171
171
DVLOG (1 ) << " MainReplicationFb stopped " << this ;
172
172
acks_fb_.JoinIfNeeded ();
173
- for (auto & flow : shard_flows_) {
174
- flow.reset ();
175
- }
173
+
174
+ proactor_->Await ([this ]() {
175
+ for (auto & flow : shard_flows_) {
176
+ flow.reset ();
177
+ }
178
+ shard_flows_.clear ();
179
+ });
176
180
177
181
if (last_journal_LSNs_.has_value ()) {
178
182
return LastMasterSyncData{master_context_.master_repl_id , last_journal_LSNs_.value ()};
@@ -501,29 +505,41 @@ error_code Replica::InitiatePSync() {
501
505
return error_code{};
502
506
}
503
507
508
+ void Replica::InitializeShardFlows () {
509
+ decltype (shard_flows_) shard_flows_copy;
510
+ shard_flows_copy.resize (master_context_.num_flows );
511
+ DCHECK (!shard_flows_copy.empty ());
512
+ thread_flow_map_ = Partition (shard_flows_copy.size ());
513
+ const size_t pool_sz = shard_set->pool ()->size ();
514
+
515
+ shard_set->pool ()->AwaitFiberOnAll ([pool_sz, this , &shard_flows_copy](auto index, auto * ctx) {
516
+ for (unsigned i = index; i < shard_flows_copy.size (); i += pool_sz) {
517
+ uint64_t partial_sync_lsn = 0 ;
518
+ if (shard_flows_[i]) {
519
+ partial_sync_lsn = shard_flows_[i]->JournalExecutedCount ();
520
+ }
521
+ shard_flows_copy[i].reset (
522
+ new DflyShardReplica (server (), master_context_, i, &service_, multi_shard_exe_));
523
+ if (partial_sync_lsn > 0 ) {
524
+ shard_flows_[i]->SetRecordsExecuted (partial_sync_lsn);
525
+ }
526
+ }
527
+ });
528
+ // now update shard_flows on proactor thread
529
+ shard_flows_ = std::move (shard_flows_copy);
530
+ }
531
+
504
532
// Initialize and start sub-replica for each flow.
505
533
error_code Replica::InitiateDflySync (std::optional<LastMasterSyncData> last_master_sync_data) {
506
534
auto start_time = absl::Now ();
507
535
508
536
// Initialize MultiShardExecution.
509
537
multi_shard_exe_.reset (new MultiShardExecution ());
510
538
511
- // Initialize shard flows.
512
- shard_flows_.resize (master_context_.num_flows );
513
- DCHECK (!shard_flows_.empty ());
514
- for (unsigned i = 0 ; i < shard_flows_.size (); ++i) {
515
- // Transfer LSN state for partial sync
516
- uint64_t partial_sync_lsn = 0 ;
517
- if (shard_flows_[i]) {
518
- partial_sync_lsn = shard_flows_[i]->JournalExecutedCount ();
519
- }
520
- shard_flows_[i].reset (
521
- new DflyShardReplica (server (), master_context_, i, &service_, multi_shard_exe_));
522
- if (partial_sync_lsn > 0 ) {
523
- shard_flows_[i]->SetRecordsExecuted (partial_sync_lsn);
524
- }
525
- }
526
- thread_flow_map_ = Partition (shard_flows_.size ());
539
+ // Initialize shard flows. The update to the shard_flows_ should be done by this thread.
540
+ // Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset()
541
+ // below.
542
+ InitializeShardFlows ();
527
543
528
544
// Blocked on until all flows got full sync cut.
529
545
BlockingCounter sync_block{unsigned (shard_flows_.size ())};
@@ -1215,6 +1231,7 @@ auto Replica::GetSummary() const -> Summary {
1215
1231
// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
1216
1232
// it's unlikely to cause a real bug.
1217
1233
for (const auto & flow : shard_flows_) { // Get last io time from all sub flows.
1234
+ DCHECK (Proactor () == ProactorBase::me ());
1218
1235
last_io_time = std::max (last_io_time, flow->LastIoTime ());
1219
1236
}
1220
1237
@@ -1246,25 +1263,14 @@ auto Replica::GetSummary() const -> Summary {
1246
1263
return res;
1247
1264
};
1248
1265
1249
- if (Sock ())
1250
- return Proactor ()->AwaitBrief (f);
1251
-
1252
- /* *
1253
- * when this branch happens: there is a very short grace period
1254
- * where Sock() is not initialized, yet the server can
1255
- * receive ROLE/INFO commands. That period happens when launching
1256
- * an instance with '--replicaof' and then immediately
1257
- * sending a command.
1258
- *
1259
- * In that instance, we have to run f() on the current fiber.
1260
- */
1261
- return f ();
1266
+ return Proactor ()->AwaitBrief (f);
1262
1267
}
1263
1268
1264
1269
std::vector<uint64_t > Replica::GetReplicaOffset () const {
1265
1270
std::vector<uint64_t > flow_rec_count;
1266
1271
flow_rec_count.resize (shard_flows_.size ());
1267
1272
for (const auto & flow : shard_flows_) {
1273
+ DCHECK (flow.get ());
1268
1274
uint32_t flow_id = flow->FlowId ();
1269
1275
uint64_t rec_count = flow->JournalExecutedCount ();
1270
1276
DCHECK_LT (flow_id, shard_flows_.size ());
0 commit comments