diff --git a/src/block_scanner.rs b/src/block_scanner.rs index 4f120dbe..820485af 100644 --- a/src/block_scanner.rs +++ b/src/block_scanner.rs @@ -165,12 +165,8 @@ impl BlockScannerBuilder { where P: Provider, { - let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch); - BlockScanner { provider, - sender, - receiver, current: Header::default(), is_end: false, blocks_read_per_epoch: self.blocks_read_per_epoch, @@ -189,8 +185,6 @@ impl BlockScannerBuilder { // with the awareness of reorganization. pub struct BlockScanner, N: Network> { provider: P, - sender: Sender>, - receiver: Receiver>, blocks_read_per_epoch: usize, start_height: BlockNumberOrTag, end_height: BlockNumberOrTag, @@ -208,14 +202,16 @@ where P: Provider, N: Network, { - pub async fn start(self) -> ReceiverStream> { - let receiver_stream = ReceiverStream::new(self.receiver); + pub async fn start(&self) -> ReceiverStream> { + let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch); + + let receiver_stream = ReceiverStream::new(receiver); future::ready(()).await; - tokio::spawn(async move { - if self.sender.send(Err(BlockScannerError::ErrEOF {})).await.is_err() {} - }); + tokio::spawn( + async move { if sender.send(Err(BlockScannerError::ErrEOF {})).await.is_err() {} }, + ); receiver_stream } @@ -293,25 +289,4 @@ mod tests { other => panic!("expected first stream item to be ErrEOF, got: {other:?}"), } } - - #[tokio::test] - async fn test_channel_buffer_is_equal_to_blocks_read_per_epoch() { - let anvil = Anvil::new().try_spawn().expect("failed to spawn anvil"); - let ws = WsConnect::new(anvil.ws_endpoint_url()); - - let mut builder = BlockScannerBuilder::::new(); - builder.with_blocks_read_per_epoch(5); - - let scanner = builder.connect_ws(ws).await.expect("failed to connect ws"); - - for _ in 0..scanner.blocks_read_per_epoch { - scanner - .sender - .try_send(Err(BlockScannerError::ErrContinue)) - .expect("channel should not be full yet"); - } - - let res = scanner.sender.try_send(Err(BlockScannerError::ErrContinue)); - assert!(matches!(res, Err(tokio::sync::mpsc::error::TrySendError::Full(_)))); - } } diff --git a/src/event_scanner.rs b/src/event_scanner.rs index e1e6f393..7fe826d3 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -172,3 +172,5 @@ impl, N: Network> EventScanner { todo!() } } + +// TODO: implement max channel buffer size test