Skip to content

Commit 475f3ec

Browse files
committed
fix: add non-zero balance checks and moved tx ingestion to post-validation
1 parent 0970e8f commit 475f3ec

File tree

9 files changed

+217
-40
lines changed

9 files changed

+217
-40
lines changed

crates/actors/src/mempool_service.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ pub struct Inner {
201201
pub enum MempoolServiceMessage {
202202
/// Block Confirmed, read publish txs from block. Overwrite copies in mempool with proof
203203
BlockConfirmed(Arc<IrysBlockHeader>),
204+
/// Ingest validated block transactions into mempool after successful block validation.
205+
BlockTransactionsValidated {
206+
block: Arc<IrysBlockHeader>,
207+
transactions: Arc<crate::block_discovery::BlockTransactions>,
208+
},
204209
/// Ingress Chunk, Add to CachedChunks, generate_ingress_proof, gossip chunk
205210
IngestChunk(
206211
UnpackedChunk,
@@ -270,6 +275,7 @@ impl MempoolServiceMessage {
270275
pub fn variant_name(&self) -> &'static str {
271276
match self {
272277
Self::BlockConfirmed(_) => "BlockConfirmed",
278+
Self::BlockTransactionsValidated { .. } => "BlockTransactionsValidated",
273279
Self::IngestChunk(_, _) => "IngestChunk",
274280
Self::IngestChunkFireAndForget(_) => "IngestChunkFireAndForget",
275281
Self::IngestIngressProof(_, _) => "IngestIngressProof",
@@ -315,6 +321,24 @@ impl Inner {
315321
);
316322
}
317323
}
324+
MempoolServiceMessage::BlockTransactionsValidated {
325+
block,
326+
transactions,
327+
} => {
328+
let block_hash = block.block_hash;
329+
let block_height = block.height;
330+
if let Err(e) = self
331+
.handle_block_transactions_validated(block, transactions)
332+
.await
333+
{
334+
tracing::error!(
335+
"Failed to handle BlockTransactionsValidated for block {} (height {}): {:#}",
336+
block_hash,
337+
block_height,
338+
e
339+
);
340+
}
341+
}
318342
MempoolServiceMessage::IngestCommitmentTxFromApi(commitment_tx, response) => {
319343
let response_message = self
320344
.handle_ingress_commitment_tx_message_api(commitment_tx)

crates/actors/src/mempool_service/commitment_txs.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use crate::mempool_service::{validate_commitment_transaction, Inner, TxIngressError, TxReadError};
22
use irys_database::{commitment_tx_by_txid, db::IrysDatabaseExt as _};
33
use irys_domain::CommitmentSnapshotStatus;
4+
use irys_reth_node_bridge::ext::IrysRethRpcTestContextExt as _;
45
use irys_types::{
56
CommitmentTransaction, CommitmentValidationError, GossipBroadcastMessage, IrysAddress,
67
IrysTransactionCommon as _, IrysTransactionId, TxKnownStatus, H256,
78
};
8-
// Bring RPC extension trait into scope for test contexts; `as _` avoids unused import warnings
99
use std::collections::HashMap;
1010
use tracing::{debug, instrument, warn};
1111

@@ -157,10 +157,14 @@ impl Inner {
157157
self.precheck_commitment_ingress_common(commitment_tx)
158158
.await?;
159159

160+
// Reject transactions with zero balance
161+
self.validate_commitment_gossip_nonzero_balance(commitment_tx)
162+
.await?;
163+
160164
// Gossip path: check only static fields from config (shape).
161165
// - Validate `fee` and `value` to reject clearly wrong Stake/Pledge/Unpledge/Unstake txs.
162-
// - Do not check account balance here. That is verified on API ingress
163-
// and again during selection/block validation.
166+
// - We skip full balance validation for gossip, as balances may differ across forks.
167+
// However, we do check for non-zero balance above to prevent DoS.
164168
if let Err(e) = commitment_tx.validate_fee(&self.config.consensus) {
165169
self.mempool_state
166170
.put_recent_invalid(commitment_tx.id())
@@ -248,6 +252,34 @@ impl Inner {
248252
Ok(())
249253
}
250254

255+
/// Validates that a gossip commitment transaction signer has non-zero balance.
256+
#[tracing::instrument(level = "trace", skip_all, fields(tx.id = %tx.id, tx.signer = %tx.signer))]
257+
async fn validate_commitment_gossip_nonzero_balance(
258+
&self,
259+
tx: &CommitmentTransaction,
260+
) -> Result<(), TxIngressError> {
261+
let balance: irys_types::U256 = self
262+
.reth_node_adapter
263+
.rpc
264+
.get_balance_irys_canonical_and_pending(tx.signer, None)
265+
.await
266+
.map_err(|e| TxIngressError::BalanceFetchError {
267+
address: tx.signer.to_string(),
268+
reason: e.to_string(),
269+
})?;
270+
271+
if balance.is_zero() {
272+
tracing::debug!(
273+
tx.id = %tx.id,
274+
tx.signer = %tx.signer,
275+
"Rejecting gossip commitment tx from zero-balance account"
276+
);
277+
return Err(TxIngressError::Unfunded(tx.id));
278+
}
279+
280+
Ok(())
281+
}
282+
251283
/// Checks the database index for an existing commitment transaction by id.
252284
fn is_known_commitment_in_db(&self, tx_id: &H256) -> Result<bool, TxIngressError> {
253285
let known_in_db = self

crates/actors/src/mempool_service/data_txs.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,18 +160,22 @@ impl Inner {
160160
// Shared pre-checks: duplicate detection, signature, anchor/expiry, ledger parsing
161161
let (ledger, expiry_height) = self.precheck_data_ingress_common(&tx).await?;
162162

163+
// DoS protection: reject transactions from accounts with zero balance
164+
self.validate_gossip_nonzero_balance(&tx).await?;
165+
163166
// Protocol fee structure checks (Gossip: skip)
164167
//
165168
// Rationale:
166169
// - When we receive a gossiped tx, it may belong to a different fork with a different
167170
// EMA/pricing context. To avoid false rejections, we limit validation for Gossip
168171
// sources to signature + anchor checks only (performed above), and skip fee structure
169172
// checks here.
170-
// - Similarly, we skip balance and EMA pricing validation for gossip, as these are
171-
// canonical-chain-specific and may differ across forks.
173+
// - We skip full balance and EMA pricing validation for gossip, as these are
174+
// canonical-chain-specific and may differ across forks. However, we do check for
175+
// non-zero balance above to prevent DoS from completely unfunded accounts.
172176
match ledger {
173177
DataLedger::Publish => {
174-
// Gossip path: skip API-only checks here
178+
// Gossip path: skip API-only fee structure checks here
175179
}
176180
DataLedger::Submit => {
177181
// Submit ledger - a data transaction cannot target the submit ledger directly
@@ -280,6 +284,34 @@ impl Inner {
280284
Ok(())
281285
}
282286

287+
/// Validates that a gossip transaction signer has non-zero balance.
288+
#[tracing::instrument(level = "trace", skip_all, fields(tx.id = %tx.id, tx.signer = %tx.signer))]
289+
async fn validate_gossip_nonzero_balance(
290+
&self,
291+
tx: &DataTransactionHeader,
292+
) -> Result<(), TxIngressError> {
293+
let balance: U256 = self
294+
.reth_node_adapter
295+
.rpc
296+
.get_balance_irys_canonical_and_pending(tx.signer, None)
297+
.await
298+
.map_err(|e| TxIngressError::BalanceFetchError {
299+
address: tx.signer.to_string(),
300+
reason: e.to_string(),
301+
})?;
302+
303+
if balance.is_zero() {
304+
tracing::debug!(
305+
tx.id = %tx.id,
306+
tx.signer = %tx.signer,
307+
"Rejecting gossip tx from zero-balance account"
308+
);
309+
return Err(TxIngressError::Unfunded(tx.id));
310+
}
311+
312+
Ok(())
313+
}
314+
283315
/// Validates data transaction fees against the authoritative EMA pricing.
284316
/// Uses `ema_for_public_pricing()` which returns the stable price from 2 intervals ago.
285317
/// This is the price that users should use when calculating their transaction fees.

crates/actors/src/mempool_service/lifecycle.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,88 @@ impl Inner {
125125
Ok(())
126126
}
127127

128+
/// Ingests validated block transactions into mempool, bypassing gossip-path balance checks.
129+
#[instrument(skip_all, fields(block.hash = %block.block_hash, block.height = block.height))]
130+
pub async fn handle_block_transactions_validated(
131+
&self,
132+
block: Arc<IrysBlockHeader>,
133+
transactions: Arc<crate::block_discovery::BlockTransactions>,
134+
) -> Result<(), crate::mempool_service::TxIngressError> {
135+
debug!(
136+
"Ingesting {} commitment txs, {} data txs from validated block {}",
137+
transactions.commitment_txs.len(),
138+
transactions.all_data_txs().count(),
139+
block.block_hash
140+
);
141+
142+
for commitment_tx in &transactions.commitment_txs {
143+
if let Err(e) = self
144+
.ingest_validated_commitment_tx(commitment_tx.clone())
145+
.await
146+
{
147+
if !matches!(e, crate::mempool_service::TxIngressError::Skipped) {
148+
warn!(
149+
"Failed to ingest validated commitment tx {}: {:?}",
150+
commitment_tx.id, e
151+
);
152+
}
153+
}
154+
}
155+
156+
for data_tx in transactions.all_data_txs() {
157+
if let Err(e) = self.ingest_validated_data_tx(data_tx.clone()).await {
158+
if !matches!(e, crate::mempool_service::TxIngressError::Skipped) {
159+
warn!("Failed to ingest validated data tx {}: {:?}", data_tx.id, e);
160+
}
161+
}
162+
}
163+
164+
Ok(())
165+
}
166+
167+
/// Ingest a block-validated commitment tx, bypassing balance checks.
168+
#[instrument(skip_all, fields(tx.id = %tx.id))]
169+
async fn ingest_validated_commitment_tx(
170+
&self,
171+
tx: CommitmentTransaction,
172+
) -> Result<(), crate::mempool_service::TxIngressError> {
173+
if self
174+
.mempool_state
175+
.is_known_commitment_in_mempool(&tx.id, tx.signer)
176+
.await
177+
{
178+
return Err(crate::mempool_service::TxIngressError::Skipped);
179+
}
180+
181+
self.mempool_state
182+
.insert_commitment_and_mark_valid(&tx)
183+
.await?;
184+
185+
debug!("Ingested validated commitment tx {}", tx.id);
186+
Ok(())
187+
}
188+
189+
/// Ingest a block-validated data tx, bypassing balance/EMA checks.
190+
#[instrument(skip_all, fields(tx.id = %tx.id))]
191+
async fn ingest_validated_data_tx(
192+
&self,
193+
tx: irys_types::DataTransactionHeader,
194+
) -> Result<(), crate::mempool_service::TxIngressError> {
195+
if self
196+
.mempool_state
197+
.valid_submit_ledger_tx_cloned(&tx.id)
198+
.await
199+
.is_some()
200+
{
201+
return Err(crate::mempool_service::TxIngressError::Skipped);
202+
}
203+
204+
self.mempool_state.bounded_insert_data_tx(tx.clone()).await;
205+
206+
debug!("Ingested validated data tx {}", tx.id);
207+
Ok(())
208+
}
209+
128210
#[tracing::instrument(level = "trace", skip_all, fields(fork_parent.height = event.fork_parent.height))]
129211
pub async fn handle_reorg(&self, event: ReorgEvent) -> eyre::Result<()> {
130212
tracing::debug!(

crates/actors/src/validation_service.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,21 @@ impl ValidationService {
264264
result = coordinator.concurrent_tasks.join_next(), if !coordinator.concurrent_tasks.is_empty() => {
265265
match result {
266266
Some(Ok(validation)) => {
267+
// On successful validation, send transactions to mempool
268+
if matches!(validation.validation_result, ValidationResult::Valid) {
269+
if let Err(e) = self.inner.service_senders.mempool.send(
270+
crate::mempool_service::MempoolServiceMessage::BlockTransactionsValidated {
271+
block: validation.block.clone(),
272+
transactions: validation.transactions.clone(),
273+
}
274+
) {
275+
error!(
276+
block.hash = %validation.block_hash,
277+
custom.error = ?e,
278+
"Failed to send BlockTransactionsValidated to mempool"
279+
);
280+
}
281+
}
267282

268283
// Send the validation result to the block tree service
269284
if let Err(e) = self.inner.service_senders.block_tree.send(

crates/actors/src/validation_service/active_validations.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ impl PartialOrd for BlockPriorityMeta {
8181
#[derive(Debug)]
8282
pub(super) struct ConcurrentValidationResult {
8383
pub block_hash: BlockHash,
84+
pub block: Arc<IrysBlockHeader>,
85+
pub transactions: Arc<crate::block_discovery::BlockTransactions>,
8486
pub validation_result: ValidationResult,
8587
}
8688

@@ -341,14 +343,17 @@ impl ValidationCoordinator {
341343
match &result {
342344
VdfValidationResult::Valid => {
343345
let block_hash = task.block.block_hash;
346+
let block = Arc::clone(&task.block);
347+
let transactions = Arc::clone(&task.transactions);
344348

345349
self.concurrent_tasks.spawn(
346350
async move {
347-
// Execute the validation and return the result
348351
let validation_result = task.execute_concurrent().await;
349352

350353
ConcurrentValidationResult {
351354
block_hash,
355+
block,
356+
transactions,
352357
validation_result,
353358
}
354359
}

crates/chain/tests/multi_node/mempool_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2510,7 +2510,7 @@ async fn commitment_tx_valid_higher_fee_test(
25102510

25112511
#[rstest::rstest]
25122512
#[case::stake_enough_balance(irys_types::U256::from(20000000000000000000100_u128 /* stake cost */), 1, 0)]
2513-
#[case::stake_not_enough_balance(irys_types::U256::from(0), 0, 0)]
2513+
#[case::stake_not_enough_balance(irys_types::U256::from(1), 0, 0)]
25142514
#[case::pledge_15_enough_balance_for_1(
25152515
irys_types::U256::from(20000000000000000000100_u128 /*stake cost*/ + 950000000000000000100_u128 /* pledge 1 */ ),
25162516
2, // stake & 1 pledge

crates/p2p/src/block_pool.rs

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use irys_actors::services::ServiceSenders;
99
use irys_actors::{MempoolFacade, TxIngressError};
1010
use irys_database::block_header_by_hash;
1111
use irys_database::db::IrysDatabaseExt as _;
12+
use irys_database::reth_db::Database as _;
1213
use irys_domain::chain_sync_state::ChainSyncState;
1314

1415
#[cfg(test)]
@@ -820,33 +821,14 @@ where
820821
block_transactions.commitment_txs.len()
821822
);
822823

823-
// Insert transactions into mempool so validation service can find them later.
824-
for commitment_tx in &block_transactions.commitment_txs {
825-
if let Err(err) = self
826-
.mempool
827-
.handle_commitment_transaction_ingress_gossip(commitment_tx.clone())
828-
.await
829-
{
830-
if !matches!(err, TxIngressError::Skipped) {
831-
warn!(
832-
"Block pool: Failed to insert commitment tx {} into mempool for block {:?}: {:?}",
833-
commitment_tx.id(), current_block_hash, err
834-
);
835-
}
836-
}
837-
}
838-
for data_tx in block_transactions.all_data_txs() {
839-
if let Err(err) = self
840-
.mempool
841-
.handle_data_transaction_ingress_gossip(data_tx.clone())
842-
.await
843-
{
844-
if !matches!(err, TxIngressError::Skipped) {
845-
warn!(
846-
"Block pool: Failed to insert data tx {} into mempool for block {:?}: {:?}",
847-
data_tx.id, current_block_hash, err
848-
);
849-
}
824+
// Cache data roots before validation (needed for publish tx ingress proof validation).
825+
// Full tx ingestion happens after validation via BlockTransactionsValidated.
826+
for data_tx in block_transactions.get_ledger_txs(DataLedger::Submit) {
827+
if let Err(e) = self.db.update(|db_tx| {
828+
irys_database::cache_data_root(db_tx, data_tx, Some(&block_header))?;
829+
Ok::<_, eyre::Report>(())
830+
}) {
831+
warn!("Failed to cache data_root for tx {}: {:?}", data_tx.id, e);
850832
}
851833
}
852834

crates/p2p/src/tests/integration/mod.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,17 @@ async fn heavy_should_fetch_missing_transactions_for_block() -> eyre::Result<()>
279279
tokio::time::sleep(Duration::from_millis(3000)).await;
280280

281281
{
282-
// Check that service 2 received and processed the transactions
283-
let service2_mempool_txs = fixture2.mempool_txs.read().expect("to read transactions");
282+
// Check that service 2 received the block (with transactions) for validation.
283+
// Note: Transactions are now ingested into mempool AFTER validation succeeds
284+
// via BlockTransactionsValidated, not during block_pool processing.
285+
let service2_discovery_blocks = fixture2
286+
.discovery_blocks
287+
.read()
288+
.expect("to read discovery blocks");
284289
eyre::ensure!(
285-
service2_mempool_txs.len() == 2,
286-
"Expected 2 transactions in service 2 mempool after block processing, but found {}",
287-
service2_mempool_txs.len()
290+
service2_discovery_blocks.len() == 1,
291+
"Expected 1 block in service 2 discovery after block processing, but found {}",
292+
service2_discovery_blocks.len()
288293
);
289294
};
290295

0 commit comments

Comments
 (0)