Skip to content

Commit fd3b4f3

Browse files
v2.0: replay: do not start leader for a block we already have shreds for (backport of #2416) (#2484)
* replay: do not start leader for a block we already have shreds for (#2416) * replay: do not start leader for a block we already have shreds for * pr feedback: comment, move existing check to blockstore fn * move blockstore read after tick height check * pr feedback: resuse blockstore fn in next_leader_slot (cherry picked from commit 15dbe7f) # Conflicts: # poh/src/poh_recorder.rs * fix conflicts --------- Co-authored-by: Ashwin Sekar <[email protected]> Co-authored-by: Ashwin Sekar <[email protected]>
1 parent 1f0c8f4 commit fd3b4f3

File tree

4 files changed

+157
-15
lines changed

4 files changed

+157
-15
lines changed

core/src/replay_stage.rs

Lines changed: 137 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2023,6 +2023,14 @@ impl ReplayStage {
20232023
}
20242024
}
20252025

2026+
/// Checks if it is time for us to start producing a leader block.
2027+
/// Fails if:
2028+
/// - Current PoH has not satisfied criteria to start my leader block
2029+
/// - Startup verification is not complete,
2030+
/// - Bank forks already contains a bank for this leader slot
2031+
/// - We have not landed a vote yet and the `wait_for_vote_to_start_leader` flag is set
2032+
/// - We have failed the propagated check
2033+
/// Returns whether a new working bank was created and inserted into bank forks.
20262034
#[allow(clippy::too_many_arguments)]
20272035
fn maybe_start_leader(
20282036
my_pubkey: &Pubkey,
@@ -2036,7 +2044,7 @@ impl ReplayStage {
20362044
banking_tracer: &Arc<BankingTracer>,
20372045
has_new_vote_been_rooted: bool,
20382046
track_transaction_indexes: bool,
2039-
) {
2047+
) -> bool {
20402048
// all the individual calls to poh_recorder.read() are designed to
20412049
// increase granularity, decrease contention
20422050

@@ -2050,7 +2058,7 @@ impl ReplayStage {
20502058
} => (poh_slot, parent_slot),
20512059
PohLeaderStatus::NotReached => {
20522060
trace!("{} poh_recorder hasn't reached_leader_slot", my_pubkey);
2053-
return;
2061+
return false;
20542062
}
20552063
};
20562064

@@ -2066,12 +2074,12 @@ impl ReplayStage {
20662074

20672075
if !parent.is_startup_verification_complete() {
20682076
info!("startup verification incomplete, so skipping my leader slot");
2069-
return;
2077+
return false;
20702078
}
20712079

20722080
if bank_forks.read().unwrap().get(poh_slot).is_some() {
20732081
warn!("{} already have bank in forks at {}?", my_pubkey, poh_slot);
2074-
return;
2082+
return false;
20752083
}
20762084
trace!(
20772085
"{} poh_slot {} parent_slot {}",
@@ -2083,7 +2091,7 @@ impl ReplayStage {
20832091
if let Some(next_leader) = leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) {
20842092
if !has_new_vote_been_rooted {
20852093
info!("Haven't landed a vote, so skipping my leader slot");
2086-
return;
2094+
return false;
20872095
}
20882096

20892097
trace!(
@@ -2095,7 +2103,7 @@ impl ReplayStage {
20952103

20962104
// I guess I missed my slot
20972105
if next_leader != *my_pubkey {
2098-
return;
2106+
return false;
20992107
}
21002108

21012109
datapoint_info!(
@@ -2129,7 +2137,7 @@ impl ReplayStage {
21292137
latest_unconfirmed_leader_slot,
21302138
);
21312139
}
2132-
return;
2140+
return false;
21332141
}
21342142

21352143
let root_slot = bank_forks.read().unwrap().root();
@@ -2164,8 +2172,10 @@ impl ReplayStage {
21642172
.write()
21652173
.unwrap()
21662174
.set_bank(tpu_bank, track_transaction_indexes);
2175+
true
21672176
} else {
21682177
error!("{} No next leader found", my_pubkey);
2178+
false
21692179
}
21702180
}
21712181

@@ -9173,4 +9183,124 @@ pub(crate) mod tests {
91739183
.is_candidate(&(5, bank_forks.bank_hash(5).unwrap()))
91749184
.unwrap());
91759185
}
9186+
9187+
#[test]
9188+
fn test_skip_leader_slot_for_existing_slot() {
9189+
solana_logger::setup();
9190+
9191+
let ReplayBlockstoreComponents {
9192+
blockstore,
9193+
my_pubkey,
9194+
leader_schedule_cache,
9195+
poh_recorder,
9196+
vote_simulator,
9197+
rpc_subscriptions,
9198+
..
9199+
} = replay_blockstore_components(None, 1, None);
9200+
let VoteSimulator {
9201+
bank_forks,
9202+
mut progress,
9203+
..
9204+
} = vote_simulator;
9205+
9206+
let working_bank = bank_forks.read().unwrap().working_bank();
9207+
assert!(working_bank.is_complete());
9208+
assert!(working_bank.is_frozen());
9209+
// Mark startup verification as complete to avoid skipping leader slots
9210+
working_bank.set_startup_verification_complete();
9211+
9212+
// Insert a block two slots greater than current bank. This slot does
9213+
// not have a corresponding Bank in BankForks; this emulates a scenario
9214+
// where the block had previously been created and added to BankForks,
9215+
// but then got removed. This could be the case if the Bank was not on
9216+
// the major fork.
9217+
let dummy_slot = working_bank.slot() + 2;
9218+
let initial_slot = working_bank.slot();
9219+
let num_entries = 10;
9220+
let merkle_variant = true;
9221+
let (shreds, _) = make_slot_entries(dummy_slot, initial_slot, num_entries, merkle_variant);
9222+
blockstore.insert_shreds(shreds, None, false).unwrap();
9223+
9224+
// Reset PoH recorder to the completed bank to ensure consistent state
9225+
ReplayStage::reset_poh_recorder(
9226+
&my_pubkey,
9227+
&blockstore,
9228+
working_bank.clone(),
9229+
&poh_recorder,
9230+
&leader_schedule_cache,
9231+
);
9232+
9233+
// Register just over one slot worth of ticks directly with PoH recorder
9234+
let num_poh_ticks =
9235+
(working_bank.ticks_per_slot() * working_bank.hashes_per_tick().unwrap()) + 1;
9236+
poh_recorder
9237+
.write()
9238+
.map(|mut poh_recorder| {
9239+
for _ in 0..num_poh_ticks + 1 {
9240+
poh_recorder.tick();
9241+
}
9242+
})
9243+
.unwrap();
9244+
9245+
let poh_recorder = Arc::new(poh_recorder);
9246+
let (retransmit_slots_sender, _) = unbounded();
9247+
let (banking_tracer, _) = BankingTracer::new(None).unwrap();
9248+
// A vote has not technically been rooted, but it doesn't matter for
9249+
// this test to use true to avoid skipping the leader slot
9250+
let has_new_vote_been_rooted = true;
9251+
let track_transaction_indexes = false;
9252+
9253+
// We should not attempt to start leader for the dummy_slot
9254+
assert_matches!(
9255+
poh_recorder.read().unwrap().reached_leader_slot(&my_pubkey),
9256+
PohLeaderStatus::NotReached
9257+
);
9258+
assert!(!ReplayStage::maybe_start_leader(
9259+
&my_pubkey,
9260+
&bank_forks,
9261+
&poh_recorder,
9262+
&leader_schedule_cache,
9263+
&rpc_subscriptions,
9264+
&mut progress,
9265+
&retransmit_slots_sender,
9266+
&mut SkippedSlotsInfo::default(),
9267+
&banking_tracer,
9268+
has_new_vote_been_rooted,
9269+
track_transaction_indexes,
9270+
));
9271+
9272+
// Register another slots worth of ticks with PoH recorder
9273+
poh_recorder
9274+
.write()
9275+
.map(|mut poh_recorder| {
9276+
for _ in 0..num_poh_ticks + 1 {
9277+
poh_recorder.tick();
9278+
}
9279+
})
9280+
.unwrap();
9281+
9282+
// We should now start leader for dummy_slot + 1
9283+
let good_slot = dummy_slot + 1;
9284+
assert!(ReplayStage::maybe_start_leader(
9285+
&my_pubkey,
9286+
&bank_forks,
9287+
&poh_recorder,
9288+
&leader_schedule_cache,
9289+
&rpc_subscriptions,
9290+
&mut progress,
9291+
&retransmit_slots_sender,
9292+
&mut SkippedSlotsInfo::default(),
9293+
&banking_tracer,
9294+
has_new_vote_been_rooted,
9295+
track_transaction_indexes,
9296+
));
9297+
// Get the new working bank, which is also the new leader bank/slot
9298+
let working_bank = bank_forks.read().unwrap().working_bank();
9299+
// The new bank's slot must NOT be dummy_slot as the blockstore already
9300+
// had a shred inserted for dummy_slot prior to maybe_start_leader().
9301+
// maybe_start_leader() must not pick dummy_slot to avoid creating a
9302+
// duplicate block.
9303+
assert_eq!(working_bank.slot(), good_slot);
9304+
assert_eq!(working_bank.parent_slot(), initial_slot);
9305+
}
91769306
}

ledger/src/blockstore.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3964,6 +3964,13 @@ impl Blockstore {
39643964
Ok(duplicate_slots_iterator.map(|(slot, _)| slot))
39653965
}
39663966

3967+
pub fn has_existing_shreds_for_slot(&self, slot: Slot) -> bool {
3968+
match self.meta(slot).unwrap() {
3969+
Some(meta) => meta.received > 0,
3970+
None => false,
3971+
}
3972+
}
3973+
39673974
/// Returns the max root or 0 if it does not exist
39683975
pub fn max_root(&self) -> Slot {
39693976
self.max_root.load(Ordering::Relaxed)

ledger/src/leader_schedule_cache.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,10 @@ impl LeaderScheduleCache {
139139
.map(move |i| i as Slot + first_slot)
140140
})
141141
.skip_while(|slot| {
142-
match blockstore {
143-
None => false,
144-
// Skip slots we have already sent a shred for.
145-
Some(blockstore) => match blockstore.meta(*slot).unwrap() {
146-
Some(meta) => meta.received > 0,
147-
None => false,
148-
},
149-
}
142+
// Skip slots we already have shreds for
143+
blockstore
144+
.map(|bs| bs.has_existing_shreds_for_slot(*slot))
145+
.unwrap_or(false)
150146
});
151147
let first_slot = schedule.next()?;
152148
let max_slot = first_slot.saturating_add(max_slot_range);

poh/src/poh_recorder.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,15 @@ impl PohRecorder {
569569
self.leader_first_tick_height_including_grace_ticks
570570
{
571571
if self.reached_leader_tick(my_pubkey, leader_first_tick_height_including_grace_ticks) {
572+
if self.blockstore.has_existing_shreds_for_slot(next_poh_slot) {
573+
// We already have existing shreds for this slot. This can happen when this block was previously
574+
// created and added to BankForks, however a recent PoH reset caused this bank to be removed
575+
// as it was not part of the rooted fork. If this slot is not the first slot for this leader,
576+
// and the first slot was previously ticked over, the check in `leader_schedule_cache::next_leader_slot`
577+
// will not suffice, as it only checks if there are shreds for the first slot.
578+
return PohLeaderStatus::NotReached;
579+
}
580+
572581
assert!(next_tick_height >= self.start_tick_height);
573582
let poh_slot = next_poh_slot;
574583
let parent_slot = self.start_slot();

0 commit comments

Comments
 (0)