Skip to content

Commit 2cc4fbc

Browse files
authored
Merge branch 'main' into tracing-fixes
2 parents 5ccbc34 + 1e89606 commit 2cc4fbc

File tree

17 files changed

+251
-96
lines changed

17 files changed

+251
-96
lines changed

src/block_range_scanner.rs

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
9292
// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
9393
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
9494

95-
pub const MAX_BUFFERED_MESSAGES: usize = 50000;
95+
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
9696

9797
// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block
9898
// is considered final)
@@ -120,10 +120,11 @@ impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumb
120120
}
121121
}
122122

123-
#[derive(Clone, Debug)]
123+
#[derive(Clone)]
124124
pub struct BlockRangeScanner {
125125
pub max_block_range: u64,
126126
pub past_blocks_storage_capacity: RingBufferCapacity,
127+
pub buffer_capacity: usize,
127128
}
128129

129130
impl Default for BlockRangeScanner {
@@ -138,6 +139,7 @@ impl BlockRangeScanner {
138139
Self {
139140
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
140141
past_blocks_storage_capacity: RingBufferCapacity::Limited(10),
142+
buffer_capacity: DEFAULT_STREAM_BUFFER_CAPACITY,
141143
}
142144
}
143145

@@ -156,6 +158,12 @@ impl BlockRangeScanner {
156158
self
157159
}
158160

161+
#[must_use]
162+
pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
163+
self.buffer_capacity = buffer_capacity;
164+
self
165+
}
166+
159167
/// Connects to an existing provider
160168
///
161169
/// # Errors
@@ -165,20 +173,27 @@ impl BlockRangeScanner {
165173
self,
166174
provider: impl IntoRobustProvider<N>,
167175
) -> Result<ConnectedBlockRangeScanner<N>, ScannerError> {
176+
if self.max_block_range == 0 {
177+
return Err(ScannerError::InvalidMaxBlockRange);
178+
}
179+
if self.buffer_capacity == 0 {
180+
return Err(ScannerError::InvalidBufferCapacity);
181+
}
168182
let provider = provider.into_robust_provider().await?;
169183
Ok(ConnectedBlockRangeScanner {
170184
provider,
171185
max_block_range: self.max_block_range,
172186
past_blocks_storage_capacity: self.past_blocks_storage_capacity,
187+
buffer_capacity: self.buffer_capacity,
173188
})
174189
}
175190
}
176191

177-
#[derive(Debug)]
178192
pub struct ConnectedBlockRangeScanner<N: Network> {
179193
provider: RobustProvider<N>,
180194
max_block_range: u64,
181195
past_blocks_storage_capacity: RingBufferCapacity,
196+
buffer_capacity: usize,
182197
}
183198

184199
impl<N: Network> ConnectedBlockRangeScanner<N> {
@@ -188,6 +203,12 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
188203
&self.provider
189204
}
190205

206+
/// Returns the stream buffer capacity.
207+
#[must_use]
208+
pub fn buffer_capacity(&self) -> usize {
209+
self.buffer_capacity
210+
}
211+
191212
/// Starts the subscription service and returns a client for sending commands.
192213
///
193214
/// # Errors
@@ -202,7 +223,7 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
202223
tokio::spawn(async move {
203224
service.run().await;
204225
});
205-
Ok(BlockRangeScannerClient::new(cmd_tx))
226+
Ok(BlockRangeScannerClient::new(cmd_tx, self.buffer_capacity))
206227
}
207228
}
208229

@@ -584,6 +605,7 @@ impl<N: Network> Service<N> {
584605

585606
pub struct BlockRangeScannerClient {
586607
command_sender: mpsc::Sender<Command>,
608+
buffer_capacity: usize,
587609
}
588610

589611
impl BlockRangeScannerClient {
@@ -592,9 +614,10 @@ impl BlockRangeScannerClient {
592614
/// # Arguments
593615
///
594616
/// * `command_sender` - The sender for sending commands to the subscription service.
617+
/// * `buffer_capacity` - The capacity for buffering messages in the stream.
595618
#[must_use]
596-
pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
597-
Self { command_sender }
619+
pub fn new(command_sender: mpsc::Sender<Command>, buffer_capacity: usize) -> Self {
620+
Self { command_sender, buffer_capacity }
598621
}
599622

600623
/// Streams live blocks starting from the latest block.
@@ -610,7 +633,7 @@ impl BlockRangeScannerClient {
610633
&self,
611634
block_confirmations: u64,
612635
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
613-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
636+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
614637
let (response_tx, response_rx) = oneshot::channel();
615638

616639
let command = Command::StreamLive {
@@ -641,7 +664,7 @@ impl BlockRangeScannerClient {
641664
start_id: impl Into<BlockId>,
642665
end_id: impl Into<BlockId>,
643666
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
644-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
667+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
645668
let (response_tx, response_rx) = oneshot::channel();
646669

647670
let command = Command::StreamHistorical {
@@ -673,7 +696,7 @@ impl BlockRangeScannerClient {
673696
start_id: impl Into<BlockId>,
674697
block_confirmations: u64,
675698
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
676-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
699+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
677700
let (response_tx, response_rx) = oneshot::channel();
678701

679702
let command = Command::StreamFrom {
@@ -732,7 +755,7 @@ impl BlockRangeScannerClient {
732755
start_id: impl Into<BlockId>,
733756
end_id: impl Into<BlockId>,
734757
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
735-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
758+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
736759
let (response_tx, response_rx) = oneshot::channel();
737760

738761
let command = Command::Rewind {
@@ -753,23 +776,28 @@ impl BlockRangeScannerClient {
753776
#[cfg(test)]
754777
mod tests {
755778
use super::*;
756-
use alloy::eips::{BlockId, BlockNumberOrTag};
779+
use alloy::{
780+
eips::{BlockId, BlockNumberOrTag},
781+
network::Ethereum,
782+
providers::{RootProvider, mock::Asserter},
783+
rpc::client::RpcClient,
784+
};
757785
use tokio::sync::mpsc;
758786

759787
#[test]
760788
fn block_range_scanner_defaults_match_constants() {
761789
let scanner = BlockRangeScanner::new();
762790

763791
assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
792+
assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
764793
}
765794

766795
#[test]
767796
fn builder_methods_update_configuration() {
768-
let max_block_range = 42;
797+
let scanner = BlockRangeScanner::new().max_block_range(42).buffer_capacity(33);
769798

770-
let scanner = BlockRangeScanner::new().max_block_range(max_block_range);
771-
772-
assert_eq!(scanner.max_block_range, max_block_range);
799+
assert_eq!(scanner.max_block_range, 42);
800+
assert_eq!(scanner.buffer_capacity, 33);
773801
}
774802

775803
#[tokio::test]
@@ -783,4 +811,20 @@ mod tests {
783811
Some(Err(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(4)))))
784812
));
785813
}
814+
815+
#[tokio::test]
816+
async fn returns_error_with_zero_buffer_capacity() {
817+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
818+
let result = BlockRangeScanner::new().buffer_capacity(0).connect(provider).await;
819+
820+
assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
821+
}
822+
823+
#[tokio::test]
824+
async fn returns_error_with_zero_max_block_range() {
825+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
826+
let result = BlockRangeScanner::new().max_block_range(0).connect(provider).await;
827+
828+
assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange)));
829+
}
786830
}

src/block_range_scanner/reorg_handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313

1414
use super::ring_buffer::RingBuffer;
1515

16-
#[derive(Clone, Debug)]
16+
#[derive(Clone)]
1717
pub(crate) struct ReorgHandler<N: Network = Ethereum> {
1818
provider: RobustProvider<N>,
1919
buffer: RingBuffer<BlockHash>,

src/block_range_scanner/ring_buffer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ macro_rules! impl_from_unsigned {
2020

2121
impl_from_unsigned!(RingBufferCapacity; u8, u16, u32, usize);
2222

23-
#[derive(Clone, Debug)]
23+
#[derive(Clone)]
2424
pub(crate) struct RingBuffer<T> {
2525
inner: VecDeque<T>,
2626
capacity: RingBufferCapacity,

src/block_range_scanner/sync_handler.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@ use crate::{
1111
};
1212

1313
/// Represents the initial state when starting a sync operation
14-
#[derive(Debug)]
1514
enum SyncState {
1615
/// Start block is already at or beyond the confirmed tip - go straight to live
1716
AlreadyLive { start_block: BlockNumber },
1817
/// Start block is behind - need to catch up first, then go live
1918
NeedsCatchup { start_block: BlockNumber, confirmed_tip: BlockNumber },
2019
}
2120

22-
#[derive(Debug)]
2321
pub(crate) struct SyncHandler<N: Network> {
2422
provider: RobustProvider<N>,
2523
max_block_range: u64,

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub enum ScannerError {
3131
#[error("Max block range must be greater than 0")]
3232
InvalidMaxBlockRange,
3333

34+
#[error("Stream buffer capacity must be greater than 0")]
35+
InvalidBufferCapacity,
36+
3437
#[error("Max concurrent fetches must be greater than 0")]
3538
InvalidMaxConcurrentFetches,
3639

src/event_scanner/listener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::event_scanner::{EventScannerResult, filter::EventFilter};
22
use tokio::sync::mpsc::Sender;
33

4-
#[derive(Clone, Debug)]
4+
#[derive(Clone)]
55
pub(crate) struct EventListener {
66
pub filter: EventFilter,
77
pub sender: Sender<EventScannerResult>,

src/event_scanner/scanner/common.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::ops::RangeInclusive;
22

33
use crate::{
44
Message, Notification, ScannerError, ScannerMessage,
5-
block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES},
5+
block_range_scanner::BlockScannerResult,
66
event_scanner::{filter::EventFilter, listener::EventListener},
77
robust_provider::{RobustProvider, provider::Error as RobustProviderError},
88
types::TryStream,
@@ -55,8 +55,9 @@ pub(crate) async fn handle_stream<N: Network, S: Stream<Item = BlockScannerResul
5555
listeners: &[EventListener],
5656
mode: ConsumerMode,
5757
max_concurrent_fetches: usize,
58+
buffer_capacity: usize,
5859
) {
59-
let (range_tx, _) = broadcast::channel::<BlockScannerResult>(MAX_BUFFERED_MESSAGES);
60+
let (range_tx, _) = broadcast::channel::<BlockScannerResult>(buffer_capacity);
6061

6162
let consumers = match mode {
6263
ConsumerMode::Stream => spawn_log_consumers_in_stream_mode(

src/event_scanner/scanner/historic.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl<N: Network> EventScanner<Historic, N> {
144144
let max_concurrent_fetches = self.config.max_concurrent_fetches;
145145
let provider = self.block_range_scanner.provider().clone();
146146
let listeners = self.listeners.clone();
147+
let buffer_capacity = self.buffer_capacity();
147148

148149
tokio::spawn(async move {
149150
handle_stream(
@@ -152,6 +153,7 @@ impl<N: Network> EventScanner<Historic, N> {
152153
&listeners,
153154
ConsumerMode::Stream,
154155
max_concurrent_fetches,
156+
buffer_capacity,
155157
)
156158
.await;
157159
});
@@ -162,7 +164,10 @@ impl<N: Network> EventScanner<Historic, N> {
162164

163165
#[cfg(test)]
164166
mod tests {
165-
use crate::event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES;
167+
use crate::{
168+
DEFAULT_STREAM_BUFFER_CAPACITY, block_range_scanner::DEFAULT_MAX_BLOCK_RANGE,
169+
event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES,
170+
};
166171

167172
use super::*;
168173
use alloy::{
@@ -180,12 +185,14 @@ mod tests {
180185
.to_block(200)
181186
.max_block_range(50)
182187
.max_concurrent_fetches(10)
188+
.buffer_capacity(33)
183189
.from_block(100);
184190

185191
assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(100).into());
186192
assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(200).into());
187193
assert_eq!(builder.config.max_concurrent_fetches, 10);
188194
assert_eq!(builder.block_range_scanner.max_block_range, 50);
195+
assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
189196
}
190197

191198
#[test]
@@ -195,6 +202,8 @@ mod tests {
195202
assert_eq!(builder.config.from_block, BlockNumberOrTag::Earliest.into());
196203
assert_eq!(builder.config.to_block, BlockNumberOrTag::Latest.into());
197204
assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
205+
assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
206+
assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
198207
}
199208

200209
#[test]
@@ -208,12 +217,15 @@ mod tests {
208217
.to_block(100)
209218
.to_block(200)
210219
.max_concurrent_fetches(10)
211-
.max_concurrent_fetches(20);
220+
.max_concurrent_fetches(20)
221+
.buffer_capacity(20)
222+
.buffer_capacity(40);
212223

213224
assert_eq!(builder.block_range_scanner.max_block_range, 105);
214225
assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(2).into());
215226
assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(200).into());
216227
assert_eq!(builder.config.max_concurrent_fetches, 20);
228+
assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
217229
}
218230

219231
#[tokio::test]
@@ -293,6 +305,14 @@ mod tests {
293305
}
294306
}
295307

308+
#[tokio::test]
309+
async fn returns_error_with_zero_buffer_capacity() {
310+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
311+
let result = EventScannerBuilder::historic().buffer_capacity(0).connect(provider).await;
312+
313+
assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
314+
}
315+
296316
#[tokio::test]
297317
async fn returns_error_with_zero_max_concurrent_fetches() {
298318
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));

0 commit comments

Comments
 (0)