Skip to content

Commit da5f618

Browse files
authored
block scanner ::start accepts ref to self (#16)
1 parent ccc6dd0 commit da5f618

File tree

2 files changed

+9
-32
lines changed

2 files changed

+9
-32
lines changed

src/block_scanner.rs

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,8 @@ impl<N: Network> BlockScannerBuilder<N> {
165165
where
166166
P: Provider<N>,
167167
{
168-
let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch);
169-
170168
BlockScanner {
171169
provider,
172-
sender,
173-
receiver,
174170
current: Header::default(),
175171
is_end: false,
176172
blocks_read_per_epoch: self.blocks_read_per_epoch,
@@ -189,8 +185,6 @@ impl<N: Network> BlockScannerBuilder<N> {
189185
// with the awareness of reorganization.
190186
pub struct BlockScanner<P: Provider<N>, N: Network> {
191187
provider: P,
192-
sender: Sender<Result<N::BlockResponse, BlockScannerError>>,
193-
receiver: Receiver<Result<N::BlockResponse, BlockScannerError>>,
194188
blocks_read_per_epoch: usize,
195189
start_height: BlockNumberOrTag,
196190
end_height: BlockNumberOrTag,
@@ -208,14 +202,16 @@ where
208202
P: Provider<N>,
209203
N: Network,
210204
{
211-
pub async fn start(self) -> ReceiverStream<Result<N::BlockResponse, BlockScannerError>> {
212-
let receiver_stream = ReceiverStream::new(self.receiver);
205+
pub async fn start(&self) -> ReceiverStream<Result<N::BlockResponse, BlockScannerError>> {
206+
let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch);
207+
208+
let receiver_stream = ReceiverStream::new(receiver);
213209

214210
future::ready(()).await;
215211

216-
tokio::spawn(async move {
217-
if self.sender.send(Err(BlockScannerError::ErrEOF {})).await.is_err() {}
218-
});
212+
tokio::spawn(
213+
async move { if sender.send(Err(BlockScannerError::ErrEOF {})).await.is_err() {} },
214+
);
219215

220216
receiver_stream
221217
}
@@ -293,25 +289,4 @@ mod tests {
293289
other => panic!("expected first stream item to be ErrEOF, got: {other:?}"),
294290
}
295291
}
296-
297-
#[tokio::test]
298-
async fn test_channel_buffer_is_equal_to_blocks_read_per_epoch() {
299-
let anvil = Anvil::new().try_spawn().expect("failed to spawn anvil");
300-
let ws = WsConnect::new(anvil.ws_endpoint_url());
301-
302-
let mut builder = BlockScannerBuilder::<Ethereum>::new();
303-
builder.with_blocks_read_per_epoch(5);
304-
305-
let scanner = builder.connect_ws(ws).await.expect("failed to connect ws");
306-
307-
for _ in 0..scanner.blocks_read_per_epoch {
308-
scanner
309-
.sender
310-
.try_send(Err(BlockScannerError::ErrContinue))
311-
.expect("channel should not be full yet");
312-
}
313-
314-
let res = scanner.sender.try_send(Err(BlockScannerError::ErrContinue));
315-
assert!(matches!(res, Err(tokio::sync::mpsc::error::TrySendError::Full(_))));
316-
}
317292
}

src/event_scanner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,5 @@ impl<P: Provider<N>, N: Network> EventScanner<P, N> {
172172
todo!()
173173
}
174174
}
175+
176+
// TODO: implement max channel buffer size test

0 commit comments

Comments
 (0)