diff --git a/monad-ethcall/src/lib.rs b/monad-ethcall/src/lib.rs index b396786b31..c4303dc4c4 100644 --- a/monad-ethcall/src/lib.rs +++ b/monad-ethcall/src/lib.rs @@ -39,7 +39,30 @@ pub mod bindings { include!(concat!(env!("OUT_DIR"), "/ethcall.rs")); } -pub use bindings::monad_eth_call_pool_config as PoolConfig; +pub use bindings::{ + monad_eth_call_executor_state as ExecutorState, monad_eth_call_pool_config as PoolConfig, +}; + +impl Default for bindings::monad_eth_call_pool_state { + fn default() -> Self { + Self { + queued_count: 0, + num_fibers: 0, + queue_limit: 0, + executing_count: 0, + queue_full_count: 0, + } + } +} + +impl Default for ExecutorState { + fn default() -> Self { + Self { + low_gas_pool_state: Default::default(), + high_gas_pool_state: Default::default(), + } + } +} #[derive(Debug)] pub struct EthCallExecutor { @@ -72,6 +95,10 @@ impl EthCallExecutor { Self { eth_call_executor } } + + pub fn get_state(&self) -> ExecutorState { + unsafe { bindings::monad_eth_call_executor_get_state(self.eth_call_executor) } + } } impl Drop for EthCallExecutor { diff --git a/monad-rpc/src/handlers/eth/call.rs b/monad-rpc/src/handlers/eth/call.rs index a209ad2d7e..dbca138506 100644 --- a/monad-rpc/src/handlers/eth/call.rs +++ b/monad-rpc/src/handlers/eth/call.rs @@ -28,7 +28,9 @@ use alloy_eips::eip7702::SignedAuthorization; use alloy_primitives::{Address, PrimitiveSignature, TxKind, Uint, U256, U64, U8}; use alloy_rpc_types::AccessList; use monad_chain_config::execution_revision::MonadExecutionRevision; -use monad_ethcall::{eth_call, CallResult, EthCallExecutor, MonadTracer, StateOverrideSet}; +use monad_ethcall::{ + eth_call, CallResult, EthCallExecutor, ExecutorState, MonadTracer, StateOverrideSet, +}; use monad_rpc_docs::rpc; use monad_triedb_utils::triedb_env::{ BlockKey, FinalizedBlockKey, ProposedBlockKey, Triedb, TriedbPath, @@ -109,12 +111,6 @@ impl EthCallStatsTracker { } } - pub async fn record_queue_rejection(&self) { - self.stats.queue_rejections.fetch_add(1, Ordering::Relaxed); - self.stats.total_requests.fetch_add(1, Ordering::Relaxed); - self.stats.total_errors.fetch_add(1, Ordering::Relaxed); - } - async fn get_stats(&self) -> (Option, Option, CumulativeStats) { let requests = self.active_requests.lock().await; @@ -828,13 +824,16 @@ pub async fn monad_debug_traceCall( #[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)] #[serde(rename_all = "camelCase")] pub struct EthCallCapacityStats { - pub inactive_executors: usize, - pub queued_requests: usize, + pub inactive_executors: u32, + pub queued_requests: u32, + pub inactive_executors_high: u32, + pub queued_requests_high: u32, pub oldest_request_age_ms: u64, pub average_request_age_ms: u64, pub total_requests: u64, pub total_errors: u64, pub queue_rejections: u64, + pub queue_rejections_high: u64, } /// Returns statistics about eth_call capacity including inactive executors and queued requests @@ -842,26 +841,31 @@ pub struct EthCallCapacityStats { #[tracing::instrument(level = "debug")] #[monad_rpc_docs::rpc(method = "admin_ethCallStatistics")] pub async fn monad_admin_ethCallStatistics( - eth_call_executor_fibers: usize, - total_permits: usize, - available_permits: usize, + executor_state: &ExecutorState, stats_tracker: &EthCallStatsTracker, ) -> JsonRpcResult { - let active_requests = total_permits - available_permits; - - let inactive_executors = eth_call_executor_fibers.saturating_sub(active_requests); - - let queued_requests = active_requests.saturating_sub(eth_call_executor_fibers); + let inactive_executors = executor_state.low_gas_pool_state.num_fibers + - executor_state.low_gas_pool_state.executing_count; + let inactive_executors_high = executor_state.high_gas_pool_state.num_fibers + - executor_state.high_gas_pool_state.executing_count; + + let queued_requests = executor_state.low_gas_pool_state.queued_count; + let queued_requests_high = executor_state.high_gas_pool_state.queued_count; + let queue_rejections = executor_state.low_gas_pool_state.queue_full_count; + let queue_rejections_high = executor_state.high_gas_pool_state.queue_full_count; let (max_age, avg_age, cumulative_stats) = stats_tracker.get_stats().await; Ok(EthCallCapacityStats { inactive_executors, + inactive_executors_high, queued_requests, + queued_requests_high, oldest_request_age_ms: max_age.map(|d| d.as_millis() as u64).unwrap_or(0), average_request_age_ms: avg_age.map(|d| d.as_millis() as u64).unwrap_or(0), total_requests: cumulative_stats.total_requests.load(Ordering::Relaxed), total_errors: cumulative_stats.total_errors.load(Ordering::Relaxed), - queue_rejections: cumulative_stats.queue_rejections.load(Ordering::Relaxed), + queue_rejections, + queue_rejections_high, }) } diff --git a/monad-rpc/src/handlers/mod.rs b/monad-rpc/src/handlers/mod.rs index 1aee0e3d0e..fc6c37fc19 100644 --- a/monad-rpc/src/handlers/mod.rs +++ b/monad-rpc/src/handlers/mod.rs @@ -192,16 +192,15 @@ async fn admin_ethCallStatistics( _params: Value, ) -> Result, JsonRpcError> { if app_state.enable_eth_call_statistics { - let available_permits = app_state.rate_limiter.available_permits(); if let Some(tracker) = &app_state.eth_call_stats_tracker { - monad_admin_ethCallStatistics( - app_state.eth_call_executor_fibers, - app_state.total_permits, - available_permits, - tracker, - ) - .await - .map(serialize_result)? + let executor_state = app_state + .eth_call_executor + .as_ref() + .map(|e| e.get_state()) + .unwrap_or_default(); + monad_admin_ethCallStatistics(&executor_state, tracker) + .await + .map(serialize_result)? } else { Err(JsonRpcError::internal_error( "stats tracking not initialized".into(), @@ -300,11 +299,6 @@ async fn debug_traceCall( let Some(ref eth_call_executor) = app_state.eth_call_executor else { return Err(JsonRpcError::method_not_supported()); }; - // acquire the concurrent requests permit - let _permit = &app_state - .rate_limiter - .try_acquire() - .map_err(|_| JsonRpcError::internal_error("eth_call concurrent requests limit".into()))?; let params = serde_json::from_value(params).invalid_params()?; monad_debug_traceCall( @@ -342,19 +336,6 @@ async fn eth_call( return Err(JsonRpcError::method_not_supported()); }; - // acquire the concurrent requests permit - let _permit = match app_state.rate_limiter.try_acquire() { - Ok(permit) => permit, - Err(_) => { - if let Some(tracker) = &app_state.eth_call_stats_tracker { - tracker.record_queue_rejection().await; - } - return Err(JsonRpcError::internal_error( - "eth_call concurrent requests limit".into(), - )); - } - }; - let params = serde_json::from_value(params).invalid_params()?; if let Some(tracker) = &app_state.eth_call_stats_tracker { @@ -371,8 +352,9 @@ async fn eth_call( .await; if let Some(tracker) = &app_state.eth_call_stats_tracker { - let is_error = result.is_err(); - tracker.record_request_complete(&request_id, is_error).await; + tracker + .record_request_complete(&request_id, result.is_err()) + .await; } result.map(serialize_result)? @@ -642,19 +624,6 @@ async fn eth_estimateGas( return Err(JsonRpcError::method_not_supported()); }; - // acquire the concurrent requests permit - let _permit = match app_state.rate_limiter.try_acquire() { - Ok(permit) => permit, - Err(_) => { - if let Some(tracker) = &app_state.eth_call_stats_tracker { - tracker.record_queue_rejection().await; - } - return Err(JsonRpcError::internal_error( - "eth_estimateGas concurrent requests limit".into(), - )); - } - }; - if let Some(tracker) = &app_state.eth_call_stats_tracker { tracker.record_request_start(&request_id).await; } @@ -670,8 +639,9 @@ async fn eth_estimateGas( .await; if let Some(tracker) = &app_state.eth_call_stats_tracker { - let is_error = result.is_err(); - tracker.record_request_complete(&request_id, is_error).await; + tracker + .record_request_complete(&request_id, result.is_err()) + .await; } result.map(serialize_result)? diff --git a/monad-rpc/src/handlers/resources.rs b/monad-rpc/src/handlers/resources.rs index b57ed86dbb..28d921485b 100644 --- a/monad-rpc/src/handlers/resources.rs +++ b/monad-rpc/src/handlers/resources.rs @@ -23,7 +23,6 @@ use actix_web::{ use monad_archive::prelude::ArchiveReader; use monad_ethcall::EthCallExecutor; use monad_triedb_utils::triedb_env::TriedbEnv; -use tokio::sync::Semaphore; use tracing_actix_web::RootSpanBuilder; use super::eth::call::EthCallStatsTracker; @@ -37,7 +36,6 @@ pub struct MonadRpcResources { pub txpool_bridge_client: EthTxPoolBridgeClient, pub triedb_reader: Option, pub eth_call_executor: Option>, - pub eth_call_executor_fibers: usize, pub eth_call_stats_tracker: Option>, pub archive_reader: Option, pub chain_id: u64, @@ -45,8 +43,6 @@ pub struct MonadRpcResources { pub batch_request_limit: u16, pub max_response_size: u32, pub allow_unprotected_txs: bool, - pub rate_limiter: Arc, - pub total_permits: usize, pub logs_max_block_range: u64, pub eth_call_provider_gas_limit: u64, pub eth_estimate_gas_provider_gas_limit: u64, @@ -63,15 +59,12 @@ impl MonadRpcResources { txpool_bridge_client: EthTxPoolBridgeClient, triedb_reader: Option, eth_call_executor: Option>, - eth_call_executor_fibers: usize, archive_reader: Option, chain_id: u64, chain_state: Option>, batch_request_limit: u16, max_response_size: u32, allow_unprotected_txs: bool, - rate_limiter: Arc, - total_permits: usize, logs_max_block_range: u64, eth_call_provider_gas_limit: u64, eth_estimate_gas_provider_gas_limit: u64, @@ -86,7 +79,6 @@ impl MonadRpcResources { txpool_bridge_client, triedb_reader, eth_call_executor, - eth_call_executor_fibers, eth_call_stats_tracker: if enable_eth_call_statistics { Some(Arc::new(EthCallStatsTracker::default())) } else { @@ -98,8 +90,6 @@ impl MonadRpcResources { batch_request_limit, max_response_size, allow_unprotected_txs, - rate_limiter, - total_permits, logs_max_block_range, eth_call_provider_gas_limit, eth_estimate_gas_provider_gas_limit, diff --git a/monad-rpc/src/main.rs b/monad-rpc/src/main.rs index f3d5a71493..dad13db6ec 100644 --- a/monad-rpc/src/main.rs +++ b/monad-rpc/src/main.rs @@ -39,7 +39,6 @@ use monad_rpc::{ use monad_tracing_timing::TimingsLayer; use monad_triedb_utils::triedb_env::TriedbEnv; use opentelemetry::metrics::MeterProvider; -use tokio::sync::Semaphore; use tracing::{debug, error, info, warn}; use tracing_actix_web::TracingLogger; use tracing_manytrace::{ManytraceLayer, TracingExtension}; @@ -123,11 +122,6 @@ async fn main() -> std::io::Result<()> { }); } - // initialize concurrent requests limiter - let concurrent_requests_limiter = Arc::new(Semaphore::new( - args.eth_call_max_concurrent_requests as usize, - )); - MONAD_RPC_VERSION.map(|v| info!("starting monad-rpc with version {}", v)); // Wait for bft to be in a ready state before starting the RPC server. @@ -343,15 +337,12 @@ async fn main() -> std::io::Result<()> { txpool_bridge_client, triedb_env, eth_call_executor, - args.eth_call_executor_fibers as usize, archive_reader, node_config.chain_id, chain_state, args.batch_request_limit, args.max_response_size, args.allow_unprotected_txs, - concurrent_requests_limiter, - args.eth_call_max_concurrent_requests as usize, args.eth_get_logs_max_block_range, args.eth_call_provider_gas_limit, args.eth_estimate_gas_provider_gas_limit, @@ -464,7 +455,6 @@ mod tests { txpool_bridge_client: EthTxPoolBridgeClient::for_testing(), triedb_reader: None, eth_call_executor: None, - eth_call_executor_fibers: 64, eth_call_stats_tracker: Some(Arc::new(EthCallStatsTracker::default())), archive_reader: None, chain_id: 1337, @@ -472,8 +462,6 @@ mod tests { batch_request_limit: 5, max_response_size: 25_000_000, allow_unprotected_txs: false, - rate_limiter: Arc::new(Semaphore::new(1000)), - total_permits: 1000, logs_max_block_range: 1000, eth_call_provider_gas_limit: u64::MAX, eth_estimate_gas_provider_gas_limit: u64::MAX, diff --git a/monad-rpc/src/websocket/handler.rs b/monad-rpc/src/websocket/handler.rs index ee8e560684..a7edcfe42d 100644 --- a/monad-rpc/src/websocket/handler.rs +++ b/monad-rpc/src/websocket/handler.rs @@ -676,7 +676,6 @@ mod tests { txpool_bridge_client: EthTxPoolBridgeClient::for_testing(), triedb_reader: None, eth_call_executor: None, - eth_call_executor_fibers: 64, eth_call_stats_tracker: Some(Arc::new(EthCallStatsTracker::default())), archive_reader: None, chain_id: 1337, @@ -684,8 +683,6 @@ mod tests { batch_request_limit: 5, max_response_size: 25_000_000, allow_unprotected_txs: false, - rate_limiter: Arc::new(Semaphore::new(1000)), - total_permits: 1000, logs_max_block_range: 1000, eth_call_provider_gas_limit: u64::MAX, eth_estimate_gas_provider_gas_limit: u64::MAX,