Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/historical_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
use alloy::{providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent};
use alloy_node_bindings::Anvil;
use async_trait::async_trait;
use event_scanner::{CallbackConfig, EventCallback, EventFilter, ScannerBuilder};
use event_scanner::{EventCallback, EventFilter, FixedRetryConfig, ScannerBuilder};

use tokio::time::sleep;
use tracing::info;
Expand Down Expand Up @@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> {

let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url())
.add_event_filter(increase_filter)
.callback_config(CallbackConfig { max_attempts: 3, delay_ms: 200 })
.callback_config(FixedRetryConfig { max_attempts: 3, delay_ms: 200 })
.start_block(0)
.build()
.await?;
Expand Down
4 changes: 2 additions & 2 deletions examples/simple_counter/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
use alloy::{providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent};
use alloy_node_bindings::Anvil;
use async_trait::async_trait;
use event_scanner::{CallbackConfig, EventCallback, EventFilter, ScannerBuilder};
use event_scanner::{EventCallback, EventFilter, FixedRetryConfig, ScannerBuilder};

use tokio::time::sleep;
use tracing::info;
Expand Down Expand Up @@ -64,7 +64,7 @@ async fn main() -> anyhow::Result<()> {

let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url())
.add_event_filter(increase_filter)
.callback_config(CallbackConfig { max_attempts: 3, delay_ms: 200 })
.callback_config(FixedRetryConfig { max_attempts: 3, delay_ms: 200 })
.build()
.await?;

Expand Down
9 changes: 4 additions & 5 deletions src/block_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,10 @@ use alloy::{
const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12);
const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
const BACK_OFF_MAX_RETRIES: u64 = 5;

// TODO: determine check exact default value
const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;

// State sync aware retry settings
const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const STATE_SYNC_MAX_RETRIES: u64 = 12;

#[derive(Debug)]
pub enum BlockScannerError {
ErrEOF,
Expand Down Expand Up @@ -202,6 +197,10 @@ where
P: Provider<N>,
N: Network,
{
pub fn provider(&self) -> &P {
&self.provider
}

pub async fn start(&self) -> ReceiverStream<Result<Range<u64>, BlockScannerError>> {
let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch);

Expand Down
25 changes: 12 additions & 13 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use crate::{
scanner::Scanner,
types::{CallbackConfig, EventFilter},
};
use crate::{FixedRetryConfig, scanner::Scanner, types::EventFilter};

pub struct ScannerBuilder {
rpc_url: String,
start_block: Option<u64>,
end_block: Option<u64>,
max_blocks_per_filter: u64,
tracked_events: Vec<EventFilter>,
callback_config: CallbackConfig,
callback_config: FixedRetryConfig,
}

impl ScannerBuilder {
Expand All @@ -20,7 +17,7 @@ impl ScannerBuilder {
end_block: None,
max_blocks_per_filter: 1000,
tracked_events: Vec::new(),
callback_config: CallbackConfig::default(),
callback_config: FixedRetryConfig::default(),
}
}

Expand Down Expand Up @@ -55,7 +52,7 @@ impl ScannerBuilder {
}

#[must_use]
pub fn callback_config(mut self, cfg: CallbackConfig) -> Self {
pub fn callback_config(mut self, cfg: FixedRetryConfig) -> Self {
self.callback_config = cfg;
self
}
Expand All @@ -81,7 +78,9 @@ impl ScannerBuilder {
#[cfg(test)]
mod tests {
use super::*;
use crate::callback::EventCallback;
use crate::{
FixedRetryConfig, callback::EventCallback, callback_strategy::BACK_OFF_MAX_RETRIES,
};
use alloy::{primitives::address, rpc::types::Log};
use async_trait::async_trait;
use std::sync::Arc;
Expand Down Expand Up @@ -141,9 +140,9 @@ mod tests {
fn test_builder_callback_config() {
let max_attempts = 5;
let delay_ms = 500;
let config = CallbackConfig { max_attempts, delay_ms };
let config = FixedRetryConfig { max_attempts, delay_ms };

let builder = ScannerBuilder::new(WS_URL).callback_config(config.clone());
let builder = ScannerBuilder::new(WS_URL).callback_config(config);

assert_eq!(builder.callback_config.max_attempts, max_attempts);
assert_eq!(builder.callback_config.delay_ms, delay_ms);
Expand All @@ -153,7 +152,7 @@ mod tests {
fn test_builder_default_callback_config() {
let builder = ScannerBuilder::new(WS_URL);

assert_eq!(builder.callback_config.max_attempts, 3);
assert_eq!(builder.callback_config.max_attempts, BACK_OFF_MAX_RETRIES);
assert_eq!(builder.callback_config.delay_ms, 200);
}

Expand Down Expand Up @@ -260,15 +259,15 @@ mod tests {

let max_attempts = 5;
let delay_ms = 500;
let config = CallbackConfig { max_attempts, delay_ms };
let config = FixedRetryConfig { max_attempts, delay_ms };

let max_blocks_per_filter = 2000;
let builder = ScannerBuilder::new(WS_URL)
.start_block(start_block)
.end_block(end_block)
.max_blocks_per_filter(max_blocks_per_filter)
.add_event_filter(filter.clone())
.callback_config(config.clone());
.callback_config(config);

assert_eq!(builder.start_block, Some(start_block));
assert_eq!(builder.end_block, Some(end_block));
Expand Down
2 changes: 2 additions & 0 deletions src/callback.rs → src/callback/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use alloy::rpc::types::Log;
use async_trait::async_trait;

pub mod strategy;

#[async_trait]
pub trait EventCallback {
/// Called when a matching log is found.
Expand Down
64 changes: 64 additions & 0 deletions src/callback/strategy/fixed_retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::{sync::Arc, time::Duration};

use alloy::rpc::types::Log;
use async_trait::async_trait;
use tracing::warn;

use crate::callback::EventCallback;

use super::CallbackStrategy;

pub const BACK_OFF_MAX_RETRIES: u64 = 5;
pub const BACK_OFF_MAX_DELAY_MS: u64 = 200;

#[derive(Clone, Copy, Debug)]
pub struct FixedRetryConfig {
pub max_attempts: u64,
pub delay_ms: u64,
}

impl Default for FixedRetryConfig {
fn default() -> Self {
Self { max_attempts: BACK_OFF_MAX_RETRIES, delay_ms: BACK_OFF_MAX_DELAY_MS }
}
}

pub struct FixedRetryStrategy {
cfg: FixedRetryConfig,
}

impl FixedRetryStrategy {
#[must_use]
pub fn new(cfg: FixedRetryConfig) -> Self {
Self { cfg }
}
}

#[async_trait]
impl CallbackStrategy for FixedRetryStrategy {
async fn execute(
&self,
callback: &Arc<dyn EventCallback + Send + Sync>,
log: &Log,
) -> anyhow::Result<()> {
match callback.on_event(log).await {
Ok(()) => Ok(()),
Err(mut last_err) => {
let attempts = self.cfg.max_attempts.max(1);
for _ in 1..attempts {
warn!(
delay_ms = self.cfg.delay_ms,
max_attempts = attempts,
"Callback failed: retrying after fixed delay"
);
tokio::time::sleep(Duration::from_millis(self.cfg.delay_ms)).await;
match callback.on_event(log).await {
Ok(()) => return Ok(()),
Err(e) => last_err = e,
}
}
Err(last_err)
}
}
}
}
21 changes: 21 additions & 0 deletions src/callback/strategy/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::sync::Arc;

use alloy::rpc::types::Log;
use async_trait::async_trait;

use crate::callback::EventCallback;

pub mod fixed_retry;
pub mod state_sync_aware;

pub use fixed_retry::{BACK_OFF_MAX_RETRIES, FixedRetryConfig, FixedRetryStrategy};
pub use state_sync_aware::{StateSyncAwareStrategy, StateSyncConfig};

#[async_trait]
pub trait CallbackStrategy: Send + Sync {
async fn execute(
&self,
callback: &Arc<dyn EventCallback + Send + Sync>,
log: &Log,
) -> anyhow::Result<()>;
}
115 changes: 115 additions & 0 deletions src/callback/strategy/state_sync_aware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::{cmp, sync::Arc, time::Duration};

use alloy::rpc::types::Log;
use async_trait::async_trait;
use tracing::{info, warn};

use crate::{FixedRetryConfig, callback::EventCallback};

use super::{CallbackStrategy, fixed_retry::FixedRetryStrategy};

pub const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30);
pub const STATE_SYNC_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(120);
pub const STATE_SYNC_RETRY_MAX_ELAPSED: Duration = Duration::from_secs(600);
pub const STATE_SYNC_RETRY_MULTIPLIER: f64 = 1.5;

#[derive(Clone, Copy, Debug)]
pub struct StateSyncConfig {
pub initial_interval: Duration,
pub max_interval: Duration,
pub max_elapsed: Duration,
pub multiplier: f64,
}

impl Default for StateSyncConfig {
fn default() -> Self {
Self {
initial_interval: STATE_SYNC_RETRY_INTERVAL,
max_interval: STATE_SYNC_RETRY_MAX_INTERVAL,
max_elapsed: STATE_SYNC_RETRY_MAX_ELAPSED,
multiplier: STATE_SYNC_RETRY_MULTIPLIER,
}
}
}

pub struct StateSyncAwareStrategy {
inner: FixedRetryStrategy,
cfg: StateSyncConfig,
}

impl Default for StateSyncAwareStrategy {
fn default() -> Self {
Self::new()
}
}

impl StateSyncAwareStrategy {
#[must_use]
pub fn new() -> Self {
Self {
inner: FixedRetryStrategy::new(FixedRetryConfig::default()),
cfg: StateSyncConfig::default(),
}
}

#[must_use]
pub fn with_state_sync_config(mut self, cfg: StateSyncConfig) -> Self {
self.cfg = cfg;
self
}

#[must_use]
pub fn with_fixed_retry_config(mut self, cfg: super::fixed_retry::FixedRetryConfig) -> Self {
self.inner = FixedRetryStrategy::new(cfg);
self
}
}

#[async_trait]
impl CallbackStrategy for StateSyncAwareStrategy {
async fn execute(
&self,
callback: &Arc<dyn EventCallback + Send + Sync>,
log: &Log,
) -> anyhow::Result<()> {
match callback.on_event(log).await {
Ok(()) => Ok(()),
Err(first_err) => {
if is_missing_trie_node_error(&first_err) {
// state sync aware retry path
let mut delay = self.cfg.initial_interval;
let start = tokio::time::Instant::now();
info!(initial_interval = ?self.cfg.initial_interval, max_interval = ?self.cfg.max_interval,
max_elapsed = ?self.cfg.max_elapsed, "Starting state-sync aware retry");
let mut last_err: anyhow::Error = first_err;
loop {
if start.elapsed() >= self.cfg.max_elapsed {
return Err(last_err);
}
tokio::time::sleep(delay).await;
match callback.on_event(log).await {
Ok(()) => return Ok(()),
Err(e) => {
last_err = e;
let next_secs = delay.as_secs_f64() * self.cfg.multiplier;
let next = Duration::from_secs_f64(next_secs);
delay = cmp::min(self.cfg.max_interval, next);
let elapsed = start.elapsed();
warn!(next_delay = ?delay, elapsed = ?elapsed, error = %last_err,
"State-sync retry operation failed: will retry");
}
}
}
} else {
// Fixed retry for regular errors
self.inner.execute(callback, log).await
}
}
}
}
}

fn is_missing_trie_node_error(err: &anyhow::Error) -> bool {
let s = err.to_string().to_lowercase();
s.contains("missing trie node") && s.contains("state") && s.contains("not available")
}
Loading