Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions crates/actors/src/mempool_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ pub struct Inner {
pub enum MempoolServiceMessage {
/// Block Confirmed, read publish txs from block. Overwrite copies in mempool with proof
BlockConfirmed(Arc<IrysBlockHeader>),
/// Ingest validated block transactions into mempool after successful block validation.
BlockTransactionsValidated {
block: Arc<IrysBlockHeader>,
transactions: Arc<crate::block_discovery::BlockTransactions>,
},
/// Ingress Chunk, Add to CachedChunks, generate_ingress_proof, gossip chunk
IngestChunk(
UnpackedChunk,
Expand Down Expand Up @@ -270,6 +275,7 @@ impl MempoolServiceMessage {
pub fn variant_name(&self) -> &'static str {
match self {
Self::BlockConfirmed(_) => "BlockConfirmed",
Self::BlockTransactionsValidated { .. } => "BlockTransactionsValidated",
Self::IngestChunk(_, _) => "IngestChunk",
Self::IngestChunkFireAndForget(_) => "IngestChunkFireAndForget",
Self::IngestIngressProof(_, _) => "IngestIngressProof",
Expand Down Expand Up @@ -315,6 +321,24 @@ impl Inner {
);
}
}
MempoolServiceMessage::BlockTransactionsValidated {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, the only thing is I don't entirely get why would be re-ingest those after the successful validation here - is it to account the transactions that made it to the mempool not from the gossip, but from the block body?

Copy link
Contributor Author

@glottologist glottologist Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so that they can be propogated to other nodes

block,
transactions,
} => {
let block_hash = block.block_hash;
let block_height = block.height;
if let Err(e) = self
.handle_block_transactions_validated(block, transactions)
.await
{
tracing::error!(
"Failed to handle BlockTransactionsValidated for block {} (height {}): {:#}",
block_hash,
block_height,
e
);
}
}
MempoolServiceMessage::IngestCommitmentTxFromApi(commitment_tx, response) => {
let response_message = self
.handle_ingress_commitment_tx_message_api(commitment_tx)
Expand Down
38 changes: 35 additions & 3 deletions crates/actors/src/mempool_service/commitment_txs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::mempool_service::{validate_commitment_transaction, Inner, TxIngressError, TxReadError};
use irys_database::{commitment_tx_by_txid, db::IrysDatabaseExt as _};
use irys_domain::CommitmentSnapshotStatus;
use irys_reth_node_bridge::ext::IrysRethRpcTestContextExt as _;
use irys_types::{
CommitmentTransaction, CommitmentValidationError, GossipBroadcastMessage, IrysAddress,
IrysTransactionCommon as _, IrysTransactionId, TxKnownStatus, H256,
};
// Bring RPC extension trait into scope for test contexts; `as _` avoids unused import warnings
use std::collections::HashMap;
use tracing::{debug, instrument, warn};

Expand Down Expand Up @@ -157,10 +157,14 @@ impl Inner {
self.precheck_commitment_ingress_common(commitment_tx)
.await?;

// Reject transactions with zero balance
self.validate_commitment_gossip_nonzero_balance(commitment_tx)
.await?;

// Gossip path: check only static fields from config (shape).
// - Validate `fee` and `value` to reject clearly wrong Stake/Pledge/Unpledge/Unstake txs.
// - Do not check account balance here. That is verified on API ingress
// and again during selection/block validation.
// - We skip full balance validation for gossip, as balances may differ across forks.
// However, we do check for non-zero balance above to prevent DoS.
if let Err(e) = commitment_tx.validate_fee(&self.config.consensus) {
self.mempool_state
.put_recent_invalid(commitment_tx.id())
Expand Down Expand Up @@ -248,6 +252,34 @@ impl Inner {
Ok(())
}

/// Validates that a gossip commitment transaction signer has non-zero balance.
#[tracing::instrument(level = "trace", skip_all, fields(tx.id = %tx.id(), tx.signer = %tx.signer()))]
async fn validate_commitment_gossip_nonzero_balance(
&self,
tx: &CommitmentTransaction,
) -> Result<(), TxIngressError> {
let balance: irys_types::U256 = self
.reth_node_adapter
.rpc
.get_balance_irys_canonical_and_pending(tx.signer(), None)
.await
.map_err(|e| TxIngressError::BalanceFetchError {
address: tx.signer().to_string(),
reason: e.to_string(),
})?;

if balance.is_zero() {
tracing::debug!(
tx.id = %tx.id(),
tx.signer = %tx.signer(),
"Rejecting gossip commitment tx from zero-balance account"
);
return Err(TxIngressError::Unfunded(tx.id()));
}

Ok(())
}

/// Checks the database index for an existing commitment transaction by id.
fn is_known_commitment_in_db(&self, tx_id: &H256) -> Result<bool, TxIngressError> {
let known_in_db = self
Expand Down
38 changes: 35 additions & 3 deletions crates/actors/src/mempool_service/data_txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,18 +160,22 @@ impl Inner {
// Shared pre-checks: duplicate detection, signature, anchor/expiry, ledger parsing
let (ledger, expiry_height) = self.precheck_data_ingress_common(&tx).await?;

// DoS protection: reject transactions from accounts with zero balance
self.validate_gossip_nonzero_balance(&tx).await?;

// Protocol fee structure checks (Gossip: skip)
//
// Rationale:
// - When we receive a gossiped tx, it may belong to a different fork with a different
// EMA/pricing context. To avoid false rejections, we limit validation for Gossip
// sources to signature + anchor checks only (performed above), and skip fee structure
// checks here.
// - Similarly, we skip balance and EMA pricing validation for gossip, as these are
// canonical-chain-specific and may differ across forks.
// - We skip full balance and EMA pricing validation for gossip, as these are
// canonical-chain-specific and may differ across forks. However, we do check for
// non-zero balance above to prevent DoS from completely unfunded accounts.
match ledger {
DataLedger::Publish => {
// Gossip path: skip API-only checks here
// Gossip path: skip API-only fee structure checks here
}
DataLedger::Submit => {
// Submit ledger - a data transaction cannot target the submit ledger directly
Expand Down Expand Up @@ -280,6 +284,34 @@ impl Inner {
Ok(())
}

/// Validates that a gossip transaction signer has non-zero balance.
#[tracing::instrument(level = "trace", skip_all, fields(tx.id = %tx.id, tx.signer = %tx.signer))]
async fn validate_gossip_nonzero_balance(
&self,
tx: &DataTransactionHeader,
) -> Result<(), TxIngressError> {
let balance: U256 = self
.reth_node_adapter
.rpc
.get_balance_irys_canonical_and_pending(tx.signer, None)
.await
.map_err(|e| TxIngressError::BalanceFetchError {
address: tx.signer.to_string(),
reason: e.to_string(),
})?;

if balance.is_zero() {
tracing::debug!(
tx.id = %tx.id,
tx.signer = %tx.signer,
"Rejecting gossip tx from zero-balance account"
);
return Err(TxIngressError::Unfunded(tx.id));
}

Ok(())
}

/// Validates data transaction fees against the authoritative EMA pricing.
/// Uses `ema_for_public_pricing()` which returns the stable price from 2 intervals ago.
/// This is the price that users should use when calculating their transaction fees.
Expand Down
83 changes: 83 additions & 0 deletions crates/actors/src/mempool_service/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,89 @@ impl Inner {
Ok(())
}

/// Ingests validated block transactions into mempool, bypassing gossip-path balance checks.
#[instrument(skip_all, fields(block.hash = %block.block_hash, block.height = block.height))]
pub async fn handle_block_transactions_validated(
&self,
block: Arc<IrysBlockHeader>,
transactions: Arc<crate::block_discovery::BlockTransactions>,
) -> Result<(), crate::mempool_service::TxIngressError> {
debug!(
"Ingesting {} commitment txs, {} data txs from validated block {}",
transactions.commitment_txs.len(),
transactions.all_data_txs().count(),
block.block_hash
);

for commitment_tx in &transactions.commitment_txs {
if let Err(e) = self
.ingest_validated_commitment_tx(commitment_tx.clone())
.await
{
if !matches!(e, crate::mempool_service::TxIngressError::Skipped) {
warn!(
"Failed to ingest validated commitment tx {}: {:?}",
commitment_tx.id(),
e
);
}
}
}

for data_tx in transactions.all_data_txs() {
if let Err(e) = self.ingest_validated_data_tx(data_tx.clone()).await {
if !matches!(e, crate::mempool_service::TxIngressError::Skipped) {
warn!("Failed to ingest validated data tx {}: {:?}", data_tx.id, e);
}
}
}

Ok(())
}

/// Ingest a block-validated commitment tx, bypassing balance checks.
#[instrument(skip_all, fields(tx.id = %tx.id()))]
async fn ingest_validated_commitment_tx(
&self,
tx: CommitmentTransaction,
) -> Result<(), crate::mempool_service::TxIngressError> {
if self
.mempool_state
.is_known_commitment_in_mempool(&tx.id(), tx.signer())
.await
{
return Err(crate::mempool_service::TxIngressError::Skipped);
}

self.mempool_state
.insert_commitment_and_mark_valid(&tx)
.await?;

debug!("Ingested validated commitment tx {}", tx.id());
Ok(())
}

/// Ingest a block-validated data tx, bypassing balance/EMA checks.
#[instrument(skip_all, fields(tx.id = %tx.id))]
async fn ingest_validated_data_tx(
&self,
tx: irys_types::DataTransactionHeader,
) -> Result<(), crate::mempool_service::TxIngressError> {
if self
.mempool_state
.valid_submit_ledger_tx_cloned(&tx.id)
.await
.is_some()
{
return Err(crate::mempool_service::TxIngressError::Skipped);
}

self.mempool_state.bounded_insert_data_tx(tx.clone()).await;

debug!("Ingested validated data tx {}", tx.id);
Ok(())
}

#[tracing::instrument(level = "trace", skip_all, fields(fork_parent.height = event.fork_parent.height))]
pub async fn handle_reorg(&self, event: ReorgEvent) -> eyre::Result<()> {
tracing::debug!(
Expand Down
15 changes: 15 additions & 0 deletions crates/actors/src/validation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,21 @@ impl ValidationService {
result = coordinator.concurrent_tasks.join_next(), if !coordinator.concurrent_tasks.is_empty() => {
match result {
Some(Ok(validation)) => {
// On successful validation, send transactions to mempool
if matches!(validation.validation_result, ValidationResult::Valid) {
if let Err(e) = self.inner.service_senders.mempool.send(
crate::mempool_service::MempoolServiceMessage::BlockTransactionsValidated {
block: validation.block.clone(),
transactions: validation.transactions.clone(),
}
) {
error!(
block.hash = %validation.block_hash,
custom.error = ?e,
"Failed to send BlockTransactionsValidated to mempool"
);
}
}

// Send the validation result to the block tree service
if let Err(e) = self.inner.service_senders.block_tree.send(
Expand Down
7 changes: 6 additions & 1 deletion crates/actors/src/validation_service/active_validations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ impl PartialOrd for BlockPriorityMeta {
#[derive(Debug)]
pub(super) struct ConcurrentValidationResult {
pub block_hash: BlockHash,
pub block: Arc<IrysBlockHeader>,
pub transactions: Arc<crate::block_discovery::BlockTransactions>,
pub validation_result: ValidationResult,
}

Expand Down Expand Up @@ -341,14 +343,17 @@ impl ValidationCoordinator {
match &result {
VdfValidationResult::Valid => {
let block_hash = task.block.block_hash;
let block = Arc::clone(&task.block);
let transactions = Arc::clone(&task.transactions);

self.concurrent_tasks.spawn(
async move {
// Execute the validation and return the result
let validation_result = task.execute_concurrent().await;

ConcurrentValidationResult {
block_hash,
block,
transactions,
validation_result,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/chain/tests/multi_node/mempool_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2510,7 +2510,7 @@ async fn commitment_tx_valid_higher_fee_test(

#[rstest::rstest]
#[case::stake_enough_balance(irys_types::U256::from(20000000000000000000100_u128 /* stake cost */), 1, 0)]
#[case::stake_not_enough_balance(irys_types::U256::from(0), 0, 0)]
#[case::stake_not_enough_balance(irys_types::U256::from(1), 0, 0)]
#[case::pledge_15_enough_balance_for_1(
irys_types::U256::from(20000000000000000000100_u128 /*stake cost*/ + 950000000000000000100_u128 /* pledge 1 */ ),
2, // stake & 1 pledge
Expand Down
36 changes: 9 additions & 27 deletions crates/p2p/src/block_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use irys_actors::services::ServiceSenders;
use irys_actors::{MempoolFacade, TxIngressError};
use irys_database::block_header_by_hash;
use irys_database::db::IrysDatabaseExt as _;
use irys_database::reth_db::Database as _;
use irys_domain::chain_sync_state::ChainSyncState;

#[cfg(test)]
Expand Down Expand Up @@ -820,33 +821,14 @@ where
block_transactions.commitment_txs.len()
);

// Insert transactions into mempool so validation service can find them later.
for commitment_tx in &block_transactions.commitment_txs {
if let Err(err) = self
.mempool
.handle_commitment_transaction_ingress_gossip(commitment_tx.clone())
.await
{
if !matches!(err, TxIngressError::Skipped) {
warn!(
"Block pool: Failed to insert commitment tx {} into mempool for block {:?}: {:?}",
commitment_tx.id(), current_block_hash, err
);
}
}
}
for data_tx in block_transactions.all_data_txs() {
if let Err(err) = self
.mempool
.handle_data_transaction_ingress_gossip(data_tx.clone())
.await
{
if !matches!(err, TxIngressError::Skipped) {
warn!(
"Block pool: Failed to insert data tx {} into mempool for block {:?}: {:?}",
data_tx.id, current_block_hash, err
);
}
// Cache data roots before validation (needed for publish tx ingress proof validation).
// Full tx ingestion happens after validation via BlockTransactionsValidated.
for data_tx in block_transactions.get_ledger_txs(DataLedger::Submit) {
if let Err(e) = self.db.update(|db_tx| {
irys_database::cache_data_root(db_tx, data_tx, Some(&block_header))?;
Ok::<_, eyre::Report>(())
}) {
warn!("Failed to cache data_root for tx {}: {:?}", data_tx.id, e);
}
}
Comment on lines +824 to 833
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the cache_data_root function definition and its signature
ast-grep --pattern 'fn cache_data_root($$$) { $$$ }'

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Search for how publish transaction validation depends on cached data roots
rg "cache_data_root|publish.*ingress.*proof" -A 5 -B 5

Repository: Irys-xyz/irys

Length of output: 28417


🏁 Script executed:

# Check the irys_database module to understand cache_data_root
fd "irys_database" -t f | head -20

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Read the cache_data_root function implementation
sed -n '1,100p' crates/database/src/database.rs | grep -A 50 "fn cache_data_root"

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Get more context around cache_data_root implementation
sed -n '1,500p' crates/database/src/database.rs | tail -100

Repository: Irys-xyz/irys

Length of output: 2802


🏁 Script executed:

# Search for how publish tx validation uses cached data roots
rg "cache_data_root|CachedDataRoots" -A 3 -B 3 crates/actors/src/block_validation.rs

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Look for publish transaction ingress proof validation logic
rg "ingress.*proof.*validat|publish.*ledger" -i crates/actors/src/block_validation.rs | head -20

Repository: Irys-xyz/irys

Length of output: 1471


🏁 Script executed:

# Find where store_ingress_proof_checked and store_external_ingress_proof_checked are called
rg "store_ingress_proof_checked|store_external_ingress_proof_checked" -B 3 -A 3

Repository: Irys-xyz/irys

Length of output: 3910


🏁 Script executed:

# Get the full cache_data_root implementation from database.rs
rg -A 30 "pub fn cache_data_root" crates/database/src/database.rs

Repository: Irys-xyz/irys

Length of output: 1299


🏁 Script executed:

# Check what errors cache_data_root can return and under what conditions
rg "cache_data_root" -B 5 -A 10 crates/database/src/database.rs | head -80

Repository: Irys-xyz/irys

Length of output: 618


🏁 Script executed:

# Check the sequence of operations in block_pool.rs around the cache_data_root call
sed -n '800,850p' crates/p2p/src/block_pool.rs

Repository: Irys-xyz/irys

Length of output: 2112


🏁 Script executed:

# Check where store_ingress_proof_checked gets called in block validation/discovery flow
rg "store_ingress_proof|store_external_ingress_proof" -B 5 crates/actors/src/block_discovery.rs crates/actors/src/block_validation.rs 2>/dev/null | head -40

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Check what happens during publish transaction processing in block discovery
sed -n '1,100p' crates/actors/src/block_discovery.rs

Repository: Irys-xyz/irys

Length of output: 3636


🏁 Script executed:

# Search for the full cache_data_root implementation to see all possible error cases
rg -A 50 "pub fn cache_data_root" crates/database/src/database.rs | head -80

Repository: Irys-xyz/irys

Length of output: 2009


🏁 Script executed:

# Find where ingress proofs are stored during block validation/processing
rg "store_ingress_proof|store_external_ingress_proof" -B 10 crates/actors/src/block_validation.rs crates/actors/src/block_discovery.rs | head -100

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Check how publish ledger validation processes ingress proofs
rg -A 20 "validate_publish_ledger|PublishLedger" crates/actors/src/block_validation.rs | head -80

Repository: Irys-xyz/irys

Length of output: 3578


🏁 Script executed:

# Search for where ingress proofs are being written to the database during block processing
rg "IngressProofs|store.*proof" crates/actors/src/block_validation.rs crates/actors/src/block_discovery.rs

Repository: Irys-xyz/irys

Length of output: 471


🏁 Script executed:

# Look at the full block validation flow - where ingress proofs are stored
rg "BlockTransactionsValidated|BlockValidated" -A 10 crates/actors/src/mempool_service/lifecycle.rs | head -60

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Search for where publish transaction proofs are processed in block discovery
rg -A 30 "process.*publish|handle_block" crates/actors/src/block_discovery.rs | head -80

Repository: Irys-xyz/irys

Length of output: 1622


🏁 Script executed:

# Check the block handler flow that uses cached data roots
rg "cached_data_root_by_data_root" -B 5 -A 10

Repository: Irys-xyz/irys

Length of output: 17469


🏁 Script executed:

# Look at what happens after BlockTransactionsValidated - where proofs are stored
rg "BlockTransactionsValidated" -A 20 crates/actors/src/mempool_service/lifecycle.rs

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Find the exact panic location in block_validation.rs where cached_data_root is accessed
rg -B 5 -A 15 'panic.*CachedDataRoot should be found' crates/actors/src/block_validation.rs

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Check if there are other places in block validation that assume cached data roots exist
rg 'cached_data_root_by_data_root' -A 5 crates/actors/src/block_validation.rs | head -60

Repository: Irys-xyz/irys

Length of output: 654


🏁 Script executed:

# Search for all callers of store_ingress_proof_checked to see how failures are handled
rg 'store_ingress_proof_checked|store_external_ingress_proof_checked' -B 2 -A 5 | grep -A 5 "block.*validat"

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Confirm the full context of the panic and when it's triggered
sed -n '550,620p' crates/actors/src/block_validation.rs

Repository: Irys-xyz/irys

Length of output: 2628


🏁 Script executed:

# Check how often this validation path is executed (is it critical path?)
rg "validate_ingress_proof|validate_publish_ledger" -B 3 -A 10 crates/actors/src/block_validation.rs | head -80

Repository: Irys-xyz/irys

Length of output: 39


🏁 Script executed:

# Find the exact validation function that uses cached_data_root and panics if missing
rg -B 10 -A 30 "CachedDataRoot should be found" crates/actors/src/block_validation.rs

Repository: Irys-xyz/irys

Length of output: 1817


🏁 Script executed:

# Check where this validation is called in the block validation flow
rg "validate_ingress_proof_signer_uniqueness|validate.*publish" -A 20 crates/actors/src/block_validation.rs | head -100

Repository: Irys-xyz/irys

Length of output: 1843


Cache failures in block processing can cause validation panics downstream.

The code silently continues after cache_data_root failures in the Submit ledger, but publish transaction validation in block_validation.rs has an explicit panic requirement for these cached entries when validating ingress proofs. If caching fails, the block will reach validation only to panic with "CachedDataRoot should be found for data_root..." during ingress proof validation—a critical validation path.

Rather than silent degradation followed by validation crashes, consider treating cache failures as fatal for the block processing step, or add explicit validation to catch missing cached data roots before they cause panics.


Expand Down
15 changes: 10 additions & 5 deletions crates/p2p/src/tests/integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,17 @@ async fn heavy_should_fetch_missing_transactions_for_block() -> eyre::Result<()>
tokio::time::sleep(Duration::from_millis(3000)).await;

{
// Check that service 2 received and processed the transactions
let service2_mempool_txs = fixture2.mempool_txs.read().expect("to read transactions");
// Check that service 2 received the block (with transactions) for validation.
// Note: Transactions are now ingested into mempool AFTER validation succeeds
// via BlockTransactionsValidated, not during block_pool processing.
let service2_discovery_blocks = fixture2
.discovery_blocks
.read()
.expect("to read discovery blocks");
eyre::ensure!(
service2_mempool_txs.len() == 2,
"Expected 2 transactions in service 2 mempool after block processing, but found {}",
service2_mempool_txs.len()
service2_discovery_blocks.len() == 1,
"Expected 1 block in service 2 discovery after block processing, but found {}",
service2_discovery_blocks.len()
);
};

Expand Down
Loading