@@ -7,7 +7,7 @@ pub use self::{
7
7
} ;
8
8
use crate :: {
9
9
btsieve:: { BlockByHash , LatestBlock , MatchingTransactions , ReceiptByHash } ,
10
- ethereum:: { Block , Transaction , TransactionAndReceipt , TransactionReceipt , H256 } ,
10
+ ethereum:: { Block , Transaction , TransactionAndReceipt , TransactionReceipt , H256 , U256 } ,
11
11
} ;
12
12
use futures_core:: { compat:: Future01CompatExt , future:: join, FutureExt , TryFutureExt } ;
13
13
use std:: { collections:: HashSet , fmt:: Debug , ops:: Add } ;
@@ -30,15 +30,19 @@ where
30
30
fn matching_transactions (
31
31
& self ,
32
32
pattern : TransactionPattern ,
33
- timestamp : Option < u32 > ,
33
+ reference_timestamp : Option < u32 > ,
34
34
) -> Box < dyn Stream < Item = Self :: Transaction , Error = ( ) > + Send > {
35
35
let ( block_queue, next_block) = async_std:: sync:: channel ( 1 ) ;
36
36
let ( find_parent_queue, next_find_parent) = async_std:: sync:: channel ( 5 ) ;
37
+ let ( look_in_the_past_queue, next_look_in_the_past) = async_std:: sync:: channel ( 5 ) ;
38
+
39
+ let reference_timestamp = reference_timestamp. map ( |timestamp| U256 :: from ( timestamp) ) ;
37
40
38
41
spawn ( self . clone ( ) , {
39
42
let mut connector = self . clone ( ) ;
40
43
let block_queue = block_queue. clone ( ) ;
41
44
let find_parent_queue = find_parent_queue. clone ( ) ;
45
+ let look_in_the_past_queue = look_in_the_past_queue. clone ( ) ;
42
46
43
47
async move {
44
48
let mut sent_blockhashes: HashSet < H256 > = HashSet :: new ( ) ;
61
65
find_parent_queue. send ( ( blockhash, block. parent_hash ) ) ,
62
66
)
63
67
. await ;
68
+
69
+ if sent_blockhashes. len ( ) == 1 {
70
+ look_in_the_past_queue. send ( block. parent_hash ) . await
71
+ } ;
64
72
}
65
73
}
66
74
Ok ( Some ( _) ) => {
@@ -117,9 +125,10 @@ where
117
125
} ) ;
118
126
119
127
spawn ( self . clone ( ) , {
128
+ let fetch_block_by_hash_queue = fetch_block_by_hash_queue. clone ( ) ;
129
+
120
130
async move {
121
131
let mut prev_blockhashes: HashSet < H256 > = HashSet :: new ( ) ;
122
- let fetch_block_by_hash_queue = fetch_block_by_hash_queue. clone ( ) ;
123
132
124
133
loop {
125
134
match next_find_parent. recv ( ) . await {
@@ -138,6 +147,53 @@ where
138
147
}
139
148
} ) ;
140
149
150
+ spawn ( self . clone ( ) , {
151
+ let connector = self . clone ( ) ;
152
+ let block_queue = block_queue. clone ( ) ;
153
+ let look_in_the_past_queue = look_in_the_past_queue. clone ( ) ;
154
+
155
+ async move {
156
+ loop {
157
+ match next_look_in_the_past. recv ( ) . await {
158
+ Some ( parent_blockhash) => {
159
+ match connector. block_by_hash ( parent_blockhash) . compat ( ) . await {
160
+ Ok ( Some ( block) ) => {
161
+ if reference_timestamp
162
+ . map ( |reference_timestamp| {
163
+ reference_timestamp <= block. timestamp
164
+ } )
165
+ . unwrap_or ( false )
166
+ {
167
+ join (
168
+ block_queue. send ( block. clone ( ) ) ,
169
+ look_in_the_past_queue. send ( block. parent_hash ) ,
170
+ )
171
+ . await ;
172
+ }
173
+ }
174
+ Ok ( None ) => {
175
+ log:: warn!(
176
+ "Block with hash {} does not exist" ,
177
+ parent_blockhash
178
+ ) ;
179
+ }
180
+ Err ( e) => {
181
+ log:: warn!(
182
+ "Could not get block with hash {}: {:?}" ,
183
+ parent_blockhash,
184
+ e
185
+ ) ;
186
+
187
+ look_in_the_past_queue. send ( parent_blockhash) . await
188
+ }
189
+ }
190
+ }
191
+ None => unreachable ! ( "senders cannot be dropped" ) ,
192
+ }
193
+ }
194
+ }
195
+ } ) ;
196
+
141
197
let ( matching_transaction_queue, matching_transaction) = async_std:: sync:: channel ( 1 ) ;
142
198
143
199
spawn ( self . clone ( ) , {
0 commit comments