Skip to content

Commit b2b6141

Browse files
committed
feat: add rewind command
1 parent c58deac commit b2b6141

File tree

2 files changed

+79
-6
lines changed

2 files changed

+79
-6
lines changed

src/block_range_scanner.rs

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
//! }
6666
//! ```
6767
68-
use std::{ops::RangeInclusive, sync::Arc};
68+
use std::{cmp::Ordering, ops::RangeInclusive, sync::Arc};
6969

7070
use tokio::sync::{mpsc, oneshot};
7171
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
@@ -88,7 +88,7 @@ use alloy::{
8888
use thiserror::Error;
8989
use tracing::{debug, error, info, warn};
9090

91-
pub const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
91+
pub const DEFAULT_BLOCKS_READ_PER_EPOCH: u32 = 1000;
9292
// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
9393
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
9494
// const BACK_OFF_MAX_RETRIES: u64 = 5;
@@ -133,6 +133,9 @@ pub enum BlockRangeScannerError {
133133

134134
#[error("WebSocket connection failed after {0} attempts")]
135135
WebSocketConnectionFailed(usize),
136+
137+
#[error("Block {0} not found")]
138+
BlockNotFound(BlockNumberOrTag),
136139
}
137140

138141
impl From<reqwest::Error> for BlockRangeScannerError {
@@ -170,6 +173,12 @@ pub enum Command {
170173
start_height: BlockNumberOrTag,
171174
response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
172175
},
176+
Rewind {
177+
sender: mpsc::Sender<BlockRangeMessage>,
178+
start_height: Option<BlockNumberOrTag>,
179+
end_height: Option<BlockNumberOrTag>,
180+
response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
181+
},
173182
Unsubscribe {
174183
response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
175184
},
@@ -192,7 +201,7 @@ impl BlockHashAndNumber {
192201

193202
#[derive(Clone)]
194203
struct Config {
195-
blocks_read_per_epoch: usize,
204+
blocks_read_per_epoch: u32,
196205
reorg_rewind_depth: u64,
197206
#[allow(
198207
dead_code,
@@ -202,7 +211,7 @@ struct Config {
202211
}
203212

204213
pub struct BlockRangeScanner {
205-
blocks_read_per_epoch: usize,
214+
blocks_read_per_epoch: u32,
206215
reorg_rewind_depth: u64,
207216
block_confirmations: u64,
208217
}
@@ -224,7 +233,7 @@ impl BlockRangeScanner {
224233
}
225234

226235
#[must_use]
227-
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
236+
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: u32) -> Self {
228237
self.blocks_read_per_epoch = blocks_read_per_epoch;
229238
self
230239
}
@@ -390,6 +399,15 @@ impl<N: Network> Service<N> {
390399
let result = self.handle_sync(start_height).await;
391400
let _ = response.send(result);
392401
}
402+
Command::Rewind { sender, start_height, end_height, response } => {
403+
self.ensure_no_subscriber()?;
404+
self.subscriber = Some(sender);
405+
let start_height = start_height.unwrap_or(BlockNumberOrTag::Latest);
406+
let end_height = end_height.unwrap_or(BlockNumberOrTag::Earliest);
407+
info!(start_height = ?start_height, end_height = ?end_height, "Starting rewind");
408+
let result = self.handle_rewind(start_height, end_height).await;
409+
let _ = response.send(result);
410+
}
393411
Command::Unsubscribe { response } => {
394412
self.handle_unsubscribe();
395413
let _ = response.send(Ok(()));
@@ -403,6 +421,61 @@ impl<N: Network> Service<N> {
403421
Ok(())
404422
}
405423

424+
async fn handle_rewind(
425+
&mut self,
426+
start_height: BlockNumberOrTag,
427+
end_height: BlockNumberOrTag,
428+
) -> Result<(), BlockRangeScannerError> {
429+
let start_block = self
430+
.provider
431+
.get_block_by_number(start_height)
432+
.await?
433+
.ok_or(BlockRangeScannerError::BlockNotFound(start_height))?
434+
.header()
435+
.number();
436+
let end_block = self
437+
.provider
438+
.get_block_by_number(end_height)
439+
.await?
440+
.ok_or(BlockRangeScannerError::BlockNotFound(end_height))?
441+
.header()
442+
.number();
443+
444+
let block_range = match start_block.cmp(&end_block) {
445+
Ordering::Greater => end_block..=start_block,
446+
_ => start_block..=end_block,
447+
};
448+
449+
self.stream_rewind(block_range).await;
450+
451+
_ = self.subscriber.take();
452+
453+
Ok(())
454+
}
455+
456+
async fn stream_rewind(&mut self, block_range: RangeInclusive<BlockNumber>) {
457+
let mut batch_count = 0;
458+
let blocks_read_per_epoch = self.config.blocks_read_per_epoch;
459+
460+
// we're iterating in reverse
461+
let stream_end = *block_range.start();
462+
// SAFETY: u32 can always be cast as usize
463+
let range_iter = block_range.rev().step_by(blocks_read_per_epoch as usize);
464+
465+
for batch_end in range_iter {
466+
let batch_start = (batch_end - blocks_read_per_epoch as u64 + 1).max(stream_end);
467+
468+
self.send_to_subscriber(BlockRangeMessage::Data(batch_start..=batch_end)).await;
469+
470+
batch_count += 1;
471+
if batch_count % 10 == 0 {
472+
debug!(batch_count = batch_count, "Processed rewind batches");
473+
}
474+
}
475+
476+
info!(batch_count = batch_count, "Rewind completed");
477+
}
478+
406479
async fn handle_live(&mut self) -> Result<(), BlockRangeScannerError> {
407480
let Some(sender) = self.subscriber.clone() else {
408481
return Err(BlockRangeScannerError::ServiceShutdown);

src/event_scanner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl EventScanner {
5959

6060
/// Configures how many blocks are read per epoch during a historical sync.
6161
#[must_use]
62-
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
62+
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: u32) -> Self {
6363
self.block_range_scanner =
6464
self.block_range_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
6565
self

0 commit comments

Comments
 (0)