Skip to content

Commit c6941d0

Browse files
committed
add latest
1 parent 5ab0b17 commit c6941d0

File tree

8 files changed

+273
-156
lines changed

8 files changed

+273
-156
lines changed

src/event_lib/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub mod scanner;
66
pub use filter::EventFilter;
77
pub use listener::EventListener;
88
pub use modes::{
9-
EventScanner, HistoricModeConfig, HistoricModeScanner, LiveModeConfig, LiveModeScanner,
10-
SyncModeConfig, SyncModeScanner,
9+
EventScanner, HistoricEventScanner, HistoricScannerConfig, LiveEventScanner, LiveScannerConfig,
10+
SyncEventScanner, SyncScannerConfig,
1111
};
1212
pub use scanner::{EventScannerError, EventScannerMessage, EventScannerService};

src/event_lib/modes/historic.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,26 @@ use crate::event_lib::{
1414

1515
use super::{BaseConfig, BaseConfigBuilder};
1616

17-
pub struct HistoricModeConfig {
17+
pub struct HistoricScannerConfig {
1818
base: BaseConfig,
19+
// Defaults to Earliest
1920
from_block: BlockNumberOrTag,
21+
// Defaults to Latest
2022
to_block: BlockNumberOrTag,
2123
}
2224

23-
pub struct HistoricModeScanner<N: Network> {
24-
config: HistoricModeConfig,
25+
pub struct HistoricEventScanner<N: Network> {
26+
config: HistoricScannerConfig,
2527
inner: EventScannerService<N>,
2628
}
2729

28-
impl BaseConfigBuilder for HistoricModeConfig {
30+
impl BaseConfigBuilder for HistoricScannerConfig {
2931
fn base_mut(&mut self) -> &mut BaseConfig {
3032
&mut self.base
3133
}
3234
}
3335

34-
impl HistoricModeConfig {
36+
impl HistoricScannerConfig {
3537
pub(super) fn new() -> Self {
3638
Self {
3739
base: BaseConfig::new(),
@@ -60,11 +62,11 @@ impl HistoricModeConfig {
6062
pub async fn connect_ws<N: Network>(
6163
self,
6264
ws_url: Url,
63-
) -> TransportResult<HistoricModeScanner<N>> {
64-
let HistoricModeConfig { base, from_block, to_block } = self;
65+
) -> TransportResult<HistoricEventScanner<N>> {
66+
let HistoricScannerConfig { base, from_block, to_block } = self;
6567
let brs = base.block_range_scanner.connect_ws::<N>(ws_url).await?;
66-
let mode = HistoricModeConfig { base, from_block, to_block };
67-
Ok(HistoricModeScanner { config: mode, inner: EventScannerService::from_config(brs) })
68+
let config = HistoricScannerConfig { base, from_block, to_block };
69+
Ok(HistoricEventScanner { config, inner: EventScannerService::from_config(brs) })
6870
}
6971

7072
/// Connects to the provider via IPC
@@ -75,11 +77,11 @@ impl HistoricModeConfig {
7577
pub async fn connect_ipc<N: Network>(
7678
self,
7779
ipc_path: String,
78-
) -> TransportResult<HistoricModeScanner<N>> {
79-
let HistoricModeConfig { base, from_block, to_block } = self;
80+
) -> TransportResult<HistoricEventScanner<N>> {
81+
let HistoricScannerConfig { base, from_block, to_block } = self;
8082
let brs = base.block_range_scanner.connect_ipc::<N>(ipc_path).await?;
81-
let mode = HistoricModeConfig { base, from_block, to_block };
82-
Ok(HistoricModeScanner { config: mode, inner: EventScannerService::from_config(brs) })
83+
let config = HistoricScannerConfig { base, from_block, to_block };
84+
Ok(HistoricEventScanner { config, inner: EventScannerService::from_config(brs) })
8385
}
8486

8587
/// Connects to an existing provider
@@ -90,15 +92,15 @@ impl HistoricModeConfig {
9092
pub fn connect_provider<N: Network>(
9193
self,
9294
provider: RootProvider<N>,
93-
) -> TransportResult<HistoricModeScanner<N>> {
94-
let HistoricModeConfig { base, from_block, to_block } = self;
95+
) -> TransportResult<HistoricEventScanner<N>> {
96+
let HistoricScannerConfig { base, from_block, to_block } = self;
9597
let brs = base.block_range_scanner.connect_provider::<N>(provider)?;
96-
let mode = HistoricModeConfig { base, from_block, to_block };
97-
Ok(HistoricModeScanner { config: mode, inner: EventScannerService::from_config(brs) })
98+
let config = HistoricScannerConfig { base, from_block, to_block };
99+
Ok(HistoricEventScanner { config, inner: EventScannerService::from_config(brs) })
98100
}
99101
}
100102

101-
impl<N: Network> HistoricModeScanner<N> {
103+
impl<N: Network> HistoricEventScanner<N> {
102104
pub fn create_event_stream(
103105
&mut self,
104106
filter: EventFilter,

src/event_lib/modes/latest.rs

Lines changed: 193 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,193 @@
1-
// use alloy::{
2-
// network::Network,
3-
// providers::RootProvider,
4-
// transports::{TransportResult, http::reqwest::Url},
5-
// };
6-
//
7-
// use tokio_stream::wrappers::ReceiverStream;
8-
//
9-
// use crate::event_lib::{
10-
// filter::EventFilter,
11-
// scanner::{EventScannerError, EventScannerMessage, EventScannerService},
12-
// };
13-
//
14-
// use super::{BaseConfig, BaseConfigBuilder};
15-
//
16-
// pub struct LatestMode {
17-
// base: BaseConfig,
18-
// count: u64,
19-
// }
20-
//
21-
// pub struct ConnectedLatestMode<N: Network> {
22-
// mode: LatestMode,
23-
// inner: EventScannerService<N>,
24-
// }
25-
//
26-
// impl BaseConfigBuilder for LatestMode {
27-
// fn base_mut(&mut self) -> &mut BaseConfig {
28-
// &mut self.base
29-
// }
30-
// }
31-
//
32-
// impl LatestMode {
33-
// pub(super) fn new() -> Self {
34-
// Self { base: BaseConfig::new(), count: 1 }
35-
// }
36-
//
37-
// pub async fn connect_ws<N: Network>(
38-
// self,
39-
// ws_url: Url,
40-
// ) -> TransportResult<ConnectedLatestMode<N>> {
41-
// let LatestMode { base, count } = self;
42-
// let brs = base.block_range_scanner.connect_ws::<N>(ws_url).await?;
43-
// let mode = LatestMode { base, count };
44-
// Ok(ConnectedLatestMode { mode, inner: EventScannerService::from_connected(brs) })
45-
// }
46-
//
47-
// pub async fn connect_ipc<N: Network>(
48-
// self,
49-
// ipc_path: String,
50-
// ) -> TransportResult<ConnectedLatestMode<N>> {
51-
// let LatestMode { base, count } = self;
52-
// let brs = base.block_range_scanner.connect_ipc::<N>(ipc_path).await?;
53-
// let mode = LatestMode { base, count };
54-
// Ok(ConnectedLatestMode { mode, inner: EventScannerService::from_connected(brs) })
55-
// }
56-
//
57-
// pub fn connect<N: Network>(
58-
// self,
59-
// provider: RootProvider<N>,
60-
// ) -> TransportResult<ConnectedLatestMode<N>> {
61-
// let LatestMode { base, count } = self;
62-
// let brs = base.block_range_scanner.connect_provider::<N>(provider)?;
63-
// let mode = LatestMode { base, count };
64-
// Ok(ConnectedLatestMode { mode, inner: EventScannerService::from_connected(brs) })
65-
// }
66-
// }
67-
//
68-
// impl<N: Network> ConnectedLatestMode<N> {
69-
// pub fn create_event_stream(
70-
// &mut self,
71-
// filter: EventFilter,
72-
// ) -> ReceiverStream<EventScannerMessage> {
73-
// self.inner.create_event_stream(filter)
74-
// }
75-
//
76-
// pub async fn stream(self) -> Result<(), EventScannerError> {
77-
// // For now, map Latest to live stream (count unused)
78-
// self.inner.stream_live(None).await
79-
// }
80-
// }
1+
use alloy::{
2+
eips::BlockNumberOrTag,
3+
network::Network,
4+
providers::RootProvider,
5+
transports::{TransportResult, http::reqwest::Url},
6+
};
7+
8+
use tokio_stream::wrappers::ReceiverStream;
9+
10+
use crate::{
11+
block_range_scanner::DEFAULT_BLOCK_CONFIRMATIONS,
12+
event_lib::{
13+
filter::EventFilter,
14+
scanner::{EventScannerError, EventScannerMessage, EventScannerService},
15+
},
16+
};
17+
18+
use super::{BaseConfig, BaseConfigBuilder};
19+
20+
pub struct LatestScannerConfig {
21+
base: BaseConfig,
22+
// Defatuls to 1
23+
count: u64,
24+
// Defaults to Earliest
25+
from_block: BlockNumberOrTag,
26+
// Defaults to Latest
27+
to_block: BlockNumberOrTag,
28+
// Defaults to 0
29+
block_confirmations: u64,
30+
// Defaults to false
31+
switch_to_live: bool,
32+
}
33+
34+
pub struct LatestEventScanner<N: Network> {
35+
#[allow(dead_code)]
36+
config: LatestScannerConfig,
37+
inner: EventScannerService<N>,
38+
}
39+
40+
impl BaseConfigBuilder for LatestScannerConfig {
41+
fn base_mut(&mut self) -> &mut BaseConfig {
42+
&mut self.base
43+
}
44+
}
45+
46+
impl LatestScannerConfig {
47+
pub(super) fn new() -> Self {
48+
Self {
49+
base: BaseConfig::new(),
50+
count: 1,
51+
from_block: BlockNumberOrTag::Earliest,
52+
to_block: BlockNumberOrTag::Latest,
53+
block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
54+
switch_to_live: false,
55+
}
56+
}
57+
58+
#[must_use]
59+
pub fn block_confirmations(mut self, count: u64) -> Self {
60+
self.block_confirmations = count;
61+
self
62+
}
63+
64+
#[must_use]
65+
pub fn count(mut self, count: u64) -> Self {
66+
self.count = count;
67+
self
68+
}
69+
70+
#[must_use]
71+
pub fn from_block(mut self, block: impl Into<BlockNumberOrTag>) -> Self {
72+
self.from_block = block.into();
73+
self
74+
}
75+
76+
#[must_use]
77+
pub fn to_block(mut self, block: impl Into<BlockNumberOrTag>) -> Self {
78+
self.to_block = block.into();
79+
self
80+
}
81+
82+
#[must_use]
83+
pub fn then_live(mut self) -> Self {
84+
self.switch_to_live = true;
85+
self
86+
}
87+
88+
/// Connects to the provider via IPC
89+
///
90+
/// # Errors
91+
///
92+
/// Returns an error if the connection fails
93+
pub async fn connect_ws<N: Network>(
94+
self,
95+
ws_url: Url,
96+
) -> TransportResult<LatestEventScanner<N>> {
97+
let LatestScannerConfig {
98+
base,
99+
count,
100+
from_block,
101+
to_block,
102+
block_confirmations,
103+
switch_to_live,
104+
} = self;
105+
let brs = base.block_range_scanner.connect_ws::<N>(ws_url).await?;
106+
let config = LatestScannerConfig {
107+
base,
108+
count,
109+
from_block,
110+
to_block,
111+
block_confirmations,
112+
switch_to_live,
113+
};
114+
Ok(LatestEventScanner { config, inner: EventScannerService::from_config(brs) })
115+
}
116+
117+
/// Connects to the provider via IPC
118+
///
119+
/// # Errors
120+
///
121+
/// Returns an error if the connection fails
122+
pub async fn connect_ipc<N: Network>(
123+
self,
124+
ipc_path: String,
125+
) -> TransportResult<LatestEventScanner<N>> {
126+
let LatestScannerConfig {
127+
base,
128+
count,
129+
from_block,
130+
to_block,
131+
block_confirmations,
132+
switch_to_live,
133+
} = self;
134+
let brs = base.block_range_scanner.connect_ipc::<N>(ipc_path).await?;
135+
let config = LatestScannerConfig {
136+
base,
137+
count,
138+
from_block,
139+
to_block,
140+
block_confirmations,
141+
switch_to_live,
142+
};
143+
Ok(LatestEventScanner { config, inner: EventScannerService::from_config(brs) })
144+
}
145+
146+
/// Connects to an existing provider
147+
///
148+
/// # Errors
149+
///
150+
/// Returns an error if the connection fails
151+
pub fn connect_provider<N: Network>(
152+
self,
153+
provider: RootProvider<N>,
154+
) -> TransportResult<LatestEventScanner<N>> {
155+
let LatestScannerConfig {
156+
base,
157+
count,
158+
from_block,
159+
to_block,
160+
block_confirmations,
161+
switch_to_live,
162+
} = self;
163+
let brs = base.block_range_scanner.connect_provider::<N>(provider)?;
164+
let config = LatestScannerConfig {
165+
base,
166+
count,
167+
from_block,
168+
to_block,
169+
block_confirmations,
170+
switch_to_live,
171+
};
172+
Ok(LatestEventScanner { config, inner: EventScannerService::from_config(brs) })
173+
}
174+
}
175+
176+
impl<N: Network> LatestEventScanner<N> {
177+
pub fn create_event_stream(
178+
&mut self,
179+
filter: EventFilter,
180+
) -> ReceiverStream<EventScannerMessage> {
181+
self.inner.create_event_stream(filter)
182+
}
183+
184+
/// WARN: unimplemented - will call stream latest
185+
///
186+
/// # Errors
187+
///
188+
/// * `EventScannerMessage::ServiceShutdown` - if the service is already shutting down.
189+
#[allow(clippy::unused_async)]
190+
pub async fn stream(self) -> Result<(), EventScannerError> {
191+
unimplemented!()
192+
}
193+
}

0 commit comments

Comments
 (0)