Skip to content

Commit 477ff30

Browse files
committed
feat(geyser): add deshred transaction notifier
Adds a new geyser plugin notification that fires when transactions are deserialized from shreds (deshredded), before any replay or execution occurs. This hooks into CompletedDataSetsService which receives callbacks from the blockstore when data sets (entries) are formed from incoming shreds. This is the earliest point where complete transaction data is available. Key differences from the existing transaction notifier: - Fires before replay/execution, not after - Does not include TransactionStatusMeta (no execution results available yet) - Cannot provide accurate transaction/entry indices since data sets arrive as shreds complete, not in slot order New geyser plugin interface: - ReplicaDeshredTransactionInfo: contains signature, is_vote, and transaction - notify_deshred_transaction(): called for each transaction when deshredded - deshred_transaction_notifications_enabled(): opt-in for plugins Implementation: - DeshredTransactionNotifier trait in solana-ledger - DeshredTransactionNotifierImpl in solana-geyser-plugin-manager - Integration in CompletedDataSetsService and Validator
1 parent b5ca513 commit 477ff30

File tree

9 files changed

+242
-4
lines changed

9 files changed

+242
-4
lines changed

core/src/completed_data_sets_service.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,17 @@
77
use {
88
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
99
solana_entry::entry::Entry,
10-
solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo},
10+
solana_ledger::{
11+
blockstore::{Blockstore, CompletedDataSetInfo},
12+
deshred_transaction_notifier_interface::DeshredTransactionNotifierArc,
13+
},
1114
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
15+
solana_message::VersionedMessage,
1216
solana_signature::Signature,
17+
solana_transaction::{
18+
simple_vote_transaction_checker::is_simple_vote_transaction_impl,
19+
versioned::VersionedTransaction,
20+
},
1321
std::{
1422
sync::{
1523
atomic::{AtomicBool, Ordering},
@@ -23,6 +31,20 @@ use {
2331
pub type CompletedDataSetsReceiver = Receiver<Vec<CompletedDataSetInfo>>;
2432
pub type CompletedDataSetsSender = Sender<Vec<CompletedDataSetInfo>>;
2533

34+
/// Check if a versioned transaction is a simple vote transaction.
35+
/// This avoids cloning by extracting the required data directly.
36+
fn is_simple_vote_transaction(tx: &VersionedTransaction) -> bool {
37+
let is_legacy = matches!(&tx.message, VersionedMessage::Legacy(_));
38+
let (account_keys, instructions) = match &tx.message {
39+
VersionedMessage::Legacy(msg) => (&msg.account_keys[..], &msg.instructions[..]),
40+
VersionedMessage::V0(msg) => (&msg.account_keys[..], &msg.instructions[..]),
41+
};
42+
let instruction_programs = instructions
43+
.iter()
44+
.filter_map(|ix| account_keys.get(ix.program_id_index as usize));
45+
is_simple_vote_transaction_impl(&tx.signatures, is_legacy, instruction_programs)
46+
}
47+
2648
pub struct CompletedDataSetsService {
2749
thread_hdl: JoinHandle<()>,
2850
}
@@ -32,6 +54,7 @@ impl CompletedDataSetsService {
3254
completed_sets_receiver: CompletedDataSetsReceiver,
3355
blockstore: Arc<Blockstore>,
3456
rpc_subscriptions: Arc<RpcSubscriptions>,
57+
deshred_transaction_notifier: Option<DeshredTransactionNotifierArc>,
3558
exit: Arc<AtomicBool>,
3659
max_slots: Arc<MaxSlots>,
3760
) -> Self {
@@ -47,6 +70,7 @@ impl CompletedDataSetsService {
4770
&completed_sets_receiver,
4871
&blockstore,
4972
&rpc_subscriptions,
73+
&deshred_transaction_notifier,
5074
&max_slots,
5175
) {
5276
break;
@@ -62,13 +86,29 @@ impl CompletedDataSetsService {
6286
completed_sets_receiver: &CompletedDataSetsReceiver,
6387
blockstore: &Blockstore,
6488
rpc_subscriptions: &RpcSubscriptions,
89+
deshred_transaction_notifier: &Option<DeshredTransactionNotifierArc>,
6590
max_slots: &Arc<MaxSlots>,
6691
) -> Result<(), RecvTimeoutError> {
6792
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
6893
let handle_completed_data_set_info = |completed_data_set_info| {
6994
let CompletedDataSetInfo { slot, indices } = completed_data_set_info;
7095
match blockstore.get_entries_in_data_block(slot, indices, /*slot_meta:*/ None) {
7196
Ok(entries) => {
97+
// Notify deshred transactions if notifier is enabled
98+
if let Some(notifier) = deshred_transaction_notifier {
99+
for entry in entries.iter() {
100+
for tx in &entry.transactions {
101+
if let Some(signature) = tx.signatures.first() {
102+
let is_vote = is_simple_vote_transaction(tx);
103+
notifier.notify_deshred_transaction(
104+
slot, signature, is_vote, tx,
105+
);
106+
}
107+
}
108+
}
109+
}
110+
111+
// Existing: notify signatures for RPC subscriptions
72112
let transactions = Self::get_transaction_signatures(entries);
73113
if !transactions.is_empty() {
74114
rpc_subscriptions.notify_signatures_received((slot, transactions));

core/src/validator.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -822,26 +822,29 @@ impl Validator {
822822
let (
823823
accounts_update_notifier,
824824
transaction_notifier,
825+
deshred_transaction_notifier,
825826
entry_notifier,
826827
block_metadata_notifier,
827828
slot_status_notifier,
828829
) = if let Some(service) = &geyser_plugin_service {
829830
(
830831
service.get_accounts_update_notifier(),
831832
service.get_transaction_notifier(),
833+
service.get_deshred_transaction_notifier(),
832834
service.get_entry_notifier(),
833835
service.get_block_metadata_notifier(),
834836
service.get_slot_status_notifier(),
835837
)
836838
} else {
837-
(None, None, None, None, None)
839+
(None, None, None, None, None, None)
838840
};
839841

840842
info!(
841843
"Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
842-
entry_notifier: {}",
844+
deshred_transaction_notifier: {}, entry_notifier: {}",
843845
accounts_update_notifier.is_some(),
844846
transaction_notifier.is_some(),
847+
deshred_transaction_notifier.is_some(),
845848
entry_notifier.is_some()
846849
);
847850

@@ -1307,6 +1310,7 @@ impl Validator {
13071310
completed_data_sets_receiver,
13081311
blockstore.clone(),
13091312
rpc_subscriptions.clone(),
1313+
deshred_transaction_notifier.clone(),
13101314
exit.clone(),
13111315
max_slots.clone(),
13121316
);

geyser-plugin-interface/src/geyser_plugin_interface.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,29 @@ pub enum ReplicaTransactionInfoVersions<'a> {
191191
V0_0_3(&'a ReplicaTransactionInfoV3<'a>),
192192
}
193193

194+
/// Information about a transaction after deshredding (when entries are formed from shreds).
195+
/// This is sent before any execution occurs.
196+
/// Unlike ReplicaTransactionInfo, this does not include TransactionStatusMeta
197+
/// since execution has not happened yet.
198+
#[derive(Clone, Debug)]
199+
#[repr(C)]
200+
pub struct ReplicaDeshredTransactionInfo<'a> {
201+
/// The transaction signature, used for identifying the transaction.
202+
pub signature: &'a Signature,
203+
204+
/// Indicates if the transaction is a simple vote transaction.
205+
pub is_vote: bool,
206+
207+
/// The versioned transaction.
208+
pub transaction: &'a VersionedTransaction,
209+
}
210+
211+
/// A wrapper to future-proof ReplicaDeshredTransactionInfo handling.
212+
#[repr(u32)]
213+
pub enum ReplicaDeshredTransactionInfoVersions<'a> {
214+
V0_0_1(&'a ReplicaDeshredTransactionInfo<'a>),
215+
}
216+
194217
#[derive(Clone, Debug)]
195218
#[repr(C)]
196219
pub struct ReplicaEntryInfo<'a> {
@@ -471,6 +494,18 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
471494
fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> {
472495
Ok(())
473496
}
497+
498+
/// Called when a transaction is deshredded (entries formed from shreds).
499+
/// This is triggered before any execution occurs. Unlike notify_transaction,
500+
/// this does not include execution metadata (TransactionStatusMeta).
501+
#[allow(unused_variables)]
502+
fn notify_deshred_transaction(
503+
&self,
504+
transaction: ReplicaDeshredTransactionInfoVersions,
505+
slot: Slot,
506+
) -> Result<()> {
507+
Ok(())
508+
}
474509

475510
/// Check if the plugin is interested in account data
476511
/// Default is true -- if the plugin is not interested in
@@ -500,4 +535,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
500535
fn entry_notifications_enabled(&self) -> bool {
501536
false
502537
}
538+
539+
/// Check if the plugin is interested in deshred transaction data.
540+
/// Default is false -- if the plugin is interested in receiving
541+
/// transactions when they are deshredded, return true.
542+
fn deshred_transaction_notifications_enabled(&self) -> bool {
543+
false
544+
}
503545
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/// Module responsible for notifying plugins of transactions when deshredded
2+
use {
3+
crate::geyser_plugin_manager::GeyserPluginManager,
4+
agave_geyser_plugin_interface::geyser_plugin_interface::{
5+
ReplicaDeshredTransactionInfo, ReplicaDeshredTransactionInfoVersions,
6+
},
7+
log::*,
8+
solana_clock::Slot,
9+
solana_ledger::deshred_transaction_notifier_interface::DeshredTransactionNotifier,
10+
solana_measure::measure::Measure,
11+
solana_metrics::*,
12+
solana_signature::Signature,
13+
solana_transaction::versioned::VersionedTransaction,
14+
std::sync::{Arc, RwLock},
15+
};
16+
17+
/// This implementation of DeshredTransactionNotifier is passed to the CompletedDataSetsService
18+
/// at validator startup. CompletedDataSetsService invokes the notify_deshred_transaction method
19+
/// when entries are formed from shreds. The implementation in turn invokes the
20+
/// notify_deshred_transaction of each plugin enabled with deshred transaction notification
21+
/// managed by the GeyserPluginManager.
22+
pub(crate) struct DeshredTransactionNotifierImpl {
23+
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
24+
}
25+
26+
impl DeshredTransactionNotifier for DeshredTransactionNotifierImpl {
27+
fn notify_deshred_transaction(
28+
&self,
29+
slot: Slot,
30+
signature: &Signature,
31+
is_vote: bool,
32+
transaction: &VersionedTransaction,
33+
) {
34+
let mut measure =
35+
Measure::start("geyser-plugin-notify_plugins_of_deshred_transaction_info");
36+
let transaction_info = Self::build_replica_deshred_transaction_info(
37+
signature,
38+
is_vote,
39+
transaction,
40+
);
41+
42+
let plugin_manager = self.plugin_manager.read().unwrap();
43+
44+
if plugin_manager.plugins.is_empty() {
45+
return;
46+
}
47+
48+
for plugin in plugin_manager.plugins.iter() {
49+
if !plugin.deshred_transaction_notifications_enabled() {
50+
continue;
51+
}
52+
match plugin.notify_deshred_transaction(
53+
ReplicaDeshredTransactionInfoVersions::V0_0_1(&transaction_info),
54+
slot,
55+
) {
56+
Err(err) => {
57+
error!(
58+
"Failed to notify deshred transaction, error: ({}) to plugin {}",
59+
err,
60+
plugin.name()
61+
)
62+
}
63+
Ok(_) => {
64+
trace!(
65+
"Successfully notified deshred transaction to plugin {}",
66+
plugin.name()
67+
);
68+
}
69+
}
70+
}
71+
measure.stop();
72+
inc_new_counter_debug!(
73+
"geyser-plugin-notify_plugins_of_deshred_transaction_info-us",
74+
measure.as_us() as usize,
75+
10000,
76+
10000
77+
);
78+
}
79+
}
80+
81+
impl DeshredTransactionNotifierImpl {
82+
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
83+
Self { plugin_manager }
84+
}
85+
86+
fn build_replica_deshred_transaction_info<'a>(
87+
signature: &'a Signature,
88+
is_vote: bool,
89+
transaction: &'a VersionedTransaction,
90+
) -> ReplicaDeshredTransactionInfo<'a> {
91+
ReplicaDeshredTransactionInfo {
92+
signature,
93+
is_vote,
94+
transaction,
95+
}
96+
}
97+
}

geyser-plugin-manager/src/geyser_plugin_manager.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,16 @@ impl GeyserPluginManager {
109109
false
110110
}
111111

112+
/// Check if there is any plugin interested in deshred transaction data
113+
pub fn deshred_transaction_notifications_enabled(&self) -> bool {
114+
for plugin in &self.plugins {
115+
if plugin.deshred_transaction_notifications_enabled() {
116+
return true;
117+
}
118+
}
119+
false
120+
}
121+
112122
/// Admin RPC request handler
113123
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
114124
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())

geyser-plugin-manager/src/geyser_plugin_service.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@ use {
55
block_metadata_notifier_interface::BlockMetadataNotifierArc,
66
entry_notifier::EntryNotifierImpl,
77
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
8+
deshred_transaction_notifier::DeshredTransactionNotifierImpl,
89
slot_status_notifier::SlotStatusNotifierImpl,
910
slot_status_observer::SlotStatusObserver,
1011
transaction_notifier::TransactionNotifierImpl,
1112
},
1213
crossbeam_channel::Receiver,
1314
log::*,
1415
solana_accounts_db::accounts_update_notifier_interface::AccountsUpdateNotifier,
15-
solana_ledger::entry_notifier_interface::EntryNotifierArc,
16+
solana_ledger::{
17+
entry_notifier_interface::EntryNotifierArc,
18+
deshred_transaction_notifier_interface::DeshredTransactionNotifierArc,
19+
},
1620
solana_rpc::{
1721
optimistically_confirmed_bank_tracker::SlotNotification,
1822
slot_status_notifier::SlotStatusNotifier,
@@ -36,6 +40,7 @@ pub struct GeyserPluginService {
3640
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
3741
accounts_update_notifier: Option<AccountsUpdateNotifier>,
3842
transaction_notifier: Option<TransactionNotifierArc>,
43+
deshred_transaction_notifier: Option<DeshredTransactionNotifierArc>,
3944
entry_notifier: Option<EntryNotifierArc>,
4045
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
4146
slot_status_notifier: Option<SlotStatusNotifier>,
@@ -91,6 +96,9 @@ impl GeyserPluginService {
9196
plugin_manager.account_data_snapshot_notifications_enabled();
9297
let transaction_notifications_enabled =
9398
plugin_manager.transaction_notifications_enabled() || geyser_plugin_always_enabled;
99+
let deshred_transaction_notifications_enabled =
100+
plugin_manager.deshred_transaction_notifications_enabled()
101+
|| geyser_plugin_always_enabled;
94102
let entry_notifications_enabled =
95103
plugin_manager.entry_notifications_enabled() || geyser_plugin_always_enabled;
96104
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
@@ -114,6 +122,15 @@ impl GeyserPluginService {
114122
None
115123
};
116124

125+
let deshred_transaction_notifier: Option<DeshredTransactionNotifierArc> =
126+
if deshred_transaction_notifications_enabled {
127+
let deshred_transaction_notifier =
128+
DeshredTransactionNotifierImpl::new(plugin_manager.clone());
129+
Some(Arc::new(deshred_transaction_notifier))
130+
} else {
131+
None
132+
};
133+
117134
let entry_notifier: Option<EntryNotifierArc> = if entry_notifications_enabled {
118135
let entry_notifier = EntryNotifierImpl::new(plugin_manager.clone());
119136
Some(Arc::new(entry_notifier))
@@ -127,6 +144,7 @@ impl GeyserPluginService {
127144
Option<SlotStatusNotifier>,
128145
) = if account_data_notifications_enabled
129146
|| transaction_notifications_enabled
147+
|| deshred_transaction_notifications_enabled
130148
|| entry_notifications_enabled
131149
{
132150
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
@@ -157,6 +175,7 @@ impl GeyserPluginService {
157175
plugin_manager,
158176
accounts_update_notifier,
159177
transaction_notifier,
178+
deshred_transaction_notifier,
160179
entry_notifier,
161180
block_metadata_notifier,
162181
slot_status_notifier,
@@ -181,6 +200,10 @@ impl GeyserPluginService {
181200
self.transaction_notifier.clone()
182201
}
183202

203+
pub fn get_deshred_transaction_notifier(&self) -> Option<DeshredTransactionNotifierArc> {
204+
self.deshred_transaction_notifier.clone()
205+
}
206+
184207
pub fn get_entry_notifier(&self) -> Option<EntryNotifierArc> {
185208
self.entry_notifier.clone()
186209
}

geyser-plugin-manager/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod block_metadata_notifier_interface;
1313
pub mod entry_notifier;
1414
pub mod geyser_plugin_manager;
1515
pub mod geyser_plugin_service;
16+
pub mod deshred_transaction_notifier;
1617
pub mod slot_status_notifier;
1718
pub mod slot_status_observer;
1819
pub mod transaction_notifier;

0 commit comments

Comments
 (0)