Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ chrono = { version = "0.4", features = ["serde"] }
revm = { version = "34.0.0", default-features = false }
revm-primitives = "22"

blst = "0.3.16"
c-kzg = { version = "2.1.5", features = ["ethereum_kzg_settings"] }
alloy-dyn-abi = "1.5.6"
alloy-evm = { version = "0.27.2", default-features = false }
alloy-primitives = { version = "1.5.6", default-features = false, features = [
Expand All @@ -167,7 +169,7 @@ alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-consensus = { version = "1.7.3", default-features = false }
alloy-contract = { version = "1.7.3", default-features = false }
alloy-core = { version = "1.5.6", default-features = false }
alloy-eips = { version = "1.7.3", default-features = false }
alloy-eips = { version = "1.7.3", default-features = false, features = ["kzg"] }
alloy-genesis = { version = "1.7.3", default-features = false }
alloy-network = { version = "1.7.3", default-features = false }
alloy-provider = { version = "1.7.3", default-features = false }
Expand Down
665 changes: 665 additions & 0 deletions KZG_PROOFS.md

Large diffs are not rendered by default.

189 changes: 189 additions & 0 deletions crates/actors/src/blob_extraction_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use irys_types::H256;
use reth::revm::primitives::B256;
use reth_transaction_pool::blobstore::BlobStore;
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::{debug, warn};

use crate::mempool_service::MempoolServiceMessage;

#[derive(Debug)]
pub enum BlobExtractionMessage {
ExtractBlobs {
block_hash: H256,
blob_tx_hashes: Vec<B256>,
},
}

/// Extracts EIP-4844 blob data from the Reth blob store after block production,
/// converts blobs into Irys chunks with IngressProofV2, and injects synthetic
/// data transactions into the mempool.
pub struct BlobExtractionService<S: BlobStore> {
blob_store: S,
mempool_sender: tokio::sync::mpsc::UnboundedSender<MempoolServiceMessage>,
config: irys_types::Config,
}

impl<S: BlobStore> BlobExtractionService<S> {
pub fn spawn_service(
blob_store: S,
mempool_sender: tokio::sync::mpsc::UnboundedSender<MempoolServiceMessage>,
config: irys_types::Config,
rx: UnboundedReceiver<BlobExtractionMessage>,
runtime_handle: tokio::runtime::Handle,
) {
let service = Self {
blob_store,
mempool_sender,
config,
};

runtime_handle.spawn(service.start(rx));
}

async fn start(self, mut rx: UnboundedReceiver<BlobExtractionMessage>) {
debug!("Blob extraction service started");
while let Some(msg) = rx.recv().await {
match msg {
BlobExtractionMessage::ExtractBlobs {
block_hash,
blob_tx_hashes,
} => {
if let Err(e) = self.handle_extract_blobs(block_hash, &blob_tx_hashes) {
warn!(
block.hash = %block_hash,
error = %e,
"Failed to extract blobs from block",
);
}
}
}
}
debug!("Blob extraction service stopped");
}

fn handle_extract_blobs(&self, block_hash: H256, blob_tx_hashes: &[B256]) -> eyre::Result<()> {
if !self.config.consensus.enable_blobs {
warn!("Received blob extraction request but blobs are disabled");
return Ok(());
}

let signer = self.config.irys_signer();
let chain_id = self.config.consensus.chain_id;
let anchor: H256 = block_hash;
let mut total_blobs = 0_u64;

for tx_hash in blob_tx_hashes {
let sidecar_variant = match self.blob_store.get(*tx_hash) {
Ok(Some(s)) => s,
Ok(None) => {
warn!(tx.hash = %tx_hash, "Blob sidecar not found in store (may be pruned)");
continue;
}
Err(e) => {
warn!(tx.hash = %tx_hash, error = ?e, "Blob store error");
continue;
}
};

let sidecar = match sidecar_variant.as_eip4844() {
Some(s) => s,
None => {
warn!(tx.hash = %tx_hash, "Sidecar is not EIP-4844 format, skipping");
continue;
}
};

eyre::ensure!(
sidecar.commitments.len() == sidecar.blobs.len(),
"sidecar commitment count ({}) != blob count ({})",
sidecar.commitments.len(),
sidecar.blobs.len(),
);
for (blob, commitment) in sidecar.blobs.iter().zip(sidecar.commitments.iter()) {
self.process_single_blob(
&signer,
blob.as_ref(),
commitment.as_ref(),
chain_id,
anchor,
)?;
total_blobs += 1;
}
}

if total_blobs > 0 {
debug!(
block.hash = %block_hash,
blobs.count = total_blobs,
txs.count = blob_tx_hashes.len(),
"Extracted blobs from block",
);
}

Ok(())
}

fn process_single_blob(
&self,
signer: &irys_types::irys::IrysSigner,
blob_data: &[u8],
commitment_bytes: &[u8; 48],
chain_id: u64,
anchor: H256,
) -> eyre::Result<()> {
use irys_types::ingress::generate_ingress_proof_v2_from_blob;
use irys_types::kzg::KzgCommitmentBytes;

let proof = generate_ingress_proof_v2_from_blob(
signer,
blob_data,
commitment_bytes,
chain_id,
anchor,
)?;

let data_root = proof.data_root();

// Blob is a single chunk (index 0) — store its KZG commitment for custody verification
let per_chunk_commitments = vec![(0_u32, KzgCommitmentBytes::from(*commitment_bytes))];

let chunk_size = u64::try_from(irys_types::kzg::CHUNK_SIZE_FOR_KZG)
.map_err(|_| eyre::eyre!("chunk size overflow"))?;

let tx_header = irys_types::transaction::DataTransactionHeader::V1(
irys_types::transaction::DataTransactionHeaderV1WithMetadata {
tx: irys_types::transaction::DataTransactionHeaderV1 {
id: H256::zero(),
anchor,
signer: signer.address(),
data_root,
data_size: chunk_size,
header_size: 0,
term_fee: Default::default(),
perm_fee: None,
ledger_id: u32::from(irys_types::block::DataLedger::Submit),
chain_id,
signature: Default::default(),
bundle_format: None,
},
metadata: irys_types::transaction::DataTransactionMetadata::new(),
},
);

let chunk_data = irys_types::kzg::zero_pad_to_chunk_size(blob_data)?;

if let Err(e) = self
.mempool_sender
.send(MempoolServiceMessage::IngestBlobDerivedTx {
tx_header,
ingress_proof: proof,
chunk_data,
per_chunk_commitments,
})
{
warn!(data_root = %data_root, error = %e, "Failed to send blob-derived tx to mempool");
}

Ok(())
}
}
7 changes: 4 additions & 3 deletions crates/actors/src/block_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,21 +603,21 @@ impl BlockDiscoveryServiceInner {
})?;
// Validate the anchors
for proof in tx_proofs.iter() {
if !valid_ingress_anchor_blocks.contains(&proof.anchor) {
if !valid_ingress_anchor_blocks.contains(&proof.anchor()) {
info!(
"valid ingress anchor blocks: {:?}, bt_finished_height {} min_ingress_proof_anchor_height {} anchor {}, ID {}",
&valid_ingress_anchor_blocks,
&bt_finished_height,
&min_ingress_proof_anchor_height,
&proof.anchor,
&proof.anchor(),
&proof.id()
);
return Err(BlockDiscoveryError::InvalidAnchor {
item_type: AnchorItemType::IngressProof {
promotion_target_id: tx_header.id,
id: proof.id(),
},
anchor: proof.anchor,
anchor: proof.anchor(),
});
}
}
Expand Down Expand Up @@ -960,6 +960,7 @@ pub async fn build_block_body_for_processed_block_header(
block_hash: block_header.block_hash,
data_transactions: data_txs,
commitment_transactions: commitment_txs,
custody_proofs: Vec::new(),
};

Ok(block_body)
Expand Down
48 changes: 47 additions & 1 deletion crates/actors/src/block_producer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
blob_extraction_service::BlobExtractionMessage,
block_discovery::{BlockDiscoveryError, BlockDiscoveryFacade as _, BlockDiscoveryFacadeImpl},
mempool_guard::MempoolReadGuard,
mempool_service::{MempoolServiceMessage, MempoolTxs},
Expand Down Expand Up @@ -797,6 +798,34 @@ pub trait BlockProdStrategy {
.broadcast_block(block, stats, &eth_built_payload)
.await?;
let Some(block) = block else { return Ok(None) };
// Extract blobs from any EIP-4844 transactions in the produced block
if self.inner().config.consensus.enable_blobs {
let blob_tx_hashes: Vec<B256> = eth_built_payload
.block()
.body()
.transactions
.iter()
.filter(|tx| tx.is_eip4844())
.map(|tx| *tx.hash())
.collect();

if !blob_tx_hashes.is_empty() {
debug!(
block.hash = %block.block_hash,
blob_txs = blob_tx_hashes.len(),
"Triggering blob extraction for EIP-4844 transactions",
);
if let Err(e) = self.inner().service_senders.blob_extraction.send(
BlobExtractionMessage::ExtractBlobs {
block_hash: block.block_hash,
blob_tx_hashes,
},
) {
warn!(error = %e, "Failed to send blob extraction request");
}
}
}

Ok(Some((block, eth_built_payload)))
}

Expand Down Expand Up @@ -1169,7 +1198,23 @@ pub trait BlockProdStrategy {
let block_signer = self.inner().config.irys_signer();
block_signer.sign_block_header(&mut irys_block)?;

// Build BlockTransactions from the mempool bundle
let custody_proofs = if self.inner().config.consensus.enable_custody_proofs {
let (tx, rx) = oneshot::channel();
if let Err(e) = self
.inner()
.service_senders
.custody_proof
.send(crate::custody_proof_service::CustodyProofMessage::TakePendingProofs(tx))
{
warn!(error = %e, "Failed to request pending custody proofs");
Vec::new()
} else {
rx.await.unwrap_or_default()
}
} else {
Vec::new()
};

let mut all_data_txs = Vec::new();
all_data_txs.extend(mempool_bundle.submit_txs);
all_data_txs.extend(mempool_bundle.publish_txs.txs);
Expand All @@ -1178,6 +1223,7 @@ pub trait BlockProdStrategy {
block_hash: irys_block.block_hash,
commitment_transactions: mempool_bundle.commitment_txs,
data_transactions: all_data_txs,
custody_proofs,
};

let sealed_block = IrysSealedBlock::new(irys_block, block_body)?;
Expand Down
15 changes: 15 additions & 0 deletions crates/actors/src/block_tree_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,21 @@ impl BlockTreeServiceInner {
);
}

if state == ChainState::Onchain && self.config.consensus.enable_custody_proofs {
let msg = crate::custody_proof_service::CustodyProofMessage::NewBlock {
vdf_output: arc_block.vdf_limiter_info.output,
block_height: height,
};
if let Err(e) = self.service_senders.custody_proof.send(msg) {
tracing::warn!(
block.hash = ?block_hash,
block.height = height,
error = %e,
"Failed to send custody proof new block trigger",
);
}
}

Ok(())
}

Expand Down
Loading
Loading