Skip to content

Commit 9c35781

Browse files
committed
feat: Split sync scanners
1 parent 9db2569 commit 9c35781

File tree

7 files changed

+241
-221
lines changed

7 files changed

+241
-221
lines changed

examples/sync_scanning/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ async fn main() -> anyhow::Result<()> {
5757
info!("Historical event {} created", i + 1);
5858
}
5959

60-
let mut scanner = EventScanner::sync().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
60+
let mut scanner =
61+
EventScanner::sync().from_block(0).connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
6162

6263
let mut stream = scanner.subscribe(increase_filter);
6364

src/event_scanner/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ pub mod modes;
88
pub use filter::EventFilter;
99
pub use message::Message;
1010
pub use modes::{
11-
EventScanner, HistoricEventScanner, LatestEventScanner, LiveEventScanner, SyncEventScanner,
12-
SyncFromLatestEventScanner,
11+
EventScanner, HistoricEventScanner, LatestEventScanner, LiveEventScanner,
12+
SyncFromBlockEventScanner, SyncFromLatestEventScanner,
1313
};

src/event_scanner/modes/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ pub use historic::{HistoricEventScanner, HistoricScannerBuilder};
88
pub use latest::{LatestEventScanner, LatestScannerBuilder};
99
pub use live::{LiveEventScanner, LiveScannerBuilder};
1010
pub use sync::{
11-
SyncEventScanner, SyncScannerBuilder,
11+
SyncScannerBuilder,
12+
from_block::{SyncFromBlockEventScanner, SyncFromBlockEventScannerBuilder},
1213
from_latest::{SyncFromLatestEventScanner, SyncFromLatestScannerBuilder},
1314
};
1415

src/event_scanner/modes/sync.rs

Lines changed: 10 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,19 @@
1-
use alloy::{
2-
eips::BlockNumberOrTag,
3-
network::Network,
4-
providers::RootProvider,
5-
transports::{TransportResult, http::reqwest::Url},
6-
};
1+
use alloy::eips::BlockNumberOrTag;
72

8-
use tokio::sync::mpsc;
9-
use tokio_stream::wrappers::ReceiverStream;
10-
11-
use crate::{
12-
ScannerError,
13-
block_range_scanner::{
14-
BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS,
15-
MAX_BUFFERED_MESSAGES,
16-
},
17-
event_scanner::{
18-
filter::EventFilter,
19-
listener::EventListener,
20-
message::Message,
21-
modes::common::{ConsumerMode, handle_stream},
22-
},
23-
};
3+
use crate::event_scanner::modes::common::{ConsumerMode, handle_stream};
244

5+
pub(crate) mod from_block;
256
pub(crate) mod from_latest;
267

8+
use from_block::SyncFromBlockEventScannerBuilder;
279
use from_latest::SyncFromLatestScannerBuilder;
2810

29-
pub struct SyncScannerBuilder {
30-
block_range_scanner: BlockRangeScanner,
31-
from_block: BlockNumberOrTag,
32-
block_confirmations: u64,
33-
}
34-
35-
pub struct SyncEventScanner<N: Network> {
36-
config: SyncScannerBuilder,
37-
block_range_scanner: ConnectedBlockRangeScanner<N>,
38-
listeners: Vec<EventListener>,
39-
}
11+
pub struct SyncScannerBuilder;
4012

4113
impl SyncScannerBuilder {
4214
#[must_use]
4315
pub(crate) fn new() -> Self {
44-
Self {
45-
block_range_scanner: BlockRangeScanner::new(),
46-
from_block: BlockNumberOrTag::Earliest,
47-
block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
48-
}
16+
Self
4917
}
5018

5119
/// Scans the latest `count` matching events per registered listener, then automatically
@@ -143,186 +111,14 @@ impl SyncScannerBuilder {
143111
/// [`ScannerStatus::SwitchingToLive`]: crate::types::ScannerStatus::SwitchingToLive
144112
#[must_use]
145113
pub fn from_latest(self, count: usize) -> SyncFromLatestScannerBuilder {
146-
let _ = self;
147114
SyncFromLatestScannerBuilder::new(count)
148115
}
149116

150117
#[must_use]
151-
pub fn max_block_range(mut self, max_block_range: u64) -> Self {
152-
self.block_range_scanner.max_block_range = max_block_range;
153-
self
154-
}
155-
156-
#[must_use]
157-
pub fn from_block(mut self, block: impl Into<BlockNumberOrTag>) -> Self {
158-
self.from_block = block.into();
159-
self
160-
}
161-
162-
#[must_use]
163-
pub fn block_confirmations(mut self, count: u64) -> Self {
164-
self.block_confirmations = count;
165-
self
166-
}
167-
168-
/// Connects to the provider via WebSocket.
169-
///
170-
/// Final builder method: consumes the builder and returns the built [`SyncEventScanner`].
171-
///
172-
/// # Errors
173-
///
174-
/// Returns an error if the connection fails
175-
pub async fn connect_ws<N: Network>(self, ws_url: Url) -> TransportResult<SyncEventScanner<N>> {
176-
let block_range_scanner = self.block_range_scanner.connect_ws::<N>(ws_url).await?;
177-
Ok(SyncEventScanner { config: self, block_range_scanner, listeners: Vec::new() })
178-
}
179-
180-
/// Connects to the provider via IPC.
181-
///
182-
/// Final builder method: consumes the builder and returns the built [`SyncEventScanner`].
183-
///
184-
/// # Errors
185-
///
186-
/// Returns an error if the connection fails
187-
pub async fn connect_ipc<N: Network>(
118+
pub fn from_block(
188119
self,
189-
ipc_path: String,
190-
) -> TransportResult<SyncEventScanner<N>> {
191-
let block_range_scanner = self.block_range_scanner.connect_ipc::<N>(ipc_path).await?;
192-
Ok(SyncEventScanner { config: self, block_range_scanner, listeners: Vec::new() })
193-
}
194-
195-
/// Connects to an existing provider.
196-
///
197-
/// Final builder method: consumes the builder and returns the built [`SyncEventScanner`].
198-
///
199-
/// # Errors
200-
///
201-
/// Returns an error if the connection fails
202-
#[must_use]
203-
pub fn connect<N: Network>(self, provider: RootProvider<N>) -> SyncEventScanner<N> {
204-
let block_range_scanner = self.block_range_scanner.connect::<N>(provider);
205-
SyncEventScanner { config: self, block_range_scanner, listeners: Vec::new() }
206-
}
207-
}
208-
209-
impl<N: Network> SyncEventScanner<N> {
210-
#[must_use]
211-
pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream<Message> {
212-
let (sender, receiver) = mpsc::channel::<Message>(MAX_BUFFERED_MESSAGES);
213-
self.listeners.push(EventListener { filter, sender });
214-
ReceiverStream::new(receiver)
215-
}
216-
217-
/// Starts the scanner in sync (historical → live) mode.
218-
///
219-
/// Streams from `from_block` up to the current confirmed tip using the configured
220-
/// `block_confirmations`, then continues streaming new confirmed ranges live.
221-
///
222-
/// # Reorg behavior
223-
///
224-
/// - In live mode, emits [`ScannerStatus::ReorgDetected`] and adjusts the next confirmed range
225-
/// using `block_confirmations` to re-emit the confirmed portion.
226-
///
227-
/// # Errors
228-
///
229-
/// - `EventScannerMessage::ServiceShutdown` if the service is already shutting down.
230-
pub async fn start(self) -> Result<(), ScannerError> {
231-
let client = self.block_range_scanner.run()?;
232-
let stream =
233-
client.stream_from(self.config.from_block, self.config.block_confirmations).await?;
234-
235-
let provider = self.block_range_scanner.provider().clone();
236-
let listeners = self.listeners.clone();
237-
238-
tokio::spawn(async move {
239-
handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await;
240-
});
241-
Ok(())
242-
}
243-
}
244-
245-
#[cfg(test)]
246-
mod tests {
247-
use super::*;
248-
use alloy::{network::Ethereum, rpc::client::RpcClient, transports::mock::Asserter};
249-
250-
#[test]
251-
fn test_sync_scanner_config_defaults() {
252-
let config = SyncScannerBuilder::new();
253-
254-
assert!(matches!(config.from_block, BlockNumberOrTag::Earliest));
255-
assert_eq!(config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
256-
}
257-
258-
#[test]
259-
fn test_sync_scanner_builder_pattern() {
260-
let config = SyncScannerBuilder::new()
261-
.max_block_range(25)
262-
.block_confirmations(5)
263-
.from_block(BlockNumberOrTag::Number(50));
264-
265-
assert_eq!(config.block_range_scanner.max_block_range, 25);
266-
assert_eq!(config.block_confirmations, 5);
267-
assert!(matches!(config.from_block, BlockNumberOrTag::Number(50)));
268-
}
269-
270-
#[test]
271-
fn test_sync_scanner_builder_with_different_block_types() {
272-
let config = SyncScannerBuilder::new()
273-
.from_block(BlockNumberOrTag::Earliest)
274-
.block_confirmations(20)
275-
.max_block_range(100);
276-
277-
assert!(matches!(config.from_block, BlockNumberOrTag::Earliest));
278-
assert_eq!(config.block_confirmations, 20);
279-
assert_eq!(config.block_range_scanner.max_block_range, 100);
280-
}
281-
282-
#[test]
283-
fn test_sync_scanner_builder_with_zero_confirmations() {
284-
let config =
285-
SyncScannerBuilder::new().from_block(0).block_confirmations(0).max_block_range(75);
286-
287-
assert!(matches!(config.from_block, BlockNumberOrTag::Number(0)));
288-
assert_eq!(config.block_confirmations, 0);
289-
assert_eq!(config.block_range_scanner.max_block_range, 75);
290-
}
291-
292-
#[test]
293-
fn test_sync_scanner_builder_last_call_wins() {
294-
let config = SyncScannerBuilder::new()
295-
.max_block_range(25)
296-
.max_block_range(55)
297-
.max_block_range(105)
298-
.from_block(1)
299-
.from_block(2)
300-
.block_confirmations(5)
301-
.block_confirmations(7);
302-
303-
assert_eq!(config.block_range_scanner.max_block_range, 105);
304-
assert!(matches!(config.from_block, BlockNumberOrTag::Number(2)));
305-
assert_eq!(config.block_confirmations, 7);
306-
}
307-
308-
#[test]
309-
fn test_sync_event_stream_listeners_vector_updates() {
310-
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
311-
let mut scanner = SyncScannerBuilder::new().connect::<Ethereum>(provider);
312-
assert_eq!(scanner.listeners.len(), 0);
313-
let _stream1 = scanner.subscribe(EventFilter::new());
314-
assert_eq!(scanner.listeners.len(), 1);
315-
let _stream2 = scanner.subscribe(EventFilter::new());
316-
let _stream3 = scanner.subscribe(EventFilter::new());
317-
assert_eq!(scanner.listeners.len(), 3);
318-
}
319-
320-
#[test]
321-
fn test_sync_event_stream_channel_capacity() {
322-
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
323-
let mut scanner = SyncScannerBuilder::new().connect::<Ethereum>(provider);
324-
let _stream = scanner.subscribe(EventFilter::new());
325-
let sender = &scanner.listeners[0].sender;
326-
assert_eq!(sender.capacity(), MAX_BUFFERED_MESSAGES);
120+
block: impl Into<BlockNumberOrTag>,
121+
) -> SyncFromBlockEventScannerBuilder {
122+
SyncFromBlockEventScannerBuilder::new(block.into())
327123
}
328124
}

0 commit comments

Comments
 (0)