Skip to content

Commit 3961e75

Browse files
committed
event channeling
1 parent c058ad9 commit 3961e75

File tree

2 files changed

+125
-3
lines changed

2 files changed

+125
-3
lines changed

src/block_scanner.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ where
186186
P: Provider<N>,
187187
N: Network,
188188
{
189+
pub fn provider(&self) -> &P {
190+
&self.provider
191+
}
192+
189193
pub async fn start(self) -> ReceiverStream<Result<N::BlockResponse, BlockScannerError>> {
190194
let receiver_stream = ReceiverStream::new(self.receiver);
191195

src/event_scanner.rs

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Duration;
1+
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use crate::{
44
block_scanner::{BlockScanner, BlockScannerBuilder, OnBlocksFunc},
@@ -8,9 +8,14 @@ use alloy::{
88
eips::BlockNumberOrTag,
99
network::Network,
1010
providers::{Provider, RootProvider},
11-
rpc::client::RpcClient,
11+
rpc::{
12+
client::RpcClient,
13+
types::{Filter, Log},
14+
},
1215
transports::TransportError,
1316
};
17+
use tokio::sync::mpsc;
18+
use tracing::{error, info, warn};
1419

1520
pub struct EventScannerBuilder<N: Network> {
1621
block_scanner: BlockScannerBuilder<N>,
@@ -137,6 +142,119 @@ pub struct EventScanner<P: Provider<N>, N: Network> {
137142

138143
impl<P: Provider<N>, N: Network> EventScanner<P, N> {
139144
pub async fn start(&mut self) -> anyhow::Result<()> {
140-
todo!()
145+
let mut event_channels: HashMap<String, mpsc::Sender<Log>> = HashMap::new();
146+
147+
for filter in &self.tracked_events {
148+
let event_name = filter.event.clone();
149+
if event_channels.contains_key(&event_name) {
150+
continue;
151+
}
152+
let (sender, mut receiver) = mpsc::channel::<Log>(1024); // TODO: configurable buffer size / smaller buffer ?
153+
let cfg = self.callback_config.clone();
154+
let event_name_clone = event_name.clone();
155+
let callback = filter.callback.clone();
156+
tokio::spawn(async move {
157+
while let Some(log) = receiver.recv().await {
158+
if let Err(e) = invoke_with_retry_static(&callback, &log, &cfg).await {
159+
error!(
160+
event = %event_name_clone,
161+
at_block = &log.block_number,
162+
error = %e,
163+
"failed to invoke callback after retries"
164+
);
165+
}
166+
}
167+
});
168+
event_channels.insert(event_name, sender);
169+
}
170+
171+
// TODO: replace with blockstream
172+
let from_block: u64 = 0;
173+
let to_block: u64 = 0;
174+
175+
info!(from_block, to_block, "processing placeholder block range");
176+
self.process_block_range(from_block, to_block, &event_channels).await?;
177+
178+
Ok(())
179+
}
180+
181+
async fn process_block_range(
182+
&self,
183+
from_block: u64,
184+
to_block: u64,
185+
event_channels: &HashMap<String, mpsc::Sender<Log>>,
186+
) -> anyhow::Result<()> {
187+
for event_filter in &self.tracked_events {
188+
let filter = Filter::new()
189+
.address(event_filter.contract_address)
190+
.event(event_filter.event.as_str())
191+
.from_block(from_block)
192+
.to_block(to_block);
193+
194+
match self.block_scanner.provider().get_logs(&filter).await {
195+
Ok(logs) => {
196+
if logs.is_empty() {
197+
continue;
198+
}
199+
info!(
200+
contract = ?event_filter.contract_address,
201+
event = %event_filter.event,
202+
log_count = logs.len(),
203+
from_block,
204+
to_block,
205+
"found logs for event in block range"
206+
);
207+
208+
if let Some(sender) = event_channels.get(&event_filter.event) {
209+
for log in logs {
210+
if let Err(e) = sender.send(log.clone()).await {
211+
warn!(event = %event_filter.event, error = %e, "failed to enqueue log for processing");
212+
}
213+
}
214+
} else {
215+
warn!(event = %event_filter.event, "no channel found for event type");
216+
}
217+
}
218+
Err(e) => {
219+
error!(
220+
contract = ?event_filter.contract_address,
221+
event = %event_filter.event,
222+
error = %e,
223+
from_block,
224+
to_block,
225+
"failed to get logs for block range"
226+
);
227+
}
228+
}
229+
}
230+
231+
Ok(())
232+
}
233+
}
234+
235+
async fn invoke_with_retry_static(
236+
callback: &Arc<dyn crate::callback::EventCallback + Send + Sync>,
237+
log: &Log,
238+
config: &CallbackConfig,
239+
) -> anyhow::Result<()> {
240+
let attempts = config.max_attempts.max(1);
241+
let mut last_err: Option<anyhow::Error> = None;
242+
for attempt in 1..=attempts {
243+
match callback.on_event(log).await {
244+
Ok(_) => return Ok(()),
245+
Err(e) => {
246+
last_err = Some(e);
247+
if attempt < attempts {
248+
warn!(
249+
attempt,
250+
max_attempts = attempts,
251+
"callback failed; retrying after fixed delay"
252+
);
253+
tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
254+
continue;
255+
}
256+
}
257+
}
141258
}
259+
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("callback failed with unknown error")))
142260
}

0 commit comments

Comments
 (0)