Skip to content

Commit 82fb1f9

Browse files
committed
chain/ethereum: Refactor calls_in_block_range to eliminate Unpin errors and use futures03-ready closures
1 parent db00339 commit 82fb1f9

File tree

1 file changed

+39
-38
lines changed

1 file changed

+39
-38
lines changed

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ impl EthereumAdapter {
369369
from: BlockNumber,
370370
to: BlockNumber,
371371
addresses: Vec<H160>,
372-
) -> impl Stream<Item = Trace, Error = Error> + Send {
372+
) -> impl futures03::Stream<Item = Result<Trace, Error>> + Send {
373373
if from > to {
374374
panic!(
375375
"Can not produce a call stream on a backwards block range: from = {}, to = {}",
@@ -385,34 +385,37 @@ impl EthereumAdapter {
385385

386386
let eth = self;
387387
let logger = logger.clone();
388-
stream::unfold(from, move |start| {
389-
if start > to {
390-
return None;
391-
}
392-
let end = (start + step_size - 1).min(to);
393-
let new_start = end + 1;
394-
if start == end {
395-
debug!(logger, "Requesting traces for block {}", start);
396-
} else {
397-
debug!(logger, "Requesting traces for blocks [{}, {}]", start, end);
388+
389+
futures03::stream::try_unfold(from, move |start| {
390+
let eth = eth.clone();
391+
let logger = logger.clone();
392+
let subgraph_metrics = subgraph_metrics.clone();
393+
let addresses = addresses.clone();
394+
395+
async move {
396+
if start > to {
397+
return Ok::<Option<_>, Error>(None);
398+
}
399+
400+
let end = (start + step_size - 1).min(to);
401+
let new_start = end + 1;
402+
403+
if start == end {
404+
debug!(logger, "Requesting traces for block {}", start);
405+
} else {
406+
debug!(logger, "Requesting traces for blocks [{}, {}]", start, end);
407+
}
408+
409+
let traces = eth
410+
.traces(logger, subgraph_metrics, start, end, addresses)
411+
.await?;
412+
Ok(Some((
413+
futures03::stream::iter(traces.into_iter().map(|t| Ok::<_, Error>(t))),
414+
new_start,
415+
)))
398416
}
399-
Some(graph::futures01::future::ok((
400-
eth.clone()
401-
.traces(
402-
logger.cheap_clone(),
403-
subgraph_metrics.clone(),
404-
start,
405-
end,
406-
addresses.clone(),
407-
)
408-
.boxed()
409-
.compat(),
410-
new_start,
411-
)))
412417
})
413-
.buffered(ENV_VARS.block_batch_size)
414-
.map(stream::iter_ok)
415-
.flatten()
418+
.try_flatten()
416419
}
417420

418421
fn log_stream(
@@ -899,14 +902,13 @@ impl EthereumAdapter {
899902

900903
Box::new(
901904
eth.trace_stream(logger, subgraph_metrics, from, to, addresses)
902-
.filter_map(|trace| EthereumCall::try_from_trace(&trace))
903-
.filter(move |call| {
904-
// `trace_filter` can only filter by calls `to` an address and
905-
// a block range. Since subgraphs are subscribing to calls
906-
// for a specific contract function an additional filter needs
907-
// to be applied
908-
call_filter.matches(call)
909-
}),
905+
.try_filter_map(move |trace| {
906+
let maybe_call = EthereumCall::try_from_trace(&trace)
907+
.filter(|call| call_filter.matches(call));
908+
futures03::future::ready(Ok(maybe_call))
909+
})
910+
.boxed()
911+
.compat(),
910912
)
911913
}
912914

@@ -989,16 +991,15 @@ impl EthereumAdapter {
989991
) -> Result<Vec<EthereumCall>, Error> {
990992
let eth = self.clone();
991993
let addresses = Vec::new();
992-
let traces = eth
994+
let traces: Vec<Trace> = eth
993995
.trace_stream(
994996
logger,
995997
subgraph_metrics.clone(),
996998
block_number,
997999
block_number,
9981000
addresses,
9991001
)
1000-
.collect()
1001-
.compat()
1002+
.try_collect()
10021003
.await?;
10031004

10041005
// `trace_stream` returns all of the traces for the block, and this

0 commit comments

Comments
 (0)