@@ -13,12 +13,11 @@ use event_scanner::{
1313#[ tokio:: test]
1414async fn live_mode_processes_all_blocks_respecting_block_confirmations ( ) -> anyhow:: Result < ( ) > {
1515 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
16- let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
16+ let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
1717
1818 // --- Zero block confirmations -> stream immediately ---
1919
20- let client =
21- BlockRangeScanner :: new ( ) . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) ) . await ?. run ( ) ?;
20+ let client = BlockRangeScanner :: new ( ) . connect :: < Ethereum > ( provider. root ( ) . clone ( ) ) . run ( ) ?;
2221
2322 let mut stream = client. stream_live ( 0 ) . await ?;
2423
@@ -60,30 +59,24 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh
6059#[ tokio:: test]
6160async fn stream_from_latest_starts_at_tip_not_confirmed ( ) -> anyhow:: Result < ( ) > {
6261 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
62+ let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
6363
64- let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
65- provider. anvil_mine ( Some ( 20 ) , None ) . await ?;
66-
67- let block_confirmations = 5 ;
64+ let client = BlockRangeScanner :: new ( ) . connect :: < Ethereum > ( provider. root ( ) . clone ( ) ) . run ( ) ?;
6865
69- let client =
70- BlockRangeScanner :: new ( ) . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) ) . await ?. run ( ) ?;
66+ provider. anvil_mine ( Some ( 20 ) , None ) . await ?;
7167
72- let stream = client. stream_from ( BlockNumberOrTag :: Latest , block_confirmations ) . await ?;
68+ let stream = client. stream_from ( BlockNumberOrTag :: Latest , 5 ) . await ?;
7369
7470 let stream = assert_empty ! ( stream) ;
7571
7672 provider. anvil_mine ( Some ( 4 ) , None ) . await ?;
77-
7873 let mut stream = assert_empty ! ( stream) ;
7974
8075 provider. anvil_mine ( Some ( 1 ) , None ) . await ?;
81-
8276 assert_next ! ( stream, 20 ..=20 ) ;
8377 let mut stream = assert_empty ! ( stream) ;
8478
8579 provider. anvil_mine ( Some ( 1 ) , None ) . await ?;
86-
8780 assert_next ! ( stream, 21 ..=21 ) ;
8881 assert_empty ! ( stream) ;
8982
@@ -94,6 +87,7 @@ async fn stream_from_latest_starts_at_tip_not_confirmed() -> anyhow::Result<()>
9487async fn continuous_blocks_if_reorg_less_than_block_confirmation ( ) -> anyhow:: Result < ( ) > {
9588 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
9689 let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
90+
9791 let client = BlockRangeScanner :: new ( ) . connect :: < Ethereum > ( provider. root ( ) . clone ( ) ) . run ( ) ?;
9892
9993 let mut stream = client. stream_live ( 5 ) . await ?;
@@ -133,8 +127,8 @@ async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Re
133127async fn shallow_block_confirmation_does_not_mitigate_reorg ( ) -> anyhow:: Result < ( ) > {
134128 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
135129 let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
136- let client =
137- BlockRangeScanner :: new ( ) . connect_ws :: < Ethereum > ( anvil . ws_endpoint_url ( ) ) . await ? . run ( ) ?;
130+
131+ let client = BlockRangeScanner :: new ( ) . connect :: < Ethereum > ( provider . root ( ) . clone ( ) ) . run ( ) ?;
138132
139133 let mut stream = client. stream_live ( 3 ) . await ?;
140134
@@ -178,19 +172,18 @@ async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result<
178172}
179173
180174#[ tokio:: test]
181- #[ ignore = "too flaky, un-ignore once a full local node is used : https://github.com/OpenZeppelin/Event-Scanner/issues/109 " ]
175+ #[ ignore = "historical currently has no reorg logic : https://github.com/OpenZeppelin/Event-Scanner/issues/56 " ]
182176async fn historical_emits_correction_range_when_reorg_below_end ( ) -> anyhow:: Result < ( ) > {
183177 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
184- let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
178+ let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
185179
186180 provider. anvil_mine ( Some ( 120 ) , None ) . await ?;
187181
188182 let end_num = 110 ;
189183
190184 let client = BlockRangeScanner :: new ( )
191185 . max_block_range ( 30 )
192- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
193- . await ?
186+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
194187 . run ( ) ?;
195188
196189 let mut stream = client
@@ -213,7 +206,7 @@ async fn historical_emits_correction_range_when_reorg_below_end() -> anyhow::Res
213206}
214207
215208#[ tokio:: test]
216- #[ ignore = "too flaky, un-ignore once a full local node is used : https://github.com/OpenZeppelin/Event-Scanner/issues/109 " ]
209+ #[ ignore = "historical currently has no reorg logic : https://github.com/OpenZeppelin/Event-Scanner/issues/56 " ]
217210async fn historical_emits_correction_range_when_end_num_reorgs ( ) -> anyhow:: Result < ( ) > {
218211 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
219212 let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
@@ -224,8 +217,7 @@ async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Resu
224217
225218 let client = BlockRangeScanner :: new ( )
226219 . max_block_range ( 30 )
227- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
228- . await ?
220+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
229221 . run ( ) ?;
230222
231223 let mut stream = client
@@ -252,15 +244,13 @@ async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Resu
252244#[ tokio:: test]
253245async fn historic_mode_respects_blocks_read_per_epoch ( ) -> anyhow:: Result < ( ) > {
254246 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
255-
256247 let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
257248
258249 provider. anvil_mine ( Some ( 100 ) , None ) . await ?;
259250
260251 let client = BlockRangeScanner :: new ( )
261252 . max_block_range ( 5 )
262- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
263- . await ?
253+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
264254 . run ( ) ?;
265255
266256 // ranges where each batch is of max blocks per epoch size
@@ -290,8 +280,7 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
290280 // range where blocks per epoch is larger than the number of blocks on chain
291281 let client = BlockRangeScanner :: new ( )
292282 . max_block_range ( 200 )
293- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
294- . await ?
283+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
295284 . run ( ) ?;
296285
297286 let mut stream = client. stream_historical ( 0 , 20 ) . await ?;
@@ -308,14 +297,13 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
308297#[ tokio:: test]
309298async fn historic_mode_normalises_start_and_end_block ( ) -> anyhow:: Result < ( ) > {
310299 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
311-
312300 let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
301+
313302 provider. anvil_mine ( Some ( 11 ) , None ) . await ?;
314303
315304 let client = BlockRangeScanner :: new ( )
316305 . max_block_range ( 5 )
317- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
318- . await ?
306+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
319307 . run ( ) ?;
320308
321309 let mut stream = client. stream_historical ( 10 , 0 ) . await ?;
@@ -324,21 +312,25 @@ async fn historic_mode_normalises_start_and_end_block() -> anyhow::Result<()> {
324312 assert_next ! ( stream, 10 ..=10 ) ;
325313 assert_closed ! ( stream) ;
326314
315+ let mut stream = client. stream_historical ( 0 , 10 ) . await ?;
316+ assert_next ! ( stream, 0 ..=4 ) ;
317+ assert_next ! ( stream, 5 ..=9 ) ;
318+ assert_next ! ( stream, 10 ..=10 ) ;
319+ assert_closed ! ( stream) ;
320+
327321 Ok ( ( ) )
328322}
329323
330324#[ tokio:: test]
331325async fn rewind_single_batch_when_epoch_larger_than_range ( ) -> anyhow:: Result < ( ) > {
332326 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
333-
334327 let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
335328
336329 provider. anvil_mine ( Some ( 150 ) , None ) . await ?;
337330
338331 let client = BlockRangeScanner :: new ( )
339332 . max_block_range ( 100 )
340- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
341- . await ?
333+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
342334 . run ( ) ?;
343335
344336 let mut stream = client. rewind ( 100 , 150 ) . await ?;
@@ -353,15 +345,13 @@ async fn rewind_single_batch_when_epoch_larger_than_range() -> anyhow::Result<()
353345#[ tokio:: test]
354346async fn rewind_exact_multiple_of_epoch_creates_full_batches_in_reverse ( ) -> anyhow:: Result < ( ) > {
355347 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
356-
357348 let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
358349
359350 provider. anvil_mine ( Some ( 15 ) , None ) . await ?;
360351
361352 let client = BlockRangeScanner :: new ( )
362353 . max_block_range ( 5 )
363- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
364- . await ?
354+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
365355 . run ( ) ?;
366356
367357 let mut stream = client. rewind ( 0 , 14 ) . await ?;
@@ -378,15 +368,13 @@ async fn rewind_exact_multiple_of_epoch_creates_full_batches_in_reverse() -> any
378368#[ tokio:: test]
379369async fn rewind_with_remainder_trims_first_batch_to_stream_start ( ) -> anyhow:: Result < ( ) > {
380370 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
381-
382371 let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
383372
384373 provider. anvil_mine ( Some ( 15 ) , None ) . await ?;
385374
386375 let client = BlockRangeScanner :: new ( )
387376 . max_block_range ( 4 )
388- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
389- . await ?
377+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
390378 . run ( ) ?;
391379
392380 let mut stream = client. rewind ( 3 , 12 ) . await ?;
@@ -403,15 +391,13 @@ async fn rewind_with_remainder_trims_first_batch_to_stream_start() -> anyhow::Re
403391#[ tokio:: test]
404392async fn rewind_single_block_range ( ) -> anyhow:: Result < ( ) > {
405393 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
406-
407394 let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
408395
409396 provider. anvil_mine ( Some ( 15 ) , None ) . await ?;
410397
411398 let client = BlockRangeScanner :: new ( )
412399 . max_block_range ( 5 )
413- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
414- . await ?
400+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
415401 . run ( ) ?;
416402
417403 let mut stream = client. rewind ( 7 , 7 ) . await ?;
@@ -425,15 +411,13 @@ async fn rewind_single_block_range() -> anyhow::Result<()> {
425411#[ tokio:: test]
426412async fn rewind_epoch_of_one_sends_each_block_in_reverse_order ( ) -> anyhow:: Result < ( ) > {
427413 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
428-
429414 let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
430415
431416 provider. anvil_mine ( Some ( 15 ) , None ) . await ?;
432417
433418 let client = BlockRangeScanner :: new ( )
434419 . max_block_range ( 1 )
435- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
436- . await ?
420+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
437421 . run ( ) ?;
438422
439423 let mut stream = client. rewind ( 5 , 8 ) . await ?;
@@ -451,15 +435,14 @@ async fn rewind_epoch_of_one_sends_each_block_in_reverse_order() -> anyhow::Resu
451435#[ tokio:: test]
452436async fn command_rewind_defaults_latest_to_earliest_batches_correctly ( ) -> anyhow:: Result < ( ) > {
453437 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
454-
455438 let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
439+
456440 // Mine 20 blocks, so the total number of blocks is 21 (including 0th block)
457441 provider. anvil_mine ( Some ( 20 ) , None ) . await ?;
458442
459443 let client = BlockRangeScanner :: new ( )
460444 . max_block_range ( 7 )
461- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
462- . await ?
445+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
463446 . run ( ) ?;
464447
465448 let mut stream = client. rewind ( BlockNumberOrTag :: Earliest , BlockNumberOrTag :: Latest ) . await ?;
@@ -475,15 +458,14 @@ async fn command_rewind_defaults_latest_to_earliest_batches_correctly() -> anyho
475458#[ tokio:: test]
476459async fn command_rewind_handles_start_and_end_in_any_order ( ) -> anyhow:: Result < ( ) > {
477460 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
478-
479461 let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
462+
480463 // Ensure blocks at 3 and 15 exist
481464 provider. anvil_mine ( Some ( 16 ) , None ) . await ?;
482465
483466 let client = BlockRangeScanner :: new ( )
484467 . max_block_range ( 5 )
485- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
486- . await ?
468+ . connect :: < Ethereum > ( provider. root ( ) . clone ( ) )
487469 . run ( ) ?;
488470
489471 let mut stream = client. rewind ( 15 , 3 ) . await ?;
0 commit comments