Skip to content

Commit 5c4be1b

Browse files
LeoPatOZ0xNeshi
andauthored
Initial Impl For Event Channeling (#14)
Co-authored-by: 0xNeshi <[email protected]>
1 parent b1fc196 commit 5c4be1b

File tree

12 files changed

+381
-80
lines changed

12 files changed

+381
-80
lines changed

examples/historical_scanning/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
33
use alloy::{providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent};
44
use alloy_node_bindings::Anvil;
55
use async_trait::async_trait;
6-
use event_scanner::{CallbackConfig, EventCallback, EventFilter, ScannerBuilder};
6+
use event_scanner::{EventCallback, EventFilter, FixedRetryConfig, ScannerBuilder};
77

88
use tokio::time::sleep;
99
use tracing::info;
@@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> {
6666

6767
let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url())
6868
.add_event_filter(increase_filter)
69-
.callback_config(CallbackConfig { max_attempts: 3, delay_ms: 200 })
69+
.callback_config(FixedRetryConfig { max_attempts: 3, delay_ms: 200 })
7070
.start_block(0)
7171
.build()
7272
.await?;

examples/simple_counter/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
33
use alloy::{providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent};
44
use alloy_node_bindings::Anvil;
55
use async_trait::async_trait;
6-
use event_scanner::{CallbackConfig, EventCallback, EventFilter, ScannerBuilder};
6+
use event_scanner::{EventCallback, EventFilter, FixedRetryConfig, ScannerBuilder};
77

88
use tokio::time::sleep;
99
use tracing::info;
@@ -64,7 +64,7 @@ async fn main() -> anyhow::Result<()> {
6464

6565
let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url())
6666
.add_event_filter(increase_filter)
67-
.callback_config(CallbackConfig { max_attempts: 3, delay_ms: 200 })
67+
.callback_config(FixedRetryConfig { max_attempts: 3, delay_ms: 200 })
6868
.build()
6969
.await?;
7070

src/block_scanner.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,10 @@ use alloy::{
2121
const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
2222
const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12);
2323
const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
24-
const BACK_OFF_MAX_RETRIES: u64 = 5;
2524

2625
// TODO: determine check exact default value
2726
const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;
2827

29-
// State sync aware retry settings
30-
const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30);
31-
const STATE_SYNC_MAX_RETRIES: u64 = 12;
32-
3328
#[derive(Debug)]
3429
pub enum BlockScannerError {
3530
ErrEOF,
@@ -202,6 +197,10 @@ where
202197
P: Provider<N>,
203198
N: Network,
204199
{
200+
pub fn provider(&self) -> &P {
201+
&self.provider
202+
}
203+
205204
pub async fn start(&self) -> ReceiverStream<Result<Range<u64>, BlockScannerError>> {
206205
let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch);
207206

src/builder.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1-
use crate::{
2-
scanner::Scanner,
3-
types::{CallbackConfig, EventFilter},
4-
};
1+
use crate::{FixedRetryConfig, scanner::Scanner, types::EventFilter};
52

63
pub struct ScannerBuilder {
74
rpc_url: String,
85
start_block: Option<u64>,
96
end_block: Option<u64>,
107
max_blocks_per_filter: u64,
118
tracked_events: Vec<EventFilter>,
12-
callback_config: CallbackConfig,
9+
callback_config: FixedRetryConfig,
1310
}
1411

1512
impl ScannerBuilder {
@@ -20,7 +17,7 @@ impl ScannerBuilder {
2017
end_block: None,
2118
max_blocks_per_filter: 1000,
2219
tracked_events: Vec::new(),
23-
callback_config: CallbackConfig::default(),
20+
callback_config: FixedRetryConfig::default(),
2421
}
2522
}
2623

@@ -55,7 +52,7 @@ impl ScannerBuilder {
5552
}
5653

5754
#[must_use]
58-
pub fn callback_config(mut self, cfg: CallbackConfig) -> Self {
55+
pub fn callback_config(mut self, cfg: FixedRetryConfig) -> Self {
5956
self.callback_config = cfg;
6057
self
6158
}
@@ -81,7 +78,9 @@ impl ScannerBuilder {
8178
#[cfg(test)]
8279
mod tests {
8380
use super::*;
84-
use crate::callback::EventCallback;
81+
use crate::{
82+
FixedRetryConfig, callback::EventCallback, callback_strategy::BACK_OFF_MAX_RETRIES,
83+
};
8584
use alloy::{primitives::address, rpc::types::Log};
8685
use async_trait::async_trait;
8786
use std::sync::Arc;
@@ -141,9 +140,9 @@ mod tests {
141140
fn test_builder_callback_config() {
142141
let max_attempts = 5;
143142
let delay_ms = 500;
144-
let config = CallbackConfig { max_attempts, delay_ms };
143+
let config = FixedRetryConfig { max_attempts, delay_ms };
145144

146-
let builder = ScannerBuilder::new(WS_URL).callback_config(config.clone());
145+
let builder = ScannerBuilder::new(WS_URL).callback_config(config);
147146

148147
assert_eq!(builder.callback_config.max_attempts, max_attempts);
149148
assert_eq!(builder.callback_config.delay_ms, delay_ms);
@@ -153,7 +152,7 @@ mod tests {
153152
fn test_builder_default_callback_config() {
154153
let builder = ScannerBuilder::new(WS_URL);
155154

156-
assert_eq!(builder.callback_config.max_attempts, 3);
155+
assert_eq!(builder.callback_config.max_attempts, BACK_OFF_MAX_RETRIES);
157156
assert_eq!(builder.callback_config.delay_ms, 200);
158157
}
159158

@@ -260,15 +259,15 @@ mod tests {
260259

261260
let max_attempts = 5;
262261
let delay_ms = 500;
263-
let config = CallbackConfig { max_attempts, delay_ms };
262+
let config = FixedRetryConfig { max_attempts, delay_ms };
264263

265264
let max_blocks_per_filter = 2000;
266265
let builder = ScannerBuilder::new(WS_URL)
267266
.start_block(start_block)
268267
.end_block(end_block)
269268
.max_blocks_per_filter(max_blocks_per_filter)
270269
.add_event_filter(filter.clone())
271-
.callback_config(config.clone());
270+
.callback_config(config);
272271

273272
assert_eq!(builder.start_block, Some(start_block));
274273
assert_eq!(builder.end_block, Some(end_block));

src/callback.rs renamed to src/callback/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use alloy::rpc::types::Log;
22
use async_trait::async_trait;
33

4+
pub mod strategy;
5+
46
#[async_trait]
57
pub trait EventCallback {
68
/// Called when a matching log is found.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use alloy::rpc::types::Log;
4+
use async_trait::async_trait;
5+
use tracing::warn;
6+
7+
use crate::callback::EventCallback;
8+
9+
use super::CallbackStrategy;
10+
11+
pub const BACK_OFF_MAX_RETRIES: u64 = 5;
12+
pub const BACK_OFF_MAX_DELAY_MS: u64 = 200;
13+
14+
#[derive(Clone, Copy, Debug)]
15+
pub struct FixedRetryConfig {
16+
pub max_attempts: u64,
17+
pub delay_ms: u64,
18+
}
19+
20+
impl Default for FixedRetryConfig {
21+
fn default() -> Self {
22+
Self { max_attempts: BACK_OFF_MAX_RETRIES, delay_ms: BACK_OFF_MAX_DELAY_MS }
23+
}
24+
}
25+
26+
pub struct FixedRetryStrategy {
27+
cfg: FixedRetryConfig,
28+
}
29+
30+
impl FixedRetryStrategy {
31+
#[must_use]
32+
pub fn new(cfg: FixedRetryConfig) -> Self {
33+
Self { cfg }
34+
}
35+
}
36+
37+
#[async_trait]
38+
impl CallbackStrategy for FixedRetryStrategy {
39+
async fn execute(
40+
&self,
41+
callback: &Arc<dyn EventCallback + Send + Sync>,
42+
log: &Log,
43+
) -> anyhow::Result<()> {
44+
match callback.on_event(log).await {
45+
Ok(()) => Ok(()),
46+
Err(mut last_err) => {
47+
let attempts = self.cfg.max_attempts.max(1);
48+
for _ in 1..attempts {
49+
warn!(
50+
delay_ms = self.cfg.delay_ms,
51+
max_attempts = attempts,
52+
"Callback failed: retrying after fixed delay"
53+
);
54+
tokio::time::sleep(Duration::from_millis(self.cfg.delay_ms)).await;
55+
match callback.on_event(log).await {
56+
Ok(()) => return Ok(()),
57+
Err(e) => last_err = e,
58+
}
59+
}
60+
Err(last_err)
61+
}
62+
}
63+
}
64+
}

src/callback/strategy/mod.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use std::sync::Arc;
2+
3+
use alloy::rpc::types::Log;
4+
use async_trait::async_trait;
5+
6+
use crate::callback::EventCallback;
7+
8+
pub mod fixed_retry;
9+
pub mod state_sync_aware;
10+
11+
pub use fixed_retry::{BACK_OFF_MAX_RETRIES, FixedRetryConfig, FixedRetryStrategy};
12+
pub use state_sync_aware::{StateSyncAwareStrategy, StateSyncConfig};
13+
14+
#[async_trait]
15+
pub trait CallbackStrategy: Send + Sync {
16+
async fn execute(
17+
&self,
18+
callback: &Arc<dyn EventCallback + Send + Sync>,
19+
log: &Log,
20+
) -> anyhow::Result<()>;
21+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
use std::{cmp, sync::Arc, time::Duration};
2+
3+
use alloy::rpc::types::Log;
4+
use async_trait::async_trait;
5+
use tracing::{info, warn};
6+
7+
use crate::{FixedRetryConfig, callback::EventCallback};
8+
9+
use super::{CallbackStrategy, fixed_retry::FixedRetryStrategy};
10+
11+
pub const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30);
12+
pub const STATE_SYNC_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(120);
13+
pub const STATE_SYNC_RETRY_MAX_ELAPSED: Duration = Duration::from_secs(600);
14+
pub const STATE_SYNC_RETRY_MULTIPLIER: f64 = 1.5;
15+
16+
#[derive(Clone, Copy, Debug)]
17+
pub struct StateSyncConfig {
18+
pub initial_interval: Duration,
19+
pub max_interval: Duration,
20+
pub max_elapsed: Duration,
21+
pub multiplier: f64,
22+
}
23+
24+
impl Default for StateSyncConfig {
25+
fn default() -> Self {
26+
Self {
27+
initial_interval: STATE_SYNC_RETRY_INTERVAL,
28+
max_interval: STATE_SYNC_RETRY_MAX_INTERVAL,
29+
max_elapsed: STATE_SYNC_RETRY_MAX_ELAPSED,
30+
multiplier: STATE_SYNC_RETRY_MULTIPLIER,
31+
}
32+
}
33+
}
34+
35+
pub struct StateSyncAwareStrategy {
36+
inner: FixedRetryStrategy,
37+
cfg: StateSyncConfig,
38+
}
39+
40+
impl Default for StateSyncAwareStrategy {
41+
fn default() -> Self {
42+
Self::new()
43+
}
44+
}
45+
46+
impl StateSyncAwareStrategy {
47+
#[must_use]
48+
pub fn new() -> Self {
49+
Self {
50+
inner: FixedRetryStrategy::new(FixedRetryConfig::default()),
51+
cfg: StateSyncConfig::default(),
52+
}
53+
}
54+
55+
#[must_use]
56+
pub fn with_state_sync_config(mut self, cfg: StateSyncConfig) -> Self {
57+
self.cfg = cfg;
58+
self
59+
}
60+
61+
#[must_use]
62+
pub fn with_fixed_retry_config(mut self, cfg: super::fixed_retry::FixedRetryConfig) -> Self {
63+
self.inner = FixedRetryStrategy::new(cfg);
64+
self
65+
}
66+
}
67+
68+
#[async_trait]
69+
impl CallbackStrategy for StateSyncAwareStrategy {
70+
async fn execute(
71+
&self,
72+
callback: &Arc<dyn EventCallback + Send + Sync>,
73+
log: &Log,
74+
) -> anyhow::Result<()> {
75+
match callback.on_event(log).await {
76+
Ok(()) => Ok(()),
77+
Err(first_err) => {
78+
if is_missing_trie_node_error(&first_err) {
79+
// state sync aware retry path
80+
let mut delay = self.cfg.initial_interval;
81+
let start = tokio::time::Instant::now();
82+
info!(initial_interval = ?self.cfg.initial_interval, max_interval = ?self.cfg.max_interval,
83+
max_elapsed = ?self.cfg.max_elapsed, "Starting state-sync aware retry");
84+
let mut last_err: anyhow::Error = first_err;
85+
loop {
86+
if start.elapsed() >= self.cfg.max_elapsed {
87+
return Err(last_err);
88+
}
89+
tokio::time::sleep(delay).await;
90+
match callback.on_event(log).await {
91+
Ok(()) => return Ok(()),
92+
Err(e) => {
93+
last_err = e;
94+
let next_secs = delay.as_secs_f64() * self.cfg.multiplier;
95+
let next = Duration::from_secs_f64(next_secs);
96+
delay = cmp::min(self.cfg.max_interval, next);
97+
let elapsed = start.elapsed();
98+
warn!(next_delay = ?delay, elapsed = ?elapsed, error = %last_err,
99+
"State-sync retry operation failed: will retry");
100+
}
101+
}
102+
}
103+
} else {
104+
// Fixed retry for regular errors
105+
self.inner.execute(callback, log).await
106+
}
107+
}
108+
}
109+
}
110+
}
111+
112+
fn is_missing_trie_node_error(err: &anyhow::Error) -> bool {
113+
let s = err.to_string().to_lowercase();
114+
s.contains("missing trie node") && s.contains("state") && s.contains("not available")
115+
}

0 commit comments

Comments
 (0)