Skip to content

Commit 6f63e45

Browse files
authored
v2.0: fix: ensure vote packets can be retried (backport of #2605) (#2612)
1 parent 377d19d commit 6f63e45

File tree

2 files changed

+152
-48
lines changed

2 files changed

+152
-48
lines changed

core/src/banking_stage/latest_unprocessed_votes.rs

Lines changed: 81 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use {
1414
},
1515
solana_vote_program::vote_instruction::VoteInstruction,
1616
std::{
17+
cmp,
1718
collections::HashMap,
1819
ops::DerefMut,
1920
sync::{
@@ -166,12 +167,13 @@ impl LatestUnprocessedVotes {
166167
pub(crate) fn insert_batch(
167168
&self,
168169
votes: impl Iterator<Item = LatestValidatorVotePacket>,
170+
should_replenish_taken_votes: bool,
169171
) -> VoteBatchInsertionMetrics {
170172
let mut num_dropped_gossip = 0;
171173
let mut num_dropped_tpu = 0;
172174

173175
for vote in votes {
174-
if let Some(vote) = self.update_latest_vote(vote) {
176+
if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) {
175177
match vote.vote_source {
176178
VoteSource::Gossip => num_dropped_gossip += 1,
177179
VoteSource::Tpu => num_dropped_tpu += 1,
@@ -199,26 +201,41 @@ impl LatestUnprocessedVotes {
199201
pub fn update_latest_vote(
200202
&self,
201203
vote: LatestValidatorVotePacket,
204+
should_replenish_taken_votes: bool,
202205
) -> Option<LatestValidatorVotePacket> {
203206
let pubkey = vote.pubkey();
204207
let slot = vote.slot();
205208
let timestamp = vote.timestamp();
206209

210+
// Allow votes for later slots or the same slot with later timestamp (refreshed votes)
211+
// We directly compare as options to prioritize votes for same slot with timestamp as
212+
// Some > None
213+
let allow_update = |latest_vote: &LatestValidatorVotePacket| -> bool {
214+
match slot.cmp(&latest_vote.slot()) {
215+
cmp::Ordering::Less => return false,
216+
cmp::Ordering::Greater => return true,
217+
cmp::Ordering::Equal => {}
218+
};
219+
220+
// Slots are equal, now check timestamp
221+
match timestamp.cmp(&latest_vote.timestamp()) {
222+
cmp::Ordering::Less => return false,
223+
cmp::Ordering::Greater => return true,
224+
cmp::Ordering::Equal => {}
225+
};
226+
227+
// Timestamps are equal, lastly check if vote was taken previously
228+
// and should be replenished
229+
should_replenish_taken_votes && latest_vote.is_vote_taken()
230+
};
231+
207232
let with_latest_vote = |latest_vote: &RwLock<LatestValidatorVotePacket>,
208233
vote: LatestValidatorVotePacket|
209234
-> Option<LatestValidatorVotePacket> {
210-
let (latest_slot, latest_timestamp) = latest_vote
211-
.read()
212-
.map(|vote| (vote.slot(), vote.timestamp()))
213-
.unwrap();
214-
// Allow votes for later slots or the same slot with later timestamp (refreshed votes)
215-
// We directly compare as options to prioritize votes for same slot with timestamp as
216-
// Some > None
217-
if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) {
235+
let should_try_update = allow_update(&latest_vote.read().unwrap());
236+
if should_try_update {
218237
let mut latest_vote = latest_vote.write().unwrap();
219-
let latest_slot = latest_vote.slot();
220-
let latest_timestamp = latest_vote.timestamp();
221-
if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) {
238+
if allow_update(&latest_vote) {
222239
let old_vote = std::mem::replace(latest_vote.deref_mut(), vote);
223240
if old_vote.is_vote_taken() {
224241
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
@@ -536,10 +553,10 @@ mod tests {
536553
);
537554

538555
assert!(latest_unprocessed_votes
539-
.update_latest_vote(vote_a)
556+
.update_latest_vote(vote_a, false /* should replenish */)
540557
.is_none());
541558
assert!(latest_unprocessed_votes
542-
.update_latest_vote(vote_b)
559+
.update_latest_vote(vote_b, false /* should replenish */)
543560
.is_none());
544561
assert_eq!(2, latest_unprocessed_votes.len());
545562

@@ -569,15 +586,15 @@ mod tests {
569586
assert_eq!(
570587
1,
571588
latest_unprocessed_votes
572-
.update_latest_vote(vote_a)
589+
.update_latest_vote(vote_a, false /* should replenish */)
573590
.unwrap()
574591
.slot
575592
);
576593
// Drop current vote
577594
assert_eq!(
578595
6,
579596
latest_unprocessed_votes
580-
.update_latest_vote(vote_b)
597+
.update_latest_vote(vote_b, false /* should replenish */)
581598
.unwrap()
582599
.slot
583600
);
@@ -597,8 +614,8 @@ mod tests {
597614
&keypair_b,
598615
None,
599616
);
600-
latest_unprocessed_votes.update_latest_vote(vote_a);
601-
latest_unprocessed_votes.update_latest_vote(vote_b);
617+
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
618+
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);
602619

603620
assert_eq!(2, latest_unprocessed_votes.len());
604621
assert_eq!(
@@ -627,8 +644,8 @@ mod tests {
627644
&keypair_b,
628645
Some(2),
629646
);
630-
latest_unprocessed_votes.update_latest_vote(vote_a);
631-
latest_unprocessed_votes.update_latest_vote(vote_b);
647+
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
648+
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);
632649

633650
assert_eq!(2, latest_unprocessed_votes.len());
634651
assert_eq!(
@@ -653,8 +670,8 @@ mod tests {
653670
&keypair_b,
654671
Some(6),
655672
);
656-
latest_unprocessed_votes.update_latest_vote(vote_a);
657-
latest_unprocessed_votes.update_latest_vote(vote_b);
673+
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
674+
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);
658675

659676
assert_eq!(2, latest_unprocessed_votes.len());
660677
assert_eq!(
@@ -679,8 +696,10 @@ mod tests {
679696
&keypair_b,
680697
Some(3),
681698
);
682-
latest_unprocessed_votes.update_latest_vote(vote_a);
683-
latest_unprocessed_votes.update_latest_vote(vote_b);
699+
latest_unprocessed_votes
700+
.update_latest_vote(vote_a.clone(), false /* should replenish */);
701+
latest_unprocessed_votes
702+
.update_latest_vote(vote_b.clone(), false /* should replenish */);
684703

685704
assert_eq!(2, latest_unprocessed_votes.len());
686705
assert_eq!(
@@ -691,6 +710,33 @@ mod tests {
691710
Some(6),
692711
latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey())
693712
);
713+
714+
// Drain all latest votes
715+
for packet in latest_unprocessed_votes
716+
.latest_votes_per_pubkey
717+
.read()
718+
.unwrap()
719+
.values()
720+
{
721+
packet.write().unwrap().take_vote().inspect(|_vote| {
722+
latest_unprocessed_votes
723+
.num_unprocessed_votes
724+
.fetch_sub(1, Ordering::Relaxed);
725+
});
726+
}
727+
assert_eq!(0, latest_unprocessed_votes.len());
728+
729+
// Same votes with same timestamps should not replenish without flag
730+
latest_unprocessed_votes
731+
.update_latest_vote(vote_a.clone(), false /* should replenish */);
732+
latest_unprocessed_votes
733+
.update_latest_vote(vote_b.clone(), false /* should replenish */);
734+
assert_eq!(0, latest_unprocessed_votes.len());
735+
736+
// Same votes with same timestamps should replenish with the flag
737+
latest_unprocessed_votes.update_latest_vote(vote_a, true /* should replenish */);
738+
latest_unprocessed_votes.update_latest_vote(vote_b, true /* should replenish */);
739+
assert_eq!(0, latest_unprocessed_votes.len());
694740
}
695741

696742
#[test]
@@ -711,7 +757,7 @@ mod tests {
711757
keypairs: &Arc<Vec<ValidatorVoteKeypairs>>,
712758
i: usize| {
713759
let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None);
714-
latest_unprocessed_votes.update_latest_vote(vote);
760+
latest_unprocessed_votes.update_latest_vote(vote, false /* should replenish */);
715761
};
716762

717763
let hdl = Builder::new()
@@ -756,7 +802,8 @@ mod tests {
756802
&keypairs[rng.gen_range(0..10)],
757803
None,
758804
);
759-
latest_unprocessed_votes.update_latest_vote(vote);
805+
latest_unprocessed_votes
806+
.update_latest_vote(vote, false /* should replenish */);
760807
}
761808
})
762809
.unwrap();
@@ -771,7 +818,8 @@ mod tests {
771818
&keypairs_tpu[rng.gen_range(0..10)],
772819
None,
773820
);
774-
latest_unprocessed_votes_tpu.update_latest_vote(vote);
821+
latest_unprocessed_votes_tpu
822+
.update_latest_vote(vote, false /* should replenish */);
775823
if i % 214 == 0 {
776824
// Simulate draining and processing packets
777825
let latest_votes_per_pubkey = latest_unprocessed_votes_tpu
@@ -807,8 +855,8 @@ mod tests {
807855

808856
let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None);
809857
let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None);
810-
latest_unprocessed_votes.update_latest_vote(vote_a);
811-
latest_unprocessed_votes.update_latest_vote(vote_b);
858+
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
859+
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);
812860

813861
// Don't forward 0 stake accounts
814862
let forwarded = latest_unprocessed_votes
@@ -902,10 +950,10 @@ mod tests {
902950
let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None);
903951
let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None);
904952

905-
latest_unprocessed_votes.update_latest_vote(vote_a);
906-
latest_unprocessed_votes.update_latest_vote(vote_b);
907-
latest_unprocessed_votes.update_latest_vote(vote_c);
908-
latest_unprocessed_votes.update_latest_vote(vote_d);
953+
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
954+
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);
955+
latest_unprocessed_votes.update_latest_vote(vote_c, false /* should replenish */);
956+
latest_unprocessed_votes.update_latest_vote(vote_d, false /* should replenish */);
909957
assert_eq!(4, latest_unprocessed_votes.len());
910958

911959
latest_unprocessed_votes.clear_forwarded_packets();

core/src/banking_stage/unprocessed_transaction_storage.rs

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -442,18 +442,18 @@ impl VoteStorage {
442442
&mut self,
443443
deserialized_packets: Vec<ImmutableDeserializedPacket>,
444444
) -> VoteBatchInsertionMetrics {
445-
self.latest_unprocessed_votes
446-
.insert_batch(
447-
deserialized_packets
448-
.into_iter()
449-
.filter_map(|deserialized_packet| {
450-
LatestValidatorVotePacket::new_from_immutable(
451-
Arc::new(deserialized_packet),
452-
self.vote_source,
453-
)
454-
.ok()
455-
}),
456-
)
445+
self.latest_unprocessed_votes.insert_batch(
446+
deserialized_packets
447+
.into_iter()
448+
.filter_map(|deserialized_packet| {
449+
LatestValidatorVotePacket::new_from_immutable(
450+
Arc::new(deserialized_packet),
451+
self.vote_source,
452+
)
453+
.ok()
454+
}),
455+
false, // should_replenish_taken_votes
456+
)
457457
}
458458

459459
fn filter_forwardable_packets_and_add_batches(
@@ -524,12 +524,15 @@ impl VoteStorage {
524524
)
525525
.ok()
526526
}),
527+
true, // should_replenish_taken_votes
527528
);
528529
} else {
529-
self.latest_unprocessed_votes
530-
.insert_batch(vote_packets.into_iter().filter_map(|packet| {
530+
self.latest_unprocessed_votes.insert_batch(
531+
vote_packets.into_iter().filter_map(|packet| {
531532
LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok()
532-
}));
533+
}),
534+
true, // should_replenish_taken_votes
535+
);
533536
}
534537
}
535538

@@ -998,6 +1001,7 @@ mod tests {
9981001
super::*,
9991002
solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
10001003
solana_perf::packet::{Packet, PacketFlags},
1004+
solana_runtime::genesis_utils,
10011005
solana_sdk::{
10021006
hash::Hash,
10031007
signature::{Keypair, Signer},
@@ -1266,6 +1270,58 @@ mod tests {
12661270
Ok(())
12671271
}
12681272

1273+
#[test]
1274+
fn test_process_packets_retryable_indexes_reinserted() -> Result<(), Box<dyn Error>> {
1275+
let node_keypair = Keypair::new();
1276+
let genesis_config =
1277+
genesis_utils::create_genesis_config_with_leader(100, &node_keypair.pubkey(), 200)
1278+
.genesis_config;
1279+
let (bank, _bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config);
1280+
let vote_keypair = Keypair::new();
1281+
let mut vote = Packet::from_data(
1282+
None,
1283+
new_tower_sync_transaction(
1284+
TowerSync::default(),
1285+
Hash::new_unique(),
1286+
&node_keypair,
1287+
&vote_keypair,
1288+
&vote_keypair,
1289+
None,
1290+
),
1291+
)?;
1292+
vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true);
1293+
1294+
let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage(
1295+
Arc::new(LatestUnprocessedVotes::new()),
1296+
VoteSource::Tpu,
1297+
);
1298+
1299+
transaction_storage.insert_batch(vec![ImmutableDeserializedPacket::new(vote.clone())?]);
1300+
assert_eq!(1, transaction_storage.len());
1301+
1302+
// When processing packets, return all packets as retryable so that they
1303+
// are reinserted into storage
1304+
let _ = transaction_storage.process_packets(
1305+
bank.clone(),
1306+
&BankingStageStats::default(),
1307+
&mut LeaderSlotMetricsTracker::new(0),
1308+
|packets, _payload| {
1309+
// Return all packets indexes as retryable
1310+
Some(
1311+
packets
1312+
.iter()
1313+
.enumerate()
1314+
.map(|(index, _packet)| index)
1315+
.collect_vec(),
1316+
)
1317+
},
1318+
);
1319+
1320+
// All packets should remain in the transaction storage
1321+
assert_eq!(1, transaction_storage.len());
1322+
Ok(())
1323+
}
1324+
12691325
#[test]
12701326
fn test_prepare_packets_to_forward() {
12711327
solana_logger::setup();

0 commit comments

Comments
 (0)