Skip to content

Commit 46e8c09

Browse files
authored
Merge branch 'main' into improve-logging-robust
2 parents f6fff0c + 87b7362 commit 46e8c09

File tree

19 files changed

+623
-160
lines changed

19 files changed

+623
-160
lines changed

src/block_range_scanner.rs

Lines changed: 70 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,24 @@ use alloy::{
7979
network::{BlockResponse, Network, primitives::HeaderResponse},
8080
primitives::BlockNumber,
8181
};
82-
use tracing::{debug, error, info, warn};
82+
use tracing::{error, info, warn};
8383

8484
mod common;
85+
mod range_iterator;
8586
mod reorg_handler;
8687
mod ring_buffer;
8788
mod sync_handler;
8889

90+
pub(crate) use range_iterator::RangeIterator;
91+
8992
use reorg_handler::ReorgHandler;
9093
pub use ring_buffer::RingBufferCapacity;
9194

9295
pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
93-
// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
96+
9497
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
9598

96-
pub const MAX_BUFFERED_MESSAGES: usize = 50000;
99+
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
97100

98101
// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block
99102
// is considered final)
@@ -121,10 +124,11 @@ impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumb
121124
}
122125
}
123126

124-
#[derive(Clone, Debug)]
127+
#[derive(Clone)]
125128
pub struct BlockRangeScanner {
126129
pub max_block_range: u64,
127130
pub past_blocks_storage_capacity: RingBufferCapacity,
131+
pub buffer_capacity: usize,
128132
}
129133

130134
impl Default for BlockRangeScanner {
@@ -139,6 +143,7 @@ impl BlockRangeScanner {
139143
Self {
140144
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
141145
past_blocks_storage_capacity: RingBufferCapacity::Limited(10),
146+
buffer_capacity: DEFAULT_STREAM_BUFFER_CAPACITY,
142147
}
143148
}
144149

@@ -157,6 +162,12 @@ impl BlockRangeScanner {
157162
self
158163
}
159164

165+
#[must_use]
166+
pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
167+
self.buffer_capacity = buffer_capacity;
168+
self
169+
}
170+
160171
/// Connects to an existing provider
161172
///
162173
/// # Errors
@@ -166,20 +177,27 @@ impl BlockRangeScanner {
166177
self,
167178
provider: impl IntoRobustProvider<N>,
168179
) -> Result<ConnectedBlockRangeScanner<N>, ScannerError> {
180+
if self.max_block_range == 0 {
181+
return Err(ScannerError::InvalidMaxBlockRange);
182+
}
183+
if self.buffer_capacity == 0 {
184+
return Err(ScannerError::InvalidBufferCapacity);
185+
}
169186
let provider = provider.into_robust_provider().await?;
170187
Ok(ConnectedBlockRangeScanner {
171188
provider,
172189
max_block_range: self.max_block_range,
173190
past_blocks_storage_capacity: self.past_blocks_storage_capacity,
191+
buffer_capacity: self.buffer_capacity,
174192
})
175193
}
176194
}
177195

178-
#[derive(Debug)]
179196
pub struct ConnectedBlockRangeScanner<N: Network> {
180197
provider: RobustProvider<N>,
181198
max_block_range: u64,
182199
past_blocks_storage_capacity: RingBufferCapacity,
200+
buffer_capacity: usize,
183201
}
184202

185203
impl<N: Network> ConnectedBlockRangeScanner<N> {
@@ -189,6 +207,12 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
189207
&self.provider
190208
}
191209

210+
/// Returns the stream buffer capacity.
211+
#[must_use]
212+
pub fn buffer_capacity(&self) -> usize {
213+
self.buffer_capacity
214+
}
215+
192216
/// Starts the subscription service and returns a client for sending commands.
193217
///
194218
/// # Errors
@@ -203,7 +227,7 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
203227
tokio::spawn(async move {
204228
service.run().await;
205229
});
206-
Ok(BlockRangeScannerClient::new(cmd_tx))
230+
Ok(BlockRangeScannerClient::new(cmd_tx, self.buffer_capacity))
207231
}
208232
}
209233

@@ -451,8 +475,6 @@ impl<N: Network> Service<N> {
451475
provider: &RobustProvider<N>,
452476
reorg_handler: &mut ReorgHandler<N>,
453477
) {
454-
let mut batch_count = 0;
455-
456478
// for checking whether reorg occurred
457479
let mut tip = from;
458480

@@ -469,29 +491,15 @@ impl<N: Network> Service<N> {
469491
}
470492
};
471493

472-
// we're iterating in reverse
473-
let mut batch_from = from;
474494
let finalized_number = finalized_block.header().number();
475495

476496
// only check reorg if our tip is after the finalized block
477497
let check_reorg = tip.header().number() > finalized_number;
478498

479-
while batch_from >= to {
480-
let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to);
481-
499+
let mut iter = RangeIterator::reverse(from, to, max_block_range);
500+
for range in &mut iter {
482501
// stream the range regularly, i.e. from smaller block number to greater
483-
if !sender.try_stream(batch_to..=batch_from).await {
484-
break;
485-
}
486-
487-
batch_count += 1;
488-
if batch_count % 10 == 0 {
489-
debug!(batch_count = batch_count, "Processed rewind batches");
490-
}
491-
492-
// check early if end of stream achieved to avoid subtraction overflow when `to
493-
// == 0`
494-
if batch_to == to {
502+
if !sender.try_stream(range).await {
495503
break;
496504
}
497505

@@ -518,11 +526,9 @@ impl<N: Network> Service<N> {
518526
return;
519527
}
520528
}
521-
522-
batch_from = batch_to - 1;
523529
}
524530

525-
info!(batch_count = batch_count, "Rewind completed");
531+
info!(batch_count = iter.batch_count(), "Rewind completed");
526532
}
527533

528534
/// Handles re-scanning of reorged blocks.
@@ -565,18 +571,10 @@ impl<N: Network> Service<N> {
565571
// Re-scan only the affected range (from common_ancestor + 1 up to tip)
566572
let rescan_from = common_ancestor + 1;
567573

568-
let mut rescan_batch_start = rescan_from;
569-
while rescan_batch_start <= tip_number {
570-
let rescan_batch_end = (rescan_batch_start + max_block_range - 1).min(tip_number);
571-
572-
if !sender.try_stream(rescan_batch_start..=rescan_batch_end).await {
574+
for batch in RangeIterator::forward(rescan_from, tip_number, max_block_range) {
575+
if !sender.try_stream(batch).await {
573576
return false;
574577
}
575-
576-
if rescan_batch_end == tip_number {
577-
break;
578-
}
579-
rescan_batch_start = rescan_batch_end + 1;
580578
}
581579

582580
true
@@ -585,6 +583,7 @@ impl<N: Network> Service<N> {
585583

586584
pub struct BlockRangeScannerClient {
587585
command_sender: mpsc::Sender<Command>,
586+
buffer_capacity: usize,
588587
}
589588

590589
impl BlockRangeScannerClient {
@@ -593,9 +592,10 @@ impl BlockRangeScannerClient {
593592
/// # Arguments
594593
///
595594
/// * `command_sender` - The sender for sending commands to the subscription service.
595+
/// * `buffer_capacity` - The capacity for buffering messages in the stream.
596596
#[must_use]
597-
pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
598-
Self { command_sender }
597+
pub fn new(command_sender: mpsc::Sender<Command>, buffer_capacity: usize) -> Self {
598+
Self { command_sender, buffer_capacity }
599599
}
600600

601601
/// Streams live blocks starting from the latest block.
@@ -611,7 +611,7 @@ impl BlockRangeScannerClient {
611611
&self,
612612
block_confirmations: u64,
613613
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
614-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
614+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
615615
let (response_tx, response_rx) = oneshot::channel();
616616

617617
let command = Command::StreamLive {
@@ -642,7 +642,7 @@ impl BlockRangeScannerClient {
642642
start_id: impl Into<BlockId>,
643643
end_id: impl Into<BlockId>,
644644
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
645-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
645+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
646646
let (response_tx, response_rx) = oneshot::channel();
647647

648648
let command = Command::StreamHistorical {
@@ -674,7 +674,7 @@ impl BlockRangeScannerClient {
674674
start_id: impl Into<BlockId>,
675675
block_confirmations: u64,
676676
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
677-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
677+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
678678
let (response_tx, response_rx) = oneshot::channel();
679679

680680
let command = Command::StreamFrom {
@@ -733,7 +733,7 @@ impl BlockRangeScannerClient {
733733
start_id: impl Into<BlockId>,
734734
end_id: impl Into<BlockId>,
735735
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
736-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
736+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
737737
let (response_tx, response_rx) = oneshot::channel();
738738

739739
let command = Command::Rewind {
@@ -754,23 +754,28 @@ impl BlockRangeScannerClient {
754754
#[cfg(test)]
755755
mod tests {
756756
use super::*;
757-
use alloy::eips::{BlockId, BlockNumberOrTag};
757+
use alloy::{
758+
eips::{BlockId, BlockNumberOrTag},
759+
network::Ethereum,
760+
providers::{RootProvider, mock::Asserter},
761+
rpc::client::RpcClient,
762+
};
758763
use tokio::sync::mpsc;
759764

760765
#[test]
761766
fn block_range_scanner_defaults_match_constants() {
762767
let scanner = BlockRangeScanner::new();
763768

764769
assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
770+
assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
765771
}
766772

767773
#[test]
768774
fn builder_methods_update_configuration() {
769-
let max_block_range = 42;
770-
771-
let scanner = BlockRangeScanner::new().max_block_range(max_block_range);
775+
let scanner = BlockRangeScanner::new().max_block_range(42).buffer_capacity(33);
772776

773-
assert_eq!(scanner.max_block_range, max_block_range);
777+
assert_eq!(scanner.max_block_range, 42);
778+
assert_eq!(scanner.buffer_capacity, 33);
774779
}
775780

776781
#[tokio::test]
@@ -784,4 +789,20 @@ mod tests {
784789
Some(Err(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(4)))))
785790
));
786791
}
792+
793+
#[tokio::test]
794+
async fn returns_error_with_zero_buffer_capacity() {
795+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
796+
let result = BlockRangeScanner::new().buffer_capacity(0).connect(provider).await;
797+
798+
assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
799+
}
800+
801+
#[tokio::test]
802+
async fn returns_error_with_zero_max_block_range() {
803+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
804+
let result = BlockRangeScanner::new().max_block_range(0).connect(provider).await;
805+
806+
assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange)));
807+
}
787808
}

0 commit comments

Comments
 (0)