Skip to content

Commit 2b7f504

Browse files
committed
fix: integrate latest into scanner api refactor
1 parent 69d336c commit 2b7f504

File tree

11 files changed

+258
-217
lines changed

11 files changed

+258
-217
lines changed

examples/latest_events_scanning/main.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent};
22
use alloy_node_bindings::Anvil;
3-
use event_scanner::{
4-
EventFilter,
5-
event_scanner::{EventScanner, EventScannerMessage},
6-
};
7-
3+
use event_scanner::{EventFilter, EventScanner, EventScannerMessage};
84
use tokio_stream::StreamExt;
95
use tracing::{error, info};
106
use tracing_subscriber::EnvFilter;
117

128
sol! {
139
#[allow(missing_docs)]
14-
#[sol(rpc, bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220b846b706f79f5ae1fc4a4238319e723a092f47ce4051404186424739164ab02264736f6c634300081e0033")]
15-
contract Counter {
10+
#[sol(rpc,
11+
bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220b846b706f79f5ae1fc4a4238319e723a092f47ce4051404186424739164ab02264736f6c634300081e0033"
12+
)] contract Counter {
1613
uint256 public count;
1714

1815
event CountIncreased(uint256 newCount);
@@ -51,31 +48,29 @@ async fn main() -> anyhow::Result<()> {
5148
.with_contract_address(*contract_address)
5249
.with_event(Counter::CountIncreased::SIGNATURE);
5350

54-
let mut client = EventScanner::new().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
51+
let mut scanner =
52+
EventScanner::latest().count(5).connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
5553

56-
let mut stream = client.create_event_stream(increase_filter);
54+
let mut stream = scanner.create_event_stream(increase_filter);
5755

58-
for _ in 0..10 {
56+
for _ in 0..8 {
5957
_ = counter_contract.increase().send().await?;
6058
}
6159

62-
tokio::spawn(async move {
63-
client.scan_latest(5).await.expect("failed to start scanner");
64-
});
60+
scanner.start().await?;
6561

66-
// only the last 5 events will be streamed
6762
while let Some(message) = stream.next().await {
6863
match message {
6964
EventScannerMessage::Data(logs) => {
7065
for log in logs {
71-
info!("Callback successfully executed with event {:?}", log.inner.data);
66+
info!("Received event: {:?}", log.inner.data);
7267
}
7368
}
7469
EventScannerMessage::Error(e) => {
7570
error!("Received error: {}", e);
7671
}
7772
EventScannerMessage::Status(info) => {
78-
info!("Received info: {:?}", info);
73+
info!("Received status: {:?}", info);
7974
}
8075
}
8176
}

src/event_lib/modes/latest.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ use super::{BaseConfig, BaseConfigBuilder};
2121
pub struct LatestScannerConfig {
2222
base: BaseConfig,
2323
// Defatuls to 1
24-
count: u64,
25-
// Defaults to Earliest
26-
from_block: BlockNumberOrTag,
24+
count: usize,
2725
// Defaults to Latest
26+
from_block: BlockNumberOrTag,
27+
// Defaults to Earliest
2828
to_block: BlockNumberOrTag,
2929
// Defaults to 0
3030
block_confirmations: u64,
@@ -49,8 +49,8 @@ impl LatestScannerConfig {
4949
Self {
5050
base: BaseConfig::new(),
5151
count: 1,
52-
from_block: BlockNumberOrTag::Earliest,
53-
to_block: BlockNumberOrTag::Latest,
52+
from_block: BlockNumberOrTag::Latest,
53+
to_block: BlockNumberOrTag::Earliest,
5454
block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
5555
switch_to_live: false,
5656
}
@@ -63,7 +63,7 @@ impl LatestScannerConfig {
6363
}
6464

6565
#[must_use]
66-
pub fn count(mut self, count: u64) -> Self {
66+
pub fn count(mut self, count: usize) -> Self {
6767
self.count = count;
6868
self
6969
}
@@ -189,7 +189,9 @@ impl<N: Network> LatestEventScanner<N> {
189189
/// * `EventScannerMessage::ServiceShutdown` - if the service is already shutting down.
190190
#[allow(clippy::unused_async)]
191191
pub async fn start(self) -> Result<(), EventScannerError> {
192-
unimplemented!()
192+
self.inner
193+
.stream_latest(self.config.count, self.config.from_block, self.config.to_block)
194+
.await
193195
}
194196
}
195197

@@ -202,8 +204,8 @@ mod tests {
202204
let config = LatestScannerConfig::new();
203205

204206
assert_eq!(config.count, 1);
205-
assert!(matches!(config.from_block, BlockNumberOrTag::Earliest));
206-
assert!(matches!(config.to_block, BlockNumberOrTag::Latest));
207+
assert!(matches!(config.from_block, BlockNumberOrTag::Latest));
208+
assert!(matches!(config.to_block, BlockNumberOrTag::Earliest));
207209
assert_eq!(config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
208210
assert!(!config.switch_to_live);
209211
}

src/event_lib/scanner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ impl<N: Network> EventScannerService<N> {
183183
/// * Returns `EventScannerError` if the scanner fails to start or fetching logs fails.
184184
///
185185
/// [`ScannerStatus::ReorgDetected`]: crate::types::ScannerStatus::ReorgDetected
186-
pub async fn scan_latest<T: Into<BlockNumberOrTag>>(
186+
pub async fn stream_latest<T: Into<BlockNumberOrTag>>(
187187
self,
188188
count: usize,
189189
start_height: T,

tests/common.rs

Lines changed: 92 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use alloy::{
1010
sol_types::SolEvent,
1111
};
1212
use alloy_node_bindings::{Anvil, AnvilInstance};
13-
use event_scanner::{EventFilter, EventScanner, EventScannerMessage, LiveEventScanner};
13+
use event_scanner::{
14+
EventFilter, EventScanner, EventScannerMessage, HistoricEventScanner, LiveEventScanner,
15+
SyncEventScanner, event_lib::modes::LatestEventScanner,
16+
};
1417
use tokio_stream::wrappers::ReceiverStream;
1518

1619
// Shared test contract used across integration tests
@@ -42,23 +45,32 @@ sol! {
4245
}
4346
}
4447

45-
pub struct LiveScannerSetup<P>
48+
pub struct ScannerSetup<S, P>
4649
where
4750
P: Provider<Ethereum> + Clone,
4851
{
4952
pub provider: RootProvider,
5053
pub contract: TestCounter::TestCounterInstance<Arc<P>>,
51-
pub scanner: LiveEventScanner<Ethereum>,
54+
pub scanner: S,
5255
pub stream: ReceiverStream<EventScannerMessage>,
5356
pub anvil: AnvilInstance,
5457
}
5558

59+
pub type LiveScannerSetup<P> = ScannerSetup<LiveEventScanner<Ethereum>, P>;
60+
pub type HistoricScannerSetup<P> = ScannerSetup<HistoricEventScanner<Ethereum>, P>;
61+
pub type SyncScannerSetup<P> = ScannerSetup<SyncEventScanner<Ethereum>, P>;
62+
pub type LatestScannerSetup<P> = ScannerSetup<LatestEventScanner<Ethereum>, P>;
63+
5664
#[allow(clippy::missing_errors_doc)]
57-
pub async fn setup_live_scanner(
65+
pub async fn setup_common(
5866
block_interval: Option<f64>,
5967
filter: Option<EventFilter>,
60-
confirmations: u64,
61-
) -> anyhow::Result<LiveScannerSetup<impl Provider<Ethereum> + Clone>> {
68+
) -> anyhow::Result<(
69+
AnvilInstance,
70+
RootProvider,
71+
TestCounter::TestCounterInstance<Arc<RootProvider>>,
72+
EventFilter,
73+
)> {
6274
let anvil = spawn_anvil(block_interval)?;
6375
let provider = build_provider(&anvil).await?;
6476
let contract = deploy_counter(Arc::new(provider.clone())).await?;
@@ -69,15 +81,87 @@ pub async fn setup_live_scanner(
6981

7082
let filter = filter.unwrap_or(default_filter);
7183

84+
Ok((anvil, provider, contract, filter))
85+
}
86+
87+
#[allow(clippy::missing_errors_doc)]
88+
pub async fn setup_live_scanner(
89+
block_interval: Option<f64>,
90+
filter: Option<EventFilter>,
91+
confirmations: u64,
92+
) -> anyhow::Result<LiveScannerSetup<impl Provider<Ethereum> + Clone>> {
93+
let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?;
94+
7295
let mut scanner = EventScanner::live()
7396
.block_confirmations(confirmations)
7497
.connect_ws(anvil.ws_endpoint_url())
7598
.await?;
7699

77100
let stream = scanner.create_event_stream(filter);
78101

79-
// return anvil otherwise it doesnt live long enough...
80-
Ok(LiveScannerSetup { provider, contract, scanner, stream, anvil })
102+
Ok(ScannerSetup { provider, contract, scanner, stream, anvil })
103+
}
104+
105+
#[allow(clippy::missing_errors_doc)]
106+
pub async fn setup_sync_scanner(
107+
block_interval: Option<f64>,
108+
filter: Option<EventFilter>,
109+
confirmations: u64,
110+
) -> anyhow::Result<SyncScannerSetup<impl Provider<Ethereum> + Clone>> {
111+
let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?;
112+
113+
let mut scanner = EventScanner::sync()
114+
.block_confirmations(confirmations)
115+
.connect_ws(anvil.ws_endpoint_url())
116+
.await?;
117+
118+
let stream = scanner.create_event_stream(filter);
119+
120+
Ok(ScannerSetup { provider, contract, scanner, stream, anvil })
121+
}
122+
123+
#[allow(clippy::missing_errors_doc)]
124+
pub async fn setup_historic_scanner(
125+
block_interval: Option<f64>,
126+
filter: Option<EventFilter>,
127+
from: BlockNumberOrTag,
128+
to: BlockNumberOrTag,
129+
) -> anyhow::Result<HistoricScannerSetup<impl Provider<Ethereum> + Clone>> {
130+
let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?;
131+
132+
let mut scanner = EventScanner::historic()
133+
.from_block(from)
134+
.to_block(to)
135+
.connect_ws(anvil.ws_endpoint_url())
136+
.await?;
137+
138+
let stream = scanner.create_event_stream(filter);
139+
140+
Ok(ScannerSetup { provider, contract, scanner, stream, anvil })
141+
}
142+
143+
#[allow(clippy::missing_errors_doc)]
144+
pub async fn setup_latest_scanner(
145+
block_interval: Option<f64>,
146+
filter: Option<EventFilter>,
147+
count: usize,
148+
from: Option<BlockNumberOrTag>,
149+
to: Option<BlockNumberOrTag>,
150+
) -> anyhow::Result<LatestScannerSetup<impl Provider<Ethereum> + Clone>> {
151+
let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?;
152+
let mut builder = EventScanner::latest().count(count);
153+
if let Some(f) = from {
154+
builder = builder.from_block(f);
155+
}
156+
if let Some(t) = to {
157+
builder = builder.to_block(t);
158+
}
159+
160+
let mut scanner = builder.connect_ws(anvil.ws_endpoint_url()).await?;
161+
162+
let stream = scanner.create_event_stream(filter);
163+
164+
Ok(ScannerSetup { provider, contract, scanner, stream, anvil })
81165
}
82166

83167
#[allow(clippy::missing_errors_doc)]

tests/historic_mode/basic.rs

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,41 +6,31 @@ use std::{
66
time::Duration,
77
};
88

9-
use alloy::{network::Ethereum, sol_types::SolEvent};
10-
use event_scanner::{EventFilter, EventScanner, EventScannerMessage};
9+
use event_scanner::EventScannerMessage;
1110
use tokio::time::timeout;
1211
use tokio_stream::StreamExt;
1312

14-
use crate::common::{TestCounter, build_provider, deploy_counter, spawn_anvil};
13+
use crate::common::{TestCounter, setup_historic_scanner};
1514

1615
#[tokio::test]
1716
async fn processes_events_within_specified_historical_range() -> anyhow::Result<()> {
18-
let anvil = spawn_anvil(Some(0.1))?;
19-
let provider = build_provider(&anvil).await?;
20-
let contract = deploy_counter(provider.clone()).await?;
21-
let contract_address = *contract.address();
22-
23-
let filter = EventFilter::new()
24-
.with_contract_address(contract_address)
25-
.with_event(TestCounter::CountIncreased::SIGNATURE);
26-
27-
let receipt = contract.increase().send().await?.get_receipt().await?;
28-
let start_block = receipt.block_number.expect("receipt should contain block number");
29-
let mut end_block = 0;
30-
17+
let setup = setup_historic_scanner(
18+
Some(0.1),
19+
None,
20+
alloy::eips::BlockNumberOrTag::Earliest,
21+
alloy::eips::BlockNumberOrTag::Latest,
22+
)
23+
.await?;
24+
25+
let contract = setup.contract.clone();
3126
let expected_event_count = 4;
3227

33-
for _ in 1..expected_event_count {
34-
let receipt = contract.increase().send().await?.get_receipt().await?;
35-
end_block = receipt.block_number.expect("receipt should contain block number");
28+
for _ in 0..expected_event_count {
29+
contract.increase().send().await?.watch().await?;
3630
}
3731

38-
let mut scanner = EventScanner::historic()
39-
.from_block(start_block)
40-
.to_block(end_block)
41-
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
42-
.await?;
43-
let mut stream = scanner.create_event_stream(filter).take(expected_event_count);
32+
let scanner = setup.scanner;
33+
let mut stream = setup.stream.take(expected_event_count);
4434

4535
tokio::spawn(async move { scanner.run().await });
4636

tests/historic_to_live/basic.rs

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,22 @@
1-
use alloy::{network::Ethereum, primitives::U256, sol_types::SolEvent};
2-
use event_scanner::{EventFilter, EventScanner, assert_next, types::ScannerStatus};
1+
use alloy::primitives::U256;
2+
use event_scanner::{assert_next, types::ScannerStatus};
33

4-
use crate::common::{TestCounter, build_provider, deploy_counter, spawn_anvil};
4+
use crate::common::{TestCounter, setup_sync_scanner};
55

66
#[tokio::test]
77
async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> {
8-
let anvil = spawn_anvil(Some(0.1))?;
9-
let provider = build_provider(&anvil).await?;
10-
let contract = deploy_counter(provider).await?;
11-
let contract_address = *contract.address();
8+
let setup = setup_sync_scanner(Some(0.1), None, 0).await?;
9+
let contract = setup.contract.clone();
1210

1311
let historical_events = 3;
1412
let live_events = 2;
1513

16-
let receipt = contract.increase().send().await?.get_receipt().await?;
17-
let first_historical_block =
18-
receipt.block_number.expect("historical receipt should contain block number");
19-
20-
for _ in 1..historical_events {
14+
for _ in 0..historical_events {
2115
contract.increase().send().await?.watch().await?;
2216
}
2317

24-
let filter = EventFilter::new()
25-
.with_contract_address(contract_address)
26-
.with_event(TestCounter::CountIncreased::SIGNATURE);
27-
28-
let mut scanner = EventScanner::sync()
29-
.from_block(first_historical_block)
30-
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
31-
.await?;
32-
33-
let mut stream = scanner.create_event_stream(filter);
18+
let scanner = setup.scanner;
19+
let mut stream = setup.stream;
3420

3521
tokio::spawn(async move { scanner.start().await });
3622

0 commit comments

Comments
 (0)