diff --git a/Cargo.lock b/Cargo.lock index 4dc8abc86..f5203e33a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6126,6 +6126,7 @@ dependencies = [ "katana-gateway-types", "katana-metrics", "katana-pool", + "katana-pool-api", "katana-primitives", "katana-provider-api", "katana-rpc-api", @@ -6249,18 +6250,22 @@ dependencies = [ "katana-gas-price-oracle", "katana-gateway-client", "katana-gateway-server", + "katana-gateway-types", "katana-messaging", "katana-metrics", "katana-pipeline", "katana-pool", + "katana-pool-api", "katana-primitives", "katana-provider", "katana-rpc-api", "katana-rpc-client", "katana-rpc-server", + "katana-rpc-types", "katana-stage", "katana-starknet", "katana-tasks", + "parking_lot", "serde", "strum 0.25.0", "strum_macros 0.25.3", @@ -6513,6 +6518,7 @@ dependencies = [ "derive_more 0.99.20", "flate2", "katana-genesis", + "katana-pool-api", "katana-primitives", "katana-trie", "rstest 0.18.2", diff --git a/crates/chain-spec/src/full_node.rs b/crates/chain-spec/src/full_node.rs new file mode 100644 index 000000000..3beec702c --- /dev/null +++ b/crates/chain-spec/src/full_node.rs @@ -0,0 +1,65 @@ +use katana_genesis::Genesis; +use katana_primitives::chain::ChainId; +use lazy_static::lazy_static; + +use crate::{FeeContracts, SettlementLayer}; + +/// The full node chain specification. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChainSpec { + /// The network chain id. + pub id: ChainId, + + /// The chain's genesis states. + pub genesis: Genesis, + + /// The chain fee token contract. + pub fee_contracts: FeeContracts, + + /// The chain's settlement layer configurations (if any). + pub settlement: Option, +} + +////////////////////////////////////////////////////////////// +// ChainSpec implementations +////////////////////////////////////////////////////////////// + +impl ChainSpec { + /// Creates a new [`ChainSpec`] for Starknet mainnet. + pub fn mainnet() -> Self { + MAINNET.clone() + } + + /// Creates a new [`ChainSpec`] for Starknet sepolia testnet. + pub fn sepolia() -> Self { + SEPOLIA.clone() + } +} + +////////////////////////////////////////////////////////////// +// Predefined ChainSpec instances +////////////////////////////////////////////////////////////// + +lazy_static! { + /// Starknet mainnet chain specification. + pub static ref MAINNET: ChainSpec = ChainSpec { + id: ChainId::MAINNET, + genesis: Genesis::default(), + fee_contracts: FeeContracts { + eth: katana_genesis::constant::DEFAULT_ETH_FEE_TOKEN_ADDRESS, + strk: katana_genesis::constant::DEFAULT_STRK_FEE_TOKEN_ADDRESS, + }, + settlement: None, + }; + + /// Starknet sepolia testnet chain specification. + pub static ref SEPOLIA: ChainSpec = ChainSpec { + id: ChainId::SEPOLIA, + genesis: Genesis::default(), + fee_contracts: FeeContracts { + eth: katana_genesis::constant::DEFAULT_ETH_FEE_TOKEN_ADDRESS, + strk: katana_genesis::constant::DEFAULT_STRK_FEE_TOKEN_ADDRESS, + }, + settlement: None, + }; +} diff --git a/crates/chain-spec/src/lib.rs b/crates/chain-spec/src/lib.rs index ed405c414..6a68a7aec 100644 --- a/crates/chain-spec/src/lib.rs +++ b/crates/chain-spec/src/lib.rs @@ -6,12 +6,14 @@ use serde::{Deserialize, Serialize}; use url::Url; pub mod dev; +pub mod full_node; pub mod rollup; #[derive(Debug, Clone, PartialEq, Eq)] pub enum ChainSpec { Dev(dev::ChainSpec), Rollup(rollup::ChainSpec), + FullNode(full_node::ChainSpec), } ////////////////////////////////////////////////////////////// @@ -24,10 +26,21 @@ impl ChainSpec { Self::Dev(dev::DEV.clone()) } + /// Creates a new [`ChainSpec`] for Starknet mainnet. + pub fn mainnet() -> Self { + Self::FullNode(full_node::ChainSpec::mainnet()) + } + + /// Creates a new [`ChainSpec`] for Starknet sepolia testnet. + pub fn sepolia() -> Self { + Self::FullNode(full_node::ChainSpec::sepolia()) + } + pub fn id(&self) -> ChainId { match self { Self::Dev(spec) => spec.id, Self::Rollup(spec) => spec.id, + Self::FullNode(spec) => spec.id, } } @@ -35,6 +48,7 @@ impl ChainSpec { match self { Self::Dev(spec) => &spec.genesis, Self::Rollup(spec) => &spec.genesis, + Self::FullNode(spec) => &spec.genesis, } } @@ -42,6 +56,7 @@ impl ChainSpec { match self { Self::Dev(spec) => spec.settlement.as_ref(), Self::Rollup(spec) => Some(&spec.settlement), + Self::FullNode(spec) => spec.settlement.as_ref(), } } @@ -49,6 +64,7 @@ impl ChainSpec { match self { Self::Dev(spec) => &spec.fee_contracts, Self::Rollup(spec) => &spec.fee_contracts, + Self::FullNode(spec) => &spec.fee_contracts, } } } @@ -65,6 +81,12 @@ impl From for ChainSpec { } } +impl From for ChainSpec { + fn from(spec: full_node::ChainSpec) -> Self { + Self::FullNode(spec) + } +} + impl Default for ChainSpec { fn default() -> Self { Self::dev() diff --git a/crates/cli/src/full.rs b/crates/cli/src/full.rs index 13488c494..1fb1ddcd3 100644 --- a/crates/cli/src/full.rs +++ b/crates/cli/src/full.rs @@ -1,11 +1,19 @@ use std::path::PathBuf; -use anyhow::{anyhow, Result}; +use anyhow::{Context, Result}; pub use clap::Parser; +use katana_node::config::db::DbConfig; +use katana_node::config::metrics::MetricsConfig; +use katana_node::config::rpc::RpcConfig; +use katana_node::full; +use katana_node::full::Network; use serde::{Deserialize, Serialize}; +use tracing::info; use crate::options::*; +pub(crate) const LOG_TARGET: &str = "katana::cli::full"; + #[derive(Parser, Debug, Serialize, Deserialize, Default, Clone, PartialEq)] #[command(next_help_heading = "Full node options")] pub struct FullNodeArgs { @@ -19,7 +27,19 @@ pub struct FullNodeArgs { /// previously initialized Katana database. #[arg(long)] #[arg(value_name = "PATH")] - pub db_dir: Option, + pub db_dir: PathBuf, + + #[arg(long = "eth.rpc")] + #[arg(value_name = "PATH")] + pub eth_rpc_url: String, + + #[arg(long)] + pub network: Network, + + /// Gateway API key for accessing the sequencer gateway. + #[arg(long)] + #[arg(value_name = "KEY")] + pub gateway_api_key: Option, #[command(flatten)] pub logging: LoggingOptions, @@ -42,6 +62,102 @@ pub struct FullNodeArgs { impl FullNodeArgs { pub async fn execute(&self) -> Result<()> { - Err(anyhow!("Full node is not implemented yet!")) + // Initialize logging with tracer + let tracer_config = self.tracer_config(); + katana_tracing::init(self.logging.log_format, tracer_config).await?; + self.start_node().await + } + + async fn start_node(&self) -> Result<()> { + // Build the node + let config = self.config()?; + let node = full::Node::build(config).context("failed to build full node")?; + + if !self.silent { + info!(target: LOG_TARGET, "Starting full node"); + } + + // Launch the node + let handle = node.launch().await.context("failed to launch full node")?; + + // Wait until an OS signal (ie SIGINT, SIGTERM) is received or the node is shutdown. + tokio::select! { + _ = katana_utils::wait_shutdown_signals() => { + // Gracefully shutdown the node before exiting + handle.stop().await?; + }, + + _ = handle.stopped() => { } + } + + info!("Shutting down."); + + Ok(()) + } + + fn config(&self) -> Result { + let db = self.db_config(); + let rpc = self.rpc_config()?; + let metrics = self.metrics_config(); + + Ok(full::Config { + db, + rpc, + metrics, + network: self.network, + eth_rpc_url: self.eth_rpc_url.clone(), + gateway_api_key: self.gateway_api_key.clone(), + }) + } + + fn db_config(&self) -> DbConfig { + DbConfig { dir: Some(self.db_dir.clone()) } + } + + fn rpc_config(&self) -> Result { + #[cfg(feature = "server")] + { + use std::time::Duration; + + let cors_origins = self.server.http_cors_origins.clone(); + + Ok(RpcConfig { + apis: Default::default(), + port: self.server.http_port, + addr: self.server.http_addr, + max_connections: self.server.max_connections, + max_concurrent_estimate_fee_requests: None, + max_request_body_size: None, + max_response_body_size: None, + timeout: self.server.timeout.map(Duration::from_secs), + cors_origins, + #[cfg(feature = "explorer")] + explorer: self.explorer.explorer, + max_event_page_size: Some(self.server.max_event_page_size), + max_proof_keys: Some(self.server.max_proof_keys), + max_call_gas: Some(self.server.max_call_gas), + }) + } + + #[cfg(not(feature = "server"))] + { + Ok(RpcConfig::default()) + } + } + + fn metrics_config(&self) -> Option { + #[cfg(feature = "server")] + if self.metrics.metrics { + Some(MetricsConfig { addr: self.metrics.metrics_addr, port: self.metrics.metrics_port }) + } else { + None + } + + #[cfg(not(feature = "server"))] + None + } + + fn tracer_config(&self) -> Option { + self.tracer.config() } } diff --git a/crates/cli/src/utils.rs b/crates/cli/src/utils.rs index 94aa22f2b..5004bd772 100644 --- a/crates/cli/src/utils.rs +++ b/crates/cli/src/utils.rs @@ -130,6 +130,8 @@ PREDEPLOYED CONTRACTS cs.fee_contracts.strk, DEFAULT_LEGACY_ERC20_CLASS_HASH, ); } + + ChainSpec::FullNode(..) => {} } println!( diff --git a/crates/core/src/backend/mod.rs b/crates/core/src/backend/mod.rs index a26234241..2d9b7c2ab 100644 --- a/crates/core/src/backend/mod.rs +++ b/crates/core/src/backend/mod.rs @@ -76,6 +76,11 @@ impl Backend { match self.chain_spec.as_ref() { ChainSpec::Dev(cs) => self.init_dev_genesis(cs), ChainSpec::Rollup(cs) => self.init_rollup_genesis(cs), + ChainSpec::FullNode(_) => { + // Full nodes sync from the network, so we skip genesis initialization + info!("Full node mode: genesis initialization skipped, will sync from network"); + Ok(()) + } } } diff --git a/crates/gateway/gateway-server/Cargo.toml b/crates/gateway/gateway-server/Cargo.toml index f75d09ff3..8a6d22ff8 100644 --- a/crates/gateway/gateway-server/Cargo.toml +++ b/crates/gateway/gateway-server/Cargo.toml @@ -7,12 +7,13 @@ version.workspace = true [dependencies] katana-gateway-types.workspace = true +katana-primitives.workspace = true katana-executor.workspace = true katana-core.workspace = true -katana-primitives.workspace = true katana-metrics.workspace = true katana-pool.workspace = true katana-rpc-server.workspace = true +katana-pool-api.workspace = true katana-rpc-api.workspace = true katana-provider-api.workspace = true serde-utils.workspace = true diff --git a/crates/gateway/gateway-server/src/handlers.rs b/crates/gateway/gateway-server/src/handlers.rs index 869df8fd0..3bf6c572c 100644 --- a/crates/gateway/gateway-server/src/handlers.rs +++ b/crates/gateway/gateway-server/src/handlers.rs @@ -7,7 +7,7 @@ use katana_gateway_types::{ Block, ConfirmedReceipt, ConfirmedTransaction, ContractClass, ErrorCode, GatewayError, ReceiptBody, StateUpdate, StateUpdateWithBlock, }; -use katana_pool::TxPool; +use katana_pool_api::TransactionPool; use katana_primitives::block::{BlockHash, BlockIdOrTag, BlockNumber}; use katana_primitives::class::{ClassHash, CompiledClass, ContractClassCompilationError}; use katana_provider_api::block::{BlockIdReader, BlockProvider, BlockStatusProvider}; @@ -20,11 +20,14 @@ use starknet::core::types::ResourcePrice; /// Shared application state containing the backend #[derive(Clone)] -pub struct AppState { - pub api: StarknetApi>, +pub struct AppState { + pub api: StarknetApi>, } -impl AppState { +impl

AppState

+where + P: TransactionPool + Send + Sync + 'static, +{ // TODO(kariy): support preconfirmed blocks async fn get_block(&self, id: BlockIdOrTag) -> Result, ApiError> { self.api @@ -154,10 +157,13 @@ pub async fn health() -> Json { /// Handler for `/feeder_gateway/get_block` endpoint /// /// Returns block information for the specified block. -pub async fn get_block( - State(state): State, +pub async fn get_block

( + State(state): State>, Query(params): Query, -) -> Result, ApiError> { +) -> Result, ApiError> +where + P: TransactionPool + Send + Sync + 'static, +{ let block_id = params.block_id()?; let block = state.get_block(block_id).await?.unwrap(); Ok(Json(block)) @@ -175,10 +181,13 @@ pub enum GetStateUpdateResponse { /// Handler for `/feeder_gateway/get_state_update` endpoint /// /// Returns state update information for the specified block. -pub async fn get_state_update( - State(state): State, +pub async fn get_state_update

( + State(state): State>, Query(params): Query, -) -> Result, ApiError> { +) -> Result, ApiError> +where + P: TransactionPool + Send + Sync + 'static, +{ let include_block = params.include_block; let block_id = params.block_query.block_id()?; @@ -197,10 +206,13 @@ pub async fn get_state_update( /// Handler for `/feeder_gateway/get_class_by_hash` endpoint /// /// Returns the contract class definition for a given class hash. -pub async fn get_class_by_hash( - State(state): State, +pub async fn get_class_by_hash

( + State(state): State>, Query(params): Query, -) -> Result, ApiError> { +) -> Result, ApiError> +where + P: TransactionPool + Send + Sync + 'static, +{ let class_hash = params.class_hash; let block_id = params.block_query.block_id()?; let class = state.api.class_at_hash(block_id, class_hash).await?; @@ -210,10 +222,13 @@ pub async fn get_class_by_hash( /// Handler for `/feeder_gateway/get_compiled_class_by_class_hash` endpoint /// /// Returns the compiled (CASM) contract class for a given class hash. -pub async fn get_compiled_class_by_class_hash( - State(state): State, +pub async fn get_compiled_class_by_class_hash

( + State(state): State>, Query(params): Query, -) -> Result, ApiError> { +) -> Result, ApiError> +where + P: TransactionPool + Send + Sync + 'static, +{ let class_hash = params.class_hash; let block_id = params.block_query.block_id()?; diff --git a/crates/gateway/gateway-server/src/lib.rs b/crates/gateway/gateway-server/src/lib.rs index b60961e48..4283bde3f 100644 --- a/crates/gateway/gateway-server/src/lib.rs +++ b/crates/gateway/gateway-server/src/lib.rs @@ -6,6 +6,7 @@ use axum::routing::get; use axum::Router; use katana_core::service::block_producer::BlockProducer; use katana_executor::implementation::blockifier::BlockifierFactory; +use katana_pool_api::TransactionPool; use katana_rpc_server::cors::Cors; use katana_rpc_server::starknet::StarknetApi; use tokio::net::TcpListener; @@ -69,20 +70,21 @@ impl GatewayServerHandle { /// The feeder gateway server. #[derive(Debug)] -pub struct GatewayServer { +pub struct GatewayServer { timeout: Duration, cors: Option, health_check: bool, metered: bool, - starknet_api: StarknetApi>, + starknet_api: StarknetApi>, } -impl GatewayServer { +impl GatewayServer +where + Pool: TransactionPool + Clone + Send + Sync + 'static, +{ /// Create a new feeder gateway server. - pub fn new( - starknet_api: StarknetApi>, - ) -> Self { + pub fn new(starknet_api: StarknetApi>) -> Self { Self { timeout: DEFAULT_GATEWAY_TIMEOUT, cors: None, diff --git a/crates/gateway/gateway-types/src/lib.rs b/crates/gateway/gateway-types/src/lib.rs index d2d91043a..d35daaaea 100644 --- a/crates/gateway/gateway-types/src/lib.rs +++ b/crates/gateway/gateway-types/src/lib.rs @@ -19,7 +19,7 @@ //! - [`DeployAccountTxV3`]: Uses the custom DA mode and resource bounds //! - [`L1HandlerTx`]: Optional `nonce` field -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use katana_primitives::block::{BlockHash, BlockNumber}; pub use katana_primitives::class::CasmContractClass; @@ -180,17 +180,6 @@ pub struct ConfirmedStateUpdate { pub state_diff: StateDiff, } -// todo(kariy): merge the serialization of gateway into the rpc types -#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] -pub struct StateDiff { - pub storage_diffs: BTreeMap>, - pub deployed_contracts: Vec, - pub old_declared_contracts: Vec, - pub declared_classes: Vec, - pub nonces: BTreeMap, - pub replaced_classes: Vec, -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct StorageDiff { pub key: StorageKey, @@ -213,6 +202,105 @@ fn default_l2_gas_price() -> ResourcePrice { ResourcePrice { price_in_fri: Felt::from(1), price_in_wei: Felt::from(1) } } +// todo(kariy): merge the serialization of gateway into the rpc types +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct StateDiff { + pub storage_diffs: BTreeMap>, + pub deployed_contracts: Vec, + pub old_declared_contracts: Vec, + pub declared_classes: Vec, + pub nonces: BTreeMap, + pub replaced_classes: Vec, +} + +impl StateDiff { + /// Returns a new [`StateDiff`] that contains all updates from `self` and `other`, + /// preferring the values from `other` when both diffs touch the same entry. + pub fn merge(mut self, other: StateDiff) -> StateDiff { + let StateDiff { + storage_diffs, + deployed_contracts, + old_declared_contracts, + declared_classes, + nonces, + replaced_classes, + } = other; + + Self::merge_storage_diffs(&mut self.storage_diffs, storage_diffs); + Self::merge_deployed_contracts(&mut self.deployed_contracts, deployed_contracts); + Self::merge_deployed_contracts(&mut self.replaced_classes, replaced_classes); + Self::merge_declared_classes(&mut self.declared_classes, declared_classes); + Self::merge_old_declared_contracts( + &mut self.old_declared_contracts, + old_declared_contracts, + ); + self.nonces.extend(nonces); + + self + } + + fn merge_storage_diffs( + target: &mut BTreeMap>, + updates: BTreeMap>, + ) { + for (address, diffs) in updates { + let entry = target.entry(address).or_default(); + let mut index_by_key: BTreeMap = + entry.iter().enumerate().map(|(idx, diff)| (diff.key, idx)).collect(); + + for diff in diffs { + if let Some(idx) = index_by_key.get(&diff.key).copied() { + entry[idx] = diff; + } else { + index_by_key.insert(diff.key, entry.len()); + entry.push(diff); + } + } + } + } + + fn merge_deployed_contracts( + target: &mut Vec, + incoming: Vec, + ) { + let mut index_by_address: BTreeMap = + target.iter().enumerate().map(|(idx, contract)| (contract.address, idx)).collect(); + + for contract in incoming { + if let Some(idx) = index_by_address.get(&contract.address).copied() { + target[idx] = contract; + } else { + index_by_address.insert(contract.address, target.len()); + target.push(contract); + } + } + } + + fn merge_declared_classes(target: &mut Vec, incoming: Vec) { + let mut index_by_hash: BTreeMap = + target.iter().enumerate().map(|(idx, contract)| (contract.class_hash, idx)).collect(); + + for declared in incoming { + if let Some(idx) = index_by_hash.get(&declared.class_hash).copied() { + target[idx] = declared; + } else { + index_by_hash.insert(declared.class_hash, target.len()); + target.push(declared); + } + } + } + + fn merge_old_declared_contracts(target: &mut Vec, incoming: Vec) { + let mut seen: BTreeSet = target.iter().copied().collect(); + + for class_hash in incoming { + if seen.insert(class_hash) { + target.push(class_hash); + } + } + } +} + impl<'de> Deserialize<'de> for StateUpdate { fn deserialize>(deserializer: D) -> Result { struct __Visitor; diff --git a/crates/gateway/gateway-types/src/receipt.rs b/crates/gateway/gateway-types/src/receipt.rs index 6a9394df9..693c59787 100644 --- a/crates/gateway/gateway-types/src/receipt.rs +++ b/crates/gateway/gateway-types/src/receipt.rs @@ -58,3 +58,379 @@ pub struct L1ToL2Message { pub payload: Vec, pub nonce: Option, } + +//////////////////////////////////////////////////////////////////////////////// +// Conversion to Katana RPC types. +//////////////////////////////////////////////////////////////////////////////// + +impl From for katana_rpc_types::ExecutionResult { + fn from(value: ExecutionStatus) -> Self { + match value { + ExecutionStatus::Succeeded => katana_rpc_types::ExecutionResult::Succeeded, + ExecutionStatus::Reverted => { + // When converting from gateway ExecutionStatus::Reverted, we don't have the + // revert reason here. The caller should use the revert_error field from + // ReceiptBody if available. + katana_rpc_types::ExecutionResult::Reverted { + reason: String::from("Transaction reverted"), + } + } + } + } +} + +impl From for katana_rpc_types::ExecutionResources { + fn from(value: ExecutionResources) -> Self { + let gas = value.total_gas_consumed.unwrap_or_default(); + katana_rpc_types::ExecutionResources { + l1_gas: gas.l1_gas, + l1_data_gas: gas.l1_data_gas, + l2_gas: gas.l2_gas, + } + } +} + +impl ReceiptBody { + /// Convert the receipt body to an RPC execution result. + /// + /// This uses the `execution_status` field if available, otherwise falls back to checking + /// the `revert_error` field. If `revert_error` is present, the result is `Reverted`. + pub fn to_execution_result(&self) -> katana_rpc_types::ExecutionResult { + if let Some(revert_error) = &self.revert_error { + katana_rpc_types::ExecutionResult::Reverted { reason: revert_error.clone() } + } else if let Some(status) = &self.execution_status { + match status { + ExecutionStatus::Succeeded => katana_rpc_types::ExecutionResult::Succeeded, + ExecutionStatus::Reverted => { + // Reverted status without error message + katana_rpc_types::ExecutionResult::Reverted { + reason: String::from("Transaction reverted"), + } + } + } + } else { + // If no status is provided, assume success + katana_rpc_types::ExecutionResult::Succeeded + } + } + + /// Convert to an RPC FeePayment with the given price unit. + pub fn to_fee_payment( + &self, + unit: katana_primitives::fee::PriceUnit, + ) -> katana_rpc_types::FeePayment { + katana_rpc_types::FeePayment { amount: self.actual_fee, unit } + } +} + +impl ConfirmedReceipt { + /// Create an RPC Invoke receipt from this gateway receipt. + /// + /// # Arguments + /// * `finality_status` - The finality status of the transaction + /// * `fee_unit` - The price unit for the fee + pub fn to_rpc_invoke_receipt( + self, + finality_status: katana_primitives::block::FinalityStatus, + fee_unit: katana_primitives::fee::PriceUnit, + ) -> katana_rpc_types::RpcInvokeTxReceipt { + let execution_result = self.body.to_execution_result(); + let actual_fee = self.body.to_fee_payment(fee_unit); + + katana_rpc_types::RpcInvokeTxReceipt { + actual_fee, + finality_status, + messages_sent: self.body.l2_to_l1_messages, + events: self.body.events, + execution_resources: self.body.execution_resources.unwrap_or_default().into(), + execution_result, + } + } + + /// Create an RPC Declare receipt from this gateway receipt. + /// + /// # Arguments + /// * `finality_status` - The finality status of the transaction + /// * `fee_unit` - The price unit for the fee + pub fn to_rpc_declare_receipt( + self, + finality_status: katana_primitives::block::FinalityStatus, + fee_unit: katana_primitives::fee::PriceUnit, + ) -> katana_rpc_types::RpcDeclareTxReceipt { + let execution_result = self.body.to_execution_result(); + let actual_fee = self.body.to_fee_payment(fee_unit); + + katana_rpc_types::RpcDeclareTxReceipt { + actual_fee, + finality_status, + messages_sent: self.body.l2_to_l1_messages, + events: self.body.events, + execution_resources: self.body.execution_resources.unwrap_or_default().into(), + execution_result, + } + } + + /// Create an RPC Deploy receipt from this gateway receipt. + /// + /// # Arguments + /// * `finality_status` - The finality status of the transaction + /// * `fee_unit` - The price unit for the fee + /// * `contract_address` - The deployed contract address + pub fn to_rpc_deploy_receipt( + self, + finality_status: katana_primitives::block::FinalityStatus, + fee_unit: katana_primitives::fee::PriceUnit, + contract_address: ContractAddress, + ) -> katana_rpc_types::RpcDeployTxReceipt { + let execution_result = self.body.to_execution_result(); + let actual_fee = self.body.to_fee_payment(fee_unit); + + katana_rpc_types::RpcDeployTxReceipt { + actual_fee, + finality_status, + messages_sent: self.body.l2_to_l1_messages, + events: self.body.events, + execution_resources: self.body.execution_resources.unwrap_or_default().into(), + contract_address, + execution_result, + } + } + + /// Create an RPC DeployAccount receipt from this gateway receipt. + /// + /// # Arguments + /// * `finality_status` - The finality status of the transaction + /// * `fee_unit` - The price unit for the fee + /// * `contract_address` - The deployed account contract address + pub fn to_rpc_deploy_account_receipt( + self, + finality_status: katana_primitives::block::FinalityStatus, + fee_unit: katana_primitives::fee::PriceUnit, + contract_address: ContractAddress, + ) -> katana_rpc_types::RpcDeployAccountTxReceipt { + let execution_result = self.body.to_execution_result(); + let actual_fee = self.body.to_fee_payment(fee_unit); + + katana_rpc_types::RpcDeployAccountTxReceipt { + actual_fee, + finality_status, + messages_sent: self.body.l2_to_l1_messages, + events: self.body.events, + execution_resources: self.body.execution_resources.unwrap_or_default().into(), + contract_address, + execution_result, + } + } + + /// Create an RPC L1Handler receipt from this gateway receipt. + /// + /// # Arguments + /// * `finality_status` - The finality status of the transaction + /// * `fee_unit` - The price unit for the fee + /// * `message_hash` - The L1 to L2 message hash + pub fn to_rpc_l1_handler_receipt( + self, + finality_status: katana_primitives::block::FinalityStatus, + fee_unit: katana_primitives::fee::PriceUnit, + message_hash: katana_primitives::B256, + ) -> katana_rpc_types::RpcL1HandlerTxReceipt { + let execution_result = self.body.to_execution_result(); + let actual_fee = self.body.to_fee_payment(fee_unit); + + katana_rpc_types::RpcL1HandlerTxReceipt { + actual_fee, + finality_status, + messages_sent: self.body.l2_to_l1_messages, + events: self.body.events, + execution_resources: self.body.execution_resources.unwrap_or_default().into(), + message_hash, + execution_result, + } + } +} + +impl Default for ExecutionResources { + fn default() -> Self { + Self { + vm_resources: Default::default(), + data_availability: None, + total_gas_consumed: Some(Default::default()), + } + } +} + +#[cfg(test)] +mod tests { + use katana_primitives::block::FinalityStatus; + use katana_primitives::fee::PriceUnit; + use katana_primitives::receipt::GasUsed; + use katana_primitives::{address, felt}; + + use super::*; + + fn create_test_receipt_body() -> ReceiptBody { + ReceiptBody { + execution_resources: Some(ExecutionResources { + vm_resources: Default::default(), + data_availability: None, + total_gas_consumed: Some(GasUsed { l1_gas: 100, l1_data_gas: 50, l2_gas: 200 }), + }), + l1_to_l2_consumed_message: None, + l2_to_l1_messages: vec![], + events: vec![], + actual_fee: felt!("0x1234"), + execution_status: Some(ExecutionStatus::Succeeded), + revert_error: None, + } + } + + #[test] + fn test_execution_status_to_execution_result() { + let succeeded: katana_rpc_types::ExecutionResult = ExecutionStatus::Succeeded.into(); + assert_eq!(succeeded, katana_rpc_types::ExecutionResult::Succeeded); + + let reverted: katana_rpc_types::ExecutionResult = ExecutionStatus::Reverted.into(); + match reverted { + katana_rpc_types::ExecutionResult::Reverted { reason } => { + assert_eq!(reason, "Transaction reverted"); + } + _ => panic!("Expected Reverted result"), + } + } + + #[test] + fn test_execution_resources_conversion() { + let gateway_resources = ExecutionResources { + vm_resources: Default::default(), + data_availability: None, + total_gas_consumed: Some(GasUsed { l1_gas: 100, l1_data_gas: 50, l2_gas: 200 }), + }; + + let rpc_resources: katana_rpc_types::ExecutionResources = gateway_resources.into(); + assert_eq!(rpc_resources.l1_gas, 100); + assert_eq!(rpc_resources.l1_data_gas, 50); + assert_eq!(rpc_resources.l2_gas, 200); + } + + #[test] + fn test_receipt_body_to_execution_result_with_revert_error() { + let body = ReceiptBody { + execution_resources: None, + l1_to_l2_consumed_message: None, + l2_to_l1_messages: vec![], + events: vec![], + actual_fee: felt!("0x0"), + execution_status: Some(ExecutionStatus::Succeeded), + revert_error: Some("Out of gas".to_string()), + }; + + let result = body.to_execution_result(); + match result { + katana_rpc_types::ExecutionResult::Reverted { reason } => { + assert_eq!(reason, "Out of gas"); + } + _ => panic!("Expected Reverted result"), + } + } + + #[test] + fn test_receipt_body_to_execution_result_succeeded() { + let body = create_test_receipt_body(); + let result = body.to_execution_result(); + assert_eq!(result, katana_rpc_types::ExecutionResult::Succeeded); + } + + #[test] + fn test_to_rpc_invoke_receipt() { + let gateway_receipt = ConfirmedReceipt { + transaction_hash: felt!("0xabc"), + transaction_index: 5, + body: create_test_receipt_body(), + }; + + let rpc_receipt = gateway_receipt + .clone() + .to_rpc_invoke_receipt(FinalityStatus::AcceptedOnL2, PriceUnit::Wei); + + assert_eq!(rpc_receipt.actual_fee.amount, felt!("0x1234")); + assert_eq!(rpc_receipt.actual_fee.unit, PriceUnit::Wei); + assert_eq!(rpc_receipt.finality_status, FinalityStatus::AcceptedOnL2); + assert_eq!(rpc_receipt.execution_resources.l1_gas, 100); + assert_eq!(rpc_receipt.execution_resources.l2_gas, 200); + assert_eq!(rpc_receipt.execution_result, katana_rpc_types::ExecutionResult::Succeeded); + } + + #[test] + fn test_to_rpc_declare_receipt() { + let gateway_receipt = ConfirmedReceipt { + transaction_hash: felt!("0xdef"), + transaction_index: 10, + body: create_test_receipt_body(), + }; + + let rpc_receipt = gateway_receipt + .clone() + .to_rpc_declare_receipt(FinalityStatus::AcceptedOnL1, PriceUnit::Fri); + + assert_eq!(rpc_receipt.actual_fee.amount, felt!("0x1234")); + assert_eq!(rpc_receipt.actual_fee.unit, PriceUnit::Fri); + assert_eq!(rpc_receipt.finality_status, FinalityStatus::AcceptedOnL1); + } + + #[test] + fn test_to_rpc_deploy_receipt() { + let gateway_receipt = ConfirmedReceipt { + transaction_hash: felt!("0x123"), + transaction_index: 1, + body: create_test_receipt_body(), + }; + + let contract_address = address!("0x456"); + let rpc_receipt = gateway_receipt.clone().to_rpc_deploy_receipt( + FinalityStatus::AcceptedOnL2, + PriceUnit::Wei, + contract_address, + ); + + assert_eq!(rpc_receipt.contract_address, contract_address); + assert_eq!(rpc_receipt.actual_fee.amount, felt!("0x1234")); + } + + #[test] + fn test_to_rpc_deploy_account_receipt() { + let gateway_receipt = ConfirmedReceipt { + transaction_hash: felt!("0x789"), + transaction_index: 2, + body: create_test_receipt_body(), + }; + + let contract_address = address!("0xabc"); + let rpc_receipt = gateway_receipt.clone().to_rpc_deploy_account_receipt( + FinalityStatus::AcceptedOnL2, + PriceUnit::Wei, + contract_address, + ); + + assert_eq!(rpc_receipt.contract_address, contract_address); + assert_eq!(rpc_receipt.execution_result, katana_rpc_types::ExecutionResult::Succeeded); + } + + #[test] + fn test_to_rpc_l1_handler_receipt() { + let gateway_receipt = ConfirmedReceipt { + transaction_hash: felt!("0xfff"), + transaction_index: 3, + body: create_test_receipt_body(), + }; + + let message_hash = katana_primitives::B256::from([1u8; 32]); + let rpc_receipt = gateway_receipt.clone().to_rpc_l1_handler_receipt( + FinalityStatus::AcceptedOnL2, + PriceUnit::Wei, + message_hash, + ); + + assert_eq!(rpc_receipt.message_hash, message_hash); + assert_eq!(rpc_receipt.actual_fee.amount, felt!("0x1234")); + } +} diff --git a/crates/gateway/gateway-types/src/transaction.rs b/crates/gateway/gateway-types/src/transaction.rs index ae24a88ef..a3c78dc55 100644 --- a/crates/gateway/gateway-types/src/transaction.rs +++ b/crates/gateway/gateway-types/src/transaction.rs @@ -310,6 +310,19 @@ pub enum TypedTransaction { DeployAccount(DeployAccountTx), } +impl TypedTransaction { + /// Returns the type of the transaction. + pub fn r#type(&self) -> TxType { + match self { + TypedTransaction::Deploy(_) => TxType::Deploy, + TypedTransaction::Declare(_) => TxType::Declare, + TypedTransaction::L1Handler(_) => TxType::L1Handler, + TypedTransaction::InvokeFunction(_) => TxType::Invoke, + TypedTransaction::DeployAccount(_) => TxType::DeployAccount, + } + } +} + /// Invoke transaction enum with version-specific variants #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "version")] @@ -490,27 +503,94 @@ impl<'de> Deserialize<'de> for DataAvailabilityMode { } } +fn deserialize_resource_bounds_mapping<'de, D: Deserializer<'de>>( + deserializer: D, +) -> Result { + #[derive(Deserialize)] + struct FeederGatewayResourceBounds { + #[serde(rename = "L1_GAS")] + l1_gas: ResourceBounds, + #[serde(rename = "L2_GAS")] + l2_gas: ResourceBounds, + #[serde(rename = "L1_DATA_GAS")] + l1_data_gas: Option, + } + + let bounds = FeederGatewayResourceBounds::deserialize(deserializer)?; + + if let Some(l1_data_gas) = bounds.l1_data_gas { + Ok(ResourceBoundsMapping::All(AllResourceBoundsMapping { + l1_gas: bounds.l1_gas, + l2_gas: bounds.l2_gas, + l1_data_gas, + })) + } else { + Ok(ResourceBoundsMapping::L1Gas(L1GasResourceBoundsMapping { + l1_gas: bounds.l1_gas, + l2_gas: bounds.l2_gas, + })) + } +} + +fn serialize_resource_bounds_mapping( + bounds: &ResourceBoundsMapping, + serializer: S, +) -> Result { + #[derive(Serialize)] + struct FeederGatewayResourceBounds<'a> { + #[serde(rename = "L1_GAS")] + l1_gas: &'a ResourceBounds, + #[serde(rename = "L2_GAS")] + l2_gas: &'a ResourceBounds, + #[serde(rename = "L1_DATA_GAS")] + l1_data_gas: Option<&'a ResourceBounds>, + } + + let feeder_bounds = match bounds { + ResourceBoundsMapping::All(all_bounds) => FeederGatewayResourceBounds { + l1_gas: &all_bounds.l1_gas, + l2_gas: &all_bounds.l2_gas, + l1_data_gas: Some(&all_bounds.l1_data_gas), + }, + ResourceBoundsMapping::L1Gas(l1_gas_bounds) => FeederGatewayResourceBounds { + l1_gas: &l1_gas_bounds.l1_gas, + l2_gas: &l1_gas_bounds.l2_gas, + l1_data_gas: None, + }, + }; + + feeder_bounds.serialize(serializer) +} + +//////////////////////////////////////////////////////////////////////////////// +// Conversion to katana-primitives types +//////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, thiserror::Error)] pub enum TxTryFromError { #[error("unsupported transaction version; type: {r#type:?}, version: {version:#x}")] UnsupportedVersion { r#type: TxType, version: Felt }, } -// -- Conversion to Katana primitive types. - impl TryFrom for TxWithHash { type Error = TxTryFromError; fn try_from(tx: ConfirmedTransaction) -> Result { - let transaction = match tx.transaction { + Ok(TxWithHash { hash: tx.transaction_hash, transaction: tx.transaction.try_into()? }) + } +} + +impl TryFrom for Tx { + type Error = TxTryFromError; + + fn try_from(tx: TypedTransaction) -> Result { + Ok(match tx { TypedTransaction::Deploy(tx) => Tx::Deploy(tx), - TypedTransaction::Declare(tx) => Tx::Declare(tx.try_into()?), TypedTransaction::L1Handler(tx) => Tx::L1Handler(tx.into()), + TypedTransaction::Declare(tx) => Tx::Declare(tx.try_into()?), TypedTransaction::InvokeFunction(tx) => Tx::Invoke(tx.try_into()?), TypedTransaction::DeployAccount(tx) => Tx::DeployAccount(tx.try_into()?), - }; - - Ok(TxWithHash { hash: tx.transaction_hash, transaction }) + }) } } @@ -659,63 +739,142 @@ impl From for katana_primitives::da::DataAvailabilityMode } } -fn deserialize_resource_bounds_mapping<'de, D: Deserializer<'de>>( - deserializer: D, -) -> Result { - #[derive(Deserialize)] - struct FeederGatewayResourceBounds { - #[serde(rename = "L1_GAS")] - l1_gas: ResourceBounds, - #[serde(rename = "L2_GAS")] - l2_gas: ResourceBounds, - #[serde(rename = "L1_DATA_GAS")] - l1_data_gas: Option, +//////////////////////////////////////////////////////////////////////////////// +// Conversion to katana-rpc-types types +//////////////////////////////////////////////////////////////////////////////// + +impl From for katana_rpc_types::RpcTxWithHash { + fn from(value: ConfirmedTransaction) -> Self { + Self { transaction_hash: value.transaction_hash, transaction: value.transaction.into() } } +} - let bounds = FeederGatewayResourceBounds::deserialize(deserializer)?; +impl From for katana_rpc_types::RpcTx { + fn from(value: TypedTransaction) -> Self { + match value { + TypedTransaction::Deploy(tx) => { + katana_rpc_types::RpcTx::Deploy(katana_rpc_types::RpcDeployTx { + version: tx.version, + class_hash: tx.class_hash, + constructor_calldata: tx.constructor_calldata, + contract_address_salt: tx.contract_address_salt, + }) + } + TypedTransaction::L1Handler(tx) => katana_rpc_types::RpcTx::L1Handler(tx.into()), + TypedTransaction::Declare(tx) => katana_rpc_types::RpcTx::Declare(tx.into()), + TypedTransaction::InvokeFunction(tx) => katana_rpc_types::RpcTx::Invoke(tx.into()), + TypedTransaction::DeployAccount(tx) => { + katana_rpc_types::RpcTx::DeployAccount(tx.into()) + } + } + } +} - if let Some(l1_data_gas) = bounds.l1_data_gas { - Ok(ResourceBoundsMapping::All(AllResourceBoundsMapping { - l1_gas: bounds.l1_gas, - l2_gas: bounds.l2_gas, - l1_data_gas, - })) - } else { - Ok(ResourceBoundsMapping::L1Gas(L1GasResourceBoundsMapping { - l1_gas: bounds.l1_gas, - l2_gas: bounds.l2_gas, - })) +impl From for katana_rpc_types::RpcInvokeTx { + fn from(value: InvokeTx) -> Self { + match value { + InvokeTx::V0(tx) => katana_rpc_types::RpcInvokeTx::V0(tx), + InvokeTx::V1(tx) => katana_rpc_types::RpcInvokeTx::V1(tx), + InvokeTx::V3(tx) => katana_rpc_types::RpcInvokeTx::V3(tx.into()), + } } } -fn serialize_resource_bounds_mapping( - bounds: &ResourceBoundsMapping, - serializer: S, -) -> Result { - #[derive(Serialize)] - struct FeederGatewayResourceBounds<'a> { - #[serde(rename = "L1_GAS")] - l1_gas: &'a ResourceBounds, - #[serde(rename = "L2_GAS")] - l2_gas: &'a ResourceBounds, - #[serde(rename = "L1_DATA_GAS")] - l1_data_gas: Option<&'a ResourceBounds>, +impl From for katana_rpc_types::RpcInvokeTxV3 { + fn from(value: InvokeTxV3) -> Self { + Self { + sender_address: value.sender_address, + calldata: value.calldata, + signature: value.signature, + nonce: value.nonce, + resource_bounds: value.resource_bounds, + tip: value.tip, + paymaster_data: value.paymaster_data, + account_deployment_data: value.account_deployment_data, + nonce_data_availability_mode: value.nonce_data_availability_mode.into(), + fee_data_availability_mode: value.fee_data_availability_mode.into(), + } } +} - let feeder_bounds = match bounds { - ResourceBoundsMapping::All(all_bounds) => FeederGatewayResourceBounds { - l1_gas: &all_bounds.l1_gas, - l2_gas: &all_bounds.l2_gas, - l1_data_gas: Some(&all_bounds.l1_data_gas), - }, - ResourceBoundsMapping::L1Gas(l1_gas_bounds) => FeederGatewayResourceBounds { - l1_gas: &l1_gas_bounds.l1_gas, - l2_gas: &l1_gas_bounds.l2_gas, - l1_data_gas: None, - }, - }; +impl From for katana_rpc_types::RpcDeclareTx { + fn from(value: DeclareTx) -> Self { + match value { + DeclareTx::V0(tx) => katana_rpc_types::RpcDeclareTx::V0(tx), + DeclareTx::V1(tx) => katana_rpc_types::RpcDeclareTx::V1(tx), + DeclareTx::V2(tx) => katana_rpc_types::RpcDeclareTx::V2(tx), + DeclareTx::V3(tx) => katana_rpc_types::RpcDeclareTx::V3(tx.into()), + } + } +} - feeder_bounds.serialize(serializer) +impl From for katana_rpc_types::RpcDeclareTxV3 { + fn from(value: DeclareTxV3) -> Self { + Self { + sender_address: value.sender_address, + compiled_class_hash: value.compiled_class_hash, + signature: value.signature, + nonce: value.nonce, + class_hash: value.class_hash, + resource_bounds: value.resource_bounds, + tip: value.tip, + paymaster_data: value.paymaster_data, + account_deployment_data: value.account_deployment_data, + nonce_data_availability_mode: value.nonce_data_availability_mode.into(), + fee_data_availability_mode: value.fee_data_availability_mode.into(), + } + } +} + +impl From for katana_rpc_types::RpcDeployAccountTx { + fn from(value: DeployAccountTx) -> Self { + match value { + DeployAccountTx::V1(tx) => katana_rpc_types::RpcDeployAccountTx::V1(tx.into()), + DeployAccountTx::V3(tx) => katana_rpc_types::RpcDeployAccountTx::V3(tx.into()), + } + } +} + +impl From for katana_rpc_types::RpcDeployAccountTxV1 { + fn from(value: DeployAccountTxV1) -> Self { + Self { + max_fee: value.max_fee, + signature: value.signature, + nonce: value.nonce, + contract_address_salt: value.contract_address_salt, + constructor_calldata: value.constructor_calldata, + class_hash: value.class_hash, + } + } +} + +impl From for katana_rpc_types::RpcDeployAccountTxV3 { + fn from(value: DeployAccountTxV3) -> Self { + Self { + signature: value.signature, + nonce: value.nonce, + contract_address_salt: value.contract_address_salt, + constructor_calldata: value.constructor_calldata, + class_hash: value.class_hash, + resource_bounds: value.resource_bounds, + tip: value.tip, + paymaster_data: value.paymaster_data, + nonce_data_availability_mode: value.nonce_data_availability_mode.into(), + fee_data_availability_mode: value.fee_data_availability_mode.into(), + } + } +} + +impl From for katana_rpc_types::RpcL1HandlerTx { + fn from(value: L1HandlerTx) -> Self { + Self { + version: value.version, + nonce: value.nonce.unwrap_or_default(), + contract_address: value.contract_address, + entry_point_selector: value.entry_point_selector, + calldata: value.calldata, + } + } } #[cfg(test)] diff --git a/crates/gateway/gateway-types/tests/types.rs b/crates/gateway/gateway-types/tests/types.rs index bab464518..8f5194104 100644 --- a/crates/gateway/gateway-types/tests/types.rs +++ b/crates/gateway/gateway-types/tests/types.rs @@ -95,6 +95,105 @@ fn state_diff_empty_conversion() { assert!(state_updates.replaced_classes.is_empty()); } +#[test] +fn state_diff_merge_merges_entries() { + let contract_a = address!("0x1"); + let contract_b = address!("0x2"); + + let mut base_storage = BTreeMap::new(); + base_storage.insert( + contract_a, + vec![ + StorageDiff { key: felt!("0x10"), value: felt!("0x100") }, + StorageDiff { key: felt!("0x11"), value: felt!("0x101") }, + ], + ); + + let deployed_a = DeployedContract { address: address!("0x300"), class_hash: felt!("0xaaa") }; + let base = StateDiff { + storage_diffs: base_storage, + deployed_contracts: vec![deployed_a.clone()], + old_declared_contracts: vec![felt!("0x400")], + declared_classes: vec![DeclaredContract { + class_hash: felt!("0x500"), + compiled_class_hash: felt!("0x501"), + }], + nonces: BTreeMap::from([(contract_a, felt!("0x1"))]), + replaced_classes: vec![DeployedContract { + address: address!("0x350"), + class_hash: felt!("0x900"), + }], + }; + + let mut other_storage = BTreeMap::new(); + other_storage.insert( + contract_a, + vec![ + StorageDiff { key: felt!("0x11"), value: felt!("0x202") }, + StorageDiff { key: felt!("0x12"), value: felt!("0x203") }, + ], + ); + other_storage + .insert(contract_b, vec![StorageDiff { key: felt!("0x20"), value: felt!("0x204") }]); + + let other = StateDiff { + storage_diffs: other_storage, + deployed_contracts: vec![ + DeployedContract { address: deployed_a.address, class_hash: felt!("0xbbb") }, + DeployedContract { address: address!("0x301"), class_hash: felt!("0xccc") }, + ], + old_declared_contracts: vec![felt!("0x400"), felt!("0x401")], + declared_classes: vec![ + DeclaredContract { class_hash: felt!("0x500"), compiled_class_hash: felt!("0x999") }, + DeclaredContract { class_hash: felt!("0x502"), compiled_class_hash: felt!("0x503") }, + ], + nonces: BTreeMap::from([(contract_a, felt!("0x5")), (contract_b, felt!("0x6"))]), + replaced_classes: vec![ + DeployedContract { address: address!("0x350"), class_hash: felt!("0x901") }, + DeployedContract { address: address!("0x351"), class_hash: felt!("0x902") }, + ], + }; + + let merged = base.merge(other); + + // storage diff merge + let merged_storage_a = merged.storage_diffs.get(&contract_a).expect("storage for contract A"); + assert_eq!(merged_storage_a.len(), 3); + assert_eq!(merged_storage_a[1].value, felt!("0x202")); + assert_eq!(merged_storage_a[2].key, felt!("0x12")); + assert_eq!( + merged.storage_diffs.get(&contract_b).expect("storage for contract B")[0].value, + felt!("0x204") + ); + + // deployed contracts updated + let deployed_by_addr: BTreeMap<_, _> = + merged.deployed_contracts.iter().map(|c| (c.address, c.class_hash)).collect(); + assert_eq!(deployed_by_addr.get(&deployed_a.address), Some(&felt!("0xbbb"))); + assert_eq!(deployed_by_addr.get(&address!("0x301")), Some(&felt!("0xccc"))); + + // replaced contracts updated + let replaced_by_addr: BTreeMap<_, _> = + merged.replaced_classes.iter().map(|c| (c.address, c.class_hash)).collect(); + assert_eq!(replaced_by_addr.get(&address!("0x350")), Some(&felt!("0x901"))); + assert_eq!(replaced_by_addr.get(&address!("0x351")), Some(&felt!("0x902"))); + + // declared classes merged + let declared_by_hash: BTreeMap<_, _> = + merged.declared_classes.iter().map(|c| (c.class_hash, c.compiled_class_hash)).collect(); + assert_eq!(declared_by_hash.get(&felt!("0x500")), Some(&felt!("0x999"))); + assert_eq!(declared_by_hash.get(&felt!("0x502")), Some(&felt!("0x503"))); + + // deprecated declared classes deduplicated + assert!(merged.old_declared_contracts.contains(&felt!("0x400"))); + assert!(merged.old_declared_contracts.contains(&felt!("0x401"))); + assert_eq!(merged.old_declared_contracts.iter().filter(|&&h| h == felt!("0x400")).count(), 1); + + // nonces override and extend + assert_eq!(merged.nonces.get(&contract_a), Some(&felt!("0x5"))); + assert_eq!(merged.nonces.get(&contract_b), Some(&felt!("0x6"))); +} + #[test] fn receipt_serde_succeeded() { let json = json!({ diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index e64e86ff9..115d8f78a 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -12,15 +12,18 @@ katana-db.workspace = true katana-executor.workspace = true katana-gateway-server.workspace = true katana-gateway-client.workspace = true +katana-gateway-types.workspace = true katana-gas-price-oracle.workspace = true katana-messaging.workspace = true katana-metrics.workspace = true katana-pipeline.workspace = true katana-pool.workspace = true +katana-pool-api.workspace = true katana-primitives.workspace = true katana-provider.workspace = true -katana-rpc-server = { workspace = true } +katana-rpc-server.workspace = true katana-rpc-api.workspace = true +katana-rpc-types.workspace = true katana-rpc-client.workspace = true katana-stage.workspace = true katana-tasks.workspace = true @@ -30,6 +33,7 @@ futures.workspace = true http.workspace = true jsonrpsee.workspace = true serde.workspace = true +parking_lot.workspace = true thiserror.workspace = true tracing.workspace = true url.workspace = true diff --git a/crates/node/src/full/mod.rs b/crates/node/src/full/mod.rs index 786dcc14c..a96aceb0e 100644 --- a/crates/node/src/full/mod.rs +++ b/crates/node/src/full/mod.rs @@ -3,35 +3,58 @@ use std::future::IntoFuture; use std::sync::Arc; +use alloy_provider::RootProvider; use anyhow::Result; +use http::header::CONTENT_TYPE; +use http::Method; +use jsonrpsee::RpcModule; +use katana_chain_spec::ChainSpec; +use katana_executor::ExecutionFlags; +use katana_gas_price_oracle::GasPriceOracle; use katana_gateway_client::Client as SequencerGateway; use katana_metrics::exporters::prometheus::PrometheusRecorder; use katana_metrics::{Report, Server as MetricsServer}; use katana_pipeline::{Pipeline, PipelineHandle}; -use katana_pool::ordering::FiFo; -use katana_pool::pool::Pool; -use katana_pool::validation::NoopValidator; -use katana_primitives::transaction::ExecutableTxWithHash; +use katana_pool::ordering::TipOrdering; use katana_provider::providers::db::DbProvider; +use katana_provider::BlockchainProvider; +use katana_rpc_api::starknet::{StarknetApiServer, StarknetTraceApiServer, StarknetWriteApiServer}; +use katana_rpc_server::cors::Cors; +use katana_rpc_server::starknet::{StarknetApi, StarknetApiConfig}; +use katana_rpc_server::{RpcServer, RpcServerHandle}; use katana_stage::blocks::BatchBlockDownloader; -use katana_stage::{Blocks, Classes}; +use katana_stage::{Blocks, Classes, StateTrie}; use katana_tasks::TaskManager; use tracing::{error, info}; use crate::config::db::DbConfig; use crate::config::metrics::MetricsConfig; +use crate::full::pending::PreconfStateFactory; mod exit; +mod pending; +mod pool; pub mod tip_watcher; use exit::NodeStoppedFuture; use tip_watcher::ChainTipWatcher; -type TxPool = - Pool, FiFo>; - -#[derive(Debug, Clone)] +use crate::config::rpc::{RpcConfig, RpcModuleKind}; +use crate::full::pool::{FullNodePool, GatewayProxyValidator}; + +#[derive( + Debug, + Copy, + Clone, + serde::Serialize, + serde::Deserialize, + PartialEq, + Default, + strum::Display, + strum::EnumString, +)] pub enum Network { + #[default] Mainnet, Sepolia, } @@ -39,6 +62,7 @@ pub enum Network { #[derive(Debug)] pub struct Config { pub db: DbConfig, + pub rpc: RpcConfig, pub metrics: Option, pub gateway_api_key: Option, pub eth_rpc_url: String, @@ -48,10 +72,13 @@ pub struct Config { #[derive(Debug)] pub struct Node { pub db: katana_db::Db, - pub pool: TxPool, + pub pool: FullNodePool, pub config: Arc, pub task_manager: TaskManager, pub pipeline: Pipeline, + pub rpc_server: RpcServer, + pub gateway_client: SequencerGateway, + pub chain_tip_watcher: ChainTipWatcher, } impl Node { @@ -65,6 +92,7 @@ impl Node { // -- build task manager let task_manager = TaskManager::current(); + let task_spawner = task_manager.task_spawner(); // -- build db and storage provider @@ -75,26 +103,137 @@ impl Node { let provider = DbProvider::new(db.clone()); + // --- build gateway client + + let gateway_client = match config.network { + Network::Mainnet => SequencerGateway::mainnet(), + Network::Sepolia => SequencerGateway::sepolia(), + }; + + let gateway_client = if let Some(ref key) = config.gateway_api_key { + gateway_client.with_api_key(key.clone()) + } else { + gateway_client + }; + // --- build transaction pool - let pool = TxPool::new(NoopValidator::new(), FiFo::new()); + let validator = GatewayProxyValidator::new(gateway_client.clone()); + let pool = FullNodePool::new(validator, TipOrdering::new()); // --- build pipeline - let fgw = if let Some(ref key) = config.gateway_api_key { - SequencerGateway::sepolia().with_api_key(key.clone()) - } else { - SequencerGateway::sepolia() + let (mut pipeline, pipeline_handle) = Pipeline::new(provider.clone(), 50); + let block_downloader = BatchBlockDownloader::new_gateway(gateway_client.clone(), 8); + pipeline.add_stage(Blocks::new(provider.clone(), block_downloader)); + pipeline.add_stage(Classes::new(provider.clone(), gateway_client.clone(), 8)); + pipeline.add_stage(StateTrie::new(provider.clone())); + + // -- + + let core_contract = match config.network { + Network::Mainnet => { + katana_starknet::StarknetCore::new_http_mainnet(&config.eth_rpc_url)? + } + Network::Sepolia => { + katana_starknet::StarknetCore::new_http_sepolia(&config.eth_rpc_url)? + } }; - let (mut pipeline, _) = Pipeline::new(provider.clone(), 64); - let block_downloader = BatchBlockDownloader::new_gateway(fgw.clone(), 3); - pipeline.add_stage(Blocks::new(provider.clone(), block_downloader)); - pipeline.add_stage(Classes::new(provider, fgw.clone(), 3)); + let chain_tip_watcher = ChainTipWatcher::new(core_contract); + + let preconf_factory = PreconfStateFactory::new( + provider.clone(), + gateway_client.clone(), + pipeline_handle.subscribe_blocks(), + chain_tip_watcher.subscribe(), + ); + + // --- build rpc server + + let mut rpc_modules = RpcModule::new(()); + + let cors = Cors::new() + .allow_origins(config.rpc.cors_origins.clone()) + // Allow `POST` when accessing the resource + .allow_methods([Method::POST, Method::GET]) + .allow_headers([CONTENT_TYPE, "argent-client".parse().unwrap(), "argent-version".parse().unwrap()]); - let node = Node { pool, config: Arc::new(config), task_manager, pipeline, db }; + // // --- build starknet api + + let starknet_api_cfg = StarknetApiConfig { + max_event_page_size: config.rpc.max_event_page_size, + max_proof_keys: config.rpc.max_proof_keys, + max_call_gas: config.rpc.max_call_gas, + max_concurrent_estimate_fee_requests: config.rpc.max_concurrent_estimate_fee_requests, + simulation_flags: ExecutionFlags::default(), + versioned_constant_overrides: None, + #[cfg(feature = "cartridge")] + paymaster: None, + }; + + let chain_spec = match config.network { + Network::Mainnet => ChainSpec::mainnet(), + Network::Sepolia => ChainSpec::sepolia(), + }; + + let starknet_api = StarknetApi::new( + Arc::new(chain_spec), + BlockchainProvider::new(Box::new(provider.clone())), + pool.clone(), + task_spawner.clone(), + preconf_factory, + GasPriceOracle::create_for_testing(), + starknet_api_cfg, + ); + + if config.rpc.apis.contains(&RpcModuleKind::Starknet) { + #[cfg(feature = "explorer")] + if config.rpc.explorer { + use katana_rpc_api::starknet_ext::StarknetApiExtServer; + rpc_modules.merge(StarknetApiExtServer::into_rpc(starknet_api.clone()))?; + } - Ok(node) + rpc_modules.merge(StarknetApiServer::into_rpc(starknet_api.clone()))?; + rpc_modules.merge(StarknetWriteApiServer::into_rpc(starknet_api.clone()))?; + rpc_modules.merge(StarknetTraceApiServer::into_rpc(starknet_api.clone()))?; + } + + #[allow(unused_mut)] + let mut rpc_server = + RpcServer::new().metrics(true).health_check(true).cors(cors).module(rpc_modules)?; + + #[cfg(feature = "explorer")] + { + rpc_server = rpc_server.explorer(config.rpc.explorer); + } + + if let Some(timeout) = config.rpc.timeout { + rpc_server = rpc_server.timeout(timeout); + }; + + if let Some(max_connections) = config.rpc.max_connections { + rpc_server = rpc_server.max_connections(max_connections); + } + + if let Some(max_request_body_size) = config.rpc.max_request_body_size { + rpc_server = rpc_server.max_request_body_size(max_request_body_size); + } + + if let Some(max_response_body_size) = config.rpc.max_response_body_size { + rpc_server = rpc_server.max_response_body_size(max_response_body_size); + } + + Ok(Node { + db, + pool, + pipeline, + rpc_server, + task_manager, + gateway_client, + chain_tip_watcher, + config: Arc::new(config), + }) } pub async fn launch(self) -> Result { @@ -111,18 +250,7 @@ impl Node { let pipeline_handle = self.pipeline.handle(); - let core_contract = match self.config.network { - Network::Mainnet => { - katana_starknet::StarknetCore::new_http_mainnet(&self.config.eth_rpc_url).await? - } - Network::Sepolia => { - katana_starknet::StarknetCore::new_http_sepolia(&self.config.eth_rpc_url).await? - } - }; - - let tip_watcher = ChainTipWatcher::new(core_contract); - - let mut tip_subscription = tip_watcher.subscribe(); + let mut tip_subscription = self.chain_tip_watcher.subscribe(); let pipeline_handle_clone = pipeline_handle.clone(); self.task_manager @@ -136,7 +264,7 @@ impl Node { .build_task() .graceful_shutdown() .name("Chain tip watcher") - .spawn(tip_watcher.into_future()); + .spawn(self.chain_tip_watcher.into_future()); // spawn a task for updating the pipeline's tip based on chain tip changes self.task_manager.task_spawner().spawn(async move { @@ -151,12 +279,16 @@ impl Node { } }); + // --- start the rpc server + + let rpc = self.rpc_server.start(self.config.rpc.socket_addr()).await?; + Ok(LaunchedNode { db: self.db, - pipeline_handle, - pool: self.pool, config: self.config, task_manager: self.task_manager, + pipeline: pipeline_handle, + rpc, }) } } @@ -164,15 +296,20 @@ impl Node { #[derive(Debug)] pub struct LaunchedNode { pub db: katana_db::Db, - pub pool: TxPool, pub task_manager: TaskManager, pub config: Arc, - pub pipeline_handle: PipelineHandle, + pub rpc: RpcServerHandle, + pub pipeline: PipelineHandle, } impl LaunchedNode { pub async fn stop(&self) -> Result<()> { + self.rpc.stop()?; + self.pipeline.stop(); + + self.pipeline.stopped().await; self.task_manager.shutdown().await; + Ok(()) } diff --git a/crates/node/src/full/pending/mod.rs b/crates/node/src/full/pending/mod.rs new file mode 100644 index 000000000..db98e91bd --- /dev/null +++ b/crates/node/src/full/pending/mod.rs @@ -0,0 +1,194 @@ +use std::sync::Arc; +use std::time::Duration; + +use katana_gateway_client::Client; +use katana_gateway_types::{ConfirmedTransaction, ErrorCode, PreConfirmedBlock, StateDiff}; +use katana_pipeline::PipelineBlockSubscription; +use katana_primitives::block::BlockNumber; +use katana_primitives::state::StateUpdates; +use katana_provider::api::state::StateFactoryProvider; +use parking_lot::Mutex; +use tracing::error; + +use crate::full::pending::state::PreconfStateProvider; +use crate::full::tip_watcher::TipSubscription; + +mod provider; +pub mod state; + +#[derive(Debug)] +pub struct PreconfStateFactory { + // from pipeline + latest_synced_block: PipelineBlockSubscription, + gateway_client: Client, + provider: P, + + // shared state + shared_preconf_block: SharedPreconfBlockData, +} + +impl PreconfStateFactory

{ + pub fn new( + state_factory_provider: P, + gateway_client: Client, + latest_synced_block: PipelineBlockSubscription, + tip_subscription: TipSubscription, + ) -> Self { + let shared_preconf_block = SharedPreconfBlockData::default(); + + let mut worker = PreconfBlockWatcher { + interval: DEFAULT_INTERVAL, + latest_block: tip_subscription, + gateway_client: gateway_client.clone(), + latest_synced_block: latest_synced_block.clone(), + shared_preconf_block: shared_preconf_block.clone(), + }; + + tokio::spawn(async move { worker.run().await }); + + Self { + gateway_client, + latest_synced_block, + shared_preconf_block, + provider: state_factory_provider, + } + } + + pub fn state(&self) -> PreconfStateProvider { + let latest_block_num = self.latest_synced_block.block().unwrap(); + let base = self.provider.historical(latest_block_num.into()).unwrap().unwrap(); + + let preconf_block = self.shared_preconf_block.inner.lock(); + let preconf_block_id = preconf_block.as_ref().map(|b| b.preconf_block_id); + let preconf_state_updates = preconf_block.as_ref().map(|b| b.preconf_state_updates.clone()); + + PreconfStateProvider { + base, + preconf_block_id, + preconf_state_updates, + gateway: self.gateway_client.clone(), + } + } + + pub fn state_updates(&self) -> Option { + self.shared_preconf_block + .inner + .lock() + .as_ref() + .map(|preconf_data| preconf_data.preconf_state_updates.clone()) + } + + pub fn block(&self) -> Option { + self.shared_preconf_block + .inner + .lock() + .as_ref() + .map(|preconf_data| preconf_data.preconf_block.clone()) + } + + pub fn transactions(&self) -> Option> { + self.shared_preconf_block + .inner + .lock() + .as_ref() + .map(|preconf_data| preconf_data.preconf_block.transactions.clone()) + } +} + +#[derive(Debug, Default, Clone)] +struct SharedPreconfBlockData { + inner: Arc>>, +} + +#[derive(Debug)] +struct PreconfBlockData { + preconf_block_id: BlockNumber, + preconf_block: PreConfirmedBlock, + preconf_state_updates: StateUpdates, +} + +const DEFAULT_INTERVAL: Duration = Duration::from_millis(500); + +struct PreconfBlockWatcher { + interval: Duration, + gateway_client: Client, + + // from pipeline + latest_synced_block: PipelineBlockSubscription, + // from tip watcher (actual tip of the chain) + latest_block: TipSubscription, + + // shared state + shared_preconf_block: SharedPreconfBlockData, +} + +impl PreconfBlockWatcher { + async fn run(&mut self) { + let mut current_preconf_block_num = + self.latest_synced_block.block().map(|b| b + 1).unwrap_or(0); + + loop { + if current_preconf_block_num >= self.latest_block.tip() { + match self.gateway_client.get_preconfirmed_block(current_preconf_block_num).await { + Ok(preconf_block) => { + let preconf_state_diff: StateUpdates = preconf_block + .transaction_state_diffs + .clone() + .into_iter() + .fold(StateDiff::default(), |acc, diff| { + if let Some(diff) = diff { + acc.merge(diff) + } else { + acc + } + }) + .into(); + + // update shared state + let mut shared_data_lock = self.shared_preconf_block.inner.lock(); + if let Some(block) = shared_data_lock.as_mut() { + block.preconf_block = preconf_block; + block.preconf_block_id = current_preconf_block_num; + block.preconf_state_updates = preconf_state_diff; + } else { + *shared_data_lock = Some(PreconfBlockData { + preconf_block, + preconf_state_updates: preconf_state_diff, + preconf_block_id: current_preconf_block_num, + }) + } + } + + // this could either be because the latest block is still not synced to the + // chain's tip, in which case we just skip to the next + // iteration. + Err(katana_gateway_client::Error::Sequencer(error)) + if error.code == ErrorCode::BlockNotFound => + { + continue + } + + Err(err) => panic!("{err}"), + } + } + + tokio::select! { + biased; + + res = self.latest_synced_block.changed() => { + if let Err(err) = res { + error!(error = ?err, "Error receiving latest block number."); + break; + } + + let latest_synced_block_num = self.latest_synced_block.block().unwrap(); + current_preconf_block_num = latest_synced_block_num + 1; + } + + _ = tokio::time::sleep(self.interval) => { + current_preconf_block_num += 1; + } + } + } + } +} diff --git a/crates/node/src/full/pending/provider.rs b/crates/node/src/full/pending/provider.rs new file mode 100644 index 000000000..bae82c04e --- /dev/null +++ b/crates/node/src/full/pending/provider.rs @@ -0,0 +1,207 @@ +use std::fmt::Debug; + +use katana_gateway_types::TxTryFromError; +use katana_primitives::block::FinalityStatus; +use katana_primitives::fee::PriceUnit; +use katana_primitives::transaction::{TxHash, TxNumber, TxType, TxWithHash}; +use katana_primitives::Felt; +use katana_provider::api::state::{StateFactoryProvider, StateProvider}; +use katana_rpc_server::starknet::{PendingBlockProvider, StarknetApiResult}; +use katana_rpc_types::{ + PreConfirmedStateUpdate, ReceiptBlockInfo, RpcTxReceipt, RpcTxWithHash, TxReceiptWithBlockInfo, +}; + +use crate::full::pending::PreconfStateFactory; + +impl

PendingBlockProvider for PreconfStateFactory

+where + P: StateFactoryProvider + Debug + 'static, +{ + fn get_pending_block_with_txs( + &self, + ) -> StarknetApiResult> { + if let Some(block) = self.block() { + let transactions = block + .transactions + .clone() + .into_iter() + .map(|tx| Ok(RpcTxWithHash::from(TxWithHash::try_from(tx)?))) + .collect::, TxTryFromError>>() + .unwrap(); + + Ok(Some(katana_rpc_types::PreConfirmedBlockWithTxs { + transactions, + block_number: 0, + l1_da_mode: block.l1_da_mode, + l1_gas_price: block.l1_gas_price, + l2_gas_price: block.l2_gas_price, + l1_data_gas_price: block.l1_data_gas_price, + sequencer_address: block.sequencer_address, + starknet_version: block.starknet_version, + timestamp: block.timestamp, + })) + } else { + Ok(None) + } + } + + fn get_pending_block_with_receipts( + &self, + ) -> StarknetApiResult> { + if let Some(block) = self.block() { + Ok(Some(katana_rpc_types::PreConfirmedBlockWithReceipts { + transactions: Vec::new(), + block_number: 0, + l1_da_mode: block.l1_da_mode, + l1_gas_price: block.l1_gas_price, + l2_gas_price: block.l2_gas_price, + l1_data_gas_price: block.l1_data_gas_price, + sequencer_address: block.sequencer_address, + starknet_version: block.starknet_version, + timestamp: block.timestamp, + })) + } else { + Ok(None) + } + } + + fn get_pending_block_with_tx_hashes( + &self, + ) -> StarknetApiResult> { + if let Some(block) = self.block() { + let transactions = block + .transactions + .clone() + .into_iter() + .map(|tx| tx.transaction_hash) + .collect::>(); + + Ok(Some(katana_rpc_types::PreConfirmedBlockWithTxHashes { + transactions, + block_number: 0, + l1_da_mode: block.l1_da_mode, + l1_gas_price: block.l1_gas_price, + l2_gas_price: block.l2_gas_price, + l1_data_gas_price: block.l1_data_gas_price, + sequencer_address: block.sequencer_address, + starknet_version: block.starknet_version, + timestamp: block.timestamp, + })) + } else { + Ok(None) + } + } + + fn get_pending_receipt( + &self, + hash: TxHash, + ) -> StarknetApiResult> { + if let Some(preconf_block) = self.block() { + let receipt = preconf_block + .transaction_receipts + .iter() + .zip(preconf_block.transactions) + .filter_map(|(receipt, tx)| { + if let Some(receipt) = receipt { + Some((receipt.clone(), tx.transaction.r#type())) + } else { + None + } + }) + .find(|(receipt, ..)| receipt.transaction_hash == hash); + + let Some((receipt, r#type)) = receipt else { return Ok(None) }; + + let status = FinalityStatus::PreConfirmed; + let transaction_hash = receipt.transaction_hash; + let block = ReceiptBlockInfo::PreConfirmed { block_number: 0 }; + + let receipt = match r#type { + TxType::Invoke => { + RpcTxReceipt::Invoke(receipt.to_rpc_invoke_receipt(status, PriceUnit::Fri)) + } + + TxType::Declare => { + RpcTxReceipt::Declare(receipt.to_rpc_declare_receipt(status, PriceUnit::Fri)) + } + + TxType::Deploy => RpcTxReceipt::Deploy(receipt.to_rpc_deploy_receipt( + status, + PriceUnit::Fri, + Default::default(), + )), + + TxType::L1Handler => RpcTxReceipt::L1Handler(receipt.to_rpc_l1_handler_receipt( + status, + PriceUnit::Fri, + Default::default(), + )), + + TxType::DeployAccount => { + RpcTxReceipt::DeployAccount(receipt.to_rpc_deploy_account_receipt( + status, + PriceUnit::Fri, + Default::default(), + )) + } + }; + + Ok(Some(TxReceiptWithBlockInfo { transaction_hash, receipt, block })) + } else { + Ok(None) + } + } + + fn get_pending_state_update( + &self, + ) -> StarknetApiResult> { + if let Some(state_diff) = self.state_updates() { + Ok(Some(PreConfirmedStateUpdate { + old_root: Felt::ZERO, + state_diff: state_diff.into(), + })) + } else { + Ok(None) + } + } + + fn get_pending_transaction( + &self, + hash: TxHash, + ) -> StarknetApiResult> { + if let Some(preconf_transactions) = self.transactions() { + let transaction = preconf_transactions + .iter() + .find(|tx| tx.transaction_hash == hash) + .cloned() + .map(RpcTxWithHash::from); + + Ok(transaction) + } else { + Ok(None) + } + } + + fn get_pending_transaction_by_index( + &self, + index: TxNumber, + ) -> StarknetApiResult> { + if let Some(preconf_transactions) = self.transactions() { + Ok(preconf_transactions.get(index as usize).cloned().map(RpcTxWithHash::from)) + } else { + Ok(None) + } + } + + fn pending_state(&self) -> StarknetApiResult>> { + Ok(Some(Box::new(self.state()))) + } + + fn get_pending_trace( + &self, + hash: TxHash, + ) -> StarknetApiResult> { + let _ = hash; + Ok(None) + } +} diff --git a/crates/node/src/full/pending/state.rs b/crates/node/src/full/pending/state.rs new file mode 100644 index 000000000..bc3ba33eb --- /dev/null +++ b/crates/node/src/full/pending/state.rs @@ -0,0 +1,126 @@ +use katana_gateway_types::{ErrorCode, GatewayError}; +use katana_primitives::block::BlockNumber; +use katana_primitives::class::{ClassHash, CompiledClassHash, ContractClass}; +use katana_primitives::contract::{ContractAddress, Nonce, StorageKey, StorageValue}; +use katana_primitives::state::StateUpdates; +use katana_provider::api::contract::ContractClassProvider; +use katana_provider::api::state::{StateProofProvider, StateProvider, StateRootProvider}; +use katana_provider::{ProviderError, ProviderResult}; +use katana_rpc_types::ConversionError; +use tokio::runtime; + +#[allow(unused)] +pub struct PreconfStateProvider { + pub base: Box, + pub preconf_block_id: Option, + pub preconf_state_updates: Option, + pub gateway: katana_gateway_client::Client, +} + +impl StateProvider for PreconfStateProvider { + fn nonce(&self, address: ContractAddress) -> ProviderResult> { + if let Some(nonce) = self + .preconf_state_updates + .as_ref() + .and_then(|updates| updates.nonce_updates.get(&address)) + { + return Ok(Some(*nonce)); + } + + self.base.nonce(address) + } + + fn storage( + &self, + address: ContractAddress, + storage_key: StorageKey, + ) -> ProviderResult> { + if let Some(contract_storage) = self + .preconf_state_updates + .as_ref() + .and_then(|updates| updates.storage_updates.get(&address)) + { + if let Some(value) = contract_storage.get(&storage_key) { + return Ok(Some(*value)); + } + } + + self.base.storage(address, storage_key) + } + + fn class_hash_of_contract( + &self, + address: ContractAddress, + ) -> ProviderResult> { + if let Some(class_hash) = self + .preconf_state_updates + .as_ref() + .and_then(|updates| updates.replaced_classes.get(&address)) + { + return Ok(Some(*class_hash)); + } + + if let Some(class_hash) = self + .preconf_state_updates + .as_ref() + .and_then(|updates| updates.deployed_contracts.get(&address)) + { + return Ok(Some(*class_hash)); + } + + self.base.class_hash_of_contract(address) + } +} + +impl ContractClassProvider for PreconfStateProvider { + fn class(&self, hash: ClassHash) -> ProviderResult> { + if let Some(class) = self.base.class(hash)? { + return Ok(Some(class)); + } + + let result = runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(self.gateway.get_class(hash, katana_gateway_types::BlockId::Pending)); + + match result { + Ok(class) => { + let class = class + .try_into() + .map_err(|e: ConversionError| ProviderError::Other(e.to_string()))?; + Ok(Some(class)) + } + + Err(error) => { + if let katana_gateway_client::Error::Sequencer(GatewayError { + code: ErrorCode::UndeclaredClass, + .. + }) = error + { + Ok(None) + } else { + Err(ProviderError::Other(error.to_string())) + } + } + } + } + + fn compiled_class_hash_of_class_hash( + &self, + hash: ClassHash, + ) -> ProviderResult> { + if let Some(compiled_hash) = self + .preconf_state_updates + .as_ref() + .and_then(|updates| updates.declared_classes.get(&hash)) + { + return Ok(Some(*compiled_hash)); + } + + // Fallback to the base provider + self.base.compiled_class_hash_of_class_hash(hash) + } +} + +impl StateRootProvider for PreconfStateProvider {} +impl StateProofProvider for PreconfStateProvider {} diff --git a/crates/node/src/full/pool.rs b/crates/node/src/full/pool.rs new file mode 100644 index 000000000..aca0b1862 --- /dev/null +++ b/crates/node/src/full/pool.rs @@ -0,0 +1,58 @@ +use std::future::Future; + +use katana_pool::ordering::TipOrdering; +use katana_pool::pool::Pool; +use katana_pool_api::validation::{ + Error as ValidationError, ValidationOutcome, ValidationResult, Validator, +}; +use katana_rpc_types::{BroadcastedTx, BroadcastedTxWithChainId}; + +pub type FullNodePool = + Pool>; + +/// This is an implementation of the [`Validator`] trait that proxies incoming transactions to a +/// Starknet sequencer via the gateway endpoint. +/// +/// Any transaction validation is performed by the Starknet sequencer. +#[derive(Debug)] +pub struct GatewayProxyValidator { + gateway_client: katana_gateway_client::Client, +} + +impl GatewayProxyValidator { + pub fn new(gateway_client: katana_gateway_client::Client) -> Self { + Self { gateway_client } + } +} + +impl Validator for GatewayProxyValidator { + type Transaction = BroadcastedTxWithChainId; + + fn validate( + &self, + tx: Self::Transaction, + ) -> impl Future> + Send { + let gateway_client = self.gateway_client.clone(); + + async move { + let hash = tx.calculate_hash(); + + let result = match tx.tx.clone() { + BroadcastedTx::Invoke(inner_tx) => { + gateway_client.add_invoke_transaction(inner_tx.into()).await.map(|_| ()) + } + BroadcastedTx::Declare(inner_tx) => { + gateway_client.add_declare_transaction(inner_tx.into()).await.map(|_| ()) + } + BroadcastedTx::DeployAccount(inner_tx) => { + gateway_client.add_deploy_account_transaction(inner_tx.into()).await.map(|_| ()) + } + }; + + match result { + Ok(_) => ValidationResult::Ok(ValidationOutcome::Valid(tx)), + Err(e) => ValidationResult::Err(ValidationError::new(hash, Box::new(e))), + } + } + } +} diff --git a/crates/node/src/full/tip_watcher.rs b/crates/node/src/full/tip_watcher.rs index 345fe4b41..2b11a7eb2 100644 --- a/crates/node/src/full/tip_watcher.rs +++ b/crates/node/src/full/tip_watcher.rs @@ -42,7 +42,7 @@ impl ChainTipWatcher

{ pub async fn run(&self) -> Result<()> { let interval_in_secs = self.watch_interval.as_secs(); - info!(target: "node", interval = %interval_in_secs, "Chain tip watcher started."); + info!(interval = %interval_in_secs, "Chain tip watcher started."); let mut prev_tip: BlockNumber = 0; diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 75869b727..f7122c2ca 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -67,7 +67,7 @@ pub struct Node { task_manager: TaskManager, backend: Arc>, block_producer: BlockProducer, - gateway_server: Option, + gateway_server: Option>, } impl Node { diff --git a/crates/pool/pool/Cargo.toml b/crates/pool/pool/Cargo.toml index 938147c11..b705bc2ae 100644 --- a/crates/pool/pool/Cargo.toml +++ b/crates/pool/pool/Cargo.toml @@ -8,10 +8,10 @@ version.workspace = true [dependencies] katana-executor.workspace = true +katana-chain-spec.workspace = true katana-pool-api.workspace = true katana-primitives.workspace = true katana-provider.workspace = true -katana-chain-spec.workspace = true futures.workspace = true parking_lot.workspace = true diff --git a/crates/pool/pool/src/pool.rs b/crates/pool/pool/src/pool.rs index 3f60f2c53..682856bf4 100644 --- a/crates/pool/pool/src/pool.rs +++ b/crates/pool/pool/src/pool.rs @@ -141,47 +141,47 @@ where async move { match pool.inner.validator.validate(tx).await { - Ok(outcome) => { - match outcome { - ValidationOutcome::Valid(tx) => { - // get the priority of the validated tx - let priority = pool.inner.ordering.priority(&tx); - let tx = PendingTx::new(id, tx, priority); - - // insert the tx in the pool - pool.inner.transactions.write().insert(tx.clone()); - trace!(target: "pool", "Transaction added to the pool"); - - pool.notify(tx); - - Ok(hash) - } - - // TODO: create a small cache for rejected transactions to respect the rpc spec - // `getTransactionStatus` - ValidationOutcome::Invalid { error, .. } => { - warn!(target: "pool", %error, "Invalid transaction."); - Err(PoolError::InvalidTransaction(Box::new(error))) - } - - // return as error for now but ideally we should kept the tx in a separate - // queue and revalidate it when the parent tx is added to the pool - ValidationOutcome::Dependent { tx, tx_nonce, current_nonce } => { - trace!(target: "pool", %tx_nonce, %current_nonce, "Dependent transaction."); - let err = InvalidTransactionError::InvalidNonce { - address: tx.sender(), - current_nonce, - tx_nonce, - }; - Err(PoolError::InvalidTransaction(Box::new(err))) - } - } - } - - Err(error) => { - error!(target: "pool", %error, "Failed to validate transaction."); - Err(PoolError::Internal(error.error)) - } + Ok(outcome) => { + match outcome { + ValidationOutcome::Valid(tx) => { + // get the priority of the validated tx + let priority = pool.inner.ordering.priority(&tx); + let tx = PendingTx::new(id, tx, priority); + + // insert the tx in the pool + pool.inner.transactions.write().insert(tx.clone()); + trace!(target: "pool", "Transaction added to the pool"); + + pool.notify(tx); + + Ok(hash) + } + + // TODO: create a small cache for rejected transactions to respect the rpc spec + // `getTransactionStatus` + ValidationOutcome::Invalid { error, .. } => { + warn!(target: "pool", %error, "Invalid transaction."); + Err(PoolError::InvalidTransaction(Box::new(error))) + } + + // return as error for now but ideally we should kept the tx in a separate + // queue and revalidate it when the parent tx is added to the pool + ValidationOutcome::Dependent { tx, tx_nonce, current_nonce } => { + trace!(target: "pool", %tx_nonce, %current_nonce, "Dependent transaction."); + let err = InvalidTransactionError::InvalidNonce { + address: tx.sender(), + current_nonce, + tx_nonce, + }; + Err(PoolError::InvalidTransaction(Box::new(err))) + } + } + } + + Err(error) => { + error!(target: "pool", %error, "Failed to validate transaction."); + Err(PoolError::Internal(error.error)) + } } } .instrument(tracing::trace_span!(target: "pool", "pool_add", tx_hash = format!("{hash:#x}"))) diff --git a/crates/rpc/rpc-server/Cargo.toml b/crates/rpc/rpc-server/Cargo.toml index 10f5f6a27..a638db1ce 100644 --- a/crates/rpc/rpc-server/Cargo.toml +++ b/crates/rpc/rpc-server/Cargo.toml @@ -8,6 +8,7 @@ version.workspace = true [dependencies] katana-core.workspace = true +katana-chain-spec.workspace = true katana-executor.workspace = true katana-explorer = { workspace = true, features = [ "jsonrpsee" ], optional = true } katana-metrics.workspace = true @@ -21,7 +22,6 @@ katana-rpc-types.workspace = true katana-rpc-types-builder.workspace = true katana-tasks.workspace = true katana-gas-price-oracle.workspace = true -katana-chain-spec.workspace = true katana-tracing.workspace = true anyhow.workspace = true diff --git a/crates/rpc/rpc-server/src/utils/events.rs b/crates/rpc/rpc-server/src/utils/events.rs index a36e70e44..9c753aee3 100644 --- a/crates/rpc/rpc-server/src/utils/events.rs +++ b/crates/rpc/rpc-server/src/utils/events.rs @@ -85,6 +85,7 @@ impl PartialCursor { } } +#[allow(unused)] pub fn fetch_pending_events( pending_block: &PreConfirmedBlockWithReceipts, filter: &Filter, diff --git a/crates/rpc/rpc-types/Cargo.toml b/crates/rpc/rpc-types/Cargo.toml index f570a53aa..e454c69eb 100644 --- a/crates/rpc/rpc-types/Cargo.toml +++ b/crates/rpc/rpc-types/Cargo.toml @@ -9,6 +9,7 @@ version.workspace = true [dependencies] katana-primitives = { workspace = true, features = [ "serde" ] } katana-genesis.workspace = true +katana-pool-api.workspace = true katana-trie.workspace = true serde-utils.workspace = true diff --git a/crates/rpc/rpc-types/src/broadcasted.rs b/crates/rpc/rpc-types/src/broadcasted.rs index 7d416aae6..03bfe7910 100644 --- a/crates/rpc/rpc-types/src/broadcasted.rs +++ b/crates/rpc/rpc-types/src/broadcasted.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use katana_pool_api::PoolTransaction; use katana_primitives::chain::ChainId; use katana_primitives::class::{ ClassHash, CompiledClassHash, ComputeClassHashError, ContractClass, SierraContractClass, @@ -391,6 +392,41 @@ impl BroadcastedTxWithChainId { } } +impl PoolTransaction for BroadcastedTxWithChainId { + fn hash(&self) -> TxHash { + self.calculate_hash() + } + + fn nonce(&self) -> Nonce { + match &self.tx { + BroadcastedTx::Invoke(tx) => tx.nonce, + BroadcastedTx::Declare(tx) => tx.nonce, + BroadcastedTx::DeployAccount(tx) => tx.nonce, + } + } + + fn sender(&self) -> ContractAddress { + match &self.tx { + BroadcastedTx::Invoke(tx) => tx.sender_address, + BroadcastedTx::Declare(tx) => tx.sender_address, + BroadcastedTx::DeployAccount(tx) => tx.contract_address(), + } + } + + fn max_fee(&self) -> u128 { + // V3 transactions don't have max_fee, they use resource bounds instead + 0 + } + + fn tip(&self) -> u64 { + match &self.tx { + BroadcastedTx::Invoke(tx) => tx.tip.into(), + BroadcastedTx::Declare(tx) => tx.tip.into(), + BroadcastedTx::DeployAccount(tx) => tx.tip.into(), + } + } +} + /// A broadcasted transaction. #[derive(Debug, Clone, Serialize)] #[serde(untagged)] diff --git a/crates/starknet/src/lib.rs b/crates/starknet/src/lib.rs index 40bd3165e..7242481b5 100644 --- a/crates/starknet/src/lib.rs +++ b/crates/starknet/src/lib.rs @@ -14,7 +14,8 @@ use alloy_network::Ethereum; use alloy_primitives::Address; -use alloy_provider::{Provider, RootProvider}; +use alloy_provider::Provider; +pub use alloy_provider::RootProvider; use alloy_rpc_types_eth::{Filter, FilterBlockOption, FilterSet, Log, Topic}; use alloy_sol_types::{sol, SolEvent}; use anyhow::Result; @@ -225,7 +226,7 @@ impl StarknetCore> { /// /// * `rpc_url` - The HTTP URL of the Ethereum RPC endpoint /// * `contract_address` - The address of the Starknet Core Contract - pub async fn new_http(rpc_url: impl AsRef, contract_address: Address) -> Result { + pub fn new_http(rpc_url: impl AsRef, contract_address: Address) -> Result { let provider = RootProvider::::new_http(reqwest::Url::parse(rpc_url.as_ref())?); Ok(Self::new(provider, contract_address)) } @@ -236,7 +237,7 @@ impl StarknetCore> { /// # Arguments /// /// * `rpc_url` - The HTTP URL of the Ethereum RPC endpoint - pub async fn new_http_mainnet(rpc_url: impl AsRef) -> Result { + pub fn new_http_mainnet(rpc_url: impl AsRef) -> Result { let provider = RootProvider::::new_http(reqwest::Url::parse(rpc_url.as_ref())?); Ok(Self::new_mainnet(provider)) } @@ -247,7 +248,7 @@ impl StarknetCore> { /// # Arguments /// /// * `rpc_url` - The HTTP URL of the Ethereum RPC endpoint - pub async fn new_http_sepolia(rpc_url: impl AsRef) -> Result { + pub fn new_http_sepolia(rpc_url: impl AsRef) -> Result { let provider = RootProvider::::new_http(reqwest::Url::parse(rpc_url.as_ref())?); Ok(Self::new_sepolia(provider)) } diff --git a/crates/storage/provider/provider/Cargo.toml b/crates/storage/provider/provider/Cargo.toml index 55a064eb4..f148826bc 100644 --- a/crates/storage/provider/provider/Cargo.toml +++ b/crates/storage/provider/provider/Cargo.toml @@ -19,6 +19,7 @@ katana-trie.workspace = true anyhow.workspace = true auto_impl.workspace = true bitvec.workspace = true +lazy_static.workspace = true parking_lot.workspace = true starknet.workspace = true starknet-types-core.workspace = true diff --git a/crates/sync/pipeline/src/lib.rs b/crates/sync/pipeline/src/lib.rs index ea184b82a..c8f1741ce 100644 --- a/crates/sync/pipeline/src/lib.rs +++ b/crates/sync/pipeline/src/lib.rs @@ -117,6 +117,7 @@ enum PipelineCommand { /// This subscription receives notifications whenever the pipeline completes processing /// a block through all stages. The block number represents the highest block that has /// been successfully processed by all pipeline stages for a given batch. +#[derive(Clone)] pub struct PipelineBlockSubscription { rx: watch::Receiver>, } diff --git a/crates/sync/stage/src/classes.rs b/crates/sync/stage/src/classes.rs index 6f9e27afa..c4e7d5213 100644 --- a/crates/sync/stage/src/classes.rs +++ b/crates/sync/stage/src/classes.rs @@ -12,7 +12,7 @@ use katana_provider::api::state_update::StateUpdateProvider; use katana_provider::api::ProviderError; use katana_rpc_types::class::ConversionError; use rayon::prelude::*; -use tracing::{debug, error}; +use tracing::{debug, error, info_span, Instrument}; use super::{Stage, StageExecutionInput, StageExecutionOutput, StageResult}; use crate::downloader::{BatchDownloader, Downloader, DownloaderResult}; @@ -130,11 +130,16 @@ where Box::pin(async move { let declared_class_hashes = self.get_declared_classes(input.from(), input.to())?; - if !declared_class_hashes.is_empty() { + if declared_class_hashes.is_empty() { + debug!(from = %input.from(), to = %input.to(), "No classes declared within the block range"); + } else { + let total_classes = declared_class_hashes.len(); + // fetch the classes artifacts let class_artifacts = self .downloader .download(declared_class_hashes.clone()) + .instrument(info_span!(target: "stage", "classes.download", %total_classes)) .await .map_err(Error::Gateway)?; diff --git a/crates/sync/stage/src/trie.rs b/crates/sync/stage/src/trie.rs index 95a24cd36..19e1fe7ec 100644 --- a/crates/sync/stage/src/trie.rs +++ b/crates/sync/stage/src/trie.rs @@ -44,12 +44,13 @@ where let span = debug_span!("state_trie.compute_state_root", %block_number); let _enter = span.enter(); - let expected_state_root = self + let header = self .provider .header(block_number.into())? - .map(|header| header.state_root) .ok_or(Error::MissingBlockHeader(block_number))?; + let expected_state_root = header.state_root; + let state_update = self .provider .state_update(block_number.into())? diff --git a/crates/sync/stage/tests/trie.rs b/crates/sync/stage/tests/trie.rs index f2a238247..d2352162e 100644 --- a/crates/sync/stage/tests/trie.rs +++ b/crates/sync/stage/tests/trie.rs @@ -14,7 +14,6 @@ use katana_stage::trie::StateTrie; use katana_stage::{Stage, StageExecutionInput}; use rstest::rstest; use starknet::macros::short_string; -use starknet::providers::sequencer::models::state_update; use starknet_types_core::hash::{Poseidon, StarkHash}; /// Mock provider implementation for testing StateTrie stage. diff --git a/crates/tracing/src/lib.rs b/crates/tracing/src/lib.rs index 3ce83f193..a8505a880 100644 --- a/crates/tracing/src/lib.rs +++ b/crates/tracing/src/lib.rs @@ -44,10 +44,11 @@ pub enum Error { } pub async fn init(format: LogFormat, telemetry_config: Option) -> Result<(), Error> { - const DEFAULT_LOG_FILTER: &str = - "katana_db::mdbx=trace,cairo_native::compiler=off,pipeline=debug,stage=debug,tasks=debug,\ - executor=trace,forking::backend=trace,blockifier=off,jsonrpsee_server=off,hyper=off,\ - messaging=debug,node=error,explorer=info,rpc=trace,pool=trace,info"; + const DEFAULT_LOG_FILTER: &str = "katana_db::mdbx=trace,cairo_native::compiler=off,\ + pipeline=debug,stage=debug,tasks=debug,executor=trace,\ + forking::backend=trace,blockifier=off,jsonrpsee_server=off,\ + hyper=off,messaging=debug,node=error,explorer=info,\ + rpc=trace,pool=trace,katana_stage::downloader=trace,info"; let default_filter = EnvFilter::try_new(DEFAULT_LOG_FILTER); let filter = EnvFilter::try_from_default_env().or(default_filter)?; diff --git a/crates/trie/src/lib.rs b/crates/trie/src/lib.rs index bd2a183ba..1e49be803 100644 --- a/crates/trie/src/lib.rs +++ b/crates/trie/src/lib.rs @@ -1,5 +1,6 @@ use bitvec::view::AsBits; pub use bonsai::{BitVec, MultiProof, Path, ProofNode}; +pub use bonsai_trie::databases::HashMapDb; use bonsai_trie::BonsaiStorage; pub use bonsai_trie::{BonsaiDatabase, BonsaiPersistentDatabase, BonsaiStorageConfig}; use katana_primitives::class::ClassHash;