Skip to content

Commit 70bcb9f

Browse files
LeoPatOZ0xNeshi
andauthored
Add parallel event fetching (#62)
Co-authored-by: Nenad <[email protected]>
1 parent 487ac13 commit 70bcb9f

File tree

2 files changed

+58
-148
lines changed

2 files changed

+58
-148
lines changed

src/block_range_scanner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ pub struct ConnectedBlockRangeScanner<N: Network> {
282282
impl<N: Network> ConnectedBlockRangeScanner<N> {
283283
/// Returns the underlying Provider.
284284
#[must_use]
285-
pub fn provider(&self) -> &impl Provider<N> {
285+
pub fn provider(&self) -> &RootProvider<N> {
286286
&self.provider
287287
}
288288

src/event_scanner.rs

Lines changed: 57 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,17 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use std::sync::Arc;
22

33
use crate::{
44
block_range_scanner::{self, BlockRangeScanner, ConnectedBlockRangeScanner},
55
callback::strategy::{CallbackStrategy, StateSyncAwareStrategy},
66
event_filter::EventFilter,
77
};
88
use alloy::{
9-
eips::BlockNumberOrTag,
10-
network::Network,
11-
primitives::Address,
12-
providers::Provider,
13-
rpc::types::{Filter, Log},
9+
eips::BlockNumberOrTag, network::Network, providers::Provider, rpc::types::Filter,
1410
transports::http::reqwest::Url,
1511
};
16-
use tokio::sync::mpsc::{self, Receiver};
12+
use tokio::sync::broadcast::{self, error::RecvError};
1713
use tokio_stream::StreamExt;
18-
use tracing::{error, info, warn};
14+
use tracing::{error, info};
1915

2016
pub struct EventScannerBuilder {
2117
block_range_scanner: BlockRangeScanner,
@@ -132,12 +128,6 @@ pub struct EventScanner<N: Network> {
132128
callback_strategy: Arc<dyn CallbackStrategy>,
133129
}
134130

135-
#[derive(Hash, Eq, PartialEq)]
136-
struct EventIdentifier {
137-
contract_address: Option<Address>,
138-
event: Option<String>,
139-
}
140-
141131
impl<N: Network> EventScanner<N> {
142132
/// Starts the scanner
143133
///
@@ -149,30 +139,6 @@ impl<N: Network> EventScanner<N> {
149139
start_height: BlockNumberOrTag,
150140
end_height: Option<BlockNumberOrTag>,
151141
) -> anyhow::Result<()> {
152-
let mut event_channels: HashMap<EventIdentifier, mpsc::Sender<Log>> = HashMap::new();
153-
154-
for filter in &self.tracked_events {
155-
let unique_event = EventIdentifier {
156-
contract_address: filter.contract_address,
157-
event: filter.event.clone(),
158-
};
159-
160-
if event_channels.contains_key(&unique_event) {
161-
continue;
162-
}
163-
164-
let (sender, receiver) = mpsc::channel::<Log>(1024);
165-
166-
let callback = filter.callback.clone();
167-
let strategy = self.callback_strategy.clone();
168-
let event_name = filter.event.clone().unwrap_or_else(|| "all events".to_string());
169-
Self::spawn_event_callback_task_executors(receiver, callback, strategy, event_name);
170-
171-
event_channels.insert(unique_event, sender);
172-
}
173-
174-
// TODO: Once we have commands in the event scanner we can
175-
// use them to start the different subscription
176142
let client = self.block_range_scanner.run()?;
177143
let mut stream = if let Some(end_height) = end_height {
178144
client.stream_historical(start_height, end_height).await?
@@ -182,125 +148,69 @@ impl<N: Network> EventScanner<N> {
182148
client.stream_from(start_height).await?
183149
};
184150

185-
while let Some(range) = stream.next().await {
186-
match range {
187-
Ok(range) => {
188-
let from_block = *range.start();
189-
let to_block = *range.end();
190-
info!(from_block, to_block, "processing block range");
191-
self.process_block_range(from_block, to_block, &event_channels).await?;
192-
}
193-
Err(e) => {
194-
error!(error = %e, "failed to get block range");
195-
}
196-
}
197-
}
198-
199-
Ok(())
200-
}
151+
let (range_tx, _) = broadcast::channel::<(u64, u64)>(1024);
201152

202-
/// Spawns background tasks that drive callback execution for an event type.
203-
fn spawn_event_callback_task_executors(
204-
mut receiver: Receiver<Log>,
205-
callback: Arc<dyn crate::callback::EventCallback + Send + Sync>,
206-
strategy: Arc<dyn CallbackStrategy>,
207-
event_name: String,
208-
) {
209-
tokio::spawn(async move {
210-
while let Some(log) = receiver.recv().await {
211-
if let Err(e) = strategy.execute(&callback, &log).await {
212-
error!(
213-
event = %event_name,
214-
at_block = &log.block_number,
215-
error = %e,
216-
"failed to invoke callback after retries"
217-
);
218-
}
219-
}
220-
});
221-
}
222-
223-
/// Fetches logs for the supplied inclusive block range [`from_block..=to_block`] and forwards
224-
/// them to the callback channels.
225-
async fn process_block_range(
226-
&self,
227-
from_block: u64,
228-
to_block: u64,
229-
event_channels: &HashMap<EventIdentifier, mpsc::Sender<Log>>,
230-
) -> anyhow::Result<()> {
231-
for event_filter in &self.tracked_events {
232-
let mut filter = Filter::new().from_block(from_block).to_block(to_block);
233-
234-
// Add contract address filter if specified
235-
if let Some(contract_address) = event_filter.contract_address {
236-
filter = filter.address(contract_address);
237-
}
238-
239-
// Add event signature filter if specified
240-
if let Some(event_signature) = &event_filter.event {
241-
filter = filter.event(event_signature.as_str());
242-
}
153+
for filter in &self.tracked_events {
154+
let provider = self.block_range_scanner.provider().clone();
155+
let mut sub = range_tx.subscribe();
156+
let filter = filter.clone();
157+
let strategy = self.callback_strategy.clone();
243158

244-
match self.block_range_scanner.provider().get_logs(&filter).await {
245-
Ok(logs) => {
246-
if logs.is_empty() {
247-
continue;
248-
}
249-
let contract_display = event_filter
250-
.contract_address
251-
.map_or_else(|| "all contracts".to_string(), |addr| format!("{addr:?}"));
252-
let event_display = event_filter.event.as_deref().map_or("all events", |s| s);
159+
tokio::spawn(async move {
160+
loop {
161+
match sub.recv().await {
162+
Ok((from_block, to_block)) => {
163+
let mut log_filter =
164+
Filter::new().from_block(from_block).to_block(to_block);
253165

254-
info!(
255-
contract = %contract_display,
256-
event = %event_display,
257-
log_count = logs.len(),
258-
from_block,
259-
to_block,
260-
"found logs for event in block range"
261-
);
166+
if let Some(contract_address) = filter.contract_address {
167+
log_filter = log_filter.address(contract_address);
168+
}
262169

263-
let event_identifier = EventIdentifier {
264-
contract_address: event_filter.contract_address,
265-
event: event_filter.event.clone(),
266-
};
170+
if let Some(ref event_signature) = filter.event {
171+
log_filter = log_filter.event(event_signature.as_str());
172+
}
267173

268-
if let Some(sender) = event_channels.get(&event_identifier) {
269-
for log in logs {
270-
if let Err(e) = sender.send(log).await {
271-
let contract_display = event_filter.contract_address.map_or_else(
272-
|| "all contracts".to_string(),
273-
|addr| format!("{addr:?}"),
274-
);
275-
let event_display =
276-
event_filter.event.as_deref().map_or("all events", |s| s);
277-
warn!(contract = %contract_display, event = %event_display, error = %e, "failed to enqueue log for processing");
174+
match provider.get_logs(&log_filter).await {
175+
Ok(logs) => {
176+
if logs.is_empty() {
177+
continue;
178+
}
179+
info!(contract = ?filter.contract_address, event = ?filter.event, log_count = logs.len(), from_block, to_block, "found logs for event in block range");
180+
for log in logs {
181+
if let Err(e) =
182+
strategy.execute(&filter.callback, &log).await
183+
{
184+
error!(event = ?filter.event, at_block = &log.block_number, error = %e, "failed to invoke callback after retries");
185+
}
186+
}
187+
}
188+
Err(e) => {
189+
error!(contract = ?filter.contract_address, event = ?filter.event, error = %e, from_block, to_block, "failed to get logs for block range");
190+
}
278191
}
279192
}
280-
} else {
281-
let contract_display = event_filter.contract_address.map_or_else(
282-
|| "all contracts".to_string(),
283-
|addr| format!("{addr:?}"),
284-
);
285-
let event_display =
286-
event_filter.event.as_deref().map_or("all events", |s| s);
287-
warn!(contract = %contract_display, event = %event_display, "no channel found for event type");
193+
// TODO: What happens if the broadcast channel is closed?
194+
Err(RecvError::Closed) => break,
195+
Err(RecvError::Lagged(_)) => {}
288196
}
289197
}
290-
Err(e) => {
291-
let contract_display = event_filter
292-
.contract_address
293-
.map_or_else(|| "all contracts".to_string(), |addr| format!("{addr:?}"));
294-
let event_display = event_filter.event.as_deref().map_or("all events", |s| s);
198+
});
199+
}
295200

296-
error!(
297-
contract = %contract_display,
298-
event = %event_display,
299-
error = %e,
300-
from_block,
301-
to_block,
302-
"failed to get logs for block range"
303-
);
201+
while let Some(range) = stream.next().await {
202+
match range {
203+
Ok(range) => {
204+
let from_block = *range.start();
205+
let to_block = *range.end();
206+
info!(from_block, to_block, "processing block range");
207+
if let Err(e) = range_tx.send((from_block, to_block)) {
208+
error!(error = %e, "failed to send block range to broadcast channel");
209+
break;
210+
}
211+
}
212+
Err(e) => {
213+
error!(error = %e, "failed to get block range");
304214
}
305215
}
306216
}

0 commit comments

Comments
 (0)