1
+ use std:: collections:: VecDeque ;
1
2
use std:: path:: Path ;
2
3
3
4
use async_trait:: async_trait;
@@ -20,7 +21,7 @@ impl DumbBlockScanner {
20
21
}
21
22
}
22
23
23
- /// Update blocks returned by `parse `
24
+ /// Update blocks returned used the streamer constructed by `scan `
24
25
pub async fn update_blocks ( & self , new_blocks : Vec < ScannedBlock > ) {
25
26
let mut blocks = self . blocks . write ( ) . await ;
26
27
* blocks = new_blocks;
@@ -35,8 +36,88 @@ impl BlockScanner for DumbBlockScanner {
35
36
_from_immutable : Option < ImmutableFileNumber > ,
36
37
_until_immutable : ImmutableFileNumber ,
37
38
) -> StdResult < Box < dyn BlockStreamer > > {
38
- // let iter = self.blocks.read().await.clone().into_iter();
39
- // Ok(Box::new(iter))
40
- todo ! ( )
39
+ let blocks = self . blocks . read ( ) . await . clone ( ) ;
40
+ Ok ( Box :: new ( DumbBlockStreamer :: new ( vec ! [ blocks] ) ) )
41
+ }
42
+ }
43
+
44
+ /// Dumb block streamer
45
+ pub struct DumbBlockStreamer {
46
+ blocks : VecDeque < Vec < ScannedBlock > > ,
47
+ }
48
+
49
+ impl DumbBlockStreamer {
50
+ /// Factory - the resulting streamer can be polled one time for each list of blocks given
51
+ pub fn new ( blocks : Vec < Vec < ScannedBlock > > ) -> Self {
52
+ Self {
53
+ blocks : VecDeque :: from ( blocks) ,
54
+ }
55
+ }
56
+ }
57
+
58
+ #[ async_trait]
59
+ impl BlockStreamer for DumbBlockStreamer {
60
+ async fn poll_next ( & mut self ) -> StdResult < Option < Vec < ScannedBlock > > > {
61
+ Ok ( self . blocks . pop_front ( ) )
62
+ }
63
+ }
64
+
65
+ #[ cfg( test) ]
66
+ mod tests {
67
+ use super :: * ;
68
+
69
+ #[ tokio:: test]
70
+ async fn polling_without_set_of_block_return_none ( ) {
71
+ let mut streamer = DumbBlockStreamer :: new ( vec ! [ ] ) ;
72
+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
73
+ assert_eq ! ( blocks, None ) ;
74
+ }
75
+
76
+ #[ tokio:: test]
77
+ async fn polling_with_one_set_of_block_returns_some_once ( ) {
78
+ let expected_blocks = vec ! [ ScannedBlock :: new( "hash-1" , 1 , 10 , 20 , Vec :: <& str >:: new( ) ) ] ;
79
+ let mut streamer = DumbBlockStreamer :: new ( vec ! [ expected_blocks. clone( ) ] ) ;
80
+
81
+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
82
+ assert_eq ! ( blocks, Some ( expected_blocks) ) ;
83
+
84
+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
85
+ assert_eq ! ( blocks, None ) ;
86
+ }
87
+
88
+ #[ tokio:: test]
89
+ async fn polling_with_multiple_sets_of_blocks_returns_some_once ( ) {
90
+ let expected_blocks = vec ! [
91
+ vec![ ScannedBlock :: new( "hash-1" , 1 , 10 , 20 , Vec :: <& str >:: new( ) ) ] ,
92
+ vec![
93
+ ScannedBlock :: new( "hash-2" , 2 , 11 , 21 , Vec :: <& str >:: new( ) ) ,
94
+ ScannedBlock :: new( "hash-3" , 3 , 12 , 22 , Vec :: <& str >:: new( ) ) ,
95
+ ] ,
96
+ vec![ ScannedBlock :: new( "hash-4" , 4 , 13 , 23 , Vec :: <& str >:: new( ) ) ] ,
97
+ ] ;
98
+ let mut streamer = DumbBlockStreamer :: new ( expected_blocks. clone ( ) ) ;
99
+
100
+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
101
+ assert_eq ! ( blocks, Some ( expected_blocks[ 0 ] . clone( ) ) ) ;
102
+
103
+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
104
+ assert_eq ! ( blocks, Some ( expected_blocks[ 1 ] . clone( ) ) ) ;
105
+
106
+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
107
+ assert_eq ! ( blocks, Some ( expected_blocks[ 2 ] . clone( ) ) ) ;
108
+
109
+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
110
+ assert_eq ! ( blocks, None ) ;
111
+ }
112
+
113
+ #[ tokio:: test]
114
+ async fn dumb_scanned_construct_a_streamer_based_on_its_stored_blocks ( ) {
115
+ let expected_blocks = vec ! [ ScannedBlock :: new( "hash-1" , 1 , 10 , 20 , Vec :: <& str >:: new( ) ) ] ;
116
+
117
+ let scanner = DumbBlockScanner :: new ( expected_blocks. clone ( ) ) ;
118
+ let mut streamer = scanner. scan ( Path :: new ( "dummy" ) , None , 1 ) . await . unwrap ( ) ;
119
+
120
+ let blocks = streamer. poll_all ( ) . await . unwrap ( ) ;
121
+ assert_eq ! ( blocks, expected_blocks) ;
41
122
}
42
123
}
0 commit comments