Skip to content

Commit 81b3aba

Browse files
LeoPatOZ0xNeshi
andauthored
Accessing event stream now depends on scanner.start() being Invoked (#220)
Co-authored-by: 0xNeshi <[email protected]>
1 parent 09c3c1d commit 81b3aba

File tree

26 files changed

+310
-146
lines changed

26 files changed

+310
-146
lines changed

README.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,13 @@ async fn run_scanner(
8585
.contract_address(contract)
8686
.event(MyContract::SomeEvent::SIGNATURE);
8787

88-
let mut stream = scanner.subscribe(filter);
88+
let subscription = scanner.subscribe(filter);
8989

90-
// Start the scanner
91-
scanner.start().await?;
90+
// Start the scanner and get the proof
91+
let proof = scanner.start().await?;
92+
93+
// Access the stream using the proof
94+
let mut stream = subscription.stream(&proof);
9295

9396
// Process messages from the stream
9497
while let Some(message) = stream.next().await {
@@ -174,7 +177,7 @@ let scanner = EventScannerBuilder::sync()
174177
.await?;
175178
```
176179

177-
Invoking `scanner.start()` starts the scanner in the specified mode.
180+
Invoking `scanner.start()` starts the scanner in the specified mode and returns a `StartProof` that must be passed to `subscription.stream()` to access the event stream. This compile-time guarantee ensures the scanner is started before attempting to read events.
178181

179182
### Defining Event Filters
180183

examples/historical_scanning/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,12 @@ async fn main() -> anyhow::Result<()> {
6363

6464
let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?;
6565

66-
let mut stream = scanner.subscribe(increase_filter);
66+
let subscription = scanner.subscribe(increase_filter);
6767

68-
scanner.start().await.expect("failed to start scanner");
68+
let proof = scanner.start().await.expect("failed to start scanner");
69+
70+
// Access the stream using the proof (proves scanner is started)
71+
let mut stream = subscription.stream(&proof);
6972

7073
while let Some(message) = stream.next().await {
7174
match message {

examples/latest_events_scanning/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,16 @@ async fn main() -> anyhow::Result<()> {
6060

6161
let mut scanner = EventScannerBuilder::latest(5).connect(robust_provider).await?;
6262

63-
let mut stream = scanner.subscribe(increase_filter);
63+
let subscription = scanner.subscribe(increase_filter);
6464

6565
for _ in 0..8 {
6666
_ = counter_contract.increase().send().await?;
6767
}
6868

69-
scanner.start().await?;
69+
let proof = scanner.start().await?;
70+
71+
// Access the stream using the proof (proves scanner is started)
72+
let mut stream = subscription.stream(&proof);
7073

7174
while let Some(message) = stream.next().await {
7275
match message {

examples/live_scanning/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,12 @@ async fn main() -> anyhow::Result<()> {
6161

6262
let mut scanner = EventScannerBuilder::live().connect(robust_provider).await?;
6363

64-
let mut stream = scanner.subscribe(increase_filter);
64+
let subscription = scanner.subscribe(increase_filter);
6565

66-
scanner.start().await.expect("failed to start scanner");
66+
let proof = scanner.start().await.expect("failed to start scanner");
67+
68+
// Access the stream using the proof (proves scanner is started)
69+
let mut stream = subscription.stream(&proof);
6770

6871
_ = counter_contract.increase().send().await?;
6972

examples/sync_from_block_scanning/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,13 @@ async fn main() -> anyhow::Result<()> {
6969

7070
let mut scanner = EventScannerBuilder::sync().from_block(0).connect(robust_provider).await?;
7171

72-
let mut stream = scanner.subscribe(increase_filter);
72+
let subscription = scanner.subscribe(increase_filter);
7373

7474
info!("Starting sync scanner...");
75-
scanner.start().await.expect("failed to start scanner");
75+
let proof = scanner.start().await.expect("failed to start scanner");
76+
77+
// Access the stream using the proof (proves scanner is started)
78+
let mut stream = subscription.stream(&proof);
7679

7780
info!("Creating live events...");
7881
for i in 0..2 {

examples/sync_from_latest_scanning/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,16 @@ async fn main() -> anyhow::Result<()> {
6161

6262
let mut client = EventScannerBuilder::sync().from_latest(5).connect(robust_provider).await?;
6363

64-
let mut stream = client.subscribe(increase_filter);
64+
let subscription = client.subscribe(increase_filter);
6565

6666
for _ in 0..10 {
6767
_ = counter_contract.increase().send().await?;
6868
}
6969

70-
client.start().await.expect("failed to start scanner");
70+
let proof = client.start().await.expect("failed to start scanner");
71+
72+
// Access the stream using the proof (proves scanner is started)
73+
let mut stream = subscription.stream(&proof);
7174

7275
// emit some events for live mode to pick up
7376
_ = counter_contract.increase().send().await?;

src/event_scanner/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ mod scanner;
1616
pub use filter::EventFilter;
1717
pub use message::{EventScannerResult, Message};
1818
pub use scanner::{
19-
DEFAULT_MAX_CONCURRENT_FETCHES, EventScanner, EventScannerBuilder, Historic, LatestEvents,
20-
Live, SyncFromBlock, SyncFromLatestEvents, block_range_handler,
19+
DEFAULT_MAX_CONCURRENT_FETCHES, EventScanner, EventScannerBuilder, EventSubscription, Historic,
20+
LatestEvents, Live, StartProof, SyncFromBlock, SyncFromLatestEvents, block_range_handler,
2121
};

src/event_scanner/scanner/historic.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use alloy::{
77
use super::block_range_handler::StreamHandler;
88
use crate::{
99
EventScannerBuilder, ScannerError,
10-
event_scanner::scanner::{EventScanner, Historic, block_range_handler::BlockRangeHandler},
10+
event_scanner::{
11+
StartProof,
12+
scanner::{EventScanner, Historic, block_range_handler::BlockRangeHandler},
13+
},
1114
robust_provider::IntoRobustProvider,
1215
};
1316

@@ -133,7 +136,7 @@ impl<N: Network> EventScanner<Historic, N> {
133136
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
134137
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
135138
/// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved.
136-
pub async fn start(self) -> Result<(), ScannerError> {
139+
pub async fn start(self) -> Result<StartProof, ScannerError> {
137140
info!(
138141
from_block = ?self.config.from_block,
139142
to_block = ?self.config.to_block,
@@ -159,7 +162,7 @@ impl<N: Network> EventScanner<Historic, N> {
159162
handler.handle(stream).await;
160163
});
161164

162-
Ok(())
165+
Ok(StartProof::new())
163166
}
164167
}
165168

src/event_scanner/scanner/latest.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use alloy::{
77
use crate::{
88
EventScannerBuilder, ScannerError,
99
event_scanner::{
10-
EventScanner, LatestEvents,
10+
EventScanner, LatestEvents, StartProof,
1111
scanner::block_range_handler::{BlockRangeHandler, LatestEventsHandler},
1212
},
1313
robust_provider::IntoRobustProvider,
@@ -146,7 +146,15 @@ impl<N: Network> EventScanner<LatestEvents, N> {
146146
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
147147
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
148148
/// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved.
149-
pub async fn start(self) -> Result<(), ScannerError> {
149+
pub async fn start(self) -> Result<StartProof, ScannerError> {
150+
info!(
151+
from_block = ?self.config.from_block,
152+
to_block = ?self.config.to_block,
153+
count = ?self.config.count,
154+
listener_count = self.listeners.len(),
155+
"Starting EventScanner in LatestEvents mode"
156+
);
157+
150158
let stream = self
151159
.block_range_scanner
152160
.stream_rewind(self.config.from_block, self.config.to_block)
@@ -166,7 +174,7 @@ impl<N: Network> EventScanner<LatestEvents, N> {
166174
handler.handle(stream).await;
167175
});
168176

169-
Ok(())
177+
Ok(StartProof::new())
170178
}
171179
}
172180

src/event_scanner/scanner/live.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use alloy::network::Network;
33
use crate::{
44
EventScannerBuilder, ScannerError,
55
event_scanner::{
6-
EventScanner,
6+
EventScanner, StartProof,
77
scanner::{
88
Live,
99
block_range_handler::{BlockRangeHandler, StreamHandler},
@@ -72,7 +72,7 @@ impl<N: Network> EventScanner<Live, N> {
7272
///
7373
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
7474
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
75-
pub async fn start(self) -> Result<(), ScannerError> {
75+
pub async fn start(self) -> Result<StartProof, ScannerError> {
7676
info!(
7777
block_confirmations = self.config.block_confirmations,
7878
listener_count = self.listeners.len(),
@@ -93,7 +93,7 @@ impl<N: Network> EventScanner<Live, N> {
9393
handler.handle(stream).await;
9494
});
9595

96-
Ok(())
96+
Ok(StartProof::new())
9797
}
9898
}
9999

0 commit comments

Comments
 (0)