Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion monad-ethcall/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,30 @@ pub mod bindings {
include!(concat!(env!("OUT_DIR"), "/ethcall.rs"));
}

pub use bindings::monad_eth_call_pool_config as PoolConfig;
pub use bindings::{
monad_eth_call_executor_state as ExecutorState, monad_eth_call_pool_config as PoolConfig,
};

impl Default for bindings::monad_eth_call_pool_state {
fn default() -> Self {
Self {
queued_count: 0,
num_fibers: 0,
queue_limit: 0,
executing_count: 0,
queue_full_count: 0,
}
}
}

impl Default for ExecutorState {
fn default() -> Self {
Self {
low_gas_pool_state: Default::default(),
high_gas_pool_state: Default::default(),
}
}
}

#[derive(Debug)]
pub struct EthCallExecutor {
Expand Down Expand Up @@ -72,6 +95,10 @@ impl EthCallExecutor {

Self { eth_call_executor }
}

pub fn get_state(&self) -> ExecutorState {
unsafe { bindings::monad_eth_call_executor_get_state(self.eth_call_executor) }
}
}

impl Drop for EthCallExecutor {
Expand Down
40 changes: 22 additions & 18 deletions monad-rpc/src/handlers/eth/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use alloy_eips::eip7702::SignedAuthorization;
use alloy_primitives::{Address, PrimitiveSignature, TxKind, Uint, U256, U64, U8};
use alloy_rpc_types::AccessList;
use monad_chain_config::execution_revision::MonadExecutionRevision;
use monad_ethcall::{eth_call, CallResult, EthCallExecutor, MonadTracer, StateOverrideSet};
use monad_ethcall::{
eth_call, CallResult, EthCallExecutor, ExecutorState, MonadTracer, StateOverrideSet,
};
use monad_rpc_docs::rpc;
use monad_triedb_utils::triedb_env::{
BlockKey, FinalizedBlockKey, ProposedBlockKey, Triedb, TriedbPath,
Expand Down Expand Up @@ -109,12 +111,6 @@ impl EthCallStatsTracker {
}
}

pub async fn record_queue_rejection(&self) {
self.stats.queue_rejections.fetch_add(1, Ordering::Relaxed);
self.stats.total_requests.fetch_add(1, Ordering::Relaxed);
self.stats.total_errors.fetch_add(1, Ordering::Relaxed);
}

async fn get_stats(&self) -> (Option<Duration>, Option<Duration>, CumulativeStats) {
let requests = self.active_requests.lock().await;

Expand Down Expand Up @@ -828,40 +824,48 @@ pub async fn monad_debug_traceCall<T: Triedb + TriedbPath>(
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct EthCallCapacityStats {
pub inactive_executors: usize,
pub queued_requests: usize,
pub inactive_executors: u32,
pub queued_requests: u32,
pub inactive_executors_high: u32,
pub queued_requests_high: u32,
pub oldest_request_age_ms: u64,
pub average_request_age_ms: u64,
pub total_requests: u64,
pub total_errors: u64,
pub queue_rejections: u64,
pub queue_rejections_high: u64,
}

/// Returns statistics about eth_call capacity including inactive executors and queued requests
#[allow(non_snake_case)]
#[tracing::instrument(level = "debug")]
#[monad_rpc_docs::rpc(method = "admin_ethCallStatistics")]
pub async fn monad_admin_ethCallStatistics(
eth_call_executor_fibers: usize,
total_permits: usize,
available_permits: usize,
executor_state: &ExecutorState,
stats_tracker: &EthCallStatsTracker,
) -> JsonRpcResult<EthCallCapacityStats> {
let active_requests = total_permits - available_permits;

let inactive_executors = eth_call_executor_fibers.saturating_sub(active_requests);

let queued_requests = active_requests.saturating_sub(eth_call_executor_fibers);
let inactive_executors = executor_state.low_gas_pool_state.num_fibers
- executor_state.low_gas_pool_state.executing_count;
let inactive_executors_high = executor_state.high_gas_pool_state.num_fibers
- executor_state.high_gas_pool_state.executing_count;

let queued_requests = executor_state.low_gas_pool_state.queued_count;
let queued_requests_high = executor_state.high_gas_pool_state.queued_count;
let queue_rejections = executor_state.low_gas_pool_state.queue_full_count;
let queue_rejections_high = executor_state.high_gas_pool_state.queue_full_count;
let (max_age, avg_age, cumulative_stats) = stats_tracker.get_stats().await;

Ok(EthCallCapacityStats {
inactive_executors,
inactive_executors_high,
queued_requests,
queued_requests_high,
oldest_request_age_ms: max_age.map(|d| d.as_millis() as u64).unwrap_or(0),
average_request_age_ms: avg_age.map(|d| d.as_millis() as u64).unwrap_or(0),
total_requests: cumulative_stats.total_requests.load(Ordering::Relaxed),
total_errors: cumulative_stats.total_errors.load(Ordering::Relaxed),
queue_rejections: cumulative_stats.queue_rejections.load(Ordering::Relaxed),
queue_rejections,
queue_rejections_high,
})
}

Expand Down
58 changes: 14 additions & 44 deletions monad-rpc/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,15 @@ async fn admin_ethCallStatistics(
_params: Value,
) -> Result<Box<RawValue>, JsonRpcError> {
if app_state.enable_eth_call_statistics {
let available_permits = app_state.rate_limiter.available_permits();
if let Some(tracker) = &app_state.eth_call_stats_tracker {
monad_admin_ethCallStatistics(
app_state.eth_call_executor_fibers,
app_state.total_permits,
available_permits,
tracker,
)
.await
.map(serialize_result)?
let executor_state = app_state
.eth_call_executor
.as_ref()
.map(|e| e.get_state())
.unwrap_or_default();
monad_admin_ethCallStatistics(&executor_state, tracker)
.await
.map(serialize_result)?
} else {
Err(JsonRpcError::internal_error(
"stats tracking not initialized".into(),
Expand Down Expand Up @@ -300,11 +299,6 @@ async fn debug_traceCall(
let Some(ref eth_call_executor) = app_state.eth_call_executor else {
return Err(JsonRpcError::method_not_supported());
};
// acquire the concurrent requests permit
let _permit = &app_state
.rate_limiter
.try_acquire()
.map_err(|_| JsonRpcError::internal_error("eth_call concurrent requests limit".into()))?;

let params = serde_json::from_value(params).invalid_params()?;
monad_debug_traceCall(
Expand Down Expand Up @@ -342,19 +336,6 @@ async fn eth_call(
return Err(JsonRpcError::method_not_supported());
};

// acquire the concurrent requests permit
let _permit = match app_state.rate_limiter.try_acquire() {
Ok(permit) => permit,
Err(_) => {
if let Some(tracker) = &app_state.eth_call_stats_tracker {
tracker.record_queue_rejection().await;
}
return Err(JsonRpcError::internal_error(
"eth_call concurrent requests limit".into(),
));
}
};

let params = serde_json::from_value(params).invalid_params()?;

if let Some(tracker) = &app_state.eth_call_stats_tracker {
Expand All @@ -371,8 +352,9 @@ async fn eth_call(
.await;

if let Some(tracker) = &app_state.eth_call_stats_tracker {
let is_error = result.is_err();
tracker.record_request_complete(&request_id, is_error).await;
tracker
.record_request_complete(&request_id, result.is_err())
.await;
}

result.map(serialize_result)?
Expand Down Expand Up @@ -642,19 +624,6 @@ async fn eth_estimateGas(
return Err(JsonRpcError::method_not_supported());
};

// acquire the concurrent requests permit
let _permit = match app_state.rate_limiter.try_acquire() {
Ok(permit) => permit,
Err(_) => {
if let Some(tracker) = &app_state.eth_call_stats_tracker {
tracker.record_queue_rejection().await;
}
return Err(JsonRpcError::internal_error(
"eth_estimateGas concurrent requests limit".into(),
));
}
};

if let Some(tracker) = &app_state.eth_call_stats_tracker {
tracker.record_request_start(&request_id).await;
}
Expand All @@ -670,8 +639,9 @@ async fn eth_estimateGas(
.await;

if let Some(tracker) = &app_state.eth_call_stats_tracker {
let is_error = result.is_err();
tracker.record_request_complete(&request_id, is_error).await;
tracker
.record_request_complete(&request_id, result.is_err())
.await;
}

result.map(serialize_result)?
Expand Down
10 changes: 0 additions & 10 deletions monad-rpc/src/handlers/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use actix_web::{
use monad_archive::prelude::ArchiveReader;
use monad_ethcall::EthCallExecutor;
use monad_triedb_utils::triedb_env::TriedbEnv;
use tokio::sync::Semaphore;
use tracing_actix_web::RootSpanBuilder;

use super::eth::call::EthCallStatsTracker;
Expand All @@ -37,16 +36,13 @@ pub struct MonadRpcResources {
pub txpool_bridge_client: EthTxPoolBridgeClient,
pub triedb_reader: Option<TriedbEnv>,
pub eth_call_executor: Option<Arc<EthCallExecutor>>,
pub eth_call_executor_fibers: usize,
pub eth_call_stats_tracker: Option<Arc<EthCallStatsTracker>>,
pub archive_reader: Option<ArchiveReader>,
pub chain_id: u64,
pub chain_state: Option<ChainState<TriedbEnv>>,
pub batch_request_limit: u16,
pub max_response_size: u32,
pub allow_unprotected_txs: bool,
pub rate_limiter: Arc<Semaphore>,
pub total_permits: usize,
pub logs_max_block_range: u64,
pub eth_call_provider_gas_limit: u64,
pub eth_estimate_gas_provider_gas_limit: u64,
Expand All @@ -63,15 +59,12 @@ impl MonadRpcResources {
txpool_bridge_client: EthTxPoolBridgeClient,
triedb_reader: Option<TriedbEnv>,
eth_call_executor: Option<Arc<EthCallExecutor>>,
eth_call_executor_fibers: usize,
archive_reader: Option<ArchiveReader>,
chain_id: u64,
chain_state: Option<ChainState<TriedbEnv>>,
batch_request_limit: u16,
max_response_size: u32,
allow_unprotected_txs: bool,
rate_limiter: Arc<Semaphore>,
total_permits: usize,
logs_max_block_range: u64,
eth_call_provider_gas_limit: u64,
eth_estimate_gas_provider_gas_limit: u64,
Expand All @@ -86,7 +79,6 @@ impl MonadRpcResources {
txpool_bridge_client,
triedb_reader,
eth_call_executor,
eth_call_executor_fibers,
eth_call_stats_tracker: if enable_eth_call_statistics {
Some(Arc::new(EthCallStatsTracker::default()))
} else {
Expand All @@ -98,8 +90,6 @@ impl MonadRpcResources {
batch_request_limit,
max_response_size,
allow_unprotected_txs,
rate_limiter,
total_permits,
logs_max_block_range,
eth_call_provider_gas_limit,
eth_estimate_gas_provider_gas_limit,
Expand Down
12 changes: 0 additions & 12 deletions monad-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use monad_rpc::{
use monad_tracing_timing::TimingsLayer;
use monad_triedb_utils::triedb_env::TriedbEnv;
use opentelemetry::metrics::MeterProvider;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, warn};
use tracing_actix_web::TracingLogger;
use tracing_manytrace::{ManytraceLayer, TracingExtension};
Expand Down Expand Up @@ -123,11 +122,6 @@ async fn main() -> std::io::Result<()> {
});
}

// initialize concurrent requests limiter
let concurrent_requests_limiter = Arc::new(Semaphore::new(
args.eth_call_max_concurrent_requests as usize,
));

MONAD_RPC_VERSION.map(|v| info!("starting monad-rpc with version {}", v));

// Wait for bft to be in a ready state before starting the RPC server.
Expand Down Expand Up @@ -343,15 +337,12 @@ async fn main() -> std::io::Result<()> {
txpool_bridge_client,
triedb_env,
eth_call_executor,
args.eth_call_executor_fibers as usize,
archive_reader,
node_config.chain_id,
chain_state,
args.batch_request_limit,
args.max_response_size,
args.allow_unprotected_txs,
concurrent_requests_limiter,
args.eth_call_max_concurrent_requests as usize,
args.eth_get_logs_max_block_range,
args.eth_call_provider_gas_limit,
args.eth_estimate_gas_provider_gas_limit,
Expand Down Expand Up @@ -464,16 +455,13 @@ mod tests {
txpool_bridge_client: EthTxPoolBridgeClient::for_testing(),
triedb_reader: None,
eth_call_executor: None,
eth_call_executor_fibers: 64,
eth_call_stats_tracker: Some(Arc::new(EthCallStatsTracker::default())),
archive_reader: None,
chain_id: 1337,
chain_state: None,
batch_request_limit: 5,
max_response_size: 25_000_000,
allow_unprotected_txs: false,
rate_limiter: Arc::new(Semaphore::new(1000)),
total_permits: 1000,
logs_max_block_range: 1000,
eth_call_provider_gas_limit: u64::MAX,
eth_estimate_gas_provider_gas_limit: u64::MAX,
Expand Down
3 changes: 0 additions & 3 deletions monad-rpc/src/websocket/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,16 +676,13 @@ mod tests {
txpool_bridge_client: EthTxPoolBridgeClient::for_testing(),
triedb_reader: None,
eth_call_executor: None,
eth_call_executor_fibers: 64,
eth_call_stats_tracker: Some(Arc::new(EthCallStatsTracker::default())),
archive_reader: None,
chain_id: 1337,
chain_state: None,
batch_request_limit: 5,
max_response_size: 25_000_000,
allow_unprotected_txs: false,
rate_limiter: Arc::new(Semaphore::new(1000)),
total_permits: 1000,
logs_max_block_range: 1000,
eth_call_provider_gas_limit: u64::MAX,
eth_estimate_gas_provider_gas_limit: u64::MAX,
Expand Down
Loading