diff --git a/anchor-evm/src/evm_transaction_manager.rs b/anchor-evm/src/evm_transaction_manager.rs index fa47b4cb1..4b9e161fd 100644 --- a/anchor-evm/src/evm_transaction_manager.rs +++ b/anchor-evm/src/evm_transaction_manager.rs @@ -1,6 +1,7 @@ use std::time::Duration; use alloy::{ + hex, network::EthereumWallet, primitives::{Address, FixedBytes, U256}, providers::{Provider, ProviderBuilder}, @@ -9,7 +10,9 @@ use alloy::{ }; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use ceramic_anchor_service::{DetachedTimeEvent, MerkleNodes, RootTimeEvent, TransactionManager}; +use ceramic_anchor_service::{ + ChainInclusionData, DetachedTimeEvent, MerkleNodes, RootTimeEvent, TransactionManager, +}; use ceramic_core::{Cid, SerializeExt}; use tokio::time::{interval, sleep}; use tracing::{debug, info, warn}; @@ -82,6 +85,18 @@ pub struct EvmTransactionManager { config: EvmConfig, } +/// Result of submitting and confirming an anchor transaction +struct AnchorResult { + /// The transaction hash (0x-prefixed) + tx_hash: String, + /// The block hash containing the transaction + block_hash: String, + /// The block timestamp (Unix timestamp in seconds) + timestamp: u64, + /// The transaction input data (0x-prefixed function selector + hash) + tx_input: String, +} + impl EvmTransactionManager { /// Create a new EVM transaction manager pub async fn new(config: EvmConfig) -> Result { @@ -134,7 +149,7 @@ impl EvmTransactionManager { } /// Submit an anchor transaction and wait for confirmation with retry logic - async fn submit_and_wait(&self, root_cid: Cid) -> Result { + async fn submit_and_wait(&self, root_cid: Cid) -> Result { info!( "Anchoring root CID: {} on chain {}", root_cid, self.config.chain_id @@ -255,7 +270,29 @@ impl EvmTransactionManager { let gas_used = starting_balance.saturating_sub(ending_balance); info!("Total gas cost: {} wei", gas_used); } - return Ok(tx_hash); + + // Get block hash from receipt + let block_hash = receipt + .block_hash + .ok_or_else(|| anyhow!("Transaction receipt missing block hash"))?; + + // Fetch block to get timestamp (false = don't include full transactions) + let block = provider + .get_block_by_number(block_number.into(), false) + .await? + .ok_or_else(|| anyhow!("Block {} not found", block_number))?; + + // Construct tx_input: function selector (0x97ad09eb) + 32-byte hash + let root_hash = Self::cid_to_bytes32(&root_cid)?; + let tx_input = + format!("0x97ad09eb{}", hex::encode(root_hash.as_slice())); + + return Ok(AnchorResult { + tx_hash, + block_hash: format!("0x{:x}", block_hash), + timestamp: block.header.timestamp, + tx_input, + }); } Err(e) => { warn!("Transaction confirmation failed: {}", e); @@ -287,7 +324,7 @@ impl EvmTransactionManager { || error_str.contains("replacement transaction underpriced")) { for prev_tx in previous_tx_hashes.iter().rev() { - if let Ok(Some(_)) = provider + if let Ok(Some(prev_receipt)) = provider .get_transaction_receipt(prev_tx.parse().unwrap_or_default()) .await { @@ -297,7 +334,32 @@ impl EvmTransactionManager { { info!("Ending wallet balance: {} wei", ending_balance); } - return Ok(prev_tx.clone()); + + // Get block info from the previous receipt + let block_hash = prev_receipt.block_hash.ok_or_else(|| { + anyhow!("Previous transaction receipt missing block hash") + })?; + let block_number = prev_receipt.block_number.ok_or_else(|| { + anyhow!("Previous transaction receipt missing block number") + })?; + + // Fetch block to get timestamp (false = don't include full transactions) + let block = provider + .get_block_by_number(block_number.into(), false) + .await? + .ok_or_else(|| anyhow!("Block {} not found", block_number))?; + + // Construct tx_input from root_cid + let root_hash = Self::cid_to_bytes32(&root_cid)?; + let tx_input = + format!("0x97ad09eb{}", hex::encode(root_hash.as_slice())); + + return Ok(AnchorResult { + tx_hash: prev_tx.clone(), + block_hash: format!("0x{:x}", block_hash), + timestamp: block.header.timestamp, + tx_input, + }); } } } @@ -428,10 +490,11 @@ impl EvmTransactionManager { impl TransactionManager for EvmTransactionManager { async fn anchor_root(&self, root: Cid) -> Result { // Submit transaction and wait for confirmation - let tx_hash = self.submit_and_wait(root).await?; + let anchor_result = self.submit_and_wait(root).await?; // Build anchor proof from transaction details - let proof = ProofBuilder::build_proof(self.config.chain_id, tx_hash, root)?; + let proof = + ProofBuilder::build_proof(self.config.chain_id, anchor_result.tx_hash.clone(), root)?; let proof_cid = proof.to_cid()?; // Create detached time event @@ -441,12 +504,22 @@ impl TransactionManager for EvmTransactionManager { proof: proof_cid, }; + // Create chain inclusion data for persistence + let chain_inclusion = ChainInclusionData { + chain_id: format!("eip155:{}", self.config.chain_id), + transaction_hash: anchor_result.tx_hash, + transaction_input: anchor_result.tx_input, + block_hash: anchor_result.block_hash, + timestamp: anchor_result.timestamp, + }; + // Return root time event with no additional remote Merkle nodes // (all nodes are local since we built the entire tree) Ok(RootTimeEvent { proof, detached_time_event, remote_merkle_nodes: MerkleNodes::default(), + chain_inclusion: Some(chain_inclusion), }) } } diff --git a/anchor-remote/src/cas_remote.rs b/anchor-remote/src/cas_remote.rs index 37dbcf861..f66bf6ba8 100644 --- a/anchor-remote/src/cas_remote.rs +++ b/anchor-remote/src/cas_remote.rs @@ -220,6 +220,7 @@ async fn parse_anchor_response(anchor_response: String) -> Result, } impl std::fmt::Debug for TimeEventBatch { @@ -109,6 +113,7 @@ impl std::fmt::Debug for TimeEventBatch { .field("merkle_nodes", &self.merkle_nodes) .field("proof", &self.proof) .field("raw_time_events", &self.raw_time_events) + .field("chain_inclusion", &self.chain_inclusion) .finish() } } @@ -125,12 +130,19 @@ impl TimeEventBatch { merkle_nodes, proof, raw_time_events, + chain_inclusion, } = self; let events = raw_time_events .events .into_iter() .map(|(anchor_request, time_event)| { - Self::build_time_event_insertable(&proof, &merkle_nodes, time_event, anchor_request) + Self::build_time_event_insertable( + &proof, + &merkle_nodes, + time_event, + anchor_request, + chain_inclusion.clone(), + ) }) .collect::>>()?; Ok(events) @@ -142,6 +154,7 @@ impl TimeEventBatch { merkle_nodes: &MerkleNodes, time_event: RawTimeEvent, anchor_request: AnchorRequest, + chain_inclusion: Option, ) -> Result { let time_event_cid = time_event.to_cid().context(format!( "could not serialize time event for {} with batch proof {}", @@ -180,6 +193,7 @@ impl TimeEventBatch { ))?, cid: time_event_cid, event: TimeEvent::new(time_event.clone(), proof.clone(), blocks_in_path), + chain_inclusion, }) } @@ -223,6 +237,8 @@ pub struct TimeEventInsertable { pub cid: Cid, /// The parsed structure containing the Time Event pub event: TimeEvent, + /// Chain inclusion data for self-anchored events (None for remote CAS) + pub chain_inclusion: Option, } impl std::fmt::Debug for TimeEventInsertable { @@ -231,6 +247,7 @@ impl std::fmt::Debug for TimeEventInsertable { .field("event_id", &self.event_id.to_string()) .field("cid", &self.cid.to_string()) .field("event", &self.event) + .field("chain_inclusion", &self.chain_inclusion) .finish() } } diff --git a/anchor-service/src/anchor_batch.rs b/anchor-service/src/anchor_batch.rs index 283a958b9..2d66a0109 100644 --- a/anchor-service/src/anchor_batch.rs +++ b/anchor-service/src/anchor_batch.rs @@ -192,6 +192,7 @@ impl AnchorService { proof, detached_time_event, mut remote_merkle_nodes, + chain_inclusion, } = self.tx_manager.anchor_root(root_cid).await?; let time_events = build_time_events(anchor_requests, &detached_time_event, count)?; remote_merkle_nodes.extend(local_merkle_nodes); @@ -199,6 +200,7 @@ impl AnchorService { merkle_nodes: remote_merkle_nodes, proof, raw_time_events: time_events, + chain_inclusion, }) } diff --git a/anchor-service/src/cas_mock.rs b/anchor-service/src/cas_mock.rs index 9604f9c88..930763e24 100644 --- a/anchor-service/src/cas_mock.rs +++ b/anchor-service/src/cas_mock.rs @@ -32,6 +32,7 @@ impl TransactionManager for MockCas { proof, }, remote_merkle_nodes: Default::default(), + chain_inclusion: None, }) } } diff --git a/anchor-service/src/lib.rs b/anchor-service/src/lib.rs index 2057a4f68..2c6969eed 100644 --- a/anchor-service/src/lib.rs +++ b/anchor-service/src/lib.rs @@ -12,4 +12,6 @@ mod transaction_manager; pub use anchor::{AnchorRequest, MerkleNode, MerkleNodes, TimeEventBatch, TimeEventInsertable}; pub use anchor_batch::{AnchorService, Store}; pub use cas_mock::{MockAnchorEventService, MockCas}; -pub use transaction_manager::{DetachedTimeEvent, RootTimeEvent, TransactionManager}; +pub use transaction_manager::{ + ChainInclusionData, DetachedTimeEvent, RootTimeEvent, TransactionManager, +}; diff --git a/anchor-service/src/test-data/test_anchor_batch_with_10_requests.test.txt b/anchor-service/src/test-data/test_anchor_batch_with_10_requests.test.txt index 1a69533e1..5a001c706 100644 --- a/anchor-service/src/test-data/test_anchor_batch_with_10_requests.test.txt +++ b/anchor-service/src/test-data/test_anchor_batch_with_10_requests.test.txt @@ -185,4 +185,5 @@ TimeEventBatch { }, ), ], + chain_inclusion: None, } diff --git a/anchor-service/src/test-data/test_anchor_batch_with_1_request.test.txt b/anchor-service/src/test-data/test_anchor_batch_with_1_request.test.txt index 98e921928..a5afb7cbc 100644 --- a/anchor-service/src/test-data/test_anchor_batch_with_1_request.test.txt +++ b/anchor-service/src/test-data/test_anchor_batch_with_1_request.test.txt @@ -22,4 +22,5 @@ TimeEventBatch { }, ), ], + chain_inclusion: None, } diff --git a/anchor-service/src/test-data/test_anchor_batch_with_less_than_pow2_requests.test.txt b/anchor-service/src/test-data/test_anchor_batch_with_less_than_pow2_requests.test.txt index c95c1704e..a9f63be6a 100644 --- a/anchor-service/src/test-data/test_anchor_batch_with_less_than_pow2_requests.test.txt +++ b/anchor-service/src/test-data/test_anchor_batch_with_less_than_pow2_requests.test.txt @@ -275,4 +275,5 @@ TimeEventBatch { }, ), ], + chain_inclusion: None, } diff --git a/anchor-service/src/test-data/test_anchor_batch_with_more_than_pow2_requests.test.txt b/anchor-service/src/test-data/test_anchor_batch_with_more_than_pow2_requests.test.txt index ab5ea0de8..84ea9fc8c 100644 --- a/anchor-service/src/test-data/test_anchor_batch_with_more_than_pow2_requests.test.txt +++ b/anchor-service/src/test-data/test_anchor_batch_with_more_than_pow2_requests.test.txt @@ -329,4 +329,5 @@ TimeEventBatch { }, ), ], + chain_inclusion: None, } diff --git a/anchor-service/src/test-data/test_anchor_batch_with_pow2_requests.test.txt b/anchor-service/src/test-data/test_anchor_batch_with_pow2_requests.test.txt index 14c2aefac..f4649b356 100644 --- a/anchor-service/src/test-data/test_anchor_batch_with_pow2_requests.test.txt +++ b/anchor-service/src/test-data/test_anchor_batch_with_pow2_requests.test.txt @@ -293,4 +293,5 @@ TimeEventBatch { }, ), ], + chain_inclusion: None, } diff --git a/anchor-service/src/test-data/test_anchor_service_run.txt b/anchor-service/src/test-data/test_anchor_service_run.txt index b66d492dd..7dd871a10 100644 --- a/anchor-service/src/test-data/test_anchor_service_run.txt +++ b/anchor-service/src/test-data/test_anchor_service_run.txt @@ -34,6 +34,7 @@ ], ], }, + chain_inclusion: None, }, TimeEventInsertable { event_id: "CE010500BA25076D730241E745CC7C072FF729EA683B75170171122072D0B0181556FFCE636242C424188FF826014A651DD225D511200E9C85374865", @@ -70,6 +71,7 @@ ], ], }, + chain_inclusion: None, }, TimeEventInsertable { event_id: "CE010500BA25076D730241E745CC7C072FF729EA683B7517017112206FEF93C0B67F1C93B2574F46BFBC7274592E8C2349921722BB426117B6082726", @@ -106,6 +108,7 @@ ], ], }, + chain_inclusion: None, }, TimeEventInsertable { event_id: "CE010500BA25076D730241E745CC7C072FF729EA683B7517017112204AC042CCE1C67A1523D01720FA4B5AAB5E5E3632DC10DC1451C9F19B86901610", @@ -142,6 +145,7 @@ ], ], }, + chain_inclusion: None, }, TimeEventInsertable { event_id: "CE010500BA25076D730241E745CC7C072FF729EA683B7517017112200FDC7F813BADA92906BC21E1C9523E2C08FDB6D370EDD8F8517AF15E50CEDC68", @@ -178,6 +182,7 @@ ], ], }, + chain_inclusion: None, }, TimeEventInsertable { event_id: "CE010500BA25076D730241E745CC7C072FF729EA683B751701711220D93A47DF5839FC52001D506F5D54ABFBBAAA75AE81801CE68017BE8B674D2527", @@ -214,6 +219,7 @@ ], ], }, + chain_inclusion: None, }, TimeEventInsertable { event_id: "CE010500BA25076D730241E745CC7C072FF729EA683B7517017112208FD9C6536FEEFE3026D8838398261AE9EB8F250B16E8195358F5893364D53FCA", @@ -250,6 +256,7 @@ ], ], }, + chain_inclusion: None, }, TimeEventInsertable { event_id: "CE010500BA25076D730241E745CC7C072FF729EA683B751701711220F84574367985116A160937E9DFA2FAF7A02013A001C4030DE78EF71C580B0F41", @@ -286,6 +293,7 @@ ], ], }, + chain_inclusion: None, }, TimeEventInsertable { event_id: "CE010500BA25076D730241E745CC7C072FF729EA683B75170171122031F627FD20CF062DBC6F34FE250D077C8711F1EF4B18F8E4777D0B021C13A067", @@ -314,6 +322,7 @@ ], ], }, + chain_inclusion: None, }, TimeEventInsertable { event_id: "CE010500BA25076D730241E745CC7C072FF729EA683B7517017112205502894B152BB78E4EE8E5C665AE5D92AF339642A10EB481F7FC877CF117F0FE", @@ -342,5 +351,6 @@ ], ], }, + chain_inclusion: None, }, ] diff --git a/anchor-service/src/test-data/test_anchor_service_run_1.txt b/anchor-service/src/test-data/test_anchor_service_run_1.txt index da9ecb853..4014d20ab 100644 --- a/anchor-service/src/test-data/test_anchor_service_run_1.txt +++ b/anchor-service/src/test-data/test_anchor_service_run_1.txt @@ -17,5 +17,6 @@ }, blocks_in_path: [], }, + chain_inclusion: None, }, ] diff --git a/anchor-service/src/transaction_manager.rs b/anchor-service/src/transaction_manager.rs index 96ec61681..82b09f33a 100644 --- a/anchor-service/src/transaction_manager.rs +++ b/anchor-service/src/transaction_manager.rs @@ -7,6 +7,23 @@ use ceramic_event::unvalidated::AnchorProof; use crate::anchor::MerkleNodes; +/// Chain inclusion data from the blockchain transaction. +/// This contains the data needed to verify the anchor proof against the blockchain, +/// and is persisted to avoid redundant RPC calls during pipeline validation. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChainInclusionData { + /// The chain ID in CAIP-2 format (e.g., "eip155:100") + pub chain_id: String, + /// The transaction hash (e.g., "0x...") + pub transaction_hash: String, + /// The transaction input data (function selector + root hash, e.g., "0x97ad09eb...") + pub transaction_input: String, + /// The block hash containing the transaction + pub block_hash: String, + /// The block timestamp (Unix timestamp in seconds) + pub timestamp: u64, +} + /// A struct containing a blockchain proof CID, the path prefix to the CID in the anchored Merkle tree and the /// corresponding Merkle tree nodes. pub struct RootTimeEvent { @@ -16,6 +33,8 @@ pub struct RootTimeEvent { pub detached_time_event: DetachedTimeEvent, /// the Merkle tree nodes from the remote anchoring service pub remote_merkle_nodes: MerkleNodes, + /// Chain inclusion data for self-anchored events (None for remote CAS) + pub chain_inclusion: Option, } impl std::fmt::Debug for RootTimeEvent { @@ -30,6 +49,7 @@ impl std::fmt::Debug for RootTimeEvent { .field("proof", &self.proof) .field("detached_time_event", &self.detached_time_event) .field("remote_merkle_nodes", &merkle_tree_nodes) + .field("chain_inclusion", &self.chain_inclusion) .finish() } } diff --git a/event-svc/src/blockchain/mod.rs b/event-svc/src/blockchain/mod.rs index 5e35e113c..ac893a6aa 100644 --- a/event-svc/src/blockchain/mod.rs +++ b/event-svc/src/blockchain/mod.rs @@ -10,3 +10,98 @@ pub mod eth_rpc; pub(crate) fn tx_hash_try_from_cid(cid: Cid) -> anyhow::Result { Ok(TxHash::from_str(&hex::encode(cid.hash().digest()))?) } + +#[cfg(test)] +mod tests { + use super::*; + use multihash_codetable::{Code, MultihashDigest}; + + /// Ethereum transaction codec for IPLD (from multicodec table) + /// This matches the constant in anchor-evm/src/proof_builder.rs + const ETH_TX_CODEC: u64 = 0x93; + + /// Simulate what ProofBuilder::tx_hash_to_cid does + fn simulate_proof_builder_tx_hash_to_cid(tx_hash: &str) -> Cid { + let hex_str = tx_hash.strip_prefix("0x").unwrap_or(tx_hash); + let tx_bytes = hex::decode(hex_str).unwrap(); + let multihash = Code::Keccak256.wrap(&tx_bytes).unwrap(); + Cid::new_v1(ETH_TX_CODEC, multihash) + } + + #[test] + fn test_tx_hash_roundtrip_self_anchor_to_lookup() { + // critical invariant: what self-anchoring stores must match what lookup queries + let original_tx_hash = "0xa1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"; + + // Step 1: ProofBuilder creates a CID from the tx_hash (simulated here) + let tx_cid = simulate_proof_builder_tx_hash_to_cid(original_tx_hash); + + // Step 2: Validation converts CID back to tx_hash for lookup + let lookup_tx_hash = tx_hash_try_from_cid(tx_cid).unwrap().to_string(); + + // Step 3: Verify they match (both should be 0x-prefixed lowercase hex) + assert_eq!( + original_tx_hash.to_lowercase(), + lookup_tx_hash.to_lowercase(), + "Self-anchored tx_hash must match lookup tx_hash for chain proof discovery to work" + ); + } + + #[test] + fn test_tx_hash_roundtrip_without_0x_prefix() { + // Test that round-trip works even without 0x prefix in original + let original_tx_hash = "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"; + + let tx_cid = simulate_proof_builder_tx_hash_to_cid(original_tx_hash); + let lookup_tx_hash = tx_hash_try_from_cid(tx_cid).unwrap().to_string(); + + // TxHash::to_string() always adds 0x prefix + assert_eq!( + format!("0x{}", original_tx_hash.to_lowercase()), + lookup_tx_hash.to_lowercase() + ); + } + + #[test] + fn test_tx_hash_format_matches_alloy_txhash() { + // Verify that our round-trip produces the same format as alloy's TxHash::to_string() + let tx_hash_hex = "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"; + let tx_cid = simulate_proof_builder_tx_hash_to_cid(tx_hash_hex); + + let recovered = tx_hash_try_from_cid(tx_cid).unwrap(); + + // Create TxHash directly from the same hex + let direct = TxHash::from_str(tx_hash_hex).unwrap(); + + assert_eq!(recovered, direct); + assert_eq!(recovered.to_string(), direct.to_string()); + } + + #[test] + fn test_tx_hash_roundtrip_mixed_case() { + // Test that mixed-case hex input produces correct round-trip. + // Ethereum tx hashes are case-insensitive, but our storage uses lowercase. + // This verifies the normalization works correctly. + let mixed_case_tx_hash = + "0xAbCdEf1234567890AbCdEf1234567890AbCdEf1234567890AbCdEf1234567890"; + + // Step 1: ProofBuilder creates a CID (hex decoding is case-insensitive) + let tx_cid = simulate_proof_builder_tx_hash_to_cid(mixed_case_tx_hash); + + // Step 2: Validation converts CID back to tx_hash for lookup + let lookup_tx_hash = tx_hash_try_from_cid(tx_cid).unwrap().to_string(); + + // Step 3: The lookup should produce lowercase (TxHash::to_string() uses lowercase) + assert_eq!( + lookup_tx_hash, "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890", + "Mixed-case input should normalize to lowercase for lookup" + ); + + // Step 4: Verify case-insensitive equality with original + assert_eq!( + mixed_case_tx_hash.to_lowercase(), + lookup_tx_hash, + "Normalized tx_hash must match original (case-insensitive)" + ); + } +} diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index 9e2a3a40b..29b74cfc7 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -544,23 +544,42 @@ impl EventService { } /// Get the chain proof for a given event from the database. - /// All proofs should have been validated and stored during the event validation phase (v0.55.0+). - async fn discover_chain_proof( + /// All proofs should have been validated and stored during the event validation phase (v0.55.0+), + /// but in case it can't be found we try to dynamically discover it through the RPC provider + pub(crate) async fn discover_chain_proof( &self, event: &ceramic_event::unvalidated::TimeEvent, ) -> std::result::Result { let tx_hash = event.proof().tx_hash(); let tx_hash = tx_hash_try_from_cid(tx_hash).unwrap().to_string(); - self.event_access + if let Some(proof) = self + .event_access .get_chain_proof(event.proof().chain_id(), &tx_hash) .await .map_err(|e| crate::eth_rpc::Error::Application(e.into()))? - .ok_or_else(|| { - crate::eth_rpc::Error::InvalidProof(format!( - "Chain proof for tx {} not found in database.", - tx_hash - )) - }) + { + return Ok(proof); + } + + warn!( + "Chain proof for tx {} not found in database, validating and storing it now.", + tx_hash + ); + + // Try using the RPC provider and store the proof + let proof = self + .event_validator + .time_event_validator() + .validate_chain_inclusion(event) + .await?; + + let proof = ChainProof::from(proof); + self.event_access + .persist_chain_inclusion_proofs(std::slice::from_ref(&proof)) + .await + .map_err(|e| crate::eth_rpc::Error::Application(e.into()))?; + + Ok(proof) } } diff --git a/event-svc/src/event/store.rs b/event-svc/src/event/store.rs index 146894d7d..ac447b97b 100644 --- a/event-svc/src/event/store.rs +++ b/event-svc/src/event/store.rs @@ -11,7 +11,7 @@ use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; use tracing::info; use crate::event::{DeliverableRequirement, EventService}; -use crate::store::{BlockAccess, EventInsertable}; +use crate::store::{BlockAccess, ChainProof, EventInsertable}; use crate::Error; use super::service::{InsertResult, ValidationError, ValidationRequirement}; @@ -260,6 +260,28 @@ impl ceramic_anchor_service::Store for EventService { items: Vec, informant: NodeId, ) -> Result<()> { + // Extract chain inclusion data from the first item that has it. + // All items in a batch share the same chain inclusion data since they're from the same anchor tx. + let chain_inclusion = items.iter().find_map(|i| i.chain_inclusion.clone()); + + // Persist chain inclusion proof FIRST for self-anchored events. + // This aligns with the external event flow (service.rs persists proofs before events) + // and ensures events are never orphaned without their chain proofs. + // + // NOTE: These operations are NOT wrapped in a single transaction. Failure scenarios: + // - If proof persistence fails: early return, no events inserted (safe) + // - If proof succeeds but event insertion fails: orphaned proof remains in DB + // This is acceptable because: + // 1. Orphaned proofs are harmless (just extra data, no foreign key constraints) + // 2. The proof can be used if the events are retried later + if let Some(chain_data) = chain_inclusion { + let proof: ChainProof = chain_data.into(); + self.event_access + .persist_chain_inclusion_proofs(&[proof]) + .await + .context("failed to persist chain inclusion proof for self-anchored events")?; + } + let items = items .into_iter() .map(|insertable| { @@ -277,6 +299,7 @@ impl ceramic_anchor_service::Store for EventService { .insert_many(items.iter()) .await .context("anchoring insert_many failed")?; + Ok(()) } diff --git a/event-svc/src/event/validator/event.rs b/event-svc/src/event/validator/event.rs index 5e92f935f..18f09fcd5 100644 --- a/event-svc/src/event/validator/event.rs +++ b/event-svc/src/event/validator/event.rs @@ -276,6 +276,9 @@ impl EventValidator { } } + pub fn time_event_validator(&self) -> &TimeEventValidator { + &self.time_event_validator + } } #[cfg(test)] diff --git a/event-svc/src/event/validator/time.rs b/event-svc/src/event/validator/time.rs index 1e14dd496..5ae8f60aa 100644 --- a/event-svc/src/event/validator/time.rs +++ b/event-svc/src/event/validator/time.rs @@ -90,7 +90,35 @@ impl TimeEventValidator { // Compare the root hash in the TimeEvent's AnchorProof to the root hash that was actually // included in the transaction onchain. We compare hashes (not full CIDs) because the // blockchain only stores the hash - the codec is not preserved on-chain. - if chain_proof.root_cid.hash() != event.proof().root().hash() { + let chain_digest = chain_proof.root_cid.hash().digest(); + let event_proof_root = event.proof().root(); + let proof_digest = event_proof_root.hash().digest(); + + if chain_digest != proof_digest { + // During clay self-anchor rollout, some anchors were made with the incorrect data. + // anchor-evm left the codec byte (0x20) as a prefix, resulting in the last byte of + // data being discarded. As a fallback, we allow matching on 31 bytes before 2026-02-01 + if chain_id.to_string() == "eip155:100" + && chain_proof.timestamp.as_unix_ts() < 1769904000u64 + && chain_digest.first() == Some(&0x20) + { + warn!( + "falling back to relaxed check for codec-shifted anchor (chain digest={}, proof digest={})", + hex::encode(chain_digest), + hex::encode(proof_digest), + ); + + if chain_digest.get(1..) == proof_digest.get(..31) { + warn!("relaxed check passed, accepting proof with shifted digest"); + return Ok(chain_proof); + } + + return Err(eth_rpc::Error::InvalidProof(format!( + "relaxed check failed: shifted digest mismatch (chain digest={}, proof digest={})", + hex::encode(chain_digest), + hex::encode(proof_digest) + ))); + } return Err(eth_rpc::Error::InvalidProof(format!( "the root hash is not in the transaction (anchor proof root={}, blockchain transaction root={})", event.proof().root(), @@ -347,4 +375,109 @@ mod test { }, } } + + /// Create a Gnosis chain time event for testing the codec-shifted digest fallback + fn time_event_gnosis() -> unvalidated::TimeEvent { + unvalidated::Builder::time() + .with_id( + Cid::from_str("bagcqcerar2aga7747dm6fota3iipogz4q55gkaamcx2weebs6emvtvie2oha") + .unwrap(), + ) + .with_tx( + "eip155:100".into(), // Gnosis chain + Cid::from_str("bagjqcgzadp7fstu7fz5tfi474ugsjqx5h6yvevn54w5m4akayhegdsonwciq") + .unwrap(), + "f(bytes32)".into(), + ) + .with_root(0, ipld_core::ipld! {[Cid::from_str("bagcqcerae5oqoglzjjgz53enwsttl7mqglp5eoh2llzbbvfktmzxleeiffbq").unwrap(), Ipld::Null, Cid::from_str("bafyreifjkogkhyqvr2gtymsndsfg3wpr7fg4q5r3opmdxoddfj4s2dyuoa").unwrap()]}) + .build() + .expect("should be valid time event") + } + + /// Creates a CID with a shifted digest simulating the anchor-evm bug: + /// [0x20, original[0..31]] instead of [original[0..32]] + fn create_shifted_digest_cid(original_cid: &Cid) -> Cid { + let original_digest = original_cid.hash().digest(); + let mut shifted = [0u8; 32]; + shifted[0] = 0x20; // codec byte that was accidentally included + shifted[1..].copy_from_slice(&original_digest[..31]); + + let mh = multihash::Multihash::<64>::wrap(0x12, &shifted).expect("valid multihash"); + Cid::new_v1(original_cid.codec(), mh) + } + + async fn get_mock_gnosis_provider( + input: unvalidated::AnchorProof, + root_cid: Cid, + timestamp: Timestamp, + ) -> TimeEventValidator { + let mut mock_provider = MockEthRpcProviderTest::new(); + let chain_id = caip2::ChainId::from_str("eip155:100").expect("eip155:100 is a valid chain"); + + mock_provider + .expect_chain_id() + .once() + .return_const(chain_id.clone()); + mock_provider + .expect_get_chain_inclusion_proof() + .once() + .with(predicate::eq(input)) + .return_once(move |_| { + Ok(eth_rpc::ChainInclusionProof { + timestamp, + root_cid, + block_hash: "0x0".to_string(), + metadata: ChainProofMetadata { + chain_id, + tx_hash: "0x0".to_string(), + tx_input: "0x0".to_string(), + }, + }) + }); + TimeEventValidator::new_with_providers(vec![Arc::new(mock_provider)]) + } + + #[test(tokio::test)] + async fn valid_proof_codec_shifted_digest_gnosis() { + let event = time_event_gnosis(); + let shifted_root = create_shifted_digest_cid(&event.proof().root()); + // Timestamp before 2026-02-01 cutoff (1769904000) + let old_timestamp = Timestamp::from_unix_ts(1700000000); + + let verifier = + get_mock_gnosis_provider(event.proof().clone(), shifted_root, old_timestamp.clone()) + .await; + + match verifier.validate_chain_inclusion(&event).await { + Ok(proof) => { + assert_eq!(proof.timestamp, old_timestamp); + } + Err(e) => panic!("should have passed with relaxed check: {:?}", e), + } + } + + #[test(tokio::test)] + async fn invalid_proof_codec_shifted_after_cutoff() { + let event = time_event_gnosis(); + let shifted_root = create_shifted_digest_cid(&event.proof().root()); + // Timestamp AFTER 2026-02-01 cutoff - should reject + let future_timestamp = Timestamp::from_unix_ts(1769904001); + + let verifier = + get_mock_gnosis_provider(event.proof().clone(), shifted_root, future_timestamp).await; + + match verifier.validate_chain_inclusion(&event).await { + Ok(v) => panic!("should have failed after cutoff: {:?}", v), + Err(e) => match e { + eth_rpc::Error::InvalidProof(msg) => { + assert!( + msg.contains("the root hash is not in the transaction"), + "{}", + msg + ); + } + err => panic!("got wrong error: {:?}", err), + }, + } + } } diff --git a/event-svc/src/store/sql/entities/chain_proof.rs b/event-svc/src/store/sql/entities/chain_proof.rs index 127e750b8..9f72fe145 100644 --- a/event-svc/src/store/sql/entities/chain_proof.rs +++ b/event-svc/src/store/sql/entities/chain_proof.rs @@ -31,3 +31,85 @@ impl From for ChainProof { } } } + +impl From for ChainProof { + fn from(value: ceramic_anchor_service::ChainInclusionData) -> Self { + Self { + chain_id: value.chain_id, + transaction_hash: value.transaction_hash, + transaction_input: value.transaction_input, + block_hash: value.block_hash, + timestamp: value + .timestamp + .try_into() + .expect("chain proof timestamp overflow"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ceramic_anchor_service::ChainInclusionData; + + #[test] + fn test_chain_proof_from_chain_inclusion_data() { + let data = ChainInclusionData { + chain_id: "eip155:100".to_string(), + transaction_hash: "0xabc123def456abc123def456abc123def456abc123def456abc123def456abc1" + .to_string(), + transaction_input: "0x97ad09eb1234567890abcdef1234567890abcdef1234567890abcdef12345678" + .to_string(), + block_hash: "0xdef456abc123def456abc123def456abc123def456abc123def456abc123def4" + .to_string(), + timestamp: 1704067200, + }; + + let proof: ChainProof = data.into(); + + assert_eq!(proof.chain_id, "eip155:100"); + assert_eq!( + proof.transaction_hash, + "0xabc123def456abc123def456abc123def456abc123def456abc123def456abc1" + ); + assert_eq!( + proof.transaction_input, + "0x97ad09eb1234567890abcdef1234567890abcdef1234567890abcdef12345678" + ); + assert_eq!( + proof.block_hash, + "0xdef456abc123def456abc123def456abc123def456abc123def456abc123def4" + ); + assert_eq!(proof.timestamp, 1704067200); + } + + #[test] + fn test_chain_proof_from_chain_inclusion_data_preserves_chain_id_format() { + for chain_id in ["eip155:1", "eip155:100", "eip155:137", "eip155:42161"] { + let data = ChainInclusionData { + chain_id: chain_id.to_string(), + transaction_hash: "0x".to_string() + &"a".repeat(64), + transaction_input: "0x".to_string() + &"b".repeat(72), + block_hash: "0x".to_string() + &"c".repeat(64), + timestamp: 1704067200, + }; + + let proof: ChainProof = data.into(); + assert_eq!(proof.chain_id, chain_id); + } + } + + #[test] + fn test_chain_proof_timestamp_zero() { + let data = ChainInclusionData { + chain_id: "eip155:1".to_string(), + transaction_hash: "0x".to_string() + &"a".repeat(64), + transaction_input: "0x".to_string() + &"b".repeat(72), + block_hash: "0x".to_string() + &"c".repeat(64), + timestamp: 0, + }; + + let proof: ChainProof = data.into(); + assert_eq!(proof.timestamp, 0); + } +} diff --git a/event-svc/src/tests/mod.rs b/event-svc/src/tests/mod.rs index ba43b46b5..a80ba9a3e 100644 --- a/event-svc/src/tests/mod.rs +++ b/event-svc/src/tests/mod.rs @@ -1,6 +1,7 @@ mod event; mod migration; mod ordering; +mod self_anchor; use std::{str::FromStr, sync::Arc}; diff --git a/event-svc/src/tests/self_anchor.rs b/event-svc/src/tests/self_anchor.rs new file mode 100644 index 000000000..d45538e47 --- /dev/null +++ b/event-svc/src/tests/self_anchor.rs @@ -0,0 +1,788 @@ +//! Tests for self-anchored events via anchor-evm. +//! +//! These tests verify that chain inclusion proofs created by self-anchoring +//! can be correctly stored and retrieved for event validation. + +use ceramic_anchor_service::ChainInclusionData; +use ceramic_sql::sqlite::SqlitePool; +use cid::Cid; +use multihash_codetable::{Code, MultihashDigest}; + +use crate::blockchain::tx_hash_try_from_cid; +use crate::store::{ChainProof, EventAccess}; + +/// Ethereum transaction codec for IPLD (from multicodec table) +/// This matches the constant in anchor-evm/src/proof_builder.rs +const ETH_TX_CODEC: u64 = 0x93; + +/// Simulate what ProofBuilder::tx_hash_to_cid does in anchor-evm +fn tx_hash_to_cid(tx_hash: &str) -> Cid { + let hex_str = tx_hash.strip_prefix("0x").unwrap_or(tx_hash); + let tx_bytes = hex::decode(hex_str).unwrap(); + let multihash = Code::Keccak256.wrap(&tx_bytes).unwrap(); + Cid::new_v1(ETH_TX_CODEC, multihash) +} + +/// Test that chain proofs persisted by self-anchoring can be retrieved by the lookup path. +/// +/// This is the critical round-trip test that verifies: +/// 1. Self-anchoring stores chain proof with tx_hash as "0x{hex}" +/// 2. Validation lookup converts tx_hash CID back to "0x{hex}" +/// 3. The lookup finds the stored proof +#[tokio::test] +async fn test_self_anchored_chain_proof_roundtrip() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = EventAccess::try_new(pool).await.unwrap(); + + // Create chain inclusion data as self-anchoring would + let tx_hash = "0xa1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"; + let chain_id = "eip155:100"; + let block_hash = "0xdef456abc123def456abc123def456abc123def456abc123def456abc123def4"; + let timestamp = 1704067200u64; + + let chain_data = ChainInclusionData { + chain_id: chain_id.to_string(), + transaction_hash: tx_hash.to_string(), + transaction_input: "0x97ad09eb".to_string() + &"ab".repeat(32), + block_hash: block_hash.to_string(), + timestamp, + }; + + // Persist as self-anchoring does (convert to ChainProof via From impl) + let proof: ChainProof = chain_data.into(); + event_access + .persist_chain_inclusion_proofs(&[proof]) + .await + .unwrap(); + + // Lookup as validation does - directly with the tx_hash string + let found = event_access + .get_chain_proof(chain_id, tx_hash) + .await + .unwrap(); + + assert!( + found.is_some(), + "Chain proof should be found for self-anchored event" + ); + let found = found.unwrap(); + assert_eq!(found.chain_id, chain_id); + assert_eq!(found.transaction_hash, tx_hash); + assert_eq!(found.block_hash, block_hash); + assert_eq!(found.timestamp, timestamp as i64); +} + +/// Test the complete discover_chain_proof path for self-anchored events. +/// +/// This simulates the actual validation flow: +/// 1. Self-anchoring creates a proof with tx_hash and stores ChainInclusionData +/// 2. Later, validation calls discover_chain_proof with a TimeEvent +/// 3. discover_chain_proof extracts tx_hash CID from the proof, converts to hex, and looks up +/// 4. The lookup succeeds and returns the stored chain proof +#[tokio::test] +async fn test_discover_chain_proof_for_self_anchored_event() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = EventAccess::try_new(pool).await.unwrap(); + + // Step 1: Simulate what anchor-evm does + // The actual blockchain tx_hash (as returned by the EVM) + let actual_tx_hash = "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; + let chain_id = "eip155:100"; + let timestamp = 1704067200u64; + + // ProofBuilder creates a tx_hash CID from the actual tx_hash + let tx_hash_cid = tx_hash_to_cid(actual_tx_hash); + + // Self-anchoring creates and stores ChainInclusionData + let chain_data = ChainInclusionData { + chain_id: chain_id.to_string(), + transaction_hash: actual_tx_hash.to_string(), + transaction_input: "0x97ad09eb".to_string() + &"cd".repeat(32), + block_hash: "0x".to_string() + &"ef".repeat(32), + timestamp, + }; + + let proof: ChainProof = chain_data.into(); + event_access + .persist_chain_inclusion_proofs(&[proof]) + .await + .unwrap(); + + // Step 2: Simulate what discover_chain_proof does + // It receives a TimeEvent with proof.tx_hash() returning the tx_hash_cid + // It converts the CID back to a tx_hash string for lookup: + let lookup_tx_hash = tx_hash_try_from_cid(tx_hash_cid).unwrap().to_string(); + + // Step 3: Perform the lookup + let found = event_access + .get_chain_proof(chain_id, &lookup_tx_hash) + .await + .unwrap(); + + // Step 4: Verify lookup succeeds + assert!( + found.is_some(), + "Chain proof should be found via tx_hash CID round-trip. \ + Original tx_hash: {}, Lookup tx_hash: {}", + actual_tx_hash, + lookup_tx_hash + ); + + let found = found.unwrap(); + assert_eq!(found.chain_id, chain_id); + assert_eq!(found.timestamp, timestamp as i64); + + // Verify the tx_hash formats match + assert_eq!( + actual_tx_hash.to_lowercase(), + lookup_tx_hash.to_lowercase(), + "Self-anchored tx_hash must match validation lookup tx_hash" + ); +} + +/// Test that chain proof upserts are idempotent. +/// +/// This verifies that persisting the same chain proof multiple times +/// (e.g., due to retries) doesn't cause errors or duplicate data. +#[tokio::test] +async fn test_chain_proof_upsert_idempotent() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = EventAccess::try_new(pool).await.unwrap(); + + let chain_data = ChainInclusionData { + chain_id: "eip155:100".to_string(), + transaction_hash: "0x".to_string() + &"ab".repeat(32), + transaction_input: "0x97ad09eb".to_string() + &"cd".repeat(32), + block_hash: "0x".to_string() + &"ef".repeat(32), + timestamp: 1704067200, + }; + + let proof: ChainProof = chain_data.clone().into(); + + // Persist the same proof multiple times (simulating retries) + for _ in 0..3 { + event_access + .persist_chain_inclusion_proofs(&[proof.clone()]) + .await + .unwrap(); + } + + // Verify we can still look it up + let found = event_access + .get_chain_proof("eip155:100", &("0x".to_string() + &"ab".repeat(32))) + .await + .unwrap(); + + assert!(found.is_some()); + assert_eq!(found.unwrap().timestamp, 1704067200); +} + +/// Test chain proof lookup with different chain IDs. +/// +/// Verifies that chain proofs are correctly scoped by chain_id. +#[tokio::test] +async fn test_chain_proof_scoped_by_chain_id() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = EventAccess::try_new(pool).await.unwrap(); + + let tx_hash = "0x".to_string() + &"ab".repeat(32); + + // Store proof for chain 100 + let chain_data_100 = ChainInclusionData { + chain_id: "eip155:100".to_string(), + transaction_hash: tx_hash.clone(), + transaction_input: "0x97ad09eb".to_string() + &"cd".repeat(32), + block_hash: "0x".to_string() + &"ef".repeat(32), + timestamp: 1000, + }; + + // Store proof for chain 137 with same tx_hash + let chain_data_137 = ChainInclusionData { + chain_id: "eip155:137".to_string(), + transaction_hash: tx_hash.clone(), + transaction_input: "0x97ad09eb".to_string() + &"de".repeat(32), + block_hash: "0x".to_string() + &"12".repeat(32), + timestamp: 2000, + }; + + let proof_100: ChainProof = chain_data_100.into(); + let proof_137: ChainProof = chain_data_137.into(); + + event_access + .persist_chain_inclusion_proofs(&[proof_100, proof_137]) + .await + .unwrap(); + + // Lookup for chain 100 should return timestamp 1000 + let found_100 = event_access + .get_chain_proof("eip155:100", &tx_hash) + .await + .unwrap(); + assert_eq!(found_100.unwrap().timestamp, 1000); + + // Lookup for chain 137 should return timestamp 2000 + let found_137 = event_access + .get_chain_proof("eip155:137", &tx_hash) + .await + .unwrap(); + assert_eq!(found_137.unwrap().timestamp, 2000); + + // Lookup for non-existent chain should return None + let found_1 = event_access + .get_chain_proof("eip155:1", &tx_hash) + .await + .unwrap(); + assert!(found_1.is_none()); +} + +/// Test that looking up a non-existent chain proof returns None. +#[tokio::test] +async fn test_chain_proof_not_found() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = EventAccess::try_new(pool).await.unwrap(); + + let found = event_access + .get_chain_proof("eip155:100", "0xnonexistent") + .await + .unwrap(); + + assert!(found.is_none()); +} + +/// Test that Store::insert_many correctly persists chain inclusion proofs. +/// +/// This tests the full integration through the ceramic_anchor_service::Store trait, +/// verifying that when TimeEventInsertable items have chain_inclusion data, +/// the chain proof is persisted before the events. +#[tokio::test] +async fn test_store_insert_many_persists_chain_proof() { + use ceramic_anchor_service::{Store, TimeEventInsertable}; + use ceramic_core::{EventId, Network, NodeKey, StreamId}; + use ceramic_event::unvalidated; + + use crate::{EventService, UndeliveredEventReview}; + + let pool = SqlitePool::connect_in_memory().await.unwrap(); + + // Create EventService (which implements Store) + let service = EventService::try_new(pool.clone(), UndeliveredEventReview::Skip, false, vec![]) + .await + .unwrap(); + + // Create test data: a TimeEvent with chain_inclusion + let tx_hash = "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; + let chain_id = "eip155:100"; + let timestamp = 1704067200u64; + + // Create tx_hash CID (simulating what ProofBuilder does) + let tx_hash_cid = tx_hash_to_cid(tx_hash); + + // Create a minimal time event + let init_cid = super::deterministic_cid(b"test init cid"); + let prev_cid = super::deterministic_cid(b"test prev cid"); + + let time_event = unvalidated::Builder::time() + .with_id(init_cid) + .with_tx(chain_id.to_string(), tx_hash_cid, "f(bytes32)".to_string()) + .with_prev(prev_cid) + .build() + .expect("test time event should build"); + + let time_event_cid = time_event.to_cid().expect("time event should encode"); + + // Create a model StreamId for the EventId + let model = StreamId::document(super::random_cid()); + let event_id = EventId::builder() + .with_network(&Network::DevUnstable) + .with_sep("model", &model.to_vec()) + .with_controller(super::CONTROLLER) + .with_init(&init_cid) + .with_event(&time_event_cid) + .build(); + + // Create ChainInclusionData (simulating what anchor-evm produces) + let chain_data = ChainInclusionData { + chain_id: chain_id.to_string(), + transaction_hash: tx_hash.to_string(), + transaction_input: "0x97ad09eb".to_string() + &"cd".repeat(32), + block_hash: "0x".to_string() + &"ef".repeat(32), + timestamp, + }; + + let insertable = TimeEventInsertable { + event_id, + cid: time_event_cid, + event: time_event, + chain_inclusion: Some(chain_data), + }; + + // Call Store::insert_many + Store::insert_many(&service, vec![insertable], NodeKey::random().id()) + .await + .unwrap(); + + // Verify the chain proof was persisted + let event_access = EventAccess::try_new(pool).await.unwrap(); + let found = event_access + .get_chain_proof(chain_id, tx_hash) + .await + .unwrap(); + + assert!( + found.is_some(), + "Chain proof should be persisted by Store::insert_many" + ); + let found = found.unwrap(); + assert_eq!(found.chain_id, chain_id); + assert_eq!(found.transaction_hash, tx_hash); + assert_eq!(found.timestamp, timestamp as i64); +} + +/// Test that Store::insert_many works without chain_inclusion (remote CAS case). +#[tokio::test] +async fn test_store_insert_many_without_chain_inclusion() { + use ceramic_anchor_service::{Store, TimeEventInsertable}; + use ceramic_core::{EventId, Network, NodeKey, StreamId}; + use ceramic_event::unvalidated; + + use crate::{EventService, UndeliveredEventReview}; + + let pool = SqlitePool::connect_in_memory().await.unwrap(); + + let service = EventService::try_new(pool.clone(), UndeliveredEventReview::Skip, false, vec![]) + .await + .unwrap(); + + // Create test data without chain_inclusion (simulating remote CAS) + let tx_hash_cid = + tx_hash_to_cid("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"); + let init_cid = super::deterministic_cid(b"test init cid 2"); + let prev_cid = super::deterministic_cid(b"test prev cid 2"); + + let time_event = unvalidated::Builder::time() + .with_id(init_cid) + .with_tx( + "eip155:1".to_string(), + tx_hash_cid, + "f(bytes32)".to_string(), + ) + .with_prev(prev_cid) + .build() + .expect("test time event should build"); + + let time_event_cid = time_event.to_cid().expect("time event should encode"); + + let model = StreamId::document(super::random_cid()); + let event_id = EventId::builder() + .with_network(&Network::DevUnstable) + .with_sep("model", &model.to_vec()) + .with_controller(super::CONTROLLER) + .with_init(&init_cid) + .with_event(&time_event_cid) + .build(); + + let insertable = TimeEventInsertable { + event_id, + cid: time_event_cid, + event: time_event, + chain_inclusion: None, // No chain inclusion for remote CAS + }; + + // Should succeed without error even without chain_inclusion + Store::insert_many(&service, vec![insertable], NodeKey::random().id()) + .await + .unwrap(); + + // Verify no chain proof was persisted + let event_access = EventAccess::try_new(pool).await.unwrap(); + let found = event_access + .get_chain_proof( + "eip155:1", + "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", + ) + .await + .unwrap(); + + assert!( + found.is_none(), + "No chain proof should be persisted when chain_inclusion is None" + ); +} + +/// Test that verifies ordering invariant: chain proofs and events both exist after Store::insert_many. +/// +/// The implementation in event/store.rs persists chain proofs BEFORE events. +/// This ordering is important because: +/// 1. If the process crashes after proof persistence but before event insertion, +/// we have an orphaned proof (harmless - just extra data). +/// 2. If the order were reversed (events first), a crash would leave events +/// without chain proofs, causing validation failures. +/// +/// This test verifies both artifacts exist after successful insertion, which +/// implicitly validates that the proof persistence completed (since we can query it). +#[tokio::test] +async fn test_store_insert_many_ordering_invariant() { + use ceramic_anchor_service::{Store, TimeEventInsertable}; + use ceramic_core::{EventId, Network, NodeKey, StreamId}; + use ceramic_event::unvalidated; + + use crate::{EventService, UndeliveredEventReview}; + + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let service = EventService::try_new(pool.clone(), UndeliveredEventReview::Skip, false, vec![]) + .await + .unwrap(); + + // Create multiple time events in a batch + let tx_hash = "0x1111111111111111111111111111111111111111111111111111111111111111"; + let chain_id = "eip155:100"; + let timestamp = 1704067200u64; + let tx_hash_cid = tx_hash_to_cid(tx_hash); + + let mut insertables = Vec::new(); + let mut event_cids = Vec::new(); + + // Create 3 time events in the same batch (sharing the same chain proof) + for i in 0..3 { + let init_cid = super::deterministic_cid(format!("ordering test init {}", i).as_bytes()); + let prev_cid = super::deterministic_cid(format!("ordering test prev {}", i).as_bytes()); + + let time_event = unvalidated::Builder::time() + .with_id(init_cid) + .with_tx(chain_id.to_string(), tx_hash_cid, "f(bytes32)".to_string()) + .with_prev(prev_cid) + .build() + .expect("test time event should build"); + + let time_event_cid = time_event.to_cid().expect("time event should encode"); + event_cids.push(time_event_cid); + + let model = StreamId::document(super::random_cid()); + let event_id = EventId::builder() + .with_network(&Network::DevUnstable) + .with_sep("model", &model.to_vec()) + .with_controller(super::CONTROLLER) + .with_init(&init_cid) + .with_event(&time_event_cid) + .build(); + + // All items in a batch share the same chain_inclusion (cloned from TimeEventBatch). + // Store::insert_many uses find_map to extract any one of them (they're all identical). + let chain_inclusion = Some(ChainInclusionData { + chain_id: chain_id.to_string(), + transaction_hash: tx_hash.to_string(), + transaction_input: "0x97ad09eb".to_string() + &"11".repeat(32), + block_hash: "0x".to_string() + &"22".repeat(32), + timestamp, + }); + + insertables.push(TimeEventInsertable { + event_id, + cid: time_event_cid, + event: time_event, + chain_inclusion, + }); + } + + // Insert the batch + Store::insert_many(&service, insertables, NodeKey::random().id()) + .await + .unwrap(); + + // Verify the chain proof exists (proves proof persistence completed) + let event_access = EventAccess::try_new(pool.clone()).await.unwrap(); + let found_proof = event_access + .get_chain_proof(chain_id, tx_hash) + .await + .unwrap(); + + assert!( + found_proof.is_some(), + "Chain proof must exist after Store::insert_many completes" + ); + + // Verify all events exist (proves event persistence completed) + for (i, event_cid) in event_cids.iter().enumerate() { + let (exists, _delivered) = event_access.deliverable_by_cid(event_cid).await.unwrap(); + assert!( + exists, + "Event {} with CID {} must exist after Store::insert_many completes", + i, event_cid + ); + } +} + +/// Test that EventService::discover_chain_proof correctly retrieves chain proofs for self-anchored events. +/// +/// This tests the actual production code path used by the pipeline's ConclusionFeed, +/// verifying that: +/// 1. Chain proof is stored via Store::insert_many (self-anchoring path) +/// 2. discover_chain_proof extracts tx_hash from TimeEvent.proof().tx_hash() +/// 3. discover_chain_proof converts the CID to hex and looks up the proof +/// 4. The returned proof matches what was stored +#[tokio::test] +async fn test_discover_chain_proof_direct() { + use ceramic_anchor_service::{Store, TimeEventInsertable}; + use ceramic_core::{EventId, Network, NodeKey, StreamId}; + use ceramic_event::unvalidated; + + use crate::{EventService, UndeliveredEventReview}; + + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let service = EventService::try_new(pool.clone(), UndeliveredEventReview::Skip, false, vec![]) + .await + .unwrap(); + + // Set up test data + let tx_hash = "0xfeedfeedfeedfeedfeedfeedfeedfeedfeedfeedfeedfeedfeedfeedfeedfeed"; + let chain_id = "eip155:100"; + let timestamp = 1704067200u64; + let block_hash = "0x".to_string() + &"aa".repeat(32); + let tx_input = "0x97ad09eb".to_string() + &"bb".repeat(32); + + // Create tx_hash CID (simulating what ProofBuilder does) + let tx_hash_cid = tx_hash_to_cid(tx_hash); + + // Create a time event with this tx_hash CID + let init_cid = super::deterministic_cid(b"discover_chain_proof test init"); + let prev_cid = super::deterministic_cid(b"discover_chain_proof test prev"); + + // Create a time event for insertion + let time_event_for_insert = unvalidated::Builder::time() + .with_id(init_cid) + .with_tx(chain_id.to_string(), tx_hash_cid, "f(bytes32)".to_string()) + .with_prev(prev_cid) + .build() + .expect("test time event should build"); + + let time_event_cid = time_event_for_insert + .to_cid() + .expect("time event should encode"); + + // Create EventId and ChainInclusionData + let model = StreamId::document(super::random_cid()); + let event_id = EventId::builder() + .with_network(&Network::DevUnstable) + .with_sep("model", &model.to_vec()) + .with_controller(super::CONTROLLER) + .with_init(&init_cid) + .with_event(&time_event_cid) + .build(); + + let chain_data = ChainInclusionData { + chain_id: chain_id.to_string(), + transaction_hash: tx_hash.to_string(), + transaction_input: tx_input.clone(), + block_hash: block_hash.clone(), + timestamp, + }; + + let insertable = TimeEventInsertable { + event_id, + cid: time_event_cid, + event: time_event_for_insert, + chain_inclusion: Some(chain_data), + }; + + // Insert via Store::insert_many (the self-anchoring path) + Store::insert_many(&service, vec![insertable], NodeKey::random().id()) + .await + .unwrap(); + + // Create an identical time event for the lookup (TimeEvent doesn't impl Clone) + let time_event_for_lookup = unvalidated::Builder::time() + .with_id(init_cid) + .with_tx(chain_id.to_string(), tx_hash_cid, "f(bytes32)".to_string()) + .with_prev(prev_cid) + .build() + .expect("test time event should build"); + + // Now call discover_chain_proof directly on the EventService + let discovered = service.discover_chain_proof(&time_event_for_lookup).await; + + assert!( + discovered.is_ok(), + "discover_chain_proof should succeed for self-anchored event: {:?}", + discovered.err() + ); + + let proof = discovered.unwrap(); + assert_eq!(proof.chain_id, chain_id); + assert_eq!(proof.transaction_hash, tx_hash); + assert_eq!(proof.transaction_input, tx_input); + assert_eq!(proof.block_hash, block_hash); + assert_eq!(proof.timestamp, timestamp as i64); +} + +/// Test that discover_chain_proof falls back to RPC when proof not in DB, +/// and returns NoChainProvider error when no RPC provider is configured. +#[tokio::test] +async fn test_discover_chain_proof_fallback_no_provider() { + use ceramic_event::unvalidated; + + use crate::{EventService, UndeliveredEventReview}; + + let pool = SqlitePool::connect_in_memory().await.unwrap(); + // Service created with no RPC providers + let service = EventService::try_new(pool.clone(), UndeliveredEventReview::Skip, false, vec![]) + .await + .unwrap(); + + // Create a time event for a non-existent chain proof + let tx_hash_cid = + tx_hash_to_cid("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); + let init_cid = super::deterministic_cid(b"discover not found test init"); + let prev_cid = super::deterministic_cid(b"discover not found test prev"); + + let time_event = unvalidated::Builder::time() + .with_id(init_cid) + .with_tx( + "eip155:100".to_string(), + tx_hash_cid, + "f(bytes32)".to_string(), + ) + .with_prev(prev_cid) + .build() + .expect("test time event should build"); + + // Call discover_chain_proof - should fail because proof not in DB and no RPC provider + let result = service.discover_chain_proof(&time_event).await; + + assert!( + result.is_err(), + "discover_chain_proof should fail when no RPC provider configured" + ); + + // Verify it's a NoChainProvider error (RPC fallback fails without provider) + match result.unwrap_err() { + crate::eth_rpc::Error::NoChainProvider(chain_id) => { + assert_eq!(chain_id.to_string(), "eip155:100"); + } + other => panic!("Expected NoChainProvider error, got: {:?}", other), + } +} + +/// Test that self-anchored events produce correct ConclusionTime events via ConclusionFeed. +/// +/// This is the critical end-to-end test that verifies: +/// 1. Self-anchored TimeEvent is inserted via Store::insert_many with chain_inclusion +/// 2. Chain proof is persisted correctly +/// 3. ConclusionFeed's conclusion_events_since returns the event with correct TimeProof +/// 4. The TimeProof contains the correct before timestamp and chain_id +/// +/// This exercises the production code path used by the pipeline/concluder. +#[tokio::test] +async fn test_self_anchored_events_in_conclusion_feed() { + use ceramic_anchor_service::{Store, TimeEventInsertable}; + use ceramic_core::{EventId, Network, NodeKey}; + use ceramic_event::unvalidated; + use ceramic_pipeline::ConclusionFeed; + + use crate::{EventService, UndeliveredEventReview}; + + let pool = SqlitePool::connect_in_memory().await.unwrap(); + + // Create EventService WITHOUT chain providers (self-anchoring doesn't use RPC) + let service = EventService::try_new(pool.clone(), UndeliveredEventReview::Skip, false, vec![]) + .await + .unwrap(); + + // Set up test data + let tx_hash = "0xfacefacefacefacefacefacefacefacefacefacefacefacefacefacefaceface"; + let chain_id = "eip155:100"; + let timestamp = 1704067200u64; // 2024-01-01 00:00:00 UTC + + // Create an init event first + // Use the test helpers which generate properly typed events + // Get init + data events using the helper (get_events returns init + 3 data events) + let events = super::get_events().await; + let init_event_id = events[0].key.clone(); + let init_cid = init_event_id.cid().unwrap(); + let data_event_id = events[1].key.clone(); + let data_cid = data_event_id.cid().unwrap(); + + // Insert init and data events via recon Store (simulating normal event flow) + recon::Store::insert_many(&service, &events[..2], NodeKey::random().id()) + .await + .unwrap(); + + // Now create a TimeEvent via the self-anchoring path + let tx_hash_cid = tx_hash_to_cid(tx_hash); + + let time_event = unvalidated::Builder::time() + .with_id(init_cid) + .with_tx(chain_id.to_string(), tx_hash_cid, "f(bytes32)".to_string()) + .with_prev(data_cid) + .build() + .expect("test time event should build"); + + let time_event_cid = time_event.to_cid().expect("time event should encode"); + + // Build EventId using the same sep key bytes from the original event + // (the events from get_events() use "model" sep with the stream's model CID) + let time_event_id = EventId::builder() + .with_network(&Network::DevUnstable) + .with_sep( + "model", + init_event_id.separator().expect("events have separator"), + ) + .with_controller(super::CONTROLLER) + .with_init(&init_cid) + .with_event(&time_event_cid) + .build(); + + // Create ChainInclusionData (simulating what anchor-evm produces) + let chain_data = ChainInclusionData { + chain_id: chain_id.to_string(), + transaction_hash: tx_hash.to_string(), + transaction_input: "0x97ad09eb".to_string() + &"fa".repeat(32), + block_hash: "0x".to_string() + &"ce".repeat(32), + timestamp, + }; + + let insertable = TimeEventInsertable { + event_id: time_event_id, + cid: time_event_cid, + event: time_event, + chain_inclusion: Some(chain_data), + }; + + // Insert via Store::insert_many (the self-anchoring path) + Store::insert_many(&service, vec![insertable], NodeKey::random().id()) + .await + .unwrap(); + + // Now verify via ConclusionFeed + let conclusion_events = service.conclusion_events_since(0, 10).await.unwrap(); + + // Find the Time event in the results + let time_conclusion = conclusion_events + .iter() + .find_map(|e| match e { + ceramic_pipeline::ConclusionEvent::Time(t) => Some(t), + _ => None, + }) + .expect("Should have a Time conclusion event"); + + // Verify the TimeProof has correct values from our ChainInclusionData + assert_eq!( + time_conclusion.time_proof.chain_id, chain_id, + "TimeProof chain_id should match self-anchored chain_id" + ); + assert_eq!( + time_conclusion.time_proof.before, timestamp, + "TimeProof.before timestamp should match self-anchored timestamp" + ); + + // Verify the event CID matches + assert_eq!( + time_conclusion.event_cid, time_event_cid, + "Conclusion event CID should match the inserted time event" + ); + + // Verify the stream info is correct + assert_eq!( + time_conclusion.init.stream_cid, init_cid, + "Stream CID should match the init event" + ); +} diff --git a/tests/networks/basic-rust.yaml b/tests/networks/basic-rust.yaml index af91fb098..9ec881edc 100644 --- a/tests/networks/basic-rust.yaml +++ b/tests/networks/basic-rust.yaml @@ -16,6 +16,7 @@ spec: CERAMIC_ONE_EVM_RPC_URL: "http://ganache:8545" CERAMIC_ONE_EVM_PRIVATE_KEY: "0000000000000000000000000000000000000000000000000000000000000001" CERAMIC_ONE_EVM_CHAIN_ID: "1337" + CERAMIC_ONE_ANCHOR_INTERVAL: "20" CERAMIC_ONE_EVM_CONTRACT_ADDRESS: "0x231055A0852D67C7107Ad0d0DFeab60278fE6AdC" CERAMIC_ONE_FLIGHT_SQL_BIND_ADDRESS: "0.0.0.0:5102" resourceLimits: diff --git a/tests/suite/src/__tests__/correctness/fast/self-anchor-integration.test.ts b/tests/suite/src/__tests__/correctness/fast/self-anchor-integration.test.ts new file mode 100644 index 000000000..19370f2b8 --- /dev/null +++ b/tests/suite/src/__tests__/correctness/fast/self-anchor-integration.test.ts @@ -0,0 +1,271 @@ +import { beforeAll, describe, expect, test, jest } from '@jest/globals' +import { + type ClientOptions, + type FlightSqlClient, + createFlightSqlClient, +} from '@ceramic-sdk/flight-sql-client' +import { CeramicClient } from '@ceramic-sdk/http-client' +import type { StreamID } from '@ceramic-sdk/identifiers' +import { ModelClient } from '@ceramic-sdk/model-client' +import { ModelInstanceClient } from '@ceramic-sdk/model-instance-client' +import type { ModelDefinition } from '@ceramic-sdk/model-protocol' +import { tableFromIPC } from 'apache-arrow' +import { randomDID } from '../../../utils/didHelper' +import { waitForEventState } from '../../../utils/rustCeramicHelpers' +import { urlsToEndpoint, utilities } from '../../../utils/common' + +const delayMs = utilities.delayMs + +const CeramicUrls = String(process.env.CERAMIC_URLS).split(',') +const CeramicFlightUrls = String(process.env.CERAMIC_FLIGHT_URLS).split(',') +const CeramicFlightEndpoints = urlsToEndpoint(CeramicFlightUrls) + +const FLIGHT_OPTIONS: ClientOptions = { + headers: [], + username: undefined, + password: undefined, + token: undefined, + tls: false, + host: CeramicFlightEndpoints[0].host, + port: CeramicFlightEndpoints[0].port, +} + +// Self-anchoring should complete within 2 minutes (anchor interval + processing time) +const ANCHOR_TIMEOUT_MS = 2 * 60 * 1000 +const POLL_INTERVAL_MS = 5000 + +// Expected chain ID for local Ganache network configured in basic-rust.yaml +const EXPECTED_CHAIN_ID = 'eip155:1337' + +const testModel: ModelDefinition = { + version: '2.0', + name: 'SelfAnchorTestModel', + description: 'Model for testing self-anchoring', + accountRelation: { type: 'list' }, + interface: false, + implements: [], + schema: { + type: 'object', + properties: { + value: { type: 'integer' }, + }, + additionalProperties: false, + }, +} + +/** + * Wait for an event to be anchored (chain_id populated) for a given stream. + * Polls the event_states table for events that have chain_id set. + */ +async function waitForAnchoredEvent( + flightClient: FlightSqlClient, + streamCid: string, + timeoutMs: number = ANCHOR_TIMEOUT_MS, +): Promise<{ anchored: boolean; chainId: string | null }> { + const startTime = Date.now() + while (Date.now() - startTime < timeoutMs) { + try { + // Query for events that have been anchored (chain_id is not null) + const buffer = await flightClient.query( + `SELECT chain_id FROM event_states + WHERE cid_string(stream_cid) = '${streamCid.toString()}' + AND chain_id IS NOT NULL + LIMIT 1`, + ) + + const table = tableFromIPC(buffer) + if (table.numRows > 0) { + const row = table.get(0) + const chainId = row?.chain_id as string | null + console.log(`Found anchored event for stream with chain_id: ${chainId}`) + return { anchored: true, chainId } + } + } catch (error) { + console.log(`Query error (retrying): ${error}`) + } + + await delayMs(POLL_INTERVAL_MS) + } + + return { anchored: false, chainId: null } +} + +/** + * Count anchored events for a given stream. + */ +async function countAnchoredEvents( + flightClient: FlightSqlClient, + streamCid: string, +): Promise { + const buffer = await flightClient.query( + `SELECT COUNT(*) as count FROM event_states + WHERE cid_string(stream_cid) = '${streamCid}' + AND chain_id IS NOT NULL`, + ) + + const table = tableFromIPC(buffer) + const row = table.get(0) + return row ? Number(row.count) : 0 +} + +describe('self-anchoring integration test', () => { + jest.setTimeout(1000 * 60 * 10) // 10 minutes total test timeout + + let flightClient: FlightSqlClient + let client: CeramicClient + let modelClient: ModelClient + let modelInstanceClient: ModelInstanceClient + let modelStream: StreamID + + beforeAll(async () => { + flightClient = await createFlightSqlClient(FLIGHT_OPTIONS) + + client = new CeramicClient({ + url: CeramicUrls[0], + }) + + modelClient = new ModelClient({ + ceramic: client, + did: await randomDID(), + }) + + modelInstanceClient = new ModelInstanceClient({ + ceramic: client, + did: await randomDID(), + }) + + // Create test model + modelStream = await modelClient.createDefinition(testModel) + await waitForEventState(flightClient, modelStream.cid) + console.log(`Created test model: ${modelStream.toString()}`) + }, 30000) + + test( + 'document creation triggers self-anchoring', + async () => { + // Create a document + console.log('Creating document...') + const documentStream = await modelInstanceClient.createInstance({ + model: modelStream, + content: { value: 42 }, + shouldIndex: true, + }) + + // Wait for the init event to be processed + await waitForEventState(flightClient, documentStream.commit) + console.log(`Document created: ${documentStream.baseID.toString()}`) + + // Get the stream CID for querying + const streamCid = documentStream.baseID.cid.toString() + console.log(`Waiting for anchored event for stream CID: ${streamCid}`) + + // Wait for anchoring to complete + const result = await waitForAnchoredEvent(flightClient, streamCid, ANCHOR_TIMEOUT_MS) + + expect(result.anchored).toBe(true) + expect(result.chainId).toBe(EXPECTED_CHAIN_ID) + console.log( + `Document successfully anchored via self-anchoring with chain_id: ${result.chainId}`, + ) + }, + ANCHOR_TIMEOUT_MS + 30000, + ) + + test( + 'document update triggers self-anchoring', + async () => { + // Create a document first + console.log('Creating document...') + const documentStream = await modelInstanceClient.createInstance({ + model: modelStream, + content: { value: 1 }, + shouldIndex: true, + }) + + await waitForEventState(flightClient, documentStream.commit) + const streamCid = documentStream.baseID.cid.toString() + + // Wait for initial anchor + console.log('Waiting for initial anchor...') + const initialResult = await waitForAnchoredEvent(flightClient, streamCid, ANCHOR_TIMEOUT_MS) + expect(initialResult.anchored).toBe(true) + expect(initialResult.chainId).toBe(EXPECTED_CHAIN_ID) + + const initialCount = await countAnchoredEvents(flightClient, streamCid) + console.log(`Initial anchored events count: ${initialCount}`) + + // Update the document + console.log('Updating document...') + const updatedState = await modelInstanceClient.updateDocument({ + streamID: documentStream.baseID.toString(), + newContent: { value: 2 }, + shouldIndex: true, + }) + + await waitForEventState(flightClient, updatedState.commitID.commit) + console.log('Document updated, waiting for anchor...') + + // Wait for the update to be anchored (should result in more anchored events) + const startTime = Date.now() + let newCount = initialCount + while (Date.now() - startTime < ANCHOR_TIMEOUT_MS) { + newCount = await countAnchoredEvents(flightClient, streamCid) + if (newCount > initialCount) { + break + } + await delayMs(POLL_INTERVAL_MS) + } + + expect(newCount).toBeGreaterThan(initialCount) + console.log(`Update anchored. Anchored events count: ${newCount}`) + }, + ANCHOR_TIMEOUT_MS * 2 + 60000, + ) + + test( + 'multiple documents are anchored', + async () => { + // Create multiple documents + console.log('Creating 3 documents...') + const docs = await Promise.all([ + modelInstanceClient.createInstance({ + model: modelStream, + content: { value: 10 }, + shouldIndex: true, + }), + modelInstanceClient.createInstance({ + model: modelStream, + content: { value: 20 }, + shouldIndex: true, + }), + modelInstanceClient.createInstance({ + model: modelStream, + content: { value: 30 }, + shouldIndex: true, + }), + ]) + + // Wait for all init events to be processed + await Promise.all(docs.map((doc) => waitForEventState(flightClient, doc.commit))) + console.log('All documents created') + + // Wait for all to be anchored + const streamCids = docs.map((doc) => doc.baseID.cid.toString()) + + console.log('Waiting for all documents to be anchored...') + const anchorResults = await Promise.all( + streamCids.map((cid) => waitForAnchoredEvent(flightClient, cid, ANCHOR_TIMEOUT_MS)), + ) + + // Verify all were anchored with correct chain ID + for (let i = 0; i < anchorResults.length; i++) { + expect(anchorResults[i].anchored).toBe(true) + expect(anchorResults[i].chainId).toBe(EXPECTED_CHAIN_ID) + console.log( + `Document ${i + 1} anchored: ${docs[i].baseID.toString()} with chain_id: ${anchorResults[i].chainId}`, + ) + } + }, + ANCHOR_TIMEOUT_MS + 60000, + ) +})