Skip to content

Commit db00339

Browse files
committed
chain/ethereum: Migrate load_blocks_rpc from futures01 to futures03
1 parent 6d1fdc6 commit db00339

File tree

1 file changed

+34
-22
lines changed

1 file changed

+34
-22
lines changed

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -665,29 +665,42 @@ impl EthereumAdapter {
665665
&self,
666666
logger: Logger,
667667
ids: Vec<H256>,
668-
) -> impl Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send {
668+
) -> impl futures03::Stream<Item = Result<Arc<LightEthereumBlock>, Error>> + Send {
669669
let web3 = self.web3.clone();
670670

671-
stream::iter_ok::<_, Error>(ids.into_iter().map(move |hash| {
671+
futures03::stream::iter(ids.into_iter().map(move |hash| {
672672
let web3 = web3.clone();
673-
retry(format!("load block {}", hash), &logger)
674-
.redact_log_urls(true)
675-
.limit(ENV_VARS.request_retries)
676-
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
677-
.run(move || {
678-
Box::pin(web3.eth().block_with_txs(BlockId::Hash(hash)))
679-
.compat()
680-
.from_err::<Error>()
681-
.and_then(move |block| {
682-
block.map(Arc::new).ok_or_else(|| {
683-
anyhow::anyhow!("Ethereum node did not find block {:?}", hash)
684-
})
673+
let logger = logger.clone();
674+
675+
async move {
676+
retry(format!("load block {}", hash), &logger)
677+
.redact_log_urls(true)
678+
.limit(ENV_VARS.request_retries)
679+
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
680+
.run(move || {
681+
let web3 = web3.cheap_clone();
682+
async move {
683+
web3.eth()
684+
.block_with_txs(BlockId::Hash(hash))
685+
.await
686+
.map_err(Error::from)
687+
.and_then(|block| {
688+
block.map(Arc::new).ok_or_else(|| {
689+
anyhow::anyhow!(
690+
"Ethereum node did not find block {:?}",
691+
hash
692+
)
693+
})
694+
})
695+
}
696+
})
697+
.await
698+
.map_err(|e| {
699+
e.into_inner().unwrap_or_else(|| {
700+
anyhow::anyhow!("Ethereum node took too long to return block {}", hash)
685701
})
686-
.compat()
687-
})
688-
.boxed()
689-
.compat()
690-
.from_err()
702+
})
703+
}
691704
}))
692705
.buffered(ENV_VARS.block_batch_size)
693706
}
@@ -1587,10 +1600,9 @@ impl EthereumAdapterTrait for EthereumAdapter {
15871600

15881601
// Return a stream that lazily loads batches of blocks.
15891602
debug!(logger, "Requesting {} block(s)", missing_blocks.len());
1590-
let new_blocks = self
1603+
let new_blocks: Vec<Arc<LightEthereumBlock>> = self
15911604
.load_blocks_rpc(logger.clone(), missing_blocks)
1592-
.collect()
1593-
.compat()
1605+
.try_collect()
15941606
.await?;
15951607
let upsert_blocks: Vec<_> = new_blocks
15961608
.iter()

0 commit comments

Comments
 (0)