@@ -7,100 +7,288 @@ pub use self::{
7
7
} ;
8
8
use crate :: {
9
9
btsieve:: { BlockByHash , LatestBlock , MatchingTransactions , ReceiptByHash } ,
10
- ethereum:: TransactionAndReceipt ,
10
+ ethereum:: { Block , Transaction , TransactionAndReceipt , TransactionReceipt , H256 , U256 } ,
11
11
} ;
12
- use futures_core:: { compat:: Future01CompatExt , TryFutureExt } ;
13
- use std:: ops:: Add ;
12
+ use futures_core:: { compat:: Future01CompatExt , future :: join , FutureExt , TryFutureExt } ;
13
+ use std:: { collections :: HashSet , fmt :: Debug , ops:: Add } ;
14
14
use tokio:: {
15
15
prelude:: { stream, Stream } ,
16
16
timer:: Delay ,
17
17
} ;
18
18
19
- impl < C > MatchingTransactions < TransactionPattern > for C
19
+ impl < C , E > MatchingTransactions < TransactionPattern > for C
20
20
where
21
- C : LatestBlock < Block = Option < crate :: ethereum :: Block < crate :: ethereum :: Transaction > > >
22
- + BlockByHash < Block = Option < crate :: ethereum :: Block < crate :: ethereum :: Transaction > > >
23
- + ReceiptByHash <
24
- Receipt = Option < crate :: ethereum :: TransactionReceipt > ,
25
- TransactionHash = crate :: ethereum :: H256 ,
26
- > + Clone ,
21
+ C : LatestBlock < Block = Option < Block < Transaction > > , Error = E >
22
+ + BlockByHash < Block = Option < Block < Transaction > > , BlockHash = H256 , Error = E >
23
+ + ReceiptByHash < Receipt = Option < TransactionReceipt > , TransactionHash = H256 , Error = E >
24
+ + tokio :: executor :: Executor
25
+ + Clone ,
26
+ E : Debug + Send + ' static ,
27
27
{
28
28
type Transaction = TransactionAndReceipt ;
29
29
30
30
fn matching_transactions (
31
31
& self ,
32
32
pattern : TransactionPattern ,
33
- _timestamp : Option < u32 > ,
33
+ reference_timestamp : Option < u32 > ,
34
34
) -> Box < dyn Stream < Item = Self :: Transaction , Error = ( ) > + Send > {
35
- let matching_transaction = Box :: pin ( matching_transaction ( self . clone ( ) , pattern) ) . compat ( ) ;
36
- Box :: new ( stream:: futures_unordered ( vec ! [ matching_transaction] ) )
37
- }
38
- }
35
+ let ( block_queue, next_block) = async_std:: sync:: channel ( 1 ) ;
36
+ let ( find_parent_queue, next_find_parent) = async_std:: sync:: channel ( 5 ) ;
37
+ let ( look_in_the_past_queue, next_look_in_the_past) = async_std:: sync:: channel ( 5 ) ;
39
38
40
- async fn matching_transaction < C > (
41
- mut blockchain_connector : C ,
42
- pattern : TransactionPattern ,
43
- ) -> Result < TransactionAndReceipt , ( ) >
44
- where
45
- C : LatestBlock < Block = Option < crate :: ethereum:: Block < crate :: ethereum:: Transaction > > >
46
- + BlockByHash < Block = Option < crate :: ethereum:: Block < crate :: ethereum:: Transaction > > >
47
- + ReceiptByHash <
48
- Receipt = Option < crate :: ethereum:: TransactionReceipt > ,
49
- TransactionHash = crate :: ethereum:: H256 ,
50
- > + Clone ,
51
- {
52
- loop {
53
- // Delay so that we don't overload the CPU in the event that
54
- // latest_block() and block_by_hash() resolve quickly.
55
- Delay :: new ( std:: time:: Instant :: now ( ) . add ( std:: time:: Duration :: from_secs ( 1 ) ) )
56
- . compat ( )
57
- . await
58
- . unwrap_or_else ( |e| log:: warn!( "Failed to wait for delay: {:?}" , e) ) ;
59
-
60
- let latest_block = match blockchain_connector. latest_block ( ) . compat ( ) . await {
61
- Ok ( Some ( block) ) => block,
62
- Ok ( None ) => {
63
- log:: warn!( "Could not get latest block" ) ;
64
- continue ;
39
+ let reference_timestamp = reference_timestamp. map ( U256 :: from) ;
40
+
41
+ spawn ( self . clone ( ) , {
42
+ let mut connector = self . clone ( ) ;
43
+ let block_queue = block_queue. clone ( ) ;
44
+ let find_parent_queue = find_parent_queue. clone ( ) ;
45
+ let look_in_the_past_queue = look_in_the_past_queue. clone ( ) ;
46
+
47
+ async move {
48
+ let mut sent_blockhashes: HashSet < H256 > = HashSet :: new ( ) ;
49
+
50
+ loop {
51
+ Delay :: new ( std:: time:: Instant :: now ( ) . add ( std:: time:: Duration :: from_secs ( 1 ) ) )
52
+ . compat ( )
53
+ . await
54
+ . unwrap ( ) ;
55
+
56
+ match connector. latest_block ( ) . compat ( ) . await {
57
+ Ok ( Some ( block) ) if block. hash . is_some ( ) => {
58
+ let blockhash = block. hash . expect ( "cannot fail" ) ;
59
+
60
+ if !sent_blockhashes. contains ( & blockhash) {
61
+ sent_blockhashes. insert ( blockhash) ;
62
+
63
+ join (
64
+ block_queue. send ( block. clone ( ) ) ,
65
+ find_parent_queue. send ( ( blockhash, block. parent_hash ) ) ,
66
+ )
67
+ . await ;
68
+
69
+ if sent_blockhashes. len ( ) == 1 {
70
+ look_in_the_past_queue. send ( block. parent_hash ) . await
71
+ } ;
72
+ }
73
+ }
74
+ Ok ( Some ( _) ) => {
75
+ log:: warn!( "Ignoring block without blockhash" ) ;
76
+ }
77
+ Ok ( None ) => {
78
+ log:: warn!( "Could not get latest block" ) ;
79
+ }
80
+ Err ( e) => {
81
+ log:: warn!( "Could not get latest block: {:?}" , e) ;
82
+ }
83
+ } ;
84
+ }
65
85
}
66
- Err ( e) => {
67
- log:: warn!( "Could not get latest block: {:?}" , e) ;
68
- continue ;
86
+ } ) ;
87
+
88
+ let ( fetch_block_by_hash_queue, next_hash) = async_std:: sync:: channel ( 5 ) ;
89
+
90
+ spawn ( self . clone ( ) , {
91
+ let connector = self . clone ( ) ;
92
+ let block_queue = block_queue. clone ( ) ;
93
+ let fetch_block_by_hash_queue = fetch_block_by_hash_queue. clone ( ) ;
94
+
95
+ async move {
96
+ loop {
97
+ match next_hash. recv ( ) . await {
98
+ Some ( blockhash) => {
99
+ match connector. block_by_hash ( blockhash) . compat ( ) . await {
100
+ Ok ( Some ( block) ) => {
101
+ join (
102
+ block_queue. send ( block. clone ( ) ) ,
103
+ find_parent_queue. send ( ( blockhash, block. parent_hash ) ) ,
104
+ )
105
+ . await ;
106
+ }
107
+ Ok ( None ) => {
108
+ log:: warn!( "Block with hash {} does not exist" , blockhash) ;
109
+ }
110
+ Err ( e) => {
111
+ log:: warn!(
112
+ "Could not get block with hash {}: {:?}" ,
113
+ blockhash,
114
+ e
115
+ ) ;
116
+
117
+ fetch_block_by_hash_queue. send ( blockhash) . await
118
+ }
119
+ } ;
120
+ }
121
+ None => unreachable ! ( "sender cannot be dropped" ) ,
122
+ }
123
+ }
69
124
}
70
- } ;
125
+ } ) ;
126
+
127
+ spawn ( self . clone ( ) , {
128
+ let fetch_block_by_hash_queue = fetch_block_by_hash_queue. clone ( ) ;
129
+
130
+ async move {
131
+ let mut prev_blockhashes: HashSet < H256 > = HashSet :: new ( ) ;
132
+
133
+ loop {
134
+ match next_find_parent. recv ( ) . await {
135
+ Some ( ( blockhash, parent_blockhash) ) => {
136
+ prev_blockhashes. insert ( blockhash) ;
71
137
72
- if pattern. can_skip_block ( & latest_block) {
73
- continue ;
74
- }
75
-
76
- for transaction in latest_block. transactions . iter ( ) {
77
- let result = blockchain_connector
78
- . receipt_by_hash ( transaction. hash )
79
- . compat ( )
80
- . await ;
81
-
82
- let receipt = match result {
83
- Ok ( Some ( receipt) ) => receipt,
84
- Ok ( None ) => {
85
- log:: warn!( "Could not get transaction receipt" ) ;
86
- continue ;
138
+ if !prev_blockhashes. contains ( & parent_blockhash)
139
+ && prev_blockhashes. len ( ) > 1
140
+ {
141
+ fetch_block_by_hash_queue. send ( parent_blockhash) . await
142
+ }
143
+ }
144
+ None => unreachable ! ( "senders cannot be dropped" ) ,
145
+ }
87
146
}
88
- Err ( e) => {
89
- log:: warn!(
90
- "Could not retrieve transaction receipt for {}: {:?}" ,
91
- transaction. hash,
92
- e
93
- ) ;
94
- continue ;
147
+ }
148
+ } ) ;
149
+
150
+ spawn ( self . clone ( ) , {
151
+ let connector = self . clone ( ) ;
152
+ let block_queue = block_queue. clone ( ) ;
153
+ let look_in_the_past_queue = look_in_the_past_queue. clone ( ) ;
154
+
155
+ async move {
156
+ loop {
157
+ match next_look_in_the_past. recv ( ) . await {
158
+ Some ( parent_blockhash) => {
159
+ match connector. block_by_hash ( parent_blockhash) . compat ( ) . await {
160
+ Ok ( Some ( block) ) => {
161
+ let younger_than_reference_timestamp = reference_timestamp
162
+ . map ( |reference_timestamp| {
163
+ reference_timestamp <= block. timestamp
164
+ } )
165
+ . unwrap_or ( false ) ;
166
+ if younger_than_reference_timestamp {
167
+ join (
168
+ block_queue. send ( block. clone ( ) ) ,
169
+ look_in_the_past_queue. send ( block. parent_hash ) ,
170
+ )
171
+ . await ;
172
+ }
173
+ }
174
+ Ok ( None ) => {
175
+ log:: warn!(
176
+ "Block with hash {} does not exist" ,
177
+ parent_blockhash
178
+ ) ;
179
+ }
180
+ Err ( e) => {
181
+ log:: warn!(
182
+ "Could not get block with hash {}: {:?}" ,
183
+ parent_blockhash,
184
+ e
185
+ ) ;
186
+
187
+ look_in_the_past_queue. send ( parent_blockhash) . await
188
+ }
189
+ }
190
+ }
191
+ None => unreachable ! ( "senders cannot be dropped" ) ,
192
+ }
193
+ }
194
+ }
195
+ } ) ;
196
+
197
+ let ( matching_transaction_queue, matching_transaction) = async_std:: sync:: channel ( 1 ) ;
198
+
199
+ spawn ( self . clone ( ) , {
200
+ let connector = self . clone ( ) ;
201
+ let matching_transaction_queue = matching_transaction_queue. clone ( ) ;
202
+
203
+ async move {
204
+ loop {
205
+ match next_block. recv ( ) . await {
206
+ Some ( block) => {
207
+ let needs_receipt = pattern. needs_receipts ( & block) ;
208
+
209
+ for transaction in block. transactions . into_iter ( ) {
210
+ if needs_receipt {
211
+ let result =
212
+ connector. receipt_by_hash ( transaction. hash ) . compat ( ) . await ;
213
+
214
+ let receipt = match result {
215
+ Ok ( Some ( receipt) ) => receipt,
216
+ Ok ( None ) => {
217
+ log:: warn!( "Could not get transaction receipt" ) ;
218
+ continue ;
219
+ }
220
+ Err ( e) => {
221
+ log:: warn!(
222
+ "Could not retrieve transaction receipt for {}: {:?}" ,
223
+ transaction. hash,
224
+ e
225
+ ) ;
226
+ continue ;
227
+ }
228
+ } ;
229
+
230
+ if pattern. matches ( & transaction, Some ( & receipt) ) {
231
+ matching_transaction_queue
232
+ . send ( TransactionAndReceipt {
233
+ transaction,
234
+ receipt,
235
+ } )
236
+ . await ;
237
+ }
238
+ } else if pattern. matches ( & transaction, None ) {
239
+ let result =
240
+ connector. receipt_by_hash ( transaction. hash ) . compat ( ) . await ;
241
+
242
+ let receipt = match result {
243
+ Ok ( Some ( receipt) ) => receipt,
244
+ Ok ( None ) => {
245
+ log:: warn!( "Could not get transaction receipt for matching transaction" ) ;
246
+ continue ;
247
+ }
248
+ Err ( e) => {
249
+ log:: warn!(
250
+ "Could not retrieve transaction receipt for matching transaction {}: {:?}" ,
251
+ transaction. hash,
252
+ e
253
+ ) ;
254
+ continue ;
255
+ }
256
+ } ;
257
+
258
+ matching_transaction_queue
259
+ . send ( TransactionAndReceipt {
260
+ transaction,
261
+ receipt,
262
+ } )
263
+ . await ;
264
+ }
265
+ }
266
+ }
267
+ None => unreachable ! ( "senders cannot be dropped" ) ,
268
+ }
95
269
}
96
- } ;
97
-
98
- if pattern. matches ( transaction, & receipt) {
99
- return Ok ( TransactionAndReceipt {
100
- transaction : transaction. clone ( ) ,
101
- receipt,
102
- } ) ;
103
- } ;
104
- }
270
+ }
271
+ } ) ;
272
+
273
+ let matching_transaction = async move {
274
+ matching_transaction
275
+ . recv ( )
276
+ . await
277
+ . expect ( "sender cannot be dropped" )
278
+ } ;
279
+
280
+ Box :: new ( stream:: futures_unordered ( vec ! [ matching_transaction
281
+ . unit_error( )
282
+ . boxed( )
283
+ . compat( ) ] ) )
105
284
}
106
285
}
286
+
287
+ fn spawn (
288
+ mut executor : impl tokio:: executor:: Executor ,
289
+ future : impl std:: future:: Future < Output = ( ) > + Send + ' static + Sized ,
290
+ ) {
291
+ executor
292
+ . spawn ( Box :: new ( future. unit_error ( ) . boxed ( ) . compat ( ) ) )
293
+ . unwrap ( )
294
+ }
0 commit comments