diff --git a/examples/historical_scanning/main.rs b/examples/historical_scanning/main.rs index 9c1db196..db9870b4 100644 --- a/examples/historical_scanning/main.rs +++ b/examples/historical_scanning/main.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use alloy::{providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent}; use alloy_node_bindings::Anvil; use async_trait::async_trait; -use event_scanner::{CallbackConfig, EventCallback, EventFilter, ScannerBuilder}; +use event_scanner::{EventCallback, EventFilter, FixedRetryConfig, ScannerBuilder}; use tokio::time::sleep; use tracing::info; @@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> { let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url()) .add_event_filter(increase_filter) - .callback_config(CallbackConfig { max_attempts: 3, delay_ms: 200 }) + .callback_config(FixedRetryConfig { max_attempts: 3, delay_ms: 200 }) .start_block(0) .build() .await?; diff --git a/examples/simple_counter/main.rs b/examples/simple_counter/main.rs index 98c3b2cd..19534edc 100644 --- a/examples/simple_counter/main.rs +++ b/examples/simple_counter/main.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use alloy::{providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent}; use alloy_node_bindings::Anvil; use async_trait::async_trait; -use event_scanner::{CallbackConfig, EventCallback, EventFilter, ScannerBuilder}; +use event_scanner::{EventCallback, EventFilter, FixedRetryConfig, ScannerBuilder}; use tokio::time::sleep; use tracing::info; @@ -64,7 +64,7 @@ async fn main() -> anyhow::Result<()> { let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url()) .add_event_filter(increase_filter) - .callback_config(CallbackConfig { max_attempts: 3, delay_ms: 200 }) + .callback_config(FixedRetryConfig { max_attempts: 3, delay_ms: 200 }) .build() .await?; diff --git a/src/block_scanner.rs b/src/block_scanner.rs index 70af1914..b1c9953b 100644 --- a/src/block_scanner.rs +++ b/src/block_scanner.rs @@ -21,15 +21,10 @@ use alloy::{ const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000; const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12); const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0; -const BACK_OFF_MAX_RETRIES: u64 = 5; // TODO: determine check exact default value const DEFAULT_REORG_REWIND_DEPTH: u64 = 0; -// State sync aware retry settings -const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30); -const STATE_SYNC_MAX_RETRIES: u64 = 12; - #[derive(Debug)] pub enum BlockScannerError { ErrEOF, diff --git a/src/builder.rs b/src/builder.rs index 258013c4..0b9efefa 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,7 +1,4 @@ -use crate::{ - scanner::Scanner, - types::{CallbackConfig, EventFilter}, -}; +use crate::{FixedRetryConfig, scanner::Scanner, types::EventFilter}; pub struct ScannerBuilder { rpc_url: String, @@ -9,7 +6,7 @@ pub struct ScannerBuilder { end_block: Option, max_blocks_per_filter: u64, tracked_events: Vec, - callback_config: CallbackConfig, + callback_config: FixedRetryConfig, } impl ScannerBuilder { @@ -20,7 +17,7 @@ impl ScannerBuilder { end_block: None, max_blocks_per_filter: 1000, tracked_events: Vec::new(), - callback_config: CallbackConfig::default(), + callback_config: FixedRetryConfig::default(), } } @@ -55,7 +52,7 @@ impl ScannerBuilder { } #[must_use] - pub fn callback_config(mut self, cfg: CallbackConfig) -> Self { + pub fn callback_config(mut self, cfg: FixedRetryConfig) -> Self { self.callback_config = cfg; self } @@ -81,7 +78,9 @@ impl ScannerBuilder { #[cfg(test)] mod tests { use super::*; - use crate::callback::EventCallback; + use crate::{ + FixedRetryConfig, callback::EventCallback, callback_strategy::BACK_OFF_MAX_RETRIES, + }; use alloy::{primitives::address, rpc::types::Log}; use async_trait::async_trait; use std::sync::Arc; @@ -141,9 +140,9 @@ mod tests { fn test_builder_callback_config() { let max_attempts = 5; let delay_ms = 500; - let config = CallbackConfig { max_attempts, delay_ms }; + let config = FixedRetryConfig { max_attempts, delay_ms }; - let builder = ScannerBuilder::new(WS_URL).callback_config(config.clone()); + let builder = ScannerBuilder::new(WS_URL).callback_config(config); assert_eq!(builder.callback_config.max_attempts, max_attempts); assert_eq!(builder.callback_config.delay_ms, delay_ms); @@ -153,7 +152,7 @@ mod tests { fn test_builder_default_callback_config() { let builder = ScannerBuilder::new(WS_URL); - assert_eq!(builder.callback_config.max_attempts, 3); + assert_eq!(builder.callback_config.max_attempts, BACK_OFF_MAX_RETRIES); assert_eq!(builder.callback_config.delay_ms, 200); } @@ -260,7 +259,7 @@ mod tests { let max_attempts = 5; let delay_ms = 500; - let config = CallbackConfig { max_attempts, delay_ms }; + let config = FixedRetryConfig { max_attempts, delay_ms }; let max_blocks_per_filter = 2000; let builder = ScannerBuilder::new(WS_URL) @@ -268,7 +267,7 @@ mod tests { .end_block(end_block) .max_blocks_per_filter(max_blocks_per_filter) .add_event_filter(filter.clone()) - .callback_config(config.clone()); + .callback_config(config); assert_eq!(builder.start_block, Some(start_block)); assert_eq!(builder.end_block, Some(end_block)); diff --git a/src/callback.rs b/src/callback/mod.rs similarity index 91% rename from src/callback.rs rename to src/callback/mod.rs index 0946e82a..6adf1eca 100644 --- a/src/callback.rs +++ b/src/callback/mod.rs @@ -1,6 +1,8 @@ use alloy::rpc::types::Log; use async_trait::async_trait; +pub mod strategy; + #[async_trait] pub trait EventCallback { /// Called when a matching log is found. diff --git a/src/callback/strategy/fixed_retry.rs b/src/callback/strategy/fixed_retry.rs new file mode 100644 index 00000000..3b8c86f4 --- /dev/null +++ b/src/callback/strategy/fixed_retry.rs @@ -0,0 +1,64 @@ +use std::{sync::Arc, time::Duration}; + +use alloy::rpc::types::Log; +use async_trait::async_trait; +use tracing::warn; + +use crate::callback::EventCallback; + +use super::CallbackStrategy; + +pub const BACK_OFF_MAX_RETRIES: u64 = 5; +pub const BACK_OFF_MAX_DELAY_MS: u64 = 200; + +#[derive(Clone, Copy, Debug)] +pub struct FixedRetryConfig { + pub max_attempts: u64, + pub delay_ms: u64, +} + +impl Default for FixedRetryConfig { + fn default() -> Self { + Self { max_attempts: BACK_OFF_MAX_RETRIES, delay_ms: BACK_OFF_MAX_DELAY_MS } + } +} + +pub struct FixedRetryStrategy { + cfg: FixedRetryConfig, +} + +impl FixedRetryStrategy { + #[must_use] + pub fn new(cfg: FixedRetryConfig) -> Self { + Self { cfg } + } +} + +#[async_trait] +impl CallbackStrategy for FixedRetryStrategy { + async fn execute( + &self, + callback: &Arc, + log: &Log, + ) -> anyhow::Result<()> { + match callback.on_event(log).await { + Ok(()) => Ok(()), + Err(mut last_err) => { + let attempts = self.cfg.max_attempts.max(1); + for _ in 1..attempts { + warn!( + delay_ms = self.cfg.delay_ms, + max_attempts = attempts, + "Callback failed: retrying after fixed delay" + ); + tokio::time::sleep(Duration::from_millis(self.cfg.delay_ms)).await; + match callback.on_event(log).await { + Ok(()) => return Ok(()), + Err(e) => last_err = e, + } + } + Err(last_err) + } + } + } +} diff --git a/src/callback/strategy/mod.rs b/src/callback/strategy/mod.rs new file mode 100644 index 00000000..2f31710d --- /dev/null +++ b/src/callback/strategy/mod.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +use alloy::rpc::types::Log; +use async_trait::async_trait; + +use crate::callback::EventCallback; + +pub mod fixed_retry; +pub mod state_sync_aware; + +pub use fixed_retry::{BACK_OFF_MAX_RETRIES, FixedRetryConfig, FixedRetryStrategy}; +pub use state_sync_aware::{StateSyncAwareStrategy, StateSyncConfig}; + +#[async_trait] +pub trait CallbackStrategy: Send + Sync { + async fn execute( + &self, + callback: &Arc, + log: &Log, + ) -> anyhow::Result<()>; +} diff --git a/src/callback/strategy/state_sync_aware.rs b/src/callback/strategy/state_sync_aware.rs new file mode 100644 index 00000000..b52ac8ac --- /dev/null +++ b/src/callback/strategy/state_sync_aware.rs @@ -0,0 +1,115 @@ +use std::{cmp, sync::Arc, time::Duration}; + +use alloy::rpc::types::Log; +use async_trait::async_trait; +use tracing::{info, warn}; + +use crate::{FixedRetryConfig, callback::EventCallback}; + +use super::{CallbackStrategy, fixed_retry::FixedRetryStrategy}; + +pub const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30); +pub const STATE_SYNC_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(120); +pub const STATE_SYNC_RETRY_MAX_ELAPSED: Duration = Duration::from_secs(600); +pub const STATE_SYNC_RETRY_MULTIPLIER: f64 = 1.5; + +#[derive(Clone, Copy, Debug)] +pub struct StateSyncConfig { + pub initial_interval: Duration, + pub max_interval: Duration, + pub max_elapsed: Duration, + pub multiplier: f64, +} + +impl Default for StateSyncConfig { + fn default() -> Self { + Self { + initial_interval: STATE_SYNC_RETRY_INTERVAL, + max_interval: STATE_SYNC_RETRY_MAX_INTERVAL, + max_elapsed: STATE_SYNC_RETRY_MAX_ELAPSED, + multiplier: STATE_SYNC_RETRY_MULTIPLIER, + } + } +} + +pub struct StateSyncAwareStrategy { + inner: FixedRetryStrategy, + cfg: StateSyncConfig, +} + +impl Default for StateSyncAwareStrategy { + fn default() -> Self { + Self::new() + } +} + +impl StateSyncAwareStrategy { + #[must_use] + pub fn new() -> Self { + Self { + inner: FixedRetryStrategy::new(FixedRetryConfig::default()), + cfg: StateSyncConfig::default(), + } + } + + #[must_use] + pub fn with_state_sync_config(mut self, cfg: StateSyncConfig) -> Self { + self.cfg = cfg; + self + } + + #[must_use] + pub fn with_fixed_retry_config(mut self, cfg: super::fixed_retry::FixedRetryConfig) -> Self { + self.inner = FixedRetryStrategy::new(cfg); + self + } +} + +#[async_trait] +impl CallbackStrategy for StateSyncAwareStrategy { + async fn execute( + &self, + callback: &Arc, + log: &Log, + ) -> anyhow::Result<()> { + match callback.on_event(log).await { + Ok(()) => Ok(()), + Err(first_err) => { + if is_missing_trie_node_error(&first_err) { + // state sync aware retry path + let mut delay = self.cfg.initial_interval; + let start = tokio::time::Instant::now(); + info!(initial_interval = ?self.cfg.initial_interval, max_interval = ?self.cfg.max_interval, + max_elapsed = ?self.cfg.max_elapsed, "Starting state-sync aware retry"); + let mut last_err: anyhow::Error = first_err; + loop { + if start.elapsed() >= self.cfg.max_elapsed { + return Err(last_err); + } + tokio::time::sleep(delay).await; + match callback.on_event(log).await { + Ok(()) => return Ok(()), + Err(e) => { + last_err = e; + let next_secs = delay.as_secs_f64() * self.cfg.multiplier; + let next = Duration::from_secs_f64(next_secs); + delay = cmp::min(self.cfg.max_interval, next); + let elapsed = start.elapsed(); + warn!(next_delay = ?delay, elapsed = ?elapsed, error = %last_err, + "State-sync retry operation failed: will retry"); + } + } + } + } else { + // Fixed retry for regular errors + self.inner.execute(callback, log).await + } + } + } + } +} + +fn is_missing_trie_node_error(err: &anyhow::Error) -> bool { + let s = err.to_string().to_lowercase(); + s.contains("missing trie node") && s.contains("state") && s.contains("not available") +} diff --git a/src/event_scanner.rs b/src/event_scanner.rs index 1d445fa8..6ceb06e6 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -1,9 +1,9 @@ -#![allow(unused)] -use std::{collections::HashMap, future, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use crate::{ block_scanner::{BlockScanner, BlockScannerBuilder, OnBlocksFunc}, - types::{CallbackConfig, EventFilter}, + callback::strategy::{CallbackStrategy, StateSyncAwareStrategy}, + types::EventFilter, }; use alloy::{ eips::BlockNumberOrTag, @@ -16,14 +16,14 @@ use alloy::{ }, transports::TransportError, }; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Receiver}; use tokio_stream::StreamExt; use tracing::{error, info, warn}; pub struct EventScannerBuilder { block_scanner: BlockScannerBuilder, tracked_events: Vec, - callback_config: CallbackConfig, + callback_strategy: Arc, } impl Default for EventScannerBuilder { @@ -38,67 +38,57 @@ impl EventScannerBuilder { Self { block_scanner: BlockScannerBuilder::new(), tracked_events: Vec::new(), - callback_config: CallbackConfig::default(), + callback_strategy: Self::get_default_callback_strategy(), } } - #[must_use] - pub fn with_event_filter(mut self, filter: EventFilter) -> Self { + pub fn with_event_filter(&mut self, filter: EventFilter) -> &mut Self { self.tracked_events.push(filter); self } - #[must_use] - pub fn with_event_filters(mut self, filters: Vec) -> Self { + pub fn with_event_filters(&mut self, filters: Vec) -> &mut Self { self.tracked_events.extend(filters); self } - #[must_use] - pub fn with_callback_config(mut self, cfg: CallbackConfig) -> Self { - self.callback_config = cfg; + pub fn with_callback_strategy(&mut self, strategy: Arc) -> &mut Self { + self.callback_strategy = strategy; self } - #[must_use] pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self { - self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch); + let _ = self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch); self } - #[must_use] pub fn with_start_height(&mut self, start_height: BlockNumberOrTag) -> &mut Self { - self.block_scanner.with_start_height(start_height); + let _ = self.block_scanner.with_start_height(start_height); self } - #[must_use] pub fn with_end_height(&mut self, end_height: BlockNumberOrTag) -> &mut Self { - self.block_scanner.with_end_height(end_height); + let _ = self.block_scanner.with_end_height(end_height); self } - #[must_use] pub fn with_on_blocks(&mut self, on_blocks: OnBlocksFunc) -> &mut Self { - self.block_scanner.with_on_blocks(on_blocks); + let _ = self.block_scanner.with_on_blocks(on_blocks); self } - #[must_use] pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self { - self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth); + let _ = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth); self } - #[must_use] pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self { - self.block_scanner.with_retry_interval(retry_interval); + let _ = self.block_scanner.with_retry_interval(retry_interval); self } - #[must_use] pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self { - self.block_scanner.with_block_confirmations(block_confirmations); + let _ = self.block_scanner.with_block_confirmations(block_confirmations); self } @@ -115,7 +105,7 @@ impl EventScannerBuilder { Ok(EventScanner { block_scanner, tracked_events: self.tracked_events, - callback_config: self.callback_config, + callback_strategy: self.callback_strategy, }) } @@ -135,7 +125,7 @@ impl EventScannerBuilder { Ok(EventScanner { block_scanner, tracked_events: self.tracked_events, - callback_config: self.callback_config, + callback_strategy: self.callback_strategy, }) } @@ -145,7 +135,7 @@ impl EventScannerBuilder { EventScanner { block_scanner, tracked_events: self.tracked_events, - callback_config: self.callback_config, + callback_strategy: self.callback_strategy, } } @@ -155,15 +145,20 @@ impl EventScannerBuilder { EventScanner { block_scanner, tracked_events: self.tracked_events, - callback_config: self.callback_config, + callback_strategy: self.callback_strategy, } } + + fn get_default_callback_strategy() -> Arc { + let state_sync_aware_strategy = StateSyncAwareStrategy::new(); + Arc::new(state_sync_aware_strategy) + } } pub struct EventScanner, N: Network> { block_scanner: BlockScanner, tracked_events: Vec, - callback_config: CallbackConfig, + callback_strategy: Arc, } impl, N: Network> EventScanner { @@ -177,25 +172,24 @@ impl, N: Network> EventScanner { for filter in &self.tracked_events { let event_name = filter.event.clone(); + if event_channels.contains_key(&event_name) { continue; } - let (sender, mut receiver) = mpsc::channel::(1024); // TODO: configurable buffer size / smaller buffer ? - let cfg = self.callback_config.clone(); + + // TODO: configurable buffer size / smaller buffer ? + let (sender, receiver) = mpsc::channel::(1024); + let event_name_clone = event_name.clone(); let callback = filter.callback.clone(); - tokio::spawn(async move { - while let Some(log) = receiver.recv().await { - if let Err(e) = Self::invoke_with_retry_static(&callback, &log, &cfg).await { - error!( - event = %event_name_clone, - at_block = &log.block_number, - error = %e, - "failed to invoke callback after retries" - ); - } - } - }); + let strategy = self.callback_strategy.clone(); + Self::spawn_event_callback_task_executors( + receiver, + callback, + strategy, + event_name_clone, + ); + event_channels.insert(event_name, sender); } @@ -217,6 +211,26 @@ impl, N: Network> EventScanner { Ok(()) } + fn spawn_event_callback_task_executors( + mut receiver: Receiver, + callback: Arc, + strategy: Arc, + event_name: String, + ) { + tokio::spawn(async move { + while let Some(log) = receiver.recv().await { + if let Err(e) = strategy.execute(&callback, &log).await { + error!( + event = %event_name, + at_block = &log.block_number, + error = %e, + "failed to invoke callback after retries" + ); + } + } + }); + } + async fn process_block_range( &self, from_block: u64, @@ -246,7 +260,7 @@ impl, N: Network> EventScanner { if let Some(sender) = event_channels.get(&event_filter.event) { for log in logs { - if let Err(e) = sender.send(log.clone()).await { + if let Err(e) = sender.send(log).await { warn!(event = %event_filter.event, error = %e, "failed to enqueue log for processing"); } } @@ -269,32 +283,4 @@ impl, N: Network> EventScanner { Ok(()) } - - async fn invoke_with_retry_static( - callback: &Arc, - log: &Log, - config: &CallbackConfig, - ) -> anyhow::Result<()> { - let attempts = config.max_attempts.max(1); - let mut last_err: Option = None; - for attempt in 1..=attempts { - match callback.on_event(log).await { - Ok(()) => return Ok(()), - Err(e) => { - last_err = Some(e); - if attempt < attempts { - warn!( - attempt, - max_attempts = attempts, - "callback failed; retrying after fixed delay" - ); - tokio::time::sleep(Duration::from_millis(config.delay_ms)).await; - } - } - } - } - Err(last_err.unwrap_or_else(|| anyhow::anyhow!("callback failed with unknown error"))) - } } - -// TODO: implement max channel buffer size test diff --git a/src/lib.rs b/src/lib.rs index 261bd043..7c4f4845 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,13 @@ pub mod types; pub use crate::{ builder::ScannerBuilder, - callback::EventCallback, + callback::{ + EventCallback, strategy as callback_strategy, + strategy::{ + CallbackStrategy, FixedRetryConfig, FixedRetryStrategy, StateSyncAwareStrategy, + StateSyncConfig, + }, + }, scanner::Scanner, - types::{CallbackConfig, EventFilter}, + types::EventFilter, }; diff --git a/src/scanner.rs b/src/scanner.rs index fa3360fb..d5eb4ec5 100644 --- a/src/scanner.rs +++ b/src/scanner.rs @@ -8,10 +8,7 @@ use tokio::time::{Duration, sleep}; use tokio_stream::StreamExt; use tracing::{error, info, warn}; -use crate::{ - callback::EventCallback, - types::{CallbackConfig, EventFilter}, -}; +use crate::{FixedRetryConfig, callback::EventCallback, types::EventFilter}; enum ProviderType { WebSocket, @@ -25,7 +22,7 @@ pub struct Scanner { end_block: Option, max_blocks_per_filter: u64, tracked_events: Vec, - callback_config: CallbackConfig, + callback_config: FixedRetryConfig, } impl Scanner { @@ -40,7 +37,7 @@ impl Scanner { end_block: Option, max_blocks_per_filter: u64, tracked_events: Vec, - callback_config: CallbackConfig, + callback_config: FixedRetryConfig, ) -> anyhow::Result { let provider = Self::get_provider(&rpc_url).await?; @@ -242,7 +239,7 @@ impl Scanner { async fn invoke_with_retry_static( callback: &Arc, log: &Log, - config: &CallbackConfig, + config: &FixedRetryConfig, ) -> anyhow::Result<()> { let attempts = config.max_attempts.max(1); let mut last_err: Option = None; diff --git a/src/types.rs b/src/types.rs index f97e2aad..d0aaeb30 100644 --- a/src/types.rs +++ b/src/types.rs @@ -4,18 +4,6 @@ use alloy::primitives::Address; use crate::callback::EventCallback; -#[derive(Clone, Debug)] -pub struct CallbackConfig { - pub max_attempts: usize, - pub delay_ms: u64, -} - -impl Default for CallbackConfig { - fn default() -> Self { - Self { max_attempts: 3, delay_ms: 200 } - } -} - #[derive(Clone)] pub struct EventFilter { pub contract_address: Address,