Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
sync_test.sh

openapi-generator-cli.jar
.env
.env*
ceramic_cicddb.sqlite*
event-svc/model_error_counts.csv
/.history
87 changes: 80 additions & 7 deletions anchor-evm/src/evm_transaction_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use alloy::{
hex,
network::EthereumWallet,
primitives::{Address, FixedBytes, U256},
providers::{Provider, ProviderBuilder},
Expand All @@ -9,7 +10,9 @@ use alloy::{
};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use ceramic_anchor_service::{DetachedTimeEvent, MerkleNodes, RootTimeEvent, TransactionManager};
use ceramic_anchor_service::{
ChainInclusionData, DetachedTimeEvent, MerkleNodes, RootTimeEvent, TransactionManager,
};
use ceramic_core::{Cid, SerializeExt};
use tokio::time::{interval, sleep};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -82,6 +85,18 @@ pub struct EvmTransactionManager {
config: EvmConfig,
}

/// Result of submitting and confirming an anchor transaction
struct AnchorResult {
/// The transaction hash (0x-prefixed)
tx_hash: String,
/// The block hash containing the transaction
block_hash: String,
/// The block timestamp (Unix timestamp in seconds)
timestamp: u64,
/// The transaction input data (0x-prefixed function selector + hash)
tx_input: String,
}

impl EvmTransactionManager {
/// Create a new EVM transaction manager
pub async fn new(config: EvmConfig) -> Result<Self> {
Expand Down Expand Up @@ -134,7 +149,7 @@ impl EvmTransactionManager {
}

/// Submit an anchor transaction and wait for confirmation with retry logic
async fn submit_and_wait(&self, root_cid: Cid) -> Result<String> {
async fn submit_and_wait(&self, root_cid: Cid) -> Result<AnchorResult> {
info!(
"Anchoring root CID: {} on chain {}",
root_cid, self.config.chain_id
Expand Down Expand Up @@ -255,7 +270,29 @@ impl EvmTransactionManager {
let gas_used = starting_balance.saturating_sub(ending_balance);
info!("Total gas cost: {} wei", gas_used);
}
return Ok(tx_hash);

// Get block hash from receipt
let block_hash = receipt
.block_hash
.ok_or_else(|| anyhow!("Transaction receipt missing block hash"))?;

// Fetch block to get timestamp (false = don't include full transactions)
let block = provider
.get_block_by_number(block_number.into(), false)
.await?
.ok_or_else(|| anyhow!("Block {} not found", block_number))?;

// Construct tx_input: function selector (0x97ad09eb) + 32-byte hash
let root_hash = Self::cid_to_bytes32(&root_cid)?;
let tx_input =
format!("0x97ad09eb{}", hex::encode(root_hash.as_slice()));

return Ok(AnchorResult {
tx_hash,
block_hash: format!("0x{:x}", block_hash),
timestamp: block.header.timestamp,
tx_input,
});
}
Err(e) => {
warn!("Transaction confirmation failed: {}", e);
Expand Down Expand Up @@ -287,7 +324,7 @@ impl EvmTransactionManager {
|| error_str.contains("replacement transaction underpriced"))
{
for prev_tx in previous_tx_hashes.iter().rev() {
if let Ok(Some(_)) = provider
if let Ok(Some(prev_receipt)) = provider
.get_transaction_receipt(prev_tx.parse().unwrap_or_default())
.await
{
Expand All @@ -297,7 +334,32 @@ impl EvmTransactionManager {
{
info!("Ending wallet balance: {} wei", ending_balance);
}
return Ok(prev_tx.clone());

// Get block info from the previous receipt
let block_hash = prev_receipt.block_hash.ok_or_else(|| {
anyhow!("Previous transaction receipt missing block hash")
})?;
let block_number = prev_receipt.block_number.ok_or_else(|| {
anyhow!("Previous transaction receipt missing block number")
})?;

// Fetch block to get timestamp (false = don't include full transactions)
let block = provider
.get_block_by_number(block_number.into(), false)
.await?
.ok_or_else(|| anyhow!("Block {} not found", block_number))?;

// Construct tx_input from root_cid
let root_hash = Self::cid_to_bytes32(&root_cid)?;
let tx_input =
format!("0x97ad09eb{}", hex::encode(root_hash.as_slice()));

return Ok(AnchorResult {
tx_hash: prev_tx.clone(),
block_hash: format!("0x{:x}", block_hash),
timestamp: block.header.timestamp,
tx_input,
});
}
}
}
Expand Down Expand Up @@ -428,10 +490,11 @@ impl EvmTransactionManager {
impl TransactionManager for EvmTransactionManager {
async fn anchor_root(&self, root: Cid) -> Result<RootTimeEvent> {
// Submit transaction and wait for confirmation
let tx_hash = self.submit_and_wait(root).await?;
let anchor_result = self.submit_and_wait(root).await?;

// Build anchor proof from transaction details
let proof = ProofBuilder::build_proof(self.config.chain_id, tx_hash, root)?;
let proof =
ProofBuilder::build_proof(self.config.chain_id, anchor_result.tx_hash.clone(), root)?;
let proof_cid = proof.to_cid()?;

// Create detached time event
Expand All @@ -441,12 +504,22 @@ impl TransactionManager for EvmTransactionManager {
proof: proof_cid,
};

// Create chain inclusion data for persistence
let chain_inclusion = ChainInclusionData {
chain_id: format!("eip155:{}", self.config.chain_id),
transaction_hash: anchor_result.tx_hash,
transaction_input: anchor_result.tx_input,
block_hash: anchor_result.block_hash,
timestamp: anchor_result.timestamp,
};

// Return root time event with no additional remote Merkle nodes
// (all nodes are local since we built the entire tree)
Ok(RootTimeEvent {
proof,
detached_time_event,
remote_merkle_nodes: MerkleNodes::default(),
chain_inclusion: Some(chain_inclusion),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions anchor-remote/src/cas_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ async fn parse_anchor_response(anchor_response: String) -> Result<CasResponsePar
proof: proof.expect("proof should be present"),
detached_time_event: detached_time_event.expect("detached time event should be present"),
remote_merkle_nodes,
chain_inclusion: None,
})))
}

Expand Down
1 change: 1 addition & 0 deletions anchor-remote/src/test-data/anchor_response.test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ Receipt {
"Cid(bafyreigab6skaymtdwkpee4yzkhhglqzl7kbd2wrv24r2uqk3iol44gfga): [Some(Cid(bafyreia776z4jdg5zgycivcpr3q6lcu6llfowkrljkmq3bex2k5hkzat54)), Some(Cid(bagcqcera24qtqvh7yjbm72iqov56tmiubltdfn2rwjgcokkxjbovbih6x24q))]",
"Cid(bafyreigay6frqlwu7dzhf4vch3oxeavkciyogkosbqrttljzgraghl3mo4): [Some(Cid(bafyreigab6skaymtdwkpee4yzkhhglqzl7kbd2wrv24r2uqk3iol44gfga)), Some(Cid(bafyreid6fo2bz67eza23ulv3l6d7uwu4nd5rmsdx3clijnelebkqscdiqa))]",
],
chain_inclusion: None,
}
19 changes: 18 additions & 1 deletion anchor-service/src/anchor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use tracing::info;
use ceramic_core::{EventId, SerializeExt};
use ceramic_event::unvalidated::{AnchorProof, ProofEdge, RawTimeEvent, TimeEvent};

use crate::transaction_manager::ChainInclusionData;

/// AnchorRequest for a Data Event on a Stream
#[derive(Clone, PartialEq, Eq, Serialize)]
pub struct AnchorRequest {
Expand Down Expand Up @@ -101,6 +103,8 @@ pub struct TimeEventBatch {
pub proof: AnchorProof,
/// The Time Events
pub raw_time_events: RawTimeEvents,
/// Chain inclusion data for self-anchored events (None for remote CAS)
pub chain_inclusion: Option<ChainInclusionData>,
}

impl std::fmt::Debug for TimeEventBatch {
Expand All @@ -109,6 +113,7 @@ impl std::fmt::Debug for TimeEventBatch {
.field("merkle_nodes", &self.merkle_nodes)
.field("proof", &self.proof)
.field("raw_time_events", &self.raw_time_events)
.field("chain_inclusion", &self.chain_inclusion)
.finish()
}
}
Expand All @@ -125,12 +130,19 @@ impl TimeEventBatch {
merkle_nodes,
proof,
raw_time_events,
chain_inclusion,
} = self;
let events = raw_time_events
.events
.into_iter()
.map(|(anchor_request, time_event)| {
Self::build_time_event_insertable(&proof, &merkle_nodes, time_event, anchor_request)
Self::build_time_event_insertable(
&proof,
&merkle_nodes,
time_event,
anchor_request,
chain_inclusion.clone(),
)
})
.collect::<Result<Vec<_>>>()?;
Ok(events)
Expand All @@ -142,6 +154,7 @@ impl TimeEventBatch {
merkle_nodes: &MerkleNodes,
time_event: RawTimeEvent,
anchor_request: AnchorRequest,
chain_inclusion: Option<ChainInclusionData>,
) -> Result<TimeEventInsertable> {
let time_event_cid = time_event.to_cid().context(format!(
"could not serialize time event for {} with batch proof {}",
Expand Down Expand Up @@ -180,6 +193,7 @@ impl TimeEventBatch {
))?,
cid: time_event_cid,
event: TimeEvent::new(time_event.clone(), proof.clone(), blocks_in_path),
chain_inclusion,
})
}

Expand Down Expand Up @@ -223,6 +237,8 @@ pub struct TimeEventInsertable {
pub cid: Cid,
/// The parsed structure containing the Time Event
pub event: TimeEvent,
/// Chain inclusion data for self-anchored events (None for remote CAS)
pub chain_inclusion: Option<ChainInclusionData>,
}

impl std::fmt::Debug for TimeEventInsertable {
Expand All @@ -231,6 +247,7 @@ impl std::fmt::Debug for TimeEventInsertable {
.field("event_id", &self.event_id.to_string())
.field("cid", &self.cid.to_string())
.field("event", &self.event)
.field("chain_inclusion", &self.chain_inclusion)
.finish()
}
}
2 changes: 2 additions & 0 deletions anchor-service/src/anchor_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,15 @@ impl AnchorService {
proof,
detached_time_event,
mut remote_merkle_nodes,
chain_inclusion,
} = self.tx_manager.anchor_root(root_cid).await?;
let time_events = build_time_events(anchor_requests, &detached_time_event, count)?;
remote_merkle_nodes.extend(local_merkle_nodes);
Ok(TimeEventBatch {
merkle_nodes: remote_merkle_nodes,
proof,
raw_time_events: time_events,
chain_inclusion,
})
}

Expand Down
1 change: 1 addition & 0 deletions anchor-service/src/cas_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl TransactionManager for MockCas {
proof,
},
remote_merkle_nodes: Default::default(),
chain_inclusion: None,
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion anchor-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ mod transaction_manager;
pub use anchor::{AnchorRequest, MerkleNode, MerkleNodes, TimeEventBatch, TimeEventInsertable};
pub use anchor_batch::{AnchorService, Store};
pub use cas_mock::{MockAnchorEventService, MockCas};
pub use transaction_manager::{DetachedTimeEvent, RootTimeEvent, TransactionManager};
pub use transaction_manager::{
ChainInclusionData, DetachedTimeEvent, RootTimeEvent, TransactionManager,
};
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,5 @@ TimeEventBatch {
},
),
],
chain_inclusion: None,
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ TimeEventBatch {
},
),
],
chain_inclusion: None,
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,5 @@ TimeEventBatch {
},
),
],
chain_inclusion: None,
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,5 @@ TimeEventBatch {
},
),
],
chain_inclusion: None,
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,5 @@ TimeEventBatch {
},
),
],
chain_inclusion: None,
}
Loading
Loading