Skip to content

Commit 9d28c26

Browse files
committed
Merge branch 'main' into start-typestate
2 parents ffe2219 + c3c3ce4 commit 9d28c26

File tree

1 file changed

+122
-118
lines changed

1 file changed

+122
-118
lines changed

src/event_scanner/scanner/common.rs

Lines changed: 122 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::ops::RangeInclusive;
33
use crate::{
44
Notification, ScannerMessage,
55
block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES},
6-
event_scanner::{EventScannerResult, filter::EventFilter, listener::EventListener},
6+
event_scanner::{filter::EventFilter, listener::EventListener},
77
robust_provider::{RobustProvider, provider::Error as RobustProviderError},
88
types::TryStream,
99
};
@@ -12,10 +12,7 @@ use alloy::{
1212
rpc::types::{Filter, Log},
1313
};
1414
use tokio::{
15-
sync::{
16-
broadcast::{self, Sender, error::RecvError},
17-
mpsc,
18-
},
15+
sync::broadcast::{self, Sender, error::RecvError},
1916
task::JoinSet,
2017
};
2118
use tokio_stream::{Stream, StreamExt};
@@ -57,7 +54,12 @@ pub async fn handle_stream<N: Network, S: Stream<Item = BlockScannerResult> + Un
5754
) {
5855
let (range_tx, _) = broadcast::channel::<BlockScannerResult>(MAX_BUFFERED_MESSAGES);
5956

60-
let consumers = spawn_log_consumers(provider, listeners, &range_tx, mode);
57+
let consumers = match mode {
58+
ConsumerMode::Stream => spawn_log_consumers_in_stream_mode(provider, listeners, &range_tx),
59+
ConsumerMode::CollectLatest { count } => {
60+
spawn_log_consumers_in_collection_mode(provider, listeners, &range_tx, count)
61+
}
62+
};
6163

6264
while let Some(message) = stream.next().await {
6365
if let Err(err) = range_tx.send(message) {
@@ -74,11 +76,73 @@ pub async fn handle_stream<N: Network, S: Stream<Item = BlockScannerResult> + Un
7476
}
7577

7678
#[must_use]
77-
pub fn spawn_log_consumers<N: Network>(
79+
pub fn spawn_log_consumers_in_stream_mode<N: Network>(
7880
provider: &RobustProvider<N>,
7981
listeners: &[EventListener],
8082
range_tx: &Sender<BlockScannerResult>,
81-
mode: ConsumerMode,
83+
) -> JoinSet<()> {
84+
listeners.iter().cloned().fold(JoinSet::new(), |mut set, listener| {
85+
let EventListener { filter, sender } = listener;
86+
87+
let provider = provider.clone();
88+
let base_filter = Filter::from(&filter);
89+
let mut range_rx = range_tx.subscribe();
90+
91+
set.spawn(async move {
92+
loop {
93+
match range_rx.recv().await {
94+
Ok(message) => match message {
95+
Ok(ScannerMessage::Data(range)) => {
96+
match get_logs(range, &filter, &base_filter, &provider).await {
97+
Ok(logs) => {
98+
if logs.is_empty() {
99+
continue;
100+
}
101+
102+
if !sender.try_stream(logs).await {
103+
return;
104+
}
105+
}
106+
Err(e) => {
107+
error!(error = ?e, "Received error message");
108+
if !sender.try_stream(e).await {
109+
return;
110+
}
111+
}
112+
}
113+
}
114+
Ok(ScannerMessage::Notification(notification)) => {
115+
info!(notification = ?notification, "Received notification");
116+
if !sender.try_stream(notification).await {
117+
return;
118+
}
119+
}
120+
Err(e) => {
121+
error!(error = ?e, "Received error message");
122+
if !sender.try_stream(e).await {
123+
return;
124+
}
125+
}
126+
},
127+
Err(RecvError::Closed) => {
128+
info!("No block ranges to receive, dropping receiver.");
129+
break;
130+
}
131+
Err(RecvError::Lagged(_)) => {}
132+
}
133+
}
134+
});
135+
136+
set
137+
})
138+
}
139+
140+
#[must_use]
141+
pub fn spawn_log_consumers_in_collection_mode<N: Network>(
142+
provider: &RobustProvider<N>,
143+
listeners: &[EventListener],
144+
range_tx: &Sender<BlockScannerResult>,
145+
count: usize,
82146
) -> JoinSet<()> {
83147
listeners.iter().cloned().fold(JoinSet::new(), |mut set, listener| {
84148
let EventListener { filter, sender } = listener;
@@ -89,28 +153,51 @@ pub fn spawn_log_consumers<N: Network>(
89153

90154
set.spawn(async move {
91155
// Only used for CollectLatest
92-
let mut collected: Vec<Log> = match mode {
93-
ConsumerMode::CollectLatest { count } => Vec::with_capacity(count),
94-
ConsumerMode::Stream => Vec::new(),
95-
};
156+
let mut collected = Vec::with_capacity(count);
96157

97158
loop {
98159
match range_rx.recv().await {
99-
Ok(message) => {
100-
if !handle_block_range_message(
101-
message,
102-
&filter,
103-
&base_filter,
104-
&provider,
105-
&sender,
106-
mode,
107-
&mut collected,
108-
)
109-
.await
110-
{
111-
break;
160+
Ok(message) => match message {
161+
Ok(ScannerMessage::Data(range)) => {
162+
match get_logs(range, &filter, &base_filter, &provider).await {
163+
Ok(logs) => {
164+
if logs.is_empty() {
165+
continue;
166+
}
167+
168+
let take = count.saturating_sub(collected.len());
169+
// if we have enough logs, break
170+
if take == 0 {
171+
break;
172+
}
173+
// take latest within this range
174+
collected.extend(logs.into_iter().rev().take(take));
175+
// if we have enough logs, break
176+
if collected.len() == count {
177+
break;
178+
}
179+
}
180+
Err(e) => {
181+
error!(error = ?e, "Received error message");
182+
if !sender.try_stream(e).await {
183+
return; // channel closed
184+
}
185+
}
186+
}
112187
}
113-
}
188+
Ok(ScannerMessage::Notification(notification)) => {
189+
info!(notification = ?notification, "Received notification");
190+
if !sender.try_stream(notification).await {
191+
return;
192+
}
193+
}
194+
Err(e) => {
195+
error!(error = ?e, "Received error message");
196+
if !sender.try_stream(e).await {
197+
return;
198+
}
199+
}
200+
},
114201
Err(RecvError::Closed) => {
115202
info!("No block ranges to receive, dropping receiver.");
116203
break;
@@ -119,19 +206,17 @@ pub fn spawn_log_consumers<N: Network>(
119206
}
120207
}
121208

122-
if let ConsumerMode::CollectLatest { .. } = mode {
123-
if collected.is_empty() {
124-
info!("No logs found");
125-
_ = sender.try_stream(Notification::NoPastLogsFound).await;
126-
return;
127-
}
209+
if collected.is_empty() {
210+
info!("No logs found");
211+
_ = sender.try_stream(Notification::NoPastLogsFound).await;
212+
return;
213+
}
128214

129-
info!(count = collected.len(), "Logs found");
130-
collected.reverse(); // restore chronological order
215+
info!(count = collected.len(), "Logs found");
216+
collected.reverse(); // restore chronological order
131217

132-
info!("Sending collected logs to consumer");
133-
_ = sender.try_stream(collected).await;
134-
}
218+
info!("Sending collected logs to consumer");
219+
_ = sender.try_stream(collected).await;
135220
});
136221

137222
set
@@ -173,84 +258,3 @@ async fn get_logs<N: Network>(
173258
}
174259
}
175260
}
176-
177-
#[must_use]
178-
async fn handle_block_range_message<N: Network>(
179-
message: BlockScannerResult,
180-
filter: &EventFilter,
181-
base_filter: &Filter,
182-
provider: &RobustProvider<N>,
183-
sender: &mpsc::Sender<EventScannerResult>,
184-
mode: ConsumerMode,
185-
collected: &mut Vec<Log>,
186-
) -> bool {
187-
match message {
188-
Ok(ScannerMessage::Data(range)) => {
189-
if !handle_block_range(range, filter, base_filter, provider, sender, mode, collected)
190-
.await
191-
{
192-
return false;
193-
}
194-
}
195-
Ok(ScannerMessage::Notification(notification)) => {
196-
info!(notification = ?notification, "Received notification");
197-
if !sender.try_stream(notification).await {
198-
return false;
199-
}
200-
}
201-
Err(e) => {
202-
error!(error = ?e, "Received error message");
203-
if !sender.try_stream(e).await {
204-
return false;
205-
}
206-
}
207-
}
208-
true
209-
}
210-
211-
#[must_use]
212-
async fn handle_block_range<N: Network>(
213-
range: RangeInclusive<u64>,
214-
filter: &EventFilter,
215-
base_filter: &Filter,
216-
provider: &RobustProvider<N>,
217-
sender: &mpsc::Sender<EventScannerResult>,
218-
mode: ConsumerMode,
219-
collected: &mut Vec<Log>,
220-
) -> bool {
221-
match get_logs(range, filter, base_filter, provider).await {
222-
Ok(logs) => {
223-
if logs.is_empty() {
224-
return true;
225-
}
226-
227-
match mode {
228-
ConsumerMode::Stream => {
229-
if !sender.try_stream(logs).await {
230-
return false;
231-
}
232-
}
233-
ConsumerMode::CollectLatest { count } => {
234-
let take = count.saturating_sub(collected.len());
235-
// if we have enough logs, break
236-
if take == 0 {
237-
return false;
238-
}
239-
// take latest within this range
240-
collected.extend(logs.into_iter().rev().take(take));
241-
// if we have enough logs, break
242-
if collected.len() == count {
243-
return false;
244-
}
245-
}
246-
}
247-
}
248-
Err(e) => {
249-
error!(error = ?e, "Received error message");
250-
if !sender.try_stream(e).await {
251-
return false;
252-
}
253-
}
254-
}
255-
true
256-
}

0 commit comments

Comments
 (0)