From 42de5ef79f493eb8430bfa0f135f9d96c83f0869 Mon Sep 17 00:00:00 2001 From: Wilfred Almeida <60785452+WilfredAlmeida@users.noreply.github.com> Date: Tue, 2 Sep 2025 10:10:13 +0000 Subject: [PATCH 1/8] bigtable: add counters Signed-off-by: GitHub --- storage-bigtable/src/bigtable.rs | 53 ++++++++++ storage-bigtable/src/lib.rs | 174 +++++++++++++++++++++++++++++++ 2 files changed, 227 insertions(+) diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs index d615b5bdac..e7776a608a 100644 --- a/storage-bigtable/src/bigtable.rs +++ b/storage-bigtable/src/bigtable.rs @@ -9,6 +9,7 @@ use { backoff::{future::retry, Error as BackoffError, ExponentialBackoff}, log::*, std::{ + collections::HashMap, str::FromStr, time::{Duration, Instant}, }, @@ -781,6 +782,31 @@ impl) -> InterceptedRequestResult> BigTable { .collect()) } + pub async fn get_bincode_cells2( + &mut self, + table: &str, + keys: &[RowKey], + ) -> Result<(HashMap>, usize)> + where + T: serde::de::DeserializeOwned, + { + let mut size = 0; + let rows = self + .get_multi_row_data(table, keys) + .await? + .into_iter() + .map(|(key, row_data)| { + size += row_data.len(); + let key_str = key.to_string(); + ( + key, + deserialize_bincode_cell_data(&row_data, table, key_str), + ) + }) + .collect(); + Ok((rows, size)) + } + pub async fn get_protobuf_cell

(&mut self, table: &str, key: RowKey) -> Result

where P: prost::Message + Default, @@ -827,6 +853,33 @@ impl) -> InterceptedRequestResult> BigTable { })) } + pub async fn get_protobuf_or_bincode_cells2<'a, B, P>( + &mut self, + table: &'a str, + row_keys: impl IntoIterator, + ) -> Result, usize)> + 'a> + where + B: serde::de::DeserializeOwned, + P: prost::Message + Default, + { + Ok(self + .get_multi_row_data( + table, + row_keys.into_iter().collect::>().as_slice(), + ) + .await? + .into_iter() + .map(|(key, row_data)| { + let size = row_data.iter().fold(0, |acc, row| acc + row.1.len()); + let key_str = key.to_string(); + ( + key, + deserialize_protobuf_or_bincode_cell_data(&row_data, table, key_str).unwrap(), + size, + ) + })) + } + pub async fn put_bincode_cells( &mut self, table: &str, diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 7c0b3aed5b..fbcfc287cd 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -85,6 +85,16 @@ impl std::convert::From for Error { } } +impl Error { + pub fn is_rpc_unauthenticated(&self) -> bool { + if let Error::BigTableError(bigtable::Error::Rpc(status)) = self { + status.code() == tonic::Code::Unauthenticated + } else { + false + } + } +} + pub type Result = std::result::Result; // Convert a slot to its bucket representation whereby lower slots are always lexically ordered @@ -742,6 +752,170 @@ impl LedgerStorage { } } + // Fetches and gets a vector of confirmed blocks via a multirow fetch + pub async fn get_confirmed_blocks_with_data2<'a>( + &self, + slots: &'a [Slot], + ) -> Result, usize)> + 'a> { + debug!( + "LedgerStorage::get_confirmed_blocks_with_data request received: {:?}", + slots + ); + inc_new_counter_debug!("storage-bigtable-query", 1); + let mut bigtable = self.connection.client(); + let row_keys = slots.iter().copied().map(slot_to_blocks_key); + Ok(bigtable + .get_protobuf_or_bincode_cells2("blocks", row_keys) + .await? + .map( + |(row_key, block_cell_data, size): ( + RowKey, + bigtable::CellData, + usize, + )| { + let block = match block_cell_data { + bigtable::CellData::Bincode(block) => block.into(), + bigtable::CellData::Protobuf(block) => match block.try_into() { + Ok(block) => block, + Err(_) => return (None, size), + }, + }; + (Some((key_to_slot(&row_key).unwrap(), block)), size) + }, + )) + } + + /// Fetch blocks and transactions + pub async fn get_confirmed_blocks_transactions( + &self, + blocks: &[Slot], + transactions: &[String], + transactions_status: &[String], + ) -> Result<( + Vec<(Slot, ConfirmedBlock)>, + Vec, + HashMap, + usize, + )> { + let mut bigtable = self.connection.client(); + + let mut blocks_resp = Vec::with_capacity(blocks.len()); + let mut transactions_resp = Vec::with_capacity(transactions.len()); + let mut transactions_status_resp = HashMap::new(); + let mut size = 0; + + // Collect slots for request + let mut blocks_map: HashMap> = HashMap::new(); + for block in blocks { + blocks_map.entry(*block).or_default(); + } + + // Fetch transactions info and collect slots + if !transactions.is_empty() || !transactions_status.is_empty() { + let mut keys = Vec::with_capacity(transactions.len() + transactions_status.len()); + keys.extend(transactions.iter().cloned()); + keys.extend(transactions_status.iter().cloned()); + + let (mut cells, bt_size) = bigtable + .get_bincode_cells2::("tx", keys.as_slice()) + .await?; + size += bt_size; + + for signature in transactions_status { + if let Some(Ok(info)) = cells.get(signature) { + transactions_status_resp.insert( + signature.clone(), + TransactionStatus { + slot: info.slot, + confirmations: None, + status: match &info.err { + Some(err) => Err(err.clone()), + None => Ok(()), + }, + err: info.err.clone(), + confirmation_status: Some(TransactionConfirmationStatus::Finalized), + }, + ); + } + } + for signature in transactions { + if let Some((signature, Ok(TransactionInfo { slot, index, .. }))) = + cells.remove_entry(signature) + { + blocks_map.entry(slot).or_default().push((index, signature)); + } + } + } + + // Fetch blocks + if !blocks_map.is_empty() { + let keys = blocks_map.keys().copied().collect::>(); + let cells = self.get_confirmed_blocks_with_data2(&keys).await?; + for (maybe_slot_block, row_size) in cells { + size += row_size; + if let Some((slot, block)) = maybe_slot_block { + if let Some(entries) = blocks_map.get(&slot) { + for (index, signature) in entries.iter() { + if let Some(tx_with_meta) = block.transactions.get(*index as usize) { + if tx_with_meta.transaction_signature().to_string() != *signature { + warn!( + "Transaction info or confirmed block for {} is corrupt", + signature + ); + } else { + transactions_resp.push(ConfirmedTransactionWithStatusMeta { + slot, + tx_with_meta: tx_with_meta.clone(), + block_time: block.block_time, + }); + } + } + } + blocks_resp.push((slot, block)); + } + } + } + } + + Ok(( + blocks_resp, + transactions_resp, + transactions_status_resp, + size, + )) + } + + /// Fetch TX index for transactions + pub async fn get_txindex( + &self, + transactions: &[String], + ) -> Result<(Vec>, usize)> { + let mut bigtable = self.connection.client(); + + let mut response = Vec::with_capacity(transactions.len()); + let mut size = 0; + + // Fetch transactions info and collect slots + if transactions.is_empty() { + response.resize(transactions.len(), None); + } else { + let (cells, bt_size) = bigtable + .get_bincode_cells2::("tx", transactions) + .await?; + size += bt_size; + + for signature in transactions { + if let Some(Ok(TransactionInfo { slot, index, .. })) = cells.get(signature) { + response.push(Some((*slot, *index))); + } else { + response.push(None); + } + } + } + + Ok((response, size)) + } + /// Get confirmed signatures for the provided address, in descending ledger order /// /// address: address to search for From 2dfb77ab60f7190dc5c7e5a3583468f33c58b0c6 Mon Sep 17 00:00:00 2001 From: Wilfred Almeida <60785452+WilfredAlmeida@users.noreply.github.com> Date: Tue, 2 Sep 2025 10:13:05 +0000 Subject: [PATCH 2/8] rpc: add method sanitize_transaction (#176) Signed-off-by: GitHub --- rpc-client-types/src/config.rs | 10 ++++++ rpc-client-types/src/request.rs | 2 ++ rpc-client/src/nonblocking/rpc_client.rs | 20 +++++++++++ rpc/src/rpc.rs | 42 ++++++++++++++++++++++++ 4 files changed, 74 insertions(+) diff --git a/rpc-client-types/src/config.rs b/rpc-client-types/src/config.rs index d28b88ff4f..2db5e04fdd 100644 --- a/rpc-client-types/src/config.rs +++ b/rpc-client-types/src/config.rs @@ -26,6 +26,16 @@ pub struct RpcSendTransactionConfig { pub min_context_slot: Option, } +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RcpSanitizeTransactionConfig { + pub sig_verify: bool, + #[serde(flatten)] + pub commitment: Option, + pub encoding: Option, + pub min_context_slot: Option, +} + #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RpcSimulateTransactionAccountsConfig { diff --git a/rpc-client-types/src/request.rs b/rpc-client-types/src/request.rs index ce7d980a99..ebbf42120d 100644 --- a/rpc-client-types/src/request.rs +++ b/rpc-client-types/src/request.rs @@ -67,6 +67,7 @@ pub enum RpcRequest { RegisterNode, RequestAirdrop, SendTransaction, + SanitizeTransaction, SimulateTransaction, SignVote, } @@ -132,6 +133,7 @@ impl fmt::Display for RpcRequest { RpcRequest::RegisterNode => "registerNode", RpcRequest::RequestAirdrop => "requestAirdrop", RpcRequest::SendTransaction => "sendTransaction", + RpcRequest::SanitizeTransaction => "sanitizeTransaction", RpcRequest::SimulateTransaction => "simulateTransaction", RpcRequest::SignVote => "signVote", }; diff --git a/rpc-client/src/nonblocking/rpc_client.rs b/rpc-client/src/nonblocking/rpc_client.rs index 8a4cfd99fc..4c8765c33b 100644 --- a/rpc-client/src/nonblocking/rpc_client.rs +++ b/rpc-client/src/nonblocking/rpc_client.rs @@ -1240,6 +1240,26 @@ impl RpcClient { } } + pub async fn sanitize_transaction( + &self, + transaction: &impl SerializableTransaction, + config: RcpSanitizeTransactionConfig, + ) -> ClientResult<()> { + let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64); + let commitment = config.commitment.unwrap_or_default(); + let config = RcpSanitizeTransactionConfig { + encoding: Some(encoding), + commitment: Some(commitment), + ..config + }; + let serialized_encoded = serialize_and_encode(transaction, encoding)?; + self.send( + RpcRequest::SanitizeTransaction, + json!([serialized_encoded, config]), + ) + .await + } + /// Simulates sending a transaction. /// /// If the transaction fails, then the [`err`] field of the returned diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 94ba7ee588..0df2c958fd 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -3499,6 +3499,14 @@ pub mod rpc_full { config: Option, ) -> Result; + #[rpc(meta, name = "sanitizeTransaction")] + fn sanitize_transaction( + &self, + meta: Self::Metadata, + data: String, + config: Option, + ) -> Result<()>; + #[rpc(meta, name = "simulateTransaction")] fn simulate_transaction( &self, @@ -3940,6 +3948,40 @@ pub mod rpc_full { ) } + fn sanitize_transaction( + &self, + meta: Self::Metadata, + data: String, + config: Option, + ) -> Result<()> { + let RcpSanitizeTransactionConfig { + sig_verify, + commitment, + encoding, + min_context_slot, + } = config.unwrap_or_default(); + let tx_encoding = encoding.unwrap_or(UiTransactionEncoding::Base58); + let binary_encoding = tx_encoding.into_binary_encoding().ok_or_else(|| { + Error::invalid_params(format!( + "unsupported encoding: {tx_encoding}. Supported encodings: base58, base64" + )) + })?; + let (_wire_transaction, unsanitized_tx) = + decode_and_deserialize::(data, binary_encoding)?; + + let bank = &*meta.get_bank_with_config(RpcContextConfig { + commitment, + min_context_slot, + })?; + let transaction = + sanitize_transaction(unsanitized_tx, bank, bank.get_reserved_account_keys())?; + if sig_verify { + verify_transaction(&transaction)?; + } + + Ok(()) + } + fn simulate_transaction( &self, meta: Self::Metadata, From 1b261fe4b678a50cd4cfcee36a94885cf1932315 Mon Sep 17 00:00:00 2001 From: Wilfred Almeida <60785452+WilfredAlmeida@users.noreply.github.com> Date: Tue, 2 Sep 2025 10:41:33 +0000 Subject: [PATCH 3/8] rpc: add option `skip_sanitize` to send_transaction (#177) Signed-off-by: GitHub --- rpc-client-types/src/config.rs | 2 + rpc/src/rpc.rs | 87 +++++++++++++++++++++------------- 2 files changed, 56 insertions(+), 33 deletions(-) diff --git a/rpc-client-types/src/config.rs b/rpc-client-types/src/config.rs index 2db5e04fdd..4de435160c 100644 --- a/rpc-client-types/src/config.rs +++ b/rpc-client-types/src/config.rs @@ -20,6 +20,8 @@ pub struct RpcSignatureStatusConfig { pub struct RpcSendTransactionConfig { #[serde(default)] pub skip_preflight: bool, + #[serde(default)] + pub skip_sanitize: bool, pub preflight_commitment: Option, pub encoding: Option, pub max_retries: Option, diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 0df2c958fd..30c144cfb0 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -3505,6 +3505,7 @@ pub mod rpc_full { meta: Self::Metadata, data: String, config: Option, + enable_static_instruction_limit: bool, ) -> Result<()>; #[rpc(meta, name = "simulateTransaction")] @@ -3810,6 +3811,7 @@ pub mod rpc_full { debug!("send_transaction rpc request received"); let RpcSendTransactionConfig { skip_preflight, + skip_sanitize, preflight_commitment, encoding, max_retries, @@ -3834,35 +3836,54 @@ pub mod rpc_full { min_context_slot, })?; - let transaction = sanitize_transaction( - unsanitized_tx, - preflight_bank, - preflight_bank.get_reserved_account_keys(), - preflight_bank - .feature_set - .is_active(&agave_feature_set::static_instruction_limit::id()), - )?; - let blockhash = *transaction.message().recent_blockhash(); - let message_hash = *transaction.message_hash(); - let signature = *transaction.signature(); + let recent_blockhash = *unsanitized_tx.message.recent_blockhash(); + let (signature, sanitized_tx, message_hash) = if skip_preflight && skip_sanitize { + unsanitized_tx.sanitize().map_err(|_err| { + Error::invalid_params(format!( + "invalid transaction: {}", + TransactionError::SanitizeFailure + )) + })?; + let message_hash = unsanitized_tx.message.hash(); + (unsanitized_tx.signatures[0], None, message_hash) + } else { + let tx = sanitize_transaction( + unsanitized_tx, + preflight_bank, + preflight_bank.get_reserved_account_keys(), + preflight_bank + .feature_set + .is_active(&agave_feature_set::static_instruction_limit::id()), + )?; + let message_hash = *tx.message_hash(); + (*tx.signature(), Some(tx), message_hash) + }; let mut last_valid_block_height = preflight_bank - .get_blockhash_last_valid_block_height(&blockhash) + .get_blockhash_last_valid_block_height(&recent_blockhash) .unwrap_or(0); - let durable_nonce_info = transaction - .get_durable_nonce() - .map(|&pubkey| (pubkey, blockhash)); - if durable_nonce_info.is_some() || (skip_preflight && last_valid_block_height == 0) { - // While it uses a defined constant, this last_valid_block_height value is chosen arbitrarily. - // It provides a fallback timeout for durable-nonce transaction retries in case of - // malicious packing of the retry queue. Durable-nonce transactions are otherwise - // retried until the nonce is advanced. - last_valid_block_height = preflight_bank.block_height() + MAX_PROCESSING_AGE as u64; + let mut durable_nonce_info = None; + if let Some(sanitized_tx) = &sanitized_tx { + durable_nonce_info = sanitized_tx + .get_durable_nonce() + .map(|&pubkey| (pubkey, recent_blockhash)); + if durable_nonce_info.is_some() || (skip_preflight && last_valid_block_height == 0) + { + // While it uses a defined constant, this last_valid_block_height value is chosen arbitrarily. + // It provides a fallback timeout for durable-nonce transaction retries in case of + // malicious packing of the retry queue. Durable-nonce transactions are otherwise + // retried until the nonce is advanced. + last_valid_block_height = + preflight_bank.block_height() + MAX_PROCESSING_AGE as u64; + } } if !skip_preflight { - let verification_error = transaction.verify().err(); + let Some(sanitized_tx) = sanitized_tx else { + return Err(Error::invalid_params("sanitized transaction should exists")); + }; + let verification_error = sanitized_tx.verify().err(); if verification_error.is_none() && !meta.config.skip_preflight_health_check { match meta.health.check() { @@ -3884,12 +3905,6 @@ pub mod rpc_full { } } - let simulation_result = if let Some(err) = verification_error { - TransactionSimulationResult::new_error(err) - } else { - preflight_bank.simulate_transaction(&transaction, false) - }; - if let TransactionSimulationResult { result: Err(err), logs, @@ -3903,7 +3918,7 @@ pub mod rpc_full { post_balances: _, pre_token_balances: _, post_token_balances: _, - } = simulation_result + } = preflight_bank.simulate_transaction(&sanitized_tx, false) { match err { TransactionError::BlockhashNotFound => { @@ -3940,7 +3955,7 @@ pub mod rpc_full { meta, message_hash, signature, - blockhash, + recent_blockhash, wire_transaction, last_valid_block_height, durable_nonce_info, @@ -3953,6 +3968,7 @@ pub mod rpc_full { meta: Self::Metadata, data: String, config: Option, + enable_static_instruction_limit: bool, ) -> Result<()> { let RcpSanitizeTransactionConfig { sig_verify, @@ -3973,10 +3989,15 @@ pub mod rpc_full { commitment, min_context_slot, })?; - let transaction = - sanitize_transaction(unsanitized_tx, bank, bank.get_reserved_account_keys())?; + let transaction = sanitize_transaction( + unsanitized_tx, + bank, + bank.get_reserved_account_keys(), + enable_static_instruction_limit, + )?; if sig_verify { - verify_transaction(&transaction)?; + // verify_transaction(&transaction)?; + transaction.verify().err(); } Ok(()) From a0246927ba9b0fcfe13f5ca9ed15165ec89efbd5 Mon Sep 17 00:00:00 2001 From: Wilfred Almeida <60785452+WilfredAlmeida@users.noreply.github.com> Date: Tue, 2 Sep 2025 10:45:53 +0000 Subject: [PATCH 4/8] rpc: add config option to getRPF (#217) Signed-off-by: GitHub --- rpc-client-types/src/config.rs | 6 + rpc/src/rpc.rs | 28 ++- runtime/src/prioritization_fee.rs | 255 +++++++++++++----------- runtime/src/prioritization_fee_cache.rs | 108 +++++----- 4 files changed, 229 insertions(+), 168 deletions(-) diff --git a/rpc-client-types/src/config.rs b/rpc-client-types/src/config.rs index 4de435160c..a0b0a7d73c 100644 --- a/rpc-client-types/src/config.rs +++ b/rpc-client-types/src/config.rs @@ -355,3 +355,9 @@ pub struct RpcContextConfig { pub commitment: Option, pub min_context_slot: Option, } + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcRecentPrioritizationFeesConfig { + pub percentile: Option, +} diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 30c144cfb0..ee4c409465 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2386,10 +2386,18 @@ impl JsonRpcRequestProcessor { fn get_recent_prioritization_fees( &self, pubkeys: Vec, + percentile: Option, ) -> Result> { - Ok(self - .prioritization_fee_cache - .get_prioritization_fees(&pubkeys) + let cache = match percentile { + Some(percentile) => self + .prioritization_fee_cache + .get_prioritization_fees2(&pubkeys, percentile), + None => self + .prioritization_fee_cache + .get_prioritization_fees(&pubkeys), + }; + + Ok(cache .into_iter() .map(|(slot, prioritization_fee)| RpcPrioritizationFee { slot, @@ -3606,6 +3614,7 @@ pub mod rpc_full { &self, meta: Self::Metadata, pubkey_strs: Option>, + config: Option, ) -> Result>; } @@ -4353,6 +4362,7 @@ pub mod rpc_full { &self, meta: Self::Metadata, pubkey_strs: Option>, + config: Option, ) -> Result> { let pubkey_strs = pubkey_strs.unwrap_or_default(); debug!( @@ -4368,7 +4378,17 @@ pub mod rpc_full { .into_iter() .map(|pubkey_str| verify_pubkey(&pubkey_str)) .collect::>>()?; - meta.get_recent_prioritization_fees(pubkeys) + + let RpcRecentPrioritizationFeesConfig { percentile } = config.unwrap_or_default(); + if let Some(percentile) = percentile { + if percentile > 10_000 { + return Err(Error::invalid_params( + "Percentile is too big; max value is 10000".to_owned(), + )); + } + } + + meta.get_recent_prioritization_fees(pubkeys, percentile) } } } diff --git a/runtime/src/prioritization_fee.rs b/runtime/src/prioritization_fee.rs index 52324d5516..bcf2e0b962 100644 --- a/runtime/src/prioritization_fee.rs +++ b/runtime/src/prioritization_fee.rs @@ -23,14 +23,14 @@ struct PrioritizationFeeMetrics { // Count of attempted update on finalized PrioritizationFee attempted_update_on_finalized_fee_count: Saturating, - // Total transaction fees of non-vote transactions included in this slot. + // Total prioritization fees included in this slot. total_prioritization_fee: Saturating, - // The minimum compute unit price of prioritized transactions in this slot. - min_compute_unit_price: Option, + // The minimum prioritization fee of prioritized transactions in this slot. + min_prioritization_fee: Option, - // The maximum compute unit price of prioritized transactions in this slot. - max_compute_unit_price: u64, + // The maximum prioritization fee of prioritized transactions in this slot. + max_prioritization_fee: u64, // Accumulated time spent on tracking prioritization fee for each slot. total_update_elapsed_us: Saturating, @@ -49,8 +49,8 @@ impl PrioritizationFeeMetrics { self.attempted_update_on_finalized_fee_count += val; } - fn update_compute_unit_price(&mut self, cu_price: u64) { - if cu_price == 0 { + fn update_prioritization_fee(&mut self, fee: u64) { + if fee == 0 { self.non_prioritized_transactions_count += 1; return; } @@ -58,11 +58,11 @@ impl PrioritizationFeeMetrics { // update prioritized transaction fee metrics. self.prioritized_transactions_count += 1; - self.max_compute_unit_price = self.max_compute_unit_price.max(cu_price); + self.max_prioritization_fee = self.max_prioritization_fee.max(fee); - self.min_compute_unit_price = Some( - self.min_compute_unit_price - .map_or(cu_price, |min_cu_price| min_cu_price.min(cu_price)), + self.min_prioritization_fee = Some( + self.min_prioritization_fee + .map_or(fee, |min_fee| min_fee.min(fee)), ); } @@ -75,8 +75,8 @@ impl PrioritizationFeeMetrics { attempted_update_on_finalized_fee_count: Saturating(attempted_update_on_finalized_fee_count), total_prioritization_fee: Saturating(total_prioritization_fee), - min_compute_unit_price, - max_compute_unit_price, + min_prioritization_fee, + max_prioritization_fee, total_update_elapsed_us: Saturating(total_update_elapsed_us), } = self; datapoint_info!( @@ -113,11 +113,11 @@ impl PrioritizationFeeMetrics { i64 ), ( - "min_compute_unit_price", - min_compute_unit_price.unwrap_or(0) as i64, + "min_prioritization_fee", + min_prioritization_fee.unwrap_or(0) as i64, i64 ), - ("max_compute_unit_price", max_compute_unit_price as i64, i64), + ("max_prioritization_fee", max_prioritization_fee as i64, i64), ( "total_update_elapsed_us", total_update_elapsed_us as i64, @@ -144,14 +144,14 @@ pub enum PrioritizationFeeError { /// Block minimum prioritization fee stats, includes the minimum prioritization fee for a transaction in this /// block; and the minimum fee for each writable account in all transactions in this block. The only relevant /// write account minimum fees are those greater than the block minimum transaction fee, because the minimum fee needed to land -/// a transaction is determined by Max( min_compute_unit_price, min_writable_account_fees(key), ...) -#[derive(Debug)] +/// a transaction is determined by Max( min_transaction_fee, min_writable_account_fees(key), ...) +#[derive(Debug, Default)] pub struct PrioritizationFee { - // The minimum prioritization fee of transactions that landed in this block. - min_compute_unit_price: u64, + // Prioritization fee of transactions that landed in this block. + transaction_fees: Vec, - // The minimum prioritization fee of each writable account in transactions in this block. - min_writable_account_fees: HashMap, + // Prioritization fee of each writable account in transactions in this block. + writable_account_fees: HashMap>, // Default to `false`, set to `true` when a block is completed, therefore the minimum fees recorded // are finalized, and can be made available for use (e.g., RPC query) @@ -161,43 +161,23 @@ pub struct PrioritizationFee { metrics: PrioritizationFeeMetrics, } -impl Default for PrioritizationFee { - fn default() -> Self { - PrioritizationFee { - min_compute_unit_price: u64::MAX, - min_writable_account_fees: HashMap::new(), - is_finalized: false, - metrics: PrioritizationFeeMetrics::default(), - } - } -} - impl PrioritizationFee { /// Update self for minimum transaction fee in the block and minimum fee for each writable account. - pub fn update( - &mut self, - compute_unit_price: u64, - prioritization_fee: u64, - writable_accounts: Vec, - ) { + pub fn update(&mut self, transaction_fee: u64, writable_accounts: Vec) { let (_, update_us) = measure_us!({ if !self.is_finalized { - if compute_unit_price < self.min_compute_unit_price { - self.min_compute_unit_price = compute_unit_price; - } + self.transaction_fees.push(transaction_fee); for write_account in writable_accounts { - self.min_writable_account_fees + self.writable_account_fees .entry(write_account) - .and_modify(|write_lock_fee| { - *write_lock_fee = std::cmp::min(*write_lock_fee, compute_unit_price) - }) - .or_insert(compute_unit_price); + .or_default() + .push(transaction_fee); } self.metrics - .accumulate_total_prioritization_fee(prioritization_fee); - self.metrics.update_compute_unit_price(compute_unit_price); + .accumulate_total_prioritization_fee(transaction_fee); + self.metrics.update_prioritization_fee(transaction_fee); } else { self.metrics .increment_attempted_update_on_finalized_fee_count(1); @@ -207,38 +187,54 @@ impl PrioritizationFee { self.metrics.accumulate_total_update_elapsed_us(update_us); } - /// Accounts that have minimum fees lesser or equal to the minimum fee in the block are redundant, they are - /// removed to reduce memory footprint when mark_block_completed() is called. - fn prune_irrelevant_writable_accounts(&mut self) { - self.metrics.total_writable_accounts_count = self.get_writable_accounts_count() as u64; - self.min_writable_account_fees - .retain(|_, account_fee| account_fee > &mut self.min_compute_unit_price); - self.metrics.relevant_writable_accounts_count = self.get_writable_accounts_count() as u64; - } - pub fn mark_block_completed(&mut self) -> Result<(), PrioritizationFeeError> { if self.is_finalized { return Err(PrioritizationFeeError::BlockIsAlreadyFinalized); } - self.prune_irrelevant_writable_accounts(); self.is_finalized = true; + + self.transaction_fees.sort(); + for fees in self.writable_account_fees.values_mut() { + fees.sort() + } + + self.metrics.total_writable_accounts_count = self.get_writable_accounts_count() as u64; + self.metrics.relevant_writable_accounts_count = self.get_writable_accounts_count() as u64; + Ok(()) } - pub fn get_min_compute_unit_price(&self) -> Option { - (self.min_compute_unit_price != u64::MAX).then_some(self.min_compute_unit_price) + pub fn get_min_transaction_fee(&self) -> Option { + self.transaction_fees.first().copied() + } + + fn get_percentile(fees: &[u64], percentile: u16) -> Option { + let index = (percentile as usize).min(9_999) * fees.len() / 10_000; + fees.get(index).copied() + } + + pub fn get_transaction_fee(&self, percentile: u16) -> Option { + Self::get_percentile(&self.transaction_fees, percentile) + } + + pub fn get_min_writable_account_fee(&self, key: &Pubkey) -> Option { + self.writable_account_fees + .get(key) + .and_then(|fees| fees.first().copied()) } - pub fn get_writable_account_fee(&self, key: &Pubkey) -> Option { - self.min_writable_account_fees.get(key).copied() + pub fn get_writable_account_fee(&self, key: &Pubkey, percentile: u16) -> Option { + self.writable_account_fees + .get(key) + .and_then(|fees| Self::get_percentile(fees, percentile)) } - pub fn get_writable_account_fees(&self) -> impl Iterator { - self.min_writable_account_fees.iter() + pub fn get_writable_account_fees(&self) -> impl Iterator)> { + self.writable_account_fees.iter() } pub fn get_writable_accounts_count(&self) -> usize { - self.min_writable_account_fees.len() + self.writable_account_fees.len() } pub fn is_finalized(&self) -> bool { @@ -247,6 +243,33 @@ impl PrioritizationFee { pub fn report_metrics(&self, slot: Slot) { self.metrics.report(slot); + + // report this slot's min_transaction_fee and top 10 min_writable_account_fees + let min_transaction_fee = self.get_min_transaction_fee().unwrap_or(0); + datapoint_info!( + "block_min_prioritization_fee", + ("slot", slot as i64, i64), + ("entity", "block", String), + ("min_prioritization_fee", min_transaction_fee as i64, i64), + ); + + let mut accounts_fees: Vec<(&Pubkey, u64)> = self + .get_writable_account_fees() + .filter_map(|(account, fees)| { + fees.first() + .copied() + .map(|min_account_fee| (account, min_transaction_fee.min(min_account_fee))) + }) + .collect(); + accounts_fees.sort_by(|lh, rh| rh.1.cmp(&lh.1)); + for (account_key, fee) in accounts_fees.into_iter().take(10) { + datapoint_trace!( + "block_min_prioritization_fee", + ("slot", slot as i64, i64), + ("entity", account_key.to_string(), String), + ("min_prioritization_fee", fee as i64, i64), + ); + } } } @@ -260,128 +283,128 @@ mod tests { let write_account_a = Pubkey::new_unique(); let write_account_b = Pubkey::new_unique(); let write_account_c = Pubkey::new_unique(); - let tx_fee = 10; let mut prioritization_fee = PrioritizationFee::default(); - assert!(prioritization_fee.get_min_compute_unit_price().is_none()); + assert!(prioritization_fee.get_min_transaction_fee().is_none()); // Assert for 1st transaction - // [cu_px, write_accounts...] --> [block, account_a, account_b, account_c] + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] // ----------------------------------------------------------------------- // [5, a, b ] --> [5, 5, 5, nil ] { - prioritization_fee.update(5, tx_fee, vec![write_account_a, write_account_b]); - assert_eq!(5, prioritization_fee.get_min_compute_unit_price().unwrap()); + prioritization_fee.update(5, vec![write_account_a, write_account_b]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_a) + .get_min_writable_account_fee(&write_account_a) .unwrap() ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) .unwrap() ); assert!(prioritization_fee - .get_writable_account_fee(&write_account_c) + .get_min_writable_account_fee(&write_account_c) .is_none()); + + prioritization_fee.is_finalized = false; } // Assert for second transaction: - // [cu_px, write_accounts...] --> [block, account_a, account_b, account_c] + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] // ----------------------------------------------------------------------- // [9, b, c ] --> [5, 5, 5, 9 ] { - prioritization_fee.update(9, tx_fee, vec![write_account_b, write_account_c]); - assert_eq!(5, prioritization_fee.get_min_compute_unit_price().unwrap()); + prioritization_fee.update(9, vec![write_account_b, write_account_c]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_a) + .get_min_writable_account_fee(&write_account_a) .unwrap() ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) .unwrap() ); assert_eq!( 9, prioritization_fee - .get_writable_account_fee(&write_account_c) + .get_min_writable_account_fee(&write_account_c) .unwrap() ); + + prioritization_fee.is_finalized = false; } // Assert for third transaction: - // [cu_px, write_accounts...] --> [block, account_a, account_b, account_c] + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] // ----------------------------------------------------------------------- // [2, a, c ] --> [2, 2, 5, 2 ] { - prioritization_fee.update(2, tx_fee, vec![write_account_a, write_account_c]); - assert_eq!(2, prioritization_fee.get_min_compute_unit_price().unwrap()); + prioritization_fee.update(2, vec![write_account_a, write_account_c]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 2, prioritization_fee - .get_writable_account_fee(&write_account_a) + .get_min_writable_account_fee(&write_account_a) .unwrap() ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) .unwrap() ); assert_eq!( 2, prioritization_fee - .get_writable_account_fee(&write_account_c) + .get_min_writable_account_fee(&write_account_c) .unwrap() ); + + prioritization_fee.is_finalized = false; } - // assert after prune, account a and c should be removed from cache to save space + // assert after sort { - prioritization_fee.prune_irrelevant_writable_accounts(); - assert_eq!(1, prioritization_fee.min_writable_account_fees.len()); - assert_eq!(2, prioritization_fee.get_min_compute_unit_price().unwrap()); - assert!(prioritization_fee - .get_writable_account_fee(&write_account_a) - .is_none()); + prioritization_fee.update(2, vec![write_account_a, write_account_c]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); + assert_eq!(3, prioritization_fee.writable_account_fees.len()); + assert_eq!( + 2, + prioritization_fee + .get_min_writable_account_fee(&write_account_a) + .unwrap() + ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) + .unwrap() + ); + assert_eq!( + 2, + prioritization_fee + .get_min_writable_account_fee(&write_account_c) .unwrap() ); - assert!(prioritization_fee - .get_writable_account_fee(&write_account_c) - .is_none()); } } - #[test] - fn test_total_prioritization_fee() { - let mut prioritization_fee = PrioritizationFee::default(); - prioritization_fee.update(0, 10, vec![]); - assert_eq!(10, prioritization_fee.metrics.total_prioritization_fee.0); - - prioritization_fee.update(10, u64::MAX, vec![]); - assert_eq!( - u64::MAX, - prioritization_fee.metrics.total_prioritization_fee.0 - ); - - prioritization_fee.update(10, 100, vec![]); - assert_eq!( - u64::MAX, - prioritization_fee.metrics.total_prioritization_fee.0 - ); - } - #[test] fn test_mark_block_completed() { let mut prioritization_fee = PrioritizationFee::default(); diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index 6dc4dde146..a1a6b282d4 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -1,5 +1,5 @@ use { - crate::{bank::Bank, prioritization_fee::PrioritizationFee}, + crate::{bank::Bank, prioritization_fee::*}, crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError}, log::*, solana_accounts_db::account_locks::validate_account_locks, @@ -47,9 +47,6 @@ struct PrioritizationFeeCacheMetrics { // Accumulated time spent on finalizing block prioritization fees. total_block_finalize_elapsed_us: AtomicU64, - - // Accumulated time spent on calculate transaction fees. - total_calculate_prioritization_fee_elapsed_us: AtomicU64, } impl PrioritizationFeeCacheMetrics { @@ -83,11 +80,6 @@ impl PrioritizationFeeCacheMetrics { .fetch_add(val, Ordering::Relaxed); } - fn accumulate_total_calculate_prioritization_fee_elapsed_us(&self, val: u64) { - self.total_calculate_prioritization_fee_elapsed_us - .fetch_add(val, Ordering::Relaxed); - } - fn report(&self, slot: Slot) { datapoint_info!( "block_prioritization_fee_counters", @@ -125,12 +117,6 @@ impl PrioritizationFeeCacheMetrics { .swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "total_calculate_prioritization_fee_elapsed_us", - self.total_calculate_prioritization_fee_elapsed_us - .swap(0, Ordering::Relaxed) as i64, - i64 - ), ); } } @@ -140,8 +126,7 @@ enum CacheServiceUpdate { TransactionUpdate { slot: Slot, bank_id: BankId, - compute_unit_price: u64, - prioritization_fee: u64, + transaction_fee: u64, writable_accounts: Vec, }, BankFinalized { @@ -248,25 +233,18 @@ impl PrioritizationFeeCache { .map(|(_, key)| *key) .collect(); - let (prioritization_fee, calculate_prioritization_fee_us) = measure_us!({ - solana_fee_structure::FeeBudgetLimits::from(compute_budget_limits) - .prioritization_fee - }); - self.metrics - .accumulate_total_calculate_prioritization_fee_elapsed_us( - calculate_prioritization_fee_us, - ); - self.sender .send(CacheServiceUpdate::TransactionUpdate { slot: bank.slot(), bank_id: bank.bank_id(), - compute_unit_price: compute_budget_limits.compute_unit_price, - prioritization_fee, + transaction_fee: compute_budget_limits.compute_unit_price, writable_accounts, }) .unwrap_or_else(|err| { - warn!("prioritization fee cache transaction updates failed: {err:?}"); + warn!( + "prioritization fee cache transaction updates failed: {:?}", + err + ); }); } }); @@ -281,7 +259,10 @@ impl PrioritizationFeeCache { self.sender .send(CacheServiceUpdate::BankFinalized { slot, bank_id }) .unwrap_or_else(|err| { - warn!("prioritization fee cache signalling bank frozen failed: {err:?}") + warn!( + "prioritization fee cache signalling bank frozen failed: {:?}", + err + ) }); } @@ -290,8 +271,7 @@ impl PrioritizationFeeCache { unfinalized: &mut UnfinalizedPrioritizationFees, slot: Slot, bank_id: BankId, - compute_unit_price: u64, - prioritization_fee: u64, + transaction_fee: u64, writable_accounts: Vec, metrics: &PrioritizationFeeCacheMetrics, ) { @@ -300,7 +280,7 @@ impl PrioritizationFeeCache { .or_default() .entry(bank_id) .or_default() - .update(compute_unit_price, prioritization_fee, writable_accounts)); + .update(transaction_fee, writable_accounts)); metrics.accumulate_total_entry_update_elapsed_us(entry_update_us); metrics.accumulate_successful_transaction_update_count(1); } @@ -339,15 +319,15 @@ impl PrioritizationFeeCache { // It should be rare that optimistically confirmed bank had no prioritized // transactions, but duplicated and unconfirmed bank had. if pre_purge_bank_count > 0 && post_purge_bank_count == 0 { - warn!( - "Finalized bank has empty prioritization fee cache. slot {slot} bank id \ - {bank_id}" - ); + warn!("Finalized bank has empty prioritization fee cache. slot {slot} bank id {bank_id}"); } if let Some(prioritization_fee) = &mut prioritization_fee { if let Err(err) = prioritization_fee.mark_block_completed() { - error!("Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {err:?}"); + error!( + "Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}", + err + ); } prioritization_fee.report_metrics(slot); } @@ -394,15 +374,13 @@ impl PrioritizationFeeCache { CacheServiceUpdate::TransactionUpdate { slot, bank_id, - compute_unit_price, - prioritization_fee, + transaction_fee, writable_accounts, } => Self::update_cache( &mut unfinalized, slot, bank_id, - compute_unit_price, - prioritization_fee, + transaction_fee, writable_accounts, &metrics, ), @@ -436,11 +414,36 @@ impl PrioritizationFeeCache { .iter() .map(|(slot, slot_prioritization_fee)| { let mut fee = slot_prioritization_fee - .get_min_compute_unit_price() + .get_min_transaction_fee() .unwrap_or_default(); for account_key in account_keys { if let Some(account_fee) = - slot_prioritization_fee.get_writable_account_fee(account_key) + slot_prioritization_fee.get_min_writable_account_fee(account_key) + { + fee = std::cmp::max(fee, account_fee); + } + } + (*slot, fee) + }) + .collect() + } + + pub fn get_prioritization_fees2( + &self, + account_keys: &[Pubkey], + percentile: u16, + ) -> Vec<(Slot, u64)> { + self.cache + .read() + .unwrap() + .iter() + .map(|(slot, slot_prioritization_fee)| { + let mut fee = slot_prioritization_fee + .get_transaction_fee(percentile) + .unwrap_or_default(); + for account_key in account_keys { + if let Some(account_fee) = + slot_prioritization_fee.get_writable_account_fee(account_key, percentile) { fee = std::cmp::max(fee, account_fee); } @@ -571,10 +574,19 @@ mod tests { sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank.bank_id()); let lock = prioritization_fee_cache.cache.read().unwrap(); let fee = lock.get(&slot).unwrap(); - assert_eq!(2, fee.get_min_compute_unit_price().unwrap()); - assert!(fee.get_writable_account_fee(&write_account_a).is_none()); - assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); - assert!(fee.get_writable_account_fee(&write_account_c).is_none()); + assert_eq!(2, fee.get_min_transaction_fee().unwrap()); + assert_eq!( + 2, + fee.get_min_writable_account_fee(&write_account_a).unwrap() + ); + assert_eq!( + 5, + fee.get_min_writable_account_fee(&write_account_b).unwrap() + ); + assert_eq!( + 2, + fee.get_min_writable_account_fee(&write_account_c).unwrap() + ); } } From 44ca455d1c0378612031dda415df23e7437ecdce Mon Sep 17 00:00:00 2001 From: Wilfred Almeida <60785452+WilfredAlmeida@users.noreply.github.com> Date: Tue, 2 Sep 2025 10:51:47 +0000 Subject: [PATCH 5/8] rpc: add rollback to getLatestBlockhash (agave#2023) Signed-off-by: GitHub --- rpc-client-types/src/config.rs | 9 ++++++++ rpc-client/src/nonblocking/rpc_client.rs | 21 ++++++++++++++++++ rpc/src/rpc.rs | 28 ++++++++++++++++++++---- runtime/src/bank_forks.rs | 13 +++++++++-- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/rpc-client-types/src/config.rs b/rpc-client-types/src/config.rs index a0b0a7d73c..0e71254deb 100644 --- a/rpc-client-types/src/config.rs +++ b/rpc-client-types/src/config.rs @@ -361,3 +361,12 @@ pub struct RpcContextConfig { pub struct RpcRecentPrioritizationFeesConfig { pub percentile: Option, } + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcLatestBlockhashConfig { + #[serde(flatten)] + pub context: RpcContextConfig, + #[serde(default)] + pub rollback: usize, +} diff --git a/rpc-client/src/nonblocking/rpc_client.rs b/rpc-client/src/nonblocking/rpc_client.rs index 4c8765c33b..dbaffee6ad 100644 --- a/rpc-client/src/nonblocking/rpc_client.rs +++ b/rpc-client/src/nonblocking/rpc_client.rs @@ -4796,6 +4796,27 @@ impl RpcClient { Ok(blockhash) } + pub async fn get_latest_blockhash_with_config( + &self, + config: RpcLatestBlockhashConfig, + ) -> ClientResult<(Hash, u64)> { + let RpcBlockhash { + blockhash, + last_valid_block_height, + } = self + .send::>(RpcRequest::GetLatestBlockhash, json!([config])) + .await? + .value; + let blockhash = blockhash.parse().map_err(|_| { + ClientError::new_with_request( + RpcError::ParseError("Hash".to_string()).into(), + RpcRequest::GetLatestBlockhash, + ) + })?; + Ok((blockhash, last_valid_block_height)) + } + + #[allow(deprecated)] pub async fn get_latest_blockhash_with_commitment( &self, commitment: CommitmentConfig, diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index ee4c409465..6ea02864c1 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2349,8 +2349,28 @@ impl JsonRpcRequestProcessor { } } - fn get_latest_blockhash(&self, config: RpcContextConfig) -> Result> { - let bank = self.get_bank_with_config(config)?; + fn get_latest_blockhash( + &self, + config: RpcLatestBlockhashConfig, + ) -> Result> { + let mut bank = self.get_bank_with_config(config.context)?; + if config.rollback > MAX_PROCESSING_AGE { + return Err(Error::invalid_params(format!("rollback exceeds ${MAX_PROCESSING_AGE}"))); + } + if config.rollback > 0 { + let r_bank_forks = self.bank_forks.read().unwrap(); + for _ in 0..config.rollback { + bank = match r_bank_forks.get(bank.parent_slot()).or_else(|| { + r_bank_forks + .banks_frozen + .get(&bank.parent_slot()) + .map(Clone::clone) + }) { + Some(bank) => bank, + None => return Err(Error::invalid_params("failed to rollback block")), + }; + } + } let blockhash = bank.last_blockhash(); let last_valid_block_height = bank .get_blockhash_last_valid_block_height(&blockhash) @@ -3583,7 +3603,7 @@ pub mod rpc_full { fn get_latest_blockhash( &self, meta: Self::Metadata, - config: Option, + config: Option, ) -> Result>; #[rpc(meta, name = "isBlockhashValid")] @@ -4306,7 +4326,7 @@ pub mod rpc_full { fn get_latest_blockhash( &self, meta: Self::Metadata, - config: Option, + config: Option, ) -> Result> { debug!("get_latest_blockhash rpc request received"); meta.get_latest_blockhash(config.unwrap_or_default()) diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 1a24cca363..66605ae53b 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -11,13 +11,13 @@ use { }, arc_swap::ArcSwap, log::*, - solana_clock::{BankId, Slot}, + solana_clock::{BankId, Slot, MAX_PROCESSING_AGE}, solana_hash::Hash, solana_measure::measure::Measure, solana_program_runtime::loaded_programs::{BlockRelation, ForkGraph}, solana_unified_scheduler_logic::SchedulingMode, std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, ops::Index, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -93,6 +93,7 @@ struct SetRootTimings { pub struct BankForks { banks: HashMap, + pub banks_frozen: BTreeMap>, descendants: HashMap>, root: Arc, working_slot: Slot, @@ -151,6 +152,7 @@ impl BankForks { working_bank: Arc::new(ArcSwap::from(root_bank.clone())), }, banks, + banks_frozen: Default::default(), descendants, in_vote_only_mode: Arc::new(AtomicBool::new(false)), highest_slot_at_startup: 0, @@ -309,6 +311,13 @@ impl BankForks { pub fn remove(&mut self, slot: Slot) -> Option { let bank = self.banks.remove(&slot)?; + if bank.is_frozen() { + self.banks_frozen + .insert(bank.slot(), bank.clone_without_scheduler()); + while self.banks_frozen.len() > MAX_PROCESSING_AGE { + self.banks_frozen.pop_first(); + } + } for parent in bank.proper_ancestors() { let Entry::Occupied(mut entry) = self.descendants.entry(parent) else { panic!("this should not happen!"); From 7fe1c585555a395b8743f03b345a103781633e79 Mon Sep 17 00:00:00 2001 From: linuskendall Date: Mon, 15 Jan 2024 08:33:41 +0000 Subject: [PATCH 6/8] rpc: add solend patch (gPA filter) Adds a patch that allows more complex queries for gPA. --- rpc-client-types/src/filter.rs | 245 +++++++++++++++++++++++++++++++++ rpc/src/filter.rs | 1 + rpc/src/rpc.rs | 2 + 3 files changed, 248 insertions(+) diff --git a/rpc-client-types/src/filter.rs b/rpc-client-types/src/filter.rs index cdd11554cc..57a7401e33 100644 --- a/rpc-client-types/src/filter.rs +++ b/rpc-client-types/src/filter.rs @@ -1,6 +1,8 @@ use { base64::{prelude::BASE64_STANDARD, Engine}, serde::{Deserialize, Serialize}, + solana_account::{AccountSharedData, ReadableAccount}, + spl_generic_token::{token::GenericTokenAccount, token_2022::Account}, std::borrow::Cow, thiserror::Error, }; @@ -15,6 +17,7 @@ pub enum RpcFilterType { DataSize(u64), Memcmp(Memcmp), TokenAccountState, + ValueCmp(ValueCmp), } impl RpcFilterType { @@ -55,6 +58,22 @@ impl RpcFilterType { } } RpcFilterType::TokenAccountState => Ok(()), + RpcFilterType::ValueCmp(_) => Ok(()), + } + } + + #[deprecated( + since = "2.0.0", + note = "Use solana_rpc::filter::filter_allows instead" + )] + pub fn allows(&self, account: &AccountSharedData) -> bool { + match self { + RpcFilterType::DataSize(size) => account.data().len() as u64 == *size, + RpcFilterType::Memcmp(compare) => compare.bytes_match(account.data()), + RpcFilterType::TokenAccountState => Account::valid_account_data(account.data()), + RpcFilterType::ValueCmp(compare) => { + compare.values_match(account.data()).unwrap_or(false) + } } } } @@ -67,6 +86,8 @@ pub enum RpcFilterError { Base58DecodeError(#[from] bs58::decode::Error), #[error("base64 decode error")] Base64DecodeError(#[from] base64::DecodeError), + #[error("invalid ValueCmp filter")] + InvalidValueCmp, } #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] @@ -208,6 +229,178 @@ impl Memcmp { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ValueCmp { + pub left: Operand, + comparator: Comparator, + pub right: Operand, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum Operand { + Mem { + offset: usize, + value_type: ValueType, + }, + Constant(String), +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum ValueType { + U8, + U16, + U32, + U64, + U128, +} + +enum WrappedValueType { + U8(u8), + U16(u16), + U32(u32), + U64(u64), + U128(u128), +} + +impl ValueCmp { + fn parse_mem_into_value_type( + o: &Operand, + data: &[u8], + ) -> Result { + match o { + Operand::Mem { offset, value_type } => match value_type { + ValueType::U8 => { + if *offset >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + + Ok(WrappedValueType::U8(data[*offset])) + } + ValueType::U16 => { + if *offset + 1 >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + Ok(WrappedValueType::U16(u16::from_le_bytes( + data[*offset..*offset + 2].try_into().unwrap(), + ))) + } + ValueType::U32 => { + if *offset + 3 >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + Ok(WrappedValueType::U32(u32::from_le_bytes( + data[*offset..*offset + 4].try_into().unwrap(), + ))) + } + ValueType::U64 => { + if *offset + 7 >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + Ok(WrappedValueType::U64(u64::from_le_bytes( + data[*offset..*offset + 8].try_into().unwrap(), + ))) + } + ValueType::U128 => { + if *offset + 15 >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + Ok(WrappedValueType::U128(u128::from_le_bytes( + data[*offset..*offset + 16].try_into().unwrap(), + ))) + } + }, + _ => Err(RpcFilterError::InvalidValueCmp), + } + } + + pub fn values_match(&self, data: &[u8]) -> Result { + match (&self.left, &self.right) { + (left @ Operand::Mem { .. }, right @ Operand::Mem { .. }) => { + let left = Self::parse_mem_into_value_type(left, data)?; + let right = Self::parse_mem_into_value_type(right, data)?; + + match (left, right) { + (WrappedValueType::U8(left), WrappedValueType::U8(right)) => { + Ok(self.comparator.compare(left, right)) + } + (WrappedValueType::U16(left), WrappedValueType::U16(right)) => { + Ok(self.comparator.compare(left, right)) + } + (WrappedValueType::U32(left), WrappedValueType::U32(right)) => { + Ok(self.comparator.compare(left, right)) + } + (WrappedValueType::U64(left), WrappedValueType::U64(right)) => { + Ok(self.comparator.compare(left, right)) + } + (WrappedValueType::U128(left), WrappedValueType::U128(right)) => { + Ok(self.comparator.compare(left, right)) + } + _ => Err(RpcFilterError::InvalidValueCmp), + } + } + (left @ Operand::Mem { .. }, Operand::Constant(constant)) => { + match Self::parse_mem_into_value_type(left, data)? { + WrappedValueType::U8(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + WrappedValueType::U16(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + WrappedValueType::U32(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + WrappedValueType::U64(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + WrappedValueType::U128(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + } + } + _ => Err(RpcFilterError::InvalidValueCmp), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum Comparator { + Eq = 0, + Ne, + Gt, + Ge, + Lt, + Le, +} + +impl Comparator { + // write a generic function to compare two values + pub fn compare(&self, left: T, right: T) -> bool { + match self { + Comparator::Eq => left == right, + Comparator::Ne => left != right, + Comparator::Gt => left > right, + Comparator::Ge => left >= right, + Comparator::Lt => left < right, + Comparator::Le => left <= right, + } + } +} + #[cfg(test)] mod tests { use { @@ -441,4 +634,56 @@ mod tests { serde_json::from_str::(BYTES_FILTER_WITH_ENCODING).unwrap() ); } + + #[test] + fn test_values_match() { + // test all the ValueCmp cases + let data = vec![1, 2, 3, 4, 5]; + + let filter = ValueCmp { + left: Operand::Mem { + offset: 1, + value_type: ValueType::U8, + }, + comparator: Comparator::Eq, + right: Operand::Constant("2".to_string()), + }; + + assert!(ValueCmp { + left: Operand::Mem { + offset: 1, + value_type: ValueType::U8 + }, + comparator: Comparator::Eq, + right: Operand::Constant("2".to_string()) + } + .values_match(&data) + .unwrap()); + + assert!(ValueCmp { + left: Operand::Mem { + offset: 1, + value_type: ValueType::U8 + }, + comparator: Comparator::Lt, + right: Operand::Constant("3".to_string()) + } + .values_match(&data) + .unwrap()); + + assert!(ValueCmp { + left: Operand::Mem { + offset: 0, + value_type: ValueType::U32 + }, + comparator: Comparator::Eq, + right: Operand::Constant("67305985".to_string()) + } + .values_match(&data) + .unwrap()); + + // serialize + let s = serde_json::to_string(&filter).unwrap(); + println!("{}", s); + } } diff --git a/rpc/src/filter.rs b/rpc/src/filter.rs index 6c2b13d20f..4af5751cf8 100644 --- a/rpc/src/filter.rs +++ b/rpc/src/filter.rs @@ -9,5 +9,6 @@ pub fn filter_allows(filter: &RpcFilterType, account: &AccountSharedData) -> boo RpcFilterType::DataSize(size) => account.data().len() as u64 == *size, RpcFilterType::Memcmp(compare) => compare.bytes_match(account.data()), RpcFilterType::TokenAccountState => Account::valid_account_data(account.data()), + RpcFilterType::ValueCmp(compare) => compare.values_match(account.data()).unwrap_or(false), } } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 6ea02864c1..bfe3e8697d 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2608,6 +2608,7 @@ fn get_spl_token_owner_filter( } } RpcFilterType::TokenAccountState => token_account_state_filter = true, + RpcFilterType::ValueCmp(_) => {} } } if data_size_filter == Some(account_packed_len as u64) @@ -2659,6 +2660,7 @@ fn get_spl_token_mint_filter( } } RpcFilterType::TokenAccountState => token_account_state_filter = true, + RpcFilterType::ValueCmp(_) => {} } } if data_size_filter == Some(account_packed_len as u64) From d5b1f25b29c74da6915d1f848f958b3c79e3f3b7 Mon Sep 17 00:00:00 2001 From: leafaar Date: Wed, 14 Jan 2026 14:52:47 -0300 Subject: [PATCH 7/8] feat(geyser): add deshred transaction notifier Adds a new geyser plugin notification that fires when transactions are deserialized from shreds (deshredded), before any replay or execution occurs. This hooks into CompletedDataSetsService which receives callbacks from the blockstore when data sets (entries) are formed from incoming shreds. This is the earliest point where complete transaction data is available. Key differences from the existing transaction notifier: - Fires before replay/execution, not after - Does not include TransactionStatusMeta (no execution results available yet) - Cannot provide accurate transaction/entry indices since data sets arrive as shreds complete, not in slot order New geyser plugin interface: - ReplicaDeshredTransactionInfo: contains signature, is_vote, and transaction - notify_deshred_transaction(): called for each transaction when deshredded - deshred_transaction_notifications_enabled(): opt-in for plugins Implementation: - DeshredTransactionNotifier trait in solana-ledger - DeshredTransactionNotifierImpl in solana-geyser-plugin-manager - Integration in CompletedDataSetsService and Validator --- core/src/completed_data_sets_service.rs | 42 +++++++- core/src/validator.rs | 8 +- .../src/geyser_plugin_interface.rs | 42 ++++++++ .../src/deshred_transaction_notifier.rs | 97 +++++++++++++++++++ .../src/geyser_plugin_manager.rs | 10 ++ .../src/geyser_plugin_service.rs | 25 ++++- geyser-plugin-manager/src/lib.rs | 1 + .../deshred_transaction_notifier_interface.rs | 20 ++++ ledger/src/lib.rs | 1 + 9 files changed, 242 insertions(+), 4 deletions(-) create mode 100644 geyser-plugin-manager/src/deshred_transaction_notifier.rs create mode 100644 ledger/src/deshred_transaction_notifier_interface.rs diff --git a/core/src/completed_data_sets_service.rs b/core/src/completed_data_sets_service.rs index 840e46b4c8..6293ac471f 100644 --- a/core/src/completed_data_sets_service.rs +++ b/core/src/completed_data_sets_service.rs @@ -7,9 +7,17 @@ use { crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, solana_entry::entry::Entry, - solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo}, + solana_ledger::{ + blockstore::{Blockstore, CompletedDataSetInfo}, + deshred_transaction_notifier_interface::DeshredTransactionNotifierArc, + }, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, + solana_message::VersionedMessage, solana_signature::Signature, + solana_transaction::{ + simple_vote_transaction_checker::is_simple_vote_transaction_impl, + versioned::VersionedTransaction, + }, std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -23,6 +31,20 @@ use { pub type CompletedDataSetsReceiver = Receiver>; pub type CompletedDataSetsSender = Sender>; +/// Check if a versioned transaction is a simple vote transaction. +/// This avoids cloning by extracting the required data directly. +fn is_simple_vote_transaction(tx: &VersionedTransaction) -> bool { + let is_legacy = matches!(&tx.message, VersionedMessage::Legacy(_)); + let (account_keys, instructions) = match &tx.message { + VersionedMessage::Legacy(msg) => (&msg.account_keys[..], &msg.instructions[..]), + VersionedMessage::V0(msg) => (&msg.account_keys[..], &msg.instructions[..]), + }; + let instruction_programs = instructions + .iter() + .filter_map(|ix| account_keys.get(ix.program_id_index as usize)); + is_simple_vote_transaction_impl(&tx.signatures, is_legacy, instruction_programs) +} + pub struct CompletedDataSetsService { thread_hdl: JoinHandle<()>, } @@ -32,6 +54,7 @@ impl CompletedDataSetsService { completed_sets_receiver: CompletedDataSetsReceiver, blockstore: Arc, rpc_subscriptions: Arc, + deshred_transaction_notifier: Option, exit: Arc, max_slots: Arc, ) -> Self { @@ -47,6 +70,7 @@ impl CompletedDataSetsService { &completed_sets_receiver, &blockstore, &rpc_subscriptions, + &deshred_transaction_notifier, &max_slots, ) { break; @@ -62,6 +86,7 @@ impl CompletedDataSetsService { completed_sets_receiver: &CompletedDataSetsReceiver, blockstore: &Blockstore, rpc_subscriptions: &RpcSubscriptions, + deshred_transaction_notifier: &Option, max_slots: &Arc, ) -> Result<(), RecvTimeoutError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); @@ -69,6 +94,21 @@ impl CompletedDataSetsService { let CompletedDataSetInfo { slot, indices } = completed_data_set_info; match blockstore.get_entries_in_data_block(slot, indices, /*slot_meta:*/ None) { Ok(entries) => { + // Notify deshred transactions if notifier is enabled + if let Some(notifier) = deshred_transaction_notifier { + for entry in entries.iter() { + for tx in &entry.transactions { + if let Some(signature) = tx.signatures.first() { + let is_vote = is_simple_vote_transaction(tx); + notifier.notify_deshred_transaction( + slot, signature, is_vote, tx, + ); + } + } + } + } + + // Existing: notify signatures for RPC subscriptions let transactions = Self::get_transaction_signatures(entries); if !transactions.is_empty() { rpc_subscriptions.notify_signatures_received((slot, transactions)); diff --git a/core/src/validator.rs b/core/src/validator.rs index 0da1864e1e..3e281e7250 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -822,6 +822,7 @@ impl Validator { let ( accounts_update_notifier, transaction_notifier, + deshred_transaction_notifier, entry_notifier, block_metadata_notifier, slot_status_notifier, @@ -829,19 +830,21 @@ impl Validator { ( service.get_accounts_update_notifier(), service.get_transaction_notifier(), + service.get_deshred_transaction_notifier(), service.get_entry_notifier(), service.get_block_metadata_notifier(), service.get_slot_status_notifier(), ) } else { - (None, None, None, None, None) + (None, None, None, None, None, None) }; info!( "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \ - entry_notifier: {}", + deshred_transaction_notifier: {}, entry_notifier: {}", accounts_update_notifier.is_some(), transaction_notifier.is_some(), + deshred_transaction_notifier.is_some(), entry_notifier.is_some() ); @@ -1307,6 +1310,7 @@ impl Validator { completed_data_sets_receiver, blockstore.clone(), rpc_subscriptions.clone(), + deshred_transaction_notifier.clone(), exit.clone(), max_slots.clone(), ); diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index b3dc245c05..c43af99ee4 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -191,6 +191,29 @@ pub enum ReplicaTransactionInfoVersions<'a> { V0_0_3(&'a ReplicaTransactionInfoV3<'a>), } +/// Information about a transaction after deshredding (when entries are formed from shreds). +/// This is sent before any execution occurs. +/// Unlike ReplicaTransactionInfo, this does not include TransactionStatusMeta +/// since execution has not happened yet. +#[derive(Clone, Debug)] +#[repr(C)] +pub struct ReplicaDeshredTransactionInfo<'a> { + /// The transaction signature, used for identifying the transaction. + pub signature: &'a Signature, + + /// Indicates if the transaction is a simple vote transaction. + pub is_vote: bool, + + /// The versioned transaction. + pub transaction: &'a VersionedTransaction, +} + +/// A wrapper to future-proof ReplicaDeshredTransactionInfo handling. +#[repr(u32)] +pub enum ReplicaDeshredTransactionInfoVersions<'a> { + V0_0_1(&'a ReplicaDeshredTransactionInfo<'a>), +} + #[derive(Clone, Debug)] #[repr(C)] pub struct ReplicaEntryInfo<'a> { @@ -471,6 +494,18 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug { fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> { Ok(()) } + + /// Called when a transaction is deshredded (entries formed from shreds). + /// This is triggered before any execution occurs. Unlike notify_transaction, + /// this does not include execution metadata (TransactionStatusMeta). + #[allow(unused_variables)] + fn notify_deshred_transaction( + &self, + transaction: ReplicaDeshredTransactionInfoVersions, + slot: Slot, + ) -> Result<()> { + Ok(()) + } /// Check if the plugin is interested in account data /// Default is true -- if the plugin is not interested in @@ -500,4 +535,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug { fn entry_notifications_enabled(&self) -> bool { false } + + /// Check if the plugin is interested in deshred transaction data. + /// Default is false -- if the plugin is interested in receiving + /// transactions when they are deshredded, return true. + fn deshred_transaction_notifications_enabled(&self) -> bool { + false + } } diff --git a/geyser-plugin-manager/src/deshred_transaction_notifier.rs b/geyser-plugin-manager/src/deshred_transaction_notifier.rs new file mode 100644 index 0000000000..c5c6b503b0 --- /dev/null +++ b/geyser-plugin-manager/src/deshred_transaction_notifier.rs @@ -0,0 +1,97 @@ +/// Module responsible for notifying plugins of transactions when deshredded +use { + crate::geyser_plugin_manager::GeyserPluginManager, + agave_geyser_plugin_interface::geyser_plugin_interface::{ + ReplicaDeshredTransactionInfo, ReplicaDeshredTransactionInfoVersions, + }, + log::*, + solana_clock::Slot, + solana_ledger::deshred_transaction_notifier_interface::DeshredTransactionNotifier, + solana_measure::measure::Measure, + solana_metrics::*, + solana_signature::Signature, + solana_transaction::versioned::VersionedTransaction, + std::sync::{Arc, RwLock}, +}; + +/// This implementation of DeshredTransactionNotifier is passed to the CompletedDataSetsService +/// at validator startup. CompletedDataSetsService invokes the notify_deshred_transaction method +/// when entries are formed from shreds. The implementation in turn invokes the +/// notify_deshred_transaction of each plugin enabled with deshred transaction notification +/// managed by the GeyserPluginManager. +pub(crate) struct DeshredTransactionNotifierImpl { + plugin_manager: Arc>, +} + +impl DeshredTransactionNotifier for DeshredTransactionNotifierImpl { + fn notify_deshred_transaction( + &self, + slot: Slot, + signature: &Signature, + is_vote: bool, + transaction: &VersionedTransaction, + ) { + let mut measure = + Measure::start("geyser-plugin-notify_plugins_of_deshred_transaction_info"); + let transaction_info = Self::build_replica_deshred_transaction_info( + signature, + is_vote, + transaction, + ); + + let plugin_manager = self.plugin_manager.read().unwrap(); + + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter() { + if !plugin.deshred_transaction_notifications_enabled() { + continue; + } + match plugin.notify_deshred_transaction( + ReplicaDeshredTransactionInfoVersions::V0_0_1(&transaction_info), + slot, + ) { + Err(err) => { + error!( + "Failed to notify deshred transaction, error: ({}) to plugin {}", + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully notified deshred transaction to plugin {}", + plugin.name() + ); + } + } + } + measure.stop(); + inc_new_counter_debug!( + "geyser-plugin-notify_plugins_of_deshred_transaction_info-us", + measure.as_us() as usize, + 10000, + 10000 + ); + } +} + +impl DeshredTransactionNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + Self { plugin_manager } + } + + fn build_replica_deshred_transaction_info<'a>( + signature: &'a Signature, + is_vote: bool, + transaction: &'a VersionedTransaction, + ) -> ReplicaDeshredTransactionInfo<'a> { + ReplicaDeshredTransactionInfo { + signature, + is_vote, + transaction, + } + } +} diff --git a/geyser-plugin-manager/src/geyser_plugin_manager.rs b/geyser-plugin-manager/src/geyser_plugin_manager.rs index 08354df044..309b89a6e6 100644 --- a/geyser-plugin-manager/src/geyser_plugin_manager.rs +++ b/geyser-plugin-manager/src/geyser_plugin_manager.rs @@ -109,6 +109,16 @@ impl GeyserPluginManager { false } + /// Check if there is any plugin interested in deshred transaction data + pub fn deshred_transaction_notifications_enabled(&self) -> bool { + for plugin in &self.plugins { + if plugin.deshred_transaction_notifications_enabled() { + return true; + } + } + false + } + /// Admin RPC request handler pub(crate) fn list_plugins(&self) -> JsonRpcResult> { Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect()) diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index 29c9cc03a4..6784ade6c2 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -5,6 +5,7 @@ use { block_metadata_notifier_interface::BlockMetadataNotifierArc, entry_notifier::EntryNotifierImpl, geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest}, + deshred_transaction_notifier::DeshredTransactionNotifierImpl, slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl, @@ -12,7 +13,10 @@ use { crossbeam_channel::Receiver, log::*, solana_accounts_db::accounts_update_notifier_interface::AccountsUpdateNotifier, - solana_ledger::entry_notifier_interface::EntryNotifierArc, + solana_ledger::{ + entry_notifier_interface::EntryNotifierArc, + deshred_transaction_notifier_interface::DeshredTransactionNotifierArc, + }, solana_rpc::{ optimistically_confirmed_bank_tracker::SlotNotification, slot_status_notifier::SlotStatusNotifier, @@ -36,6 +40,7 @@ pub struct GeyserPluginService { plugin_manager: Arc>, accounts_update_notifier: Option, transaction_notifier: Option, + deshred_transaction_notifier: Option, entry_notifier: Option, block_metadata_notifier: Option, slot_status_notifier: Option, @@ -91,6 +96,9 @@ impl GeyserPluginService { plugin_manager.account_data_snapshot_notifications_enabled(); let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled() || geyser_plugin_always_enabled; + let deshred_transaction_notifications_enabled = + plugin_manager.deshred_transaction_notifications_enabled() + || geyser_plugin_always_enabled; let entry_notifications_enabled = plugin_manager.entry_notifications_enabled() || geyser_plugin_always_enabled; let plugin_manager = Arc::new(RwLock::new(plugin_manager)); @@ -114,6 +122,15 @@ impl GeyserPluginService { None }; + let deshred_transaction_notifier: Option = + if deshred_transaction_notifications_enabled { + let deshred_transaction_notifier = + DeshredTransactionNotifierImpl::new(plugin_manager.clone()); + Some(Arc::new(deshred_transaction_notifier)) + } else { + None + }; + let entry_notifier: Option = if entry_notifications_enabled { let entry_notifier = EntryNotifierImpl::new(plugin_manager.clone()); Some(Arc::new(entry_notifier)) @@ -127,6 +144,7 @@ impl GeyserPluginService { Option, ) = if account_data_notifications_enabled || transaction_notifications_enabled + || deshred_transaction_notifications_enabled || entry_notifications_enabled { let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); @@ -157,6 +175,7 @@ impl GeyserPluginService { plugin_manager, accounts_update_notifier, transaction_notifier, + deshred_transaction_notifier, entry_notifier, block_metadata_notifier, slot_status_notifier, @@ -181,6 +200,10 @@ impl GeyserPluginService { self.transaction_notifier.clone() } + pub fn get_deshred_transaction_notifier(&self) -> Option { + self.deshred_transaction_notifier.clone() + } + pub fn get_entry_notifier(&self) -> Option { self.entry_notifier.clone() } diff --git a/geyser-plugin-manager/src/lib.rs b/geyser-plugin-manager/src/lib.rs index 8ecdbb2d23..8ca2eafbac 100644 --- a/geyser-plugin-manager/src/lib.rs +++ b/geyser-plugin-manager/src/lib.rs @@ -13,6 +13,7 @@ pub mod block_metadata_notifier_interface; pub mod entry_notifier; pub mod geyser_plugin_manager; pub mod geyser_plugin_service; +pub mod deshred_transaction_notifier; pub mod slot_status_notifier; pub mod slot_status_observer; pub mod transaction_notifier; diff --git a/ledger/src/deshred_transaction_notifier_interface.rs b/ledger/src/deshred_transaction_notifier_interface.rs new file mode 100644 index 0000000000..e21f723c63 --- /dev/null +++ b/ledger/src/deshred_transaction_notifier_interface.rs @@ -0,0 +1,20 @@ +use { + solana_clock::Slot, + solana_signature::Signature, + solana_transaction::versioned::VersionedTransaction, + std::sync::Arc, +}; + +/// Trait for notifying about transactions when they are deshredded. +/// This is called when entries are formed from shreds, before any execution occurs. +pub trait DeshredTransactionNotifier { + fn notify_deshred_transaction( + &self, + slot: Slot, + signature: &Signature, + is_vote: bool, + transaction: &VersionedTransaction, + ); +} + +pub type DeshredTransactionNotifierArc = Arc; diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index e7bf5659a4..fb08b87bbd 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -29,6 +29,7 @@ pub mod blockstore_options; pub mod blockstore_processor; pub mod entry_notifier_interface; pub mod entry_notifier_service; +pub mod deshred_transaction_notifier_interface; pub mod genesis_utils; pub mod leader_schedule; pub mod leader_schedule_cache; From 07f9e7e226f50dda418df68b660e213bcb930c48 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Feb 2026 13:52:50 +0000 Subject: [PATCH 8/8] build(deps): bump enum-iterator from 0.8.1 to 1.4.0 Bumps [enum-iterator](https://github.com/stephaneyfx/enum-iterator) from 0.8.1 to 1.4.0. - [Release notes](https://github.com/stephaneyfx/enum-iterator/releases) - [Commits](https://github.com/stephaneyfx/enum-iterator/compare/0.8.1...1.4.0) --- updated-dependencies: - dependency-name: enum-iterator dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4493c29be..25ba94601f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2869,18 +2869,18 @@ dependencies = [ [[package]] name = "enum-iterator" -version = "1.5.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fd242f399be1da0a5354aa462d57b4ab2b4ee0683cc552f7c007d2d12d36e94" +checksum = "a4549325971814bda7a44061bf3fe7e487d447cba01e4220a4b454d630d7a016" dependencies = [ "enum-iterator-derive", ] [[package]] name = "enum-iterator-derive" -version = "1.3.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03cdc46ec28bd728e67540c528013c6a10eb69a02eb31078a1bda695438cbfb8" +checksum = "685adfa4d6f3d765a26bc5dbc936577de9abf756c1feeb3089b01dd395034842" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 7690b732d5..f7e2861cd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -266,7 +266,7 @@ dyn-clone = "1.0.20" eager = "0.1.0" ed25519-dalek = "=1.0.1" ed25519-dalek-bip32 = "0.2.0" -enum-iterator = "1.5.0" +enum-iterator = "2.3.0" env_logger = "0.11.8" fast-math = "0.1" fd-lock = "3.0.13"