Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ dyn-clone = "1.0.20"
eager = "0.1.0"
ed25519-dalek = "=1.0.1"
ed25519-dalek-bip32 = "0.2.0"
enum-iterator = "1.5.0"
enum-iterator = "2.3.0"
env_logger = "0.11.8"
fast-math = "0.1"
fd-lock = "3.0.13"
Expand Down
42 changes: 41 additions & 1 deletion core/src/completed_data_sets_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
solana_entry::entry::Entry,
solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo},
solana_ledger::{
blockstore::{Blockstore, CompletedDataSetInfo},
deshred_transaction_notifier_interface::DeshredTransactionNotifierArc,
},
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_message::VersionedMessage,
solana_signature::Signature,
solana_transaction::{
simple_vote_transaction_checker::is_simple_vote_transaction_impl,
versioned::VersionedTransaction,
},
std::{
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -23,6 +31,20 @@ use {
pub type CompletedDataSetsReceiver = Receiver<Vec<CompletedDataSetInfo>>;
pub type CompletedDataSetsSender = Sender<Vec<CompletedDataSetInfo>>;

/// Check if a versioned transaction is a simple vote transaction.
/// This avoids cloning by extracting the required data directly.
fn is_simple_vote_transaction(tx: &VersionedTransaction) -> bool {
let is_legacy = matches!(&tx.message, VersionedMessage::Legacy(_));
let (account_keys, instructions) = match &tx.message {
VersionedMessage::Legacy(msg) => (&msg.account_keys[..], &msg.instructions[..]),
VersionedMessage::V0(msg) => (&msg.account_keys[..], &msg.instructions[..]),
};
let instruction_programs = instructions
.iter()
.filter_map(|ix| account_keys.get(ix.program_id_index as usize));
is_simple_vote_transaction_impl(&tx.signatures, is_legacy, instruction_programs)
}

pub struct CompletedDataSetsService {
thread_hdl: JoinHandle<()>,
}
Expand All @@ -32,6 +54,7 @@ impl CompletedDataSetsService {
completed_sets_receiver: CompletedDataSetsReceiver,
blockstore: Arc<Blockstore>,
rpc_subscriptions: Arc<RpcSubscriptions>,
deshred_transaction_notifier: Option<DeshredTransactionNotifierArc>,
exit: Arc<AtomicBool>,
max_slots: Arc<MaxSlots>,
) -> Self {
Expand All @@ -47,6 +70,7 @@ impl CompletedDataSetsService {
&completed_sets_receiver,
&blockstore,
&rpc_subscriptions,
&deshred_transaction_notifier,
&max_slots,
) {
break;
Expand All @@ -62,13 +86,29 @@ impl CompletedDataSetsService {
completed_sets_receiver: &CompletedDataSetsReceiver,
blockstore: &Blockstore,
rpc_subscriptions: &RpcSubscriptions,
deshred_transaction_notifier: &Option<DeshredTransactionNotifierArc>,
max_slots: &Arc<MaxSlots>,
) -> Result<(), RecvTimeoutError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let handle_completed_data_set_info = |completed_data_set_info| {
let CompletedDataSetInfo { slot, indices } = completed_data_set_info;
match blockstore.get_entries_in_data_block(slot, indices, /*slot_meta:*/ None) {
Ok(entries) => {
// Notify deshred transactions if notifier is enabled
if let Some(notifier) = deshred_transaction_notifier {
for entry in entries.iter() {
for tx in &entry.transactions {
if let Some(signature) = tx.signatures.first() {
let is_vote = is_simple_vote_transaction(tx);
notifier.notify_deshred_transaction(
slot, signature, is_vote, tx,
);
}
}
}
}

// Existing: notify signatures for RPC subscriptions
let transactions = Self::get_transaction_signatures(entries);
if !transactions.is_empty() {
rpc_subscriptions.notify_signatures_received((slot, transactions));
Expand Down
8 changes: 6 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,26 +822,29 @@ impl Validator {
let (
accounts_update_notifier,
transaction_notifier,
deshred_transaction_notifier,
entry_notifier,
block_metadata_notifier,
slot_status_notifier,
) = if let Some(service) = &geyser_plugin_service {
(
service.get_accounts_update_notifier(),
service.get_transaction_notifier(),
service.get_deshred_transaction_notifier(),
service.get_entry_notifier(),
service.get_block_metadata_notifier(),
service.get_slot_status_notifier(),
)
} else {
(None, None, None, None, None)
(None, None, None, None, None, None)
};

info!(
"Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
entry_notifier: {}",
deshred_transaction_notifier: {}, entry_notifier: {}",
accounts_update_notifier.is_some(),
transaction_notifier.is_some(),
deshred_transaction_notifier.is_some(),
entry_notifier.is_some()
);

Expand Down Expand Up @@ -1307,6 +1310,7 @@ impl Validator {
completed_data_sets_receiver,
blockstore.clone(),
rpc_subscriptions.clone(),
deshred_transaction_notifier.clone(),
exit.clone(),
max_slots.clone(),
);
Expand Down
42 changes: 42 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,29 @@ pub enum ReplicaTransactionInfoVersions<'a> {
V0_0_3(&'a ReplicaTransactionInfoV3<'a>),
}

/// Information about a transaction after deshredding (when entries are formed from shreds).
/// This is sent before any execution occurs.
/// Unlike ReplicaTransactionInfo, this does not include TransactionStatusMeta
/// since execution has not happened yet.
#[derive(Clone, Debug)]
#[repr(C)]
pub struct ReplicaDeshredTransactionInfo<'a> {
/// The transaction signature, used for identifying the transaction.
pub signature: &'a Signature,

/// Indicates if the transaction is a simple vote transaction.
pub is_vote: bool,

/// The versioned transaction.
pub transaction: &'a VersionedTransaction,
}

/// A wrapper to future-proof ReplicaDeshredTransactionInfo handling.
#[repr(u32)]
pub enum ReplicaDeshredTransactionInfoVersions<'a> {
V0_0_1(&'a ReplicaDeshredTransactionInfo<'a>),
}

#[derive(Clone, Debug)]
#[repr(C)]
pub struct ReplicaEntryInfo<'a> {
Expand Down Expand Up @@ -471,6 +494,18 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> {
Ok(())
}

/// Called when a transaction is deshredded (entries formed from shreds).
/// This is triggered before any execution occurs. Unlike notify_transaction,
/// this does not include execution metadata (TransactionStatusMeta).
#[allow(unused_variables)]
fn notify_deshred_transaction(
&self,
transaction: ReplicaDeshredTransactionInfoVersions,
slot: Slot,
) -> Result<()> {
Ok(())
}

/// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in
Expand Down Expand Up @@ -500,4 +535,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn entry_notifications_enabled(&self) -> bool {
false
}

/// Check if the plugin is interested in deshred transaction data.
/// Default is false -- if the plugin is interested in receiving
/// transactions when they are deshredded, return true.
fn deshred_transaction_notifications_enabled(&self) -> bool {
false
}
}
97 changes: 97 additions & 0 deletions geyser-plugin-manager/src/deshred_transaction_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/// Module responsible for notifying plugins of transactions when deshredded
use {
crate::geyser_plugin_manager::GeyserPluginManager,
agave_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaDeshredTransactionInfo, ReplicaDeshredTransactionInfoVersions,
},
log::*,
solana_clock::Slot,
solana_ledger::deshred_transaction_notifier_interface::DeshredTransactionNotifier,
solana_measure::measure::Measure,
solana_metrics::*,
solana_signature::Signature,
solana_transaction::versioned::VersionedTransaction,
std::sync::{Arc, RwLock},
};

/// This implementation of DeshredTransactionNotifier is passed to the CompletedDataSetsService
/// at validator startup. CompletedDataSetsService invokes the notify_deshred_transaction method
/// when entries are formed from shreds. The implementation in turn invokes the
/// notify_deshred_transaction of each plugin enabled with deshred transaction notification
/// managed by the GeyserPluginManager.
pub(crate) struct DeshredTransactionNotifierImpl {
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
}

impl DeshredTransactionNotifier for DeshredTransactionNotifierImpl {
fn notify_deshred_transaction(
&self,
slot: Slot,
signature: &Signature,
is_vote: bool,
transaction: &VersionedTransaction,
) {
let mut measure =
Measure::start("geyser-plugin-notify_plugins_of_deshred_transaction_info");
let transaction_info = Self::build_replica_deshred_transaction_info(
signature,
is_vote,
transaction,
);

let plugin_manager = self.plugin_manager.read().unwrap();

if plugin_manager.plugins.is_empty() {
return;
}

for plugin in plugin_manager.plugins.iter() {
if !plugin.deshred_transaction_notifications_enabled() {
continue;
}
match plugin.notify_deshred_transaction(
ReplicaDeshredTransactionInfoVersions::V0_0_1(&transaction_info),
slot,
) {
Err(err) => {
error!(
"Failed to notify deshred transaction, error: ({}) to plugin {}",
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully notified deshred transaction to plugin {}",
plugin.name()
);
}
}
}
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-notify_plugins_of_deshred_transaction_info-us",
measure.as_us() as usize,
10000,
10000
);
}
}

impl DeshredTransactionNotifierImpl {
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
Self { plugin_manager }
}

fn build_replica_deshred_transaction_info<'a>(
signature: &'a Signature,
is_vote: bool,
transaction: &'a VersionedTransaction,
) -> ReplicaDeshredTransactionInfo<'a> {
ReplicaDeshredTransactionInfo {
signature,
is_vote,
transaction,
}
}
}
10 changes: 10 additions & 0 deletions geyser-plugin-manager/src/geyser_plugin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ impl GeyserPluginManager {
false
}

/// Check if there is any plugin interested in deshred transaction data
pub fn deshred_transaction_notifications_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.deshred_transaction_notifications_enabled() {
return true;
}
}
false
}

/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
Expand Down
Loading