Skip to content

Commit 69f5e79

Browse files
authored
Refactor brs to remove command based design (#274)
1 parent 9b8e077 commit 69f5e79

File tree

8 files changed

+200
-425
lines changed

8 files changed

+200
-425
lines changed

src/block_range_scanner.rs

Lines changed: 125 additions & 341 deletions
Large diffs are not rendered by default.

src/error.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ pub enum ScannerError {
2020
#[error("RPC error: {0}")]
2121
RpcError(Arc<RpcError<TransportErrorKind>>),
2222

23-
/// The internal service has shut down and can no longer process commands.
24-
#[error("Service is shutting down")]
25-
ServiceShutdown,
26-
2723
/// A requested block (by number, hash or tag) could not be retrieved.
2824
#[error("Block not found, Block Id: {0}")]
2925
BlockNotFound(BlockId),

src/event_scanner/scanner/historic.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,14 @@ impl<N: Network> EventScanner<Historic, N> {
130130
///
131131
/// # Errors
132132
///
133-
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
134133
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
135134
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
136135
/// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved.
137-
pub async fn start(self) -> Result<(), ScannerError> {
138-
let client = self.block_range_scanner.run()?;
139-
let stream = client.stream_historical(self.config.from_block, self.config.to_block).await?;
136+
pub async fn start(mut self) -> Result<(), ScannerError> {
137+
let stream = self
138+
.block_range_scanner
139+
.stream_historical(self.config.from_block, self.config.to_block)
140+
.await?;
140141

141142
let max_concurrent_fetches = self.config.max_concurrent_fetches;
142143
let provider = self.block_range_scanner.provider().clone();

src/event_scanner/scanner/latest.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,14 @@ impl<N: Network> EventScanner<LatestEvents, N> {
141141
///
142142
/// # Errors
143143
///
144-
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
145144
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
146145
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
147146
/// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved.
148-
pub async fn start(self) -> Result<(), ScannerError> {
149-
let client = self.block_range_scanner.run()?;
150-
let stream = client.rewind(self.config.from_block, self.config.to_block).await?;
147+
pub async fn start(mut self) -> Result<(), ScannerError> {
148+
let stream = self
149+
.block_range_scanner
150+
.stream_rewind(self.config.from_block, self.config.to_block)
151+
.await?;
151152

152153
let max_concurrent_fetches = self.config.max_concurrent_fetches;
153154
let provider = self.block_range_scanner.provider().clone();

src/event_scanner/scanner/live.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,10 @@ impl<N: Network> EventScanner<Live, N> {
6565
///
6666
/// # Errors
6767
///
68-
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
6968
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
7069
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
71-
pub async fn start(self) -> Result<(), ScannerError> {
72-
let client = self.block_range_scanner.run()?;
73-
let stream = client.stream_live(self.config.block_confirmations).await?;
74-
70+
pub async fn start(mut self) -> Result<(), ScannerError> {
71+
let stream = self.block_range_scanner.stream_live(self.config.block_confirmations).await?;
7572
let max_concurrent_fetches = self.config.max_concurrent_fetches;
7673
let provider = self.block_range_scanner.provider().clone();
7774
let listeners = self.listeners.clone();

src/event_scanner/scanner/sync/from_block.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ impl<N: Network> EventScanner<SyncFromBlock, N> {
7272
///
7373
/// # Errors
7474
///
75-
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
7675
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
7776
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
7877
/// * [`ScannerError::BlockNotFound`] - if `from_block` cannot be resolved.
7978
pub async fn start(self) -> Result<(), ScannerError> {
80-
let client = self.block_range_scanner.run()?;
81-
let stream =
82-
client.stream_from(self.config.from_block, self.config.block_confirmations).await?;
79+
let stream = self
80+
.block_range_scanner
81+
.stream_from(self.config.from_block, self.config.block_confirmations)
82+
.await?;
8383

8484
let max_concurrent_fetches = self.config.max_concurrent_fetches;
8585
let provider = self.block_range_scanner.provider().clone();

src/event_scanner/scanner/sync/from_latest.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,10 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
7171
///
7272
/// # Errors
7373
///
74-
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
7574
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
7675
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
7776
#[allow(clippy::missing_panics_doc)]
78-
pub async fn start(self) -> Result<(), ScannerError> {
77+
pub async fn start(mut self) -> Result<(), ScannerError> {
7978
let count = self.config.count;
8079
let provider = self.block_range_scanner.provider().clone();
8180
let listeners = self.listeners.clone();
@@ -84,16 +83,17 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
8483

8584
info!(count = count, "Starting scanner, mode: fetch latest events and switch to live");
8685

87-
let client = self.block_range_scanner.run()?;
88-
8986
// Fetch the latest block number.
9087
// This is used to determine the starting point for the rewind stream and the live
9188
// stream. We do this before starting the streams to avoid a race condition
9289
// where the latest block changes while we're setting up the streams.
9390
let latest_block = provider.get_block_number().await?;
9491

9592
// Setup rewind and live streams to run in parallel.
96-
let rewind_stream = client.rewind(latest_block, BlockNumberOrTag::Earliest).await?;
93+
let rewind_stream = self
94+
.block_range_scanner
95+
.stream_rewind(latest_block, BlockNumberOrTag::Earliest)
96+
.await?;
9797

9898
// Start streaming...
9999
tokio::spawn(async move {
@@ -114,17 +114,20 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
114114
// We actually rely on the sync mode for the live stream, as more blocks could have been
115115
// minted while the scanner was collecting the latest `count` events.
116116
// Note: Sync mode will notify the client when it switches to live streaming.
117-
let sync_stream =
118-
match client.stream_from(latest_block + 1, self.config.block_confirmations).await {
119-
Ok(stream) => stream,
120-
Err(e) => {
121-
error!(error = %e, "Error during sync mode setup");
122-
for listener in listeners {
123-
_ = listener.sender.try_stream(e.clone()).await;
124-
}
125-
return;
117+
let sync_stream = match self
118+
.block_range_scanner
119+
.stream_from(latest_block + 1, self.config.block_confirmations)
120+
.await
121+
{
122+
Ok(stream) => stream,
123+
Err(e) => {
124+
error!(error = %e, "Error during sync mode setup");
125+
for listener in listeners {
126+
_ = listener.sender.try_stream(e.clone()).await;
126127
}
127-
};
128+
return;
129+
}
130+
};
128131

129132
// Start the live (sync) stream.
130133
handle_stream(

0 commit comments

Comments
 (0)