@@ -178,7 +178,6 @@ impl EthereumAdapter {
178
178
addresses : Vec < H160 > ,
179
179
) -> Result < Vec < Trace > , Error > {
180
180
let eth = self . clone ( ) ;
181
- let logger = logger. to_owned ( ) ;
182
181
183
182
retry ( "trace_filter RPC call" , & logger)
184
183
. limit ( * REQUEST_RETRIES )
@@ -196,54 +195,59 @@ impl EthereumAdapter {
196
195
. build ( ) ,
197
196
} ;
198
197
198
+ let eth = eth. cheap_clone ( ) ;
199
199
let logger_for_triggers = logger. clone ( ) ;
200
200
let logger_for_error = logger. clone ( ) ;
201
201
let start = Instant :: now ( ) ;
202
202
let subgraph_metrics = subgraph_metrics. clone ( ) ;
203
203
let provider_metrics = eth. metrics . clone ( ) ;
204
204
let provider = self . provider . clone ( ) ;
205
- Box :: pin ( eth. web3 . trace ( ) . filter ( trace_filter) )
206
- . compat ( )
207
- . map ( move |traces| {
208
- if traces. len ( ) > 0 {
209
- if to == from {
210
- debug ! (
211
- logger_for_triggers,
212
- "Received {} traces for block {}" ,
213
- traces. len( ) ,
214
- to
215
- ) ;
216
- } else {
217
- debug ! (
218
- logger_for_triggers,
219
- "Received {} traces for blocks [{}, {}]" ,
220
- traces. len( ) ,
221
- from,
222
- to
223
- ) ;
205
+
206
+ async move {
207
+ let result = eth
208
+ . web3
209
+ . trace ( )
210
+ . filter ( trace_filter)
211
+ . await
212
+ . map ( move |traces| {
213
+ if traces. len ( ) > 0 {
214
+ if to == from {
215
+ debug ! (
216
+ logger_for_triggers,
217
+ "Received {} traces for block {}" ,
218
+ traces. len( ) ,
219
+ to
220
+ ) ;
221
+ } else {
222
+ debug ! (
223
+ logger_for_triggers,
224
+ "Received {} traces for blocks [{}, {}]" ,
225
+ traces. len( ) ,
226
+ from,
227
+ to
228
+ ) ;
229
+ }
224
230
}
225
- }
226
- traces
227
- } )
228
- . from_err ( )
229
- . then ( move |result| {
230
- let elapsed = start. elapsed ( ) . as_secs_f64 ( ) ;
231
- provider_metrics. observe_request ( elapsed, "trace_filter" , & provider) ;
232
- subgraph_metrics. observe_request ( elapsed, "trace_filter" , & provider) ;
233
- if result. is_err ( ) {
234
- provider_metrics. add_error ( "trace_filter" , & provider) ;
235
- subgraph_metrics. add_error ( "trace_filter" , & provider) ;
236
- debug ! (
237
- logger_for_error,
238
- "Error querying traces error = {:?} from = {:?} to = {:?}" ,
239
- result,
240
- from,
241
- to
242
- ) ;
243
- }
244
- result
245
- } )
246
- . compat ( )
231
+ traces
232
+ } )
233
+ . map_err ( Error :: from) ;
234
+
235
+ let elapsed = start. elapsed ( ) . as_secs_f64 ( ) ;
236
+ provider_metrics. observe_request ( elapsed, "trace_filter" , & provider) ;
237
+ subgraph_metrics. observe_request ( elapsed, "trace_filter" , & provider) ;
238
+ if result. is_err ( ) {
239
+ provider_metrics. add_error ( "trace_filter" , & provider) ;
240
+ subgraph_metrics. add_error ( "trace_filter" , & provider) ;
241
+ debug ! (
242
+ logger_for_error,
243
+ "Error querying traces error = {:?} from = {:?} to = {:?}" ,
244
+ result,
245
+ from,
246
+ to
247
+ ) ;
248
+ }
249
+ result
250
+ }
247
251
} )
248
252
. map_err ( move |e| {
249
253
e. into_inner ( ) . unwrap_or_else ( move || {
0 commit comments