Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use {
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{
Expand Down Expand Up @@ -160,7 +160,7 @@ pub struct ReplayStageConfig {
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_meta_sender: Option<CacheBlockMetaSender>,
pub bank_notification_sender: Option<BankNotificationSender>,
pub bank_notification_sender: Option<BankNotificationSenderConfig>,
pub wait_for_vote_to_start_leader: bool,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
pub tower_storage: Arc<dyn TowerStorage>,
Expand Down Expand Up @@ -1901,7 +1901,7 @@ impl ReplayStage {
rpc_subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
Expand All @@ -1927,8 +1927,19 @@ impl ReplayStage {
.get(new_root)
.expect("Root bank doesn't exist");
let mut rooted_banks = root_bank.parents();
let oldest_parent = rooted_banks.last().map(|last| last.parent_slot());
rooted_banks.push(root_bank.clone());
let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect();
// The following differs from rooted_slots by including the parent slot of the oldest parent bank.
let rooted_slots_with_parents = bank_notification_sender
.as_ref()
.map_or(false, |sender| sender.should_send_parents)
.then(|| {
let mut new_chain = rooted_slots.clone();
new_chain.push(oldest_parent.unwrap_or_else(|| bank.parent_slot()));
new_chain
});

// Call leader schedule_cache.set_root() before blockstore.set_root() because
// bank_forks.root is consumed by repair_service to update gossip, so we don't want to
// get shreds for repair on gossip before we update leader schedule, otherwise they may
Expand Down Expand Up @@ -1964,8 +1975,16 @@ impl ReplayStage {
rpc_subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Root(root_bank))
.sender
.send(BankNotification::NewRootBank(root_bank))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));

if let Some(new_chain) = rooted_slots_with_parents {
sender
.sender
.send(BankNotification::NewRootedChain(new_chain))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
}
latest_root_senders.iter().for_each(|s| {
if let Err(e) = s.send(new_root) {
Expand Down Expand Up @@ -2483,7 +2502,7 @@ impl ReplayStage {
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
Expand Down Expand Up @@ -2606,6 +2625,7 @@ impl ReplayStage {
);
if let Some(sender) = bank_notification_sender {
sender
.sender
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
Expand Down Expand Up @@ -2678,7 +2698,7 @@ impl ReplayStage {
verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
replay_vote_sender: &ReplayVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
Expand Down
4 changes: 2 additions & 2 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use {
},
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSender,
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{
Expand Down Expand Up @@ -119,7 +119,7 @@ impl Tvu {
verified_vote_receiver: VerifiedVoteReceiver,
replay_vote_sender: ReplayVoteSender,
completed_data_sets_sender: CompletedDataSetsSender,
bank_notification_sender: Option<BankNotificationSender>,
bank_notification_sender: Option<BankNotificationSenderConfig>,
gossip_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
Expand Down
10 changes: 7 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ use {
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
BankNotificationSenderConfig, OptimisticallyConfirmedBank,
OptimisticallyConfirmedBankTracker,
},
rpc::JsonRpcConfig,
rpc_completed_slots_service::RpcCompletedSlotsService,
Expand Down Expand Up @@ -878,7 +879,10 @@ impl Validator {
rpc_subscriptions.clone(),
confirmed_bank_subscribers,
)),
Some(bank_notification_sender),
Some(BankNotificationSenderConfig {
sender: bank_notification_sender,
should_send_parents: geyser_plugin_service.is_some(),
}),
)
} else {
(None, None, None, None)
Expand Down Expand Up @@ -1062,7 +1066,7 @@ impl Validator {
gossip_verified_vote_hash_sender,
replay_vote_receiver,
replay_vote_sender,
bank_notification_sender,
bank_notification_sender.map(|sender| sender.sender),
config.tpu_coalesce_ms,
cluster_confirmed_slot_sender,
&cost_model,
Expand Down
4 changes: 2 additions & 2 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
crossbeam_channel::Receiver,
log::*,
solana_rpc::{
optimistically_confirmed_bank_tracker::BankNotification,
optimistically_confirmed_bank_tracker::SlotNotification,
transaction_notifier_interface::TransactionNotifierLock,
},
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
Expand Down Expand Up @@ -68,7 +68,7 @@ impl GeyserPluginService {
/// It is usually used to configure the connection information for the external data store.

pub fn new(
confirmed_bank_receiver: Receiver<BankNotification>,
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_config_files: &[PathBuf],
) -> Result<Self, GeyserPluginServiceError> {
info!(
Expand Down
16 changes: 8 additions & 8 deletions geyser-plugin-manager/src/slot_status_observer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::slot_status_notifier::SlotStatusNotifier,
crossbeam_channel::Receiver,
solana_rpc::optimistically_confirmed_bank_tracker::BankNotification,
solana_rpc::optimistically_confirmed_bank_tracker::SlotNotification,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -19,7 +19,7 @@ pub(crate) struct SlotStatusObserver {

impl SlotStatusObserver {
pub fn new(
bank_notification_receiver: Receiver<BankNotification>,
bank_notification_receiver: Receiver<SlotNotification>,
slot_status_notifier: SlotStatusNotifier,
) -> Self {
let exit_updated_slot_server = Arc::new(AtomicBool::new(false));
Expand All @@ -43,7 +43,7 @@ impl SlotStatusObserver {
}

fn run_bank_notification_receiver(
bank_notification_receiver: Receiver<BankNotification>,
bank_notification_receiver: Receiver<SlotNotification>,
exit: Arc<AtomicBool>,
slot_status_notifier: SlotStatusNotifier,
) -> JoinHandle<()> {
Expand All @@ -53,23 +53,23 @@ impl SlotStatusObserver {
while !exit.load(Ordering::Relaxed) {
if let Ok(slot) = bank_notification_receiver.recv() {
match slot {
BankNotification::OptimisticallyConfirmed(slot) => {
SlotNotification::OptimisticallyConfirmed(slot) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_confirmed(slot, None);
}
BankNotification::Frozen(bank) => {
SlotNotification::Frozen((slot, parent)) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_processed(bank.slot(), Some(bank.parent_slot()));
.notify_slot_processed(slot, Some(parent));
}
BankNotification::Root(bank) => {
SlotNotification::Root((slot, parent)) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_rooted(bank.slot(), Some(bank.parent_slot()));
.notify_slot_rooted(slot, Some(parent));
}
}
}
Expand Down
Loading