Skip to content

Commit 1e89606

Browse files
authored
Make max Stream Capacity Configurable (#266)
1 parent 18922b2 commit 1e89606

File tree

17 files changed

+251
-96
lines changed

17 files changed

+251
-96
lines changed

src/block_range_scanner.rs

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
9393
// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
9494
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
9595

96-
pub const MAX_BUFFERED_MESSAGES: usize = 50000;
96+
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
9797

9898
// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block
9999
// is considered final)
@@ -121,10 +121,11 @@ impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumb
121121
}
122122
}
123123

124-
#[derive(Clone, Debug)]
124+
#[derive(Clone)]
125125
pub struct BlockRangeScanner {
126126
pub max_block_range: u64,
127127
pub past_blocks_storage_capacity: RingBufferCapacity,
128+
pub buffer_capacity: usize,
128129
}
129130

130131
impl Default for BlockRangeScanner {
@@ -139,6 +140,7 @@ impl BlockRangeScanner {
139140
Self {
140141
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
141142
past_blocks_storage_capacity: RingBufferCapacity::Limited(10),
143+
buffer_capacity: DEFAULT_STREAM_BUFFER_CAPACITY,
142144
}
143145
}
144146

@@ -157,6 +159,12 @@ impl BlockRangeScanner {
157159
self
158160
}
159161

162+
#[must_use]
163+
pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
164+
self.buffer_capacity = buffer_capacity;
165+
self
166+
}
167+
160168
/// Connects to an existing provider
161169
///
162170
/// # Errors
@@ -166,20 +174,27 @@ impl BlockRangeScanner {
166174
self,
167175
provider: impl IntoRobustProvider<N>,
168176
) -> Result<ConnectedBlockRangeScanner<N>, ScannerError> {
177+
if self.max_block_range == 0 {
178+
return Err(ScannerError::InvalidMaxBlockRange);
179+
}
180+
if self.buffer_capacity == 0 {
181+
return Err(ScannerError::InvalidBufferCapacity);
182+
}
169183
let provider = provider.into_robust_provider().await?;
170184
Ok(ConnectedBlockRangeScanner {
171185
provider,
172186
max_block_range: self.max_block_range,
173187
past_blocks_storage_capacity: self.past_blocks_storage_capacity,
188+
buffer_capacity: self.buffer_capacity,
174189
})
175190
}
176191
}
177192

178-
#[derive(Debug)]
179193
pub struct ConnectedBlockRangeScanner<N: Network> {
180194
provider: RobustProvider<N>,
181195
max_block_range: u64,
182196
past_blocks_storage_capacity: RingBufferCapacity,
197+
buffer_capacity: usize,
183198
}
184199

185200
impl<N: Network> ConnectedBlockRangeScanner<N> {
@@ -189,6 +204,12 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
189204
&self.provider
190205
}
191206

207+
/// Returns the stream buffer capacity.
208+
#[must_use]
209+
pub fn buffer_capacity(&self) -> usize {
210+
self.buffer_capacity
211+
}
212+
192213
/// Starts the subscription service and returns a client for sending commands.
193214
///
194215
/// # Errors
@@ -203,7 +224,7 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
203224
tokio::spawn(async move {
204225
service.run().await;
205226
});
206-
Ok(BlockRangeScannerClient::new(cmd_tx))
227+
Ok(BlockRangeScannerClient::new(cmd_tx, self.buffer_capacity))
207228
}
208229
}
209230

@@ -585,6 +606,7 @@ impl<N: Network> Service<N> {
585606

586607
pub struct BlockRangeScannerClient {
587608
command_sender: mpsc::Sender<Command>,
609+
buffer_capacity: usize,
588610
}
589611

590612
impl BlockRangeScannerClient {
@@ -593,9 +615,10 @@ impl BlockRangeScannerClient {
593615
/// # Arguments
594616
///
595617
/// * `command_sender` - The sender for sending commands to the subscription service.
618+
/// * `buffer_capacity` - The capacity for buffering messages in the stream.
596619
#[must_use]
597-
pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
598-
Self { command_sender }
620+
pub fn new(command_sender: mpsc::Sender<Command>, buffer_capacity: usize) -> Self {
621+
Self { command_sender, buffer_capacity }
599622
}
600623

601624
/// Streams live blocks starting from the latest block.
@@ -611,7 +634,7 @@ impl BlockRangeScannerClient {
611634
&self,
612635
block_confirmations: u64,
613636
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
614-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
637+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
615638
let (response_tx, response_rx) = oneshot::channel();
616639

617640
let command = Command::StreamLive {
@@ -642,7 +665,7 @@ impl BlockRangeScannerClient {
642665
start_id: impl Into<BlockId>,
643666
end_id: impl Into<BlockId>,
644667
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
645-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
668+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
646669
let (response_tx, response_rx) = oneshot::channel();
647670

648671
let command = Command::StreamHistorical {
@@ -674,7 +697,7 @@ impl BlockRangeScannerClient {
674697
start_id: impl Into<BlockId>,
675698
block_confirmations: u64,
676699
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
677-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
700+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
678701
let (response_tx, response_rx) = oneshot::channel();
679702

680703
let command = Command::StreamFrom {
@@ -733,7 +756,7 @@ impl BlockRangeScannerClient {
733756
start_id: impl Into<BlockId>,
734757
end_id: impl Into<BlockId>,
735758
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
736-
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
759+
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
737760
let (response_tx, response_rx) = oneshot::channel();
738761

739762
let command = Command::Rewind {
@@ -754,23 +777,28 @@ impl BlockRangeScannerClient {
754777
#[cfg(test)]
755778
mod tests {
756779
use super::*;
757-
use alloy::eips::{BlockId, BlockNumberOrTag};
780+
use alloy::{
781+
eips::{BlockId, BlockNumberOrTag},
782+
network::Ethereum,
783+
providers::{RootProvider, mock::Asserter},
784+
rpc::client::RpcClient,
785+
};
758786
use tokio::sync::mpsc;
759787

760788
#[test]
761789
fn block_range_scanner_defaults_match_constants() {
762790
let scanner = BlockRangeScanner::new();
763791

764792
assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
793+
assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
765794
}
766795

767796
#[test]
768797
fn builder_methods_update_configuration() {
769-
let max_block_range = 42;
798+
let scanner = BlockRangeScanner::new().max_block_range(42).buffer_capacity(33);
770799

771-
let scanner = BlockRangeScanner::new().max_block_range(max_block_range);
772-
773-
assert_eq!(scanner.max_block_range, max_block_range);
800+
assert_eq!(scanner.max_block_range, 42);
801+
assert_eq!(scanner.buffer_capacity, 33);
774802
}
775803

776804
#[tokio::test]
@@ -784,4 +812,20 @@ mod tests {
784812
Some(Err(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(4)))))
785813
));
786814
}
815+
816+
#[tokio::test]
817+
async fn returns_error_with_zero_buffer_capacity() {
818+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
819+
let result = BlockRangeScanner::new().buffer_capacity(0).connect(provider).await;
820+
821+
assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
822+
}
823+
824+
#[tokio::test]
825+
async fn returns_error_with_zero_max_block_range() {
826+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
827+
let result = BlockRangeScanner::new().max_block_range(0).connect(provider).await;
828+
829+
assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange)));
830+
}
787831
}

src/block_range_scanner/reorg_handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414

1515
use super::ring_buffer::RingBuffer;
1616

17-
#[derive(Clone, Debug)]
17+
#[derive(Clone)]
1818
pub(crate) struct ReorgHandler<N: Network = Ethereum> {
1919
provider: RobustProvider<N>,
2020
buffer: RingBuffer<BlockHash>,

src/block_range_scanner/ring_buffer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ macro_rules! impl_from_unsigned {
2020

2121
impl_from_unsigned!(RingBufferCapacity; u8, u16, u32, usize);
2222

23-
#[derive(Clone, Debug)]
23+
#[derive(Clone)]
2424
pub(crate) struct RingBuffer<T> {
2525
inner: VecDeque<T>,
2626
capacity: RingBufferCapacity,

src/block_range_scanner/sync_handler.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,13 @@ use crate::{
1212
};
1313

1414
/// Represents the initial state when starting a sync operation
15-
#[derive(Debug)]
1615
enum SyncState {
1716
/// Start block is already at or beyond the confirmed tip - go straight to live
1817
AlreadyLive { start_block: BlockNumber },
1918
/// Start block is behind - need to catch up first, then go live
2019
NeedsCatchup { start_block: BlockNumber, confirmed_tip: BlockNumber },
2120
}
2221

23-
#[derive(Debug)]
2422
pub(crate) struct SyncHandler<N: Network> {
2523
provider: RobustProvider<N>,
2624
max_block_range: u64,

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub enum ScannerError {
3131
#[error("Max block range must be greater than 0")]
3232
InvalidMaxBlockRange,
3333

34+
#[error("Stream buffer capacity must be greater than 0")]
35+
InvalidBufferCapacity,
36+
3437
#[error("Max concurrent fetches must be greater than 0")]
3538
InvalidMaxConcurrentFetches,
3639

src/event_scanner/listener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::event_scanner::{EventScannerResult, filter::EventFilter};
22
use tokio::sync::mpsc::Sender;
33

4-
#[derive(Clone, Debug)]
4+
#[derive(Clone)]
55
pub(crate) struct EventListener {
66
pub filter: EventFilter,
77
pub sender: Sender<EventScannerResult>,

src/event_scanner/scanner/common.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::ops::RangeInclusive;
22

33
use crate::{
44
Message, Notification, ScannerError, ScannerMessage,
5-
block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES},
5+
block_range_scanner::BlockScannerResult,
66
event_scanner::{filter::EventFilter, listener::EventListener},
77
robust_provider::{RobustProvider, provider::Error as RobustProviderError},
88
types::TryStream,
@@ -56,8 +56,9 @@ pub(crate) async fn handle_stream<N: Network, S: Stream<Item = BlockScannerResul
5656
listeners: &[EventListener],
5757
mode: ConsumerMode,
5858
max_concurrent_fetches: usize,
59+
buffer_capacity: usize,
5960
) {
60-
let (range_tx, _) = broadcast::channel::<BlockScannerResult>(MAX_BUFFERED_MESSAGES);
61+
let (range_tx, _) = broadcast::channel::<BlockScannerResult>(buffer_capacity);
6162

6263
let consumers = match mode {
6364
ConsumerMode::Stream => spawn_log_consumers_in_stream_mode(

src/event_scanner/scanner/historic.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl<N: Network> EventScanner<Historic, N> {
144144
let max_concurrent_fetches = self.config.max_concurrent_fetches;
145145
let provider = self.block_range_scanner.provider().clone();
146146
let listeners = self.listeners.clone();
147+
let buffer_capacity = self.buffer_capacity();
147148

148149
tokio::spawn(async move {
149150
handle_stream(
@@ -152,6 +153,7 @@ impl<N: Network> EventScanner<Historic, N> {
152153
&listeners,
153154
ConsumerMode::Stream,
154155
max_concurrent_fetches,
156+
buffer_capacity,
155157
)
156158
.await;
157159
});
@@ -162,7 +164,10 @@ impl<N: Network> EventScanner<Historic, N> {
162164

163165
#[cfg(test)]
164166
mod tests {
165-
use crate::event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES;
167+
use crate::{
168+
DEFAULT_STREAM_BUFFER_CAPACITY, block_range_scanner::DEFAULT_MAX_BLOCK_RANGE,
169+
event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES,
170+
};
166171

167172
use super::*;
168173
use alloy::{
@@ -180,12 +185,14 @@ mod tests {
180185
.to_block(200)
181186
.max_block_range(50)
182187
.max_concurrent_fetches(10)
188+
.buffer_capacity(33)
183189
.from_block(100);
184190

185191
assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(100).into());
186192
assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(200).into());
187193
assert_eq!(builder.config.max_concurrent_fetches, 10);
188194
assert_eq!(builder.block_range_scanner.max_block_range, 50);
195+
assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
189196
}
190197

191198
#[test]
@@ -195,6 +202,8 @@ mod tests {
195202
assert_eq!(builder.config.from_block, BlockNumberOrTag::Earliest.into());
196203
assert_eq!(builder.config.to_block, BlockNumberOrTag::Latest.into());
197204
assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
205+
assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
206+
assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
198207
}
199208

200209
#[test]
@@ -208,12 +217,15 @@ mod tests {
208217
.to_block(100)
209218
.to_block(200)
210219
.max_concurrent_fetches(10)
211-
.max_concurrent_fetches(20);
220+
.max_concurrent_fetches(20)
221+
.buffer_capacity(20)
222+
.buffer_capacity(40);
212223

213224
assert_eq!(builder.block_range_scanner.max_block_range, 105);
214225
assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(2).into());
215226
assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(200).into());
216227
assert_eq!(builder.config.max_concurrent_fetches, 20);
228+
assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
217229
}
218230

219231
#[tokio::test]
@@ -293,6 +305,14 @@ mod tests {
293305
}
294306
}
295307

308+
#[tokio::test]
309+
async fn returns_error_with_zero_buffer_capacity() {
310+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
311+
let result = EventScannerBuilder::historic().buffer_capacity(0).connect(provider).await;
312+
313+
assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
314+
}
315+
296316
#[tokio::test]
297317
async fn returns_error_with_zero_max_concurrent_fetches() {
298318
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));

0 commit comments

Comments
 (0)