From 3c45a6d82eecacb1ca131a2652e41bdafd38d57a Mon Sep 17 00:00:00 2001 From: jason Date: Thu, 18 Dec 2025 17:15:12 +0000 Subject: [PATCH] fix: add non-zero balance checks and moved tx ingestion to post-validation --- crates/actors/src/mempool_service.rs | 24 ++++++ .../src/mempool_service/commitment_txs.rs | 38 ++++++++- crates/actors/src/mempool_service/data_txs.rs | 38 ++++++++- .../actors/src/mempool_service/lifecycle.rs | 83 +++++++++++++++++++ crates/actors/src/validation_service.rs | 15 ++++ .../validation_service/active_validations.rs | 7 +- .../chain/tests/multi_node/mempool_tests.rs | 2 +- crates/p2p/src/block_pool.rs | 36 ++------ crates/p2p/src/tests/integration/mod.rs | 15 ++-- 9 files changed, 218 insertions(+), 40 deletions(-) diff --git a/crates/actors/src/mempool_service.rs b/crates/actors/src/mempool_service.rs index 1dda45d49b..7412c98e57 100644 --- a/crates/actors/src/mempool_service.rs +++ b/crates/actors/src/mempool_service.rs @@ -201,6 +201,11 @@ pub struct Inner { pub enum MempoolServiceMessage { /// Block Confirmed, read publish txs from block. Overwrite copies in mempool with proof BlockConfirmed(Arc), + /// Ingest validated block transactions into mempool after successful block validation. + BlockTransactionsValidated { + block: Arc, + transactions: Arc, + }, /// Ingress Chunk, Add to CachedChunks, generate_ingress_proof, gossip chunk IngestChunk( UnpackedChunk, @@ -270,6 +275,7 @@ impl MempoolServiceMessage { pub fn variant_name(&self) -> &'static str { match self { Self::BlockConfirmed(_) => "BlockConfirmed", + Self::BlockTransactionsValidated { .. } => "BlockTransactionsValidated", Self::IngestChunk(_, _) => "IngestChunk", Self::IngestChunkFireAndForget(_) => "IngestChunkFireAndForget", Self::IngestIngressProof(_, _) => "IngestIngressProof", @@ -315,6 +321,24 @@ impl Inner { ); } } + MempoolServiceMessage::BlockTransactionsValidated { + block, + transactions, + } => { + let block_hash = block.block_hash; + let block_height = block.height; + if let Err(e) = self + .handle_block_transactions_validated(block, transactions) + .await + { + tracing::error!( + "Failed to handle BlockTransactionsValidated for block {} (height {}): {:#}", + block_hash, + block_height, + e + ); + } + } MempoolServiceMessage::IngestCommitmentTxFromApi(commitment_tx, response) => { let response_message = self .handle_ingress_commitment_tx_message_api(commitment_tx) diff --git a/crates/actors/src/mempool_service/commitment_txs.rs b/crates/actors/src/mempool_service/commitment_txs.rs index c5d6f00b1a..4e2c4c2244 100644 --- a/crates/actors/src/mempool_service/commitment_txs.rs +++ b/crates/actors/src/mempool_service/commitment_txs.rs @@ -1,11 +1,11 @@ use crate::mempool_service::{validate_commitment_transaction, Inner, TxIngressError, TxReadError}; use irys_database::{commitment_tx_by_txid, db::IrysDatabaseExt as _}; use irys_domain::CommitmentSnapshotStatus; +use irys_reth_node_bridge::ext::IrysRethRpcTestContextExt as _; use irys_types::{ CommitmentTransaction, CommitmentValidationError, GossipBroadcastMessage, IrysAddress, IrysTransactionCommon as _, IrysTransactionId, TxKnownStatus, H256, }; -// Bring RPC extension trait into scope for test contexts; `as _` avoids unused import warnings use std::collections::HashMap; use tracing::{debug, instrument, warn}; @@ -157,10 +157,14 @@ impl Inner { self.precheck_commitment_ingress_common(commitment_tx) .await?; + // Reject transactions with zero balance + self.validate_commitment_gossip_nonzero_balance(commitment_tx) + .await?; + // Gossip path: check only static fields from config (shape). // - Validate `fee` and `value` to reject clearly wrong Stake/Pledge/Unpledge/Unstake txs. - // - Do not check account balance here. That is verified on API ingress - // and again during selection/block validation. + // - We skip full balance validation for gossip, as balances may differ across forks. + // However, we do check for non-zero balance above to prevent DoS. if let Err(e) = commitment_tx.validate_fee(&self.config.consensus) { self.mempool_state .put_recent_invalid(commitment_tx.id()) @@ -248,6 +252,34 @@ impl Inner { Ok(()) } + /// Validates that a gossip commitment transaction signer has non-zero balance. + #[tracing::instrument(level = "trace", skip_all, fields(tx.id = %tx.id(), tx.signer = %tx.signer()))] + async fn validate_commitment_gossip_nonzero_balance( + &self, + tx: &CommitmentTransaction, + ) -> Result<(), TxIngressError> { + let balance: irys_types::U256 = self + .reth_node_adapter + .rpc + .get_balance_irys_canonical_and_pending(tx.signer(), None) + .await + .map_err(|e| TxIngressError::BalanceFetchError { + address: tx.signer().to_string(), + reason: e.to_string(), + })?; + + if balance.is_zero() { + tracing::debug!( + tx.id = %tx.id(), + tx.signer = %tx.signer(), + "Rejecting gossip commitment tx from zero-balance account" + ); + return Err(TxIngressError::Unfunded(tx.id())); + } + + Ok(()) + } + /// Checks the database index for an existing commitment transaction by id. fn is_known_commitment_in_db(&self, tx_id: &H256) -> Result { let known_in_db = self diff --git a/crates/actors/src/mempool_service/data_txs.rs b/crates/actors/src/mempool_service/data_txs.rs index 9a41bf7122..8d73e2742b 100644 --- a/crates/actors/src/mempool_service/data_txs.rs +++ b/crates/actors/src/mempool_service/data_txs.rs @@ -160,6 +160,9 @@ impl Inner { // Shared pre-checks: duplicate detection, signature, anchor/expiry, ledger parsing let (ledger, expiry_height) = self.precheck_data_ingress_common(&tx).await?; + // DoS protection: reject transactions from accounts with zero balance + self.validate_gossip_nonzero_balance(&tx).await?; + // Protocol fee structure checks (Gossip: skip) // // Rationale: @@ -167,11 +170,12 @@ impl Inner { // EMA/pricing context. To avoid false rejections, we limit validation for Gossip // sources to signature + anchor checks only (performed above), and skip fee structure // checks here. - // - Similarly, we skip balance and EMA pricing validation for gossip, as these are - // canonical-chain-specific and may differ across forks. + // - We skip full balance and EMA pricing validation for gossip, as these are + // canonical-chain-specific and may differ across forks. However, we do check for + // non-zero balance above to prevent DoS from completely unfunded accounts. match ledger { DataLedger::Publish => { - // Gossip path: skip API-only checks here + // Gossip path: skip API-only fee structure checks here } DataLedger::Submit => { // Submit ledger - a data transaction cannot target the submit ledger directly @@ -280,6 +284,34 @@ impl Inner { Ok(()) } + /// Validates that a gossip transaction signer has non-zero balance. + #[tracing::instrument(level = "trace", skip_all, fields(tx.id = %tx.id, tx.signer = %tx.signer))] + async fn validate_gossip_nonzero_balance( + &self, + tx: &DataTransactionHeader, + ) -> Result<(), TxIngressError> { + let balance: U256 = self + .reth_node_adapter + .rpc + .get_balance_irys_canonical_and_pending(tx.signer, None) + .await + .map_err(|e| TxIngressError::BalanceFetchError { + address: tx.signer.to_string(), + reason: e.to_string(), + })?; + + if balance.is_zero() { + tracing::debug!( + tx.id = %tx.id, + tx.signer = %tx.signer, + "Rejecting gossip tx from zero-balance account" + ); + return Err(TxIngressError::Unfunded(tx.id)); + } + + Ok(()) + } + /// Validates data transaction fees against the authoritative EMA pricing. /// Uses `ema_for_public_pricing()` which returns the stable price from 2 intervals ago. /// This is the price that users should use when calculating their transaction fees. diff --git a/crates/actors/src/mempool_service/lifecycle.rs b/crates/actors/src/mempool_service/lifecycle.rs index 195bc29898..20a1b2d25a 100644 --- a/crates/actors/src/mempool_service/lifecycle.rs +++ b/crates/actors/src/mempool_service/lifecycle.rs @@ -125,6 +125,89 @@ impl Inner { Ok(()) } + /// Ingests validated block transactions into mempool, bypassing gossip-path balance checks. + #[instrument(skip_all, fields(block.hash = %block.block_hash, block.height = block.height))] + pub async fn handle_block_transactions_validated( + &self, + block: Arc, + transactions: Arc, + ) -> Result<(), crate::mempool_service::TxIngressError> { + debug!( + "Ingesting {} commitment txs, {} data txs from validated block {}", + transactions.commitment_txs.len(), + transactions.all_data_txs().count(), + block.block_hash + ); + + for commitment_tx in &transactions.commitment_txs { + if let Err(e) = self + .ingest_validated_commitment_tx(commitment_tx.clone()) + .await + { + if !matches!(e, crate::mempool_service::TxIngressError::Skipped) { + warn!( + "Failed to ingest validated commitment tx {}: {:?}", + commitment_tx.id(), + e + ); + } + } + } + + for data_tx in transactions.all_data_txs() { + if let Err(e) = self.ingest_validated_data_tx(data_tx.clone()).await { + if !matches!(e, crate::mempool_service::TxIngressError::Skipped) { + warn!("Failed to ingest validated data tx {}: {:?}", data_tx.id, e); + } + } + } + + Ok(()) + } + + /// Ingest a block-validated commitment tx, bypassing balance checks. + #[instrument(skip_all, fields(tx.id = %tx.id()))] + async fn ingest_validated_commitment_tx( + &self, + tx: CommitmentTransaction, + ) -> Result<(), crate::mempool_service::TxIngressError> { + if self + .mempool_state + .is_known_commitment_in_mempool(&tx.id(), tx.signer()) + .await + { + return Err(crate::mempool_service::TxIngressError::Skipped); + } + + self.mempool_state + .insert_commitment_and_mark_valid(&tx) + .await?; + + debug!("Ingested validated commitment tx {}", tx.id()); + Ok(()) + } + + /// Ingest a block-validated data tx, bypassing balance/EMA checks. + #[instrument(skip_all, fields(tx.id = %tx.id))] + async fn ingest_validated_data_tx( + &self, + tx: irys_types::DataTransactionHeader, + ) -> Result<(), crate::mempool_service::TxIngressError> { + if self + .mempool_state + .valid_submit_ledger_tx_cloned(&tx.id) + .await + .is_some() + { + return Err(crate::mempool_service::TxIngressError::Skipped); + } + + self.mempool_state.bounded_insert_data_tx(tx.clone()).await; + + debug!("Ingested validated data tx {}", tx.id); + Ok(()) + } + #[tracing::instrument(level = "trace", skip_all, fields(fork_parent.height = event.fork_parent.height))] pub async fn handle_reorg(&self, event: ReorgEvent) -> eyre::Result<()> { tracing::debug!( diff --git a/crates/actors/src/validation_service.rs b/crates/actors/src/validation_service.rs index de901ea563..d96db15661 100644 --- a/crates/actors/src/validation_service.rs +++ b/crates/actors/src/validation_service.rs @@ -264,6 +264,21 @@ impl ValidationService { result = coordinator.concurrent_tasks.join_next(), if !coordinator.concurrent_tasks.is_empty() => { match result { Some(Ok(validation)) => { + // On successful validation, send transactions to mempool + if matches!(validation.validation_result, ValidationResult::Valid) { + if let Err(e) = self.inner.service_senders.mempool.send( + crate::mempool_service::MempoolServiceMessage::BlockTransactionsValidated { + block: validation.block.clone(), + transactions: validation.transactions.clone(), + } + ) { + error!( + block.hash = %validation.block_hash, + custom.error = ?e, + "Failed to send BlockTransactionsValidated to mempool" + ); + } + } // Send the validation result to the block tree service if let Err(e) = self.inner.service_senders.block_tree.send( diff --git a/crates/actors/src/validation_service/active_validations.rs b/crates/actors/src/validation_service/active_validations.rs index 7a0f23ce1e..749f3deb60 100644 --- a/crates/actors/src/validation_service/active_validations.rs +++ b/crates/actors/src/validation_service/active_validations.rs @@ -81,6 +81,8 @@ impl PartialOrd for BlockPriorityMeta { #[derive(Debug)] pub(super) struct ConcurrentValidationResult { pub block_hash: BlockHash, + pub block: Arc, + pub transactions: Arc, pub validation_result: ValidationResult, } @@ -341,14 +343,17 @@ impl ValidationCoordinator { match &result { VdfValidationResult::Valid => { let block_hash = task.block.block_hash; + let block = Arc::clone(&task.block); + let transactions = Arc::clone(&task.transactions); self.concurrent_tasks.spawn( async move { - // Execute the validation and return the result let validation_result = task.execute_concurrent().await; ConcurrentValidationResult { block_hash, + block, + transactions, validation_result, } } diff --git a/crates/chain/tests/multi_node/mempool_tests.rs b/crates/chain/tests/multi_node/mempool_tests.rs index 14df8335dc..ab4e3f33e0 100644 --- a/crates/chain/tests/multi_node/mempool_tests.rs +++ b/crates/chain/tests/multi_node/mempool_tests.rs @@ -2510,7 +2510,7 @@ async fn commitment_tx_valid_higher_fee_test( #[rstest::rstest] #[case::stake_enough_balance(irys_types::U256::from(20000000000000000000100_u128 /* stake cost */), 1, 0)] -#[case::stake_not_enough_balance(irys_types::U256::from(0), 0, 0)] +#[case::stake_not_enough_balance(irys_types::U256::from(1), 0, 0)] #[case::pledge_15_enough_balance_for_1( irys_types::U256::from(20000000000000000000100_u128 /*stake cost*/ + 950000000000000000100_u128 /* pledge 1 */ ), 2, // stake & 1 pledge diff --git a/crates/p2p/src/block_pool.rs b/crates/p2p/src/block_pool.rs index aebd4ad4fb..fa2e073889 100644 --- a/crates/p2p/src/block_pool.rs +++ b/crates/p2p/src/block_pool.rs @@ -9,6 +9,7 @@ use irys_actors::services::ServiceSenders; use irys_actors::{MempoolFacade, TxIngressError}; use irys_database::block_header_by_hash; use irys_database::db::IrysDatabaseExt as _; +use irys_database::reth_db::Database as _; use irys_domain::chain_sync_state::ChainSyncState; #[cfg(test)] @@ -820,33 +821,14 @@ where block_transactions.commitment_txs.len() ); - // Insert transactions into mempool so validation service can find them later. - for commitment_tx in &block_transactions.commitment_txs { - if let Err(err) = self - .mempool - .handle_commitment_transaction_ingress_gossip(commitment_tx.clone()) - .await - { - if !matches!(err, TxIngressError::Skipped) { - warn!( - "Block pool: Failed to insert commitment tx {} into mempool for block {:?}: {:?}", - commitment_tx.id(), current_block_hash, err - ); - } - } - } - for data_tx in block_transactions.all_data_txs() { - if let Err(err) = self - .mempool - .handle_data_transaction_ingress_gossip(data_tx.clone()) - .await - { - if !matches!(err, TxIngressError::Skipped) { - warn!( - "Block pool: Failed to insert data tx {} into mempool for block {:?}: {:?}", - data_tx.id, current_block_hash, err - ); - } + // Cache data roots before validation (needed for publish tx ingress proof validation). + // Full tx ingestion happens after validation via BlockTransactionsValidated. + for data_tx in block_transactions.get_ledger_txs(DataLedger::Submit) { + if let Err(e) = self.db.update(|db_tx| { + irys_database::cache_data_root(db_tx, data_tx, Some(&block_header))?; + Ok::<_, eyre::Report>(()) + }) { + warn!("Failed to cache data_root for tx {}: {:?}", data_tx.id, e); } } diff --git a/crates/p2p/src/tests/integration/mod.rs b/crates/p2p/src/tests/integration/mod.rs index aae1b779eb..5d71dd7ba2 100644 --- a/crates/p2p/src/tests/integration/mod.rs +++ b/crates/p2p/src/tests/integration/mod.rs @@ -279,12 +279,17 @@ async fn heavy_should_fetch_missing_transactions_for_block() -> eyre::Result<()> tokio::time::sleep(Duration::from_millis(3000)).await; { - // Check that service 2 received and processed the transactions - let service2_mempool_txs = fixture2.mempool_txs.read().expect("to read transactions"); + // Check that service 2 received the block (with transactions) for validation. + // Note: Transactions are now ingested into mempool AFTER validation succeeds + // via BlockTransactionsValidated, not during block_pool processing. + let service2_discovery_blocks = fixture2 + .discovery_blocks + .read() + .expect("to read discovery blocks"); eyre::ensure!( - service2_mempool_txs.len() == 2, - "Expected 2 transactions in service 2 mempool after block processing, but found {}", - service2_mempool_txs.len() + service2_discovery_blocks.len() == 1, + "Expected 1 block in service 2 discovery after block processing, but found {}", + service2_discovery_blocks.len() ); };