Skip to content

Commit 03d4611

Browse files
committed
chain: Simplify EthereumAdapter.load_blocks
1 parent 2f47ed5 commit 03d4611

File tree

3 files changed

+26
-35
lines changed

3 files changed

+26
-35
lines changed

chain/ethereum/src/adapter.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use graph::prelude::*;
2626
use graph::{
2727
blockchain as bc,
2828
components::metrics::{CounterVec, GaugeVec, HistogramVec},
29-
futures01::Stream,
3029
petgraph::{self, graphmap::GraphMap},
3130
};
3231

@@ -1107,7 +1106,7 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11071106
logger: Logger,
11081107
chain_store: Arc<dyn ChainStore>,
11091108
block_hashes: HashSet<H256>,
1110-
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send>;
1109+
) -> Result<Vec<Arc<LightEthereumBlock>>, Error>;
11111110

11121111
/// Find a block by its hash.
11131112
fn block_by_hash(

chain/ethereum/src/chain.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use graph::components::network_provider::ChainName;
1010
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::firehose::{FirehoseEndpoint, ForkStep};
13-
use graph::futures03::compat::Future01CompatExt;
1413
use graph::futures03::TryStreamExt;
1514
use graph::prelude::{
1615
retry, BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
@@ -1060,7 +1059,6 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10601059
}
10611060

10621061
async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error> {
1063-
use graph::futures01::stream::Stream;
10641062
use graph::prelude::LightEthereumBlockExt;
10651063

10661064
let block = match self.chain_client.as_ref() {
@@ -1111,9 +1109,6 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
11111109
self.chain_store.cheap_clone(),
11121110
HashSet::from_iter(Some(block.hash_as_h256())),
11131111
)
1114-
.await
1115-
.collect()
1116-
.compat()
11171112
.await?;
11181113
assert_eq!(blocks.len(), 1);
11191114

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,7 +1734,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
17341734
logger: Logger,
17351735
chain_store: Arc<dyn ChainStore>,
17361736
block_hashes: HashSet<H256>,
1737-
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send> {
1737+
) -> Result<Vec<Arc<LightEthereumBlock>>, Error> {
17381738
let block_hashes: Vec<_> = block_hashes.iter().cloned().collect();
17391739
// Search for the block in the store first then use json-rpc as a backup.
17401740
let mut blocks: Vec<Arc<LightEthereumBlock>> = chain_store
@@ -1756,27 +1756,25 @@ impl EthereumAdapterTrait for EthereumAdapter {
17561756

17571757
// Return a stream that lazily loads batches of blocks.
17581758
debug!(logger, "Requesting {} block(s)", missing_blocks.len());
1759-
Box::new(
1760-
self.load_blocks_rpc(logger.clone(), missing_blocks)
1761-
.collect()
1762-
.map(move |new_blocks| {
1763-
let upsert_blocks: Vec<_> = new_blocks
1764-
.iter()
1765-
.map(|block| BlockFinality::Final(block.clone()))
1766-
.collect();
1767-
let block_refs: Vec<_> = upsert_blocks
1768-
.iter()
1769-
.map(|block| block as &dyn graph::blockchain::Block)
1770-
.collect();
1771-
if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) {
1772-
error!(logger, "Error writing to block cache {}", e);
1773-
}
1774-
blocks.extend(new_blocks);
1775-
blocks.sort_by_key(|block| block.number);
1776-
stream::iter_ok(blocks)
1777-
})
1778-
.flatten_stream(),
1779-
)
1759+
let new_blocks = self
1760+
.load_blocks_rpc(logger.clone(), missing_blocks)
1761+
.collect()
1762+
.compat()
1763+
.await?;
1764+
let upsert_blocks: Vec<_> = new_blocks
1765+
.iter()
1766+
.map(|block| BlockFinality::Final(block.clone()))
1767+
.collect();
1768+
let block_refs: Vec<_> = upsert_blocks
1769+
.iter()
1770+
.map(|block| block as &dyn graph::blockchain::Block)
1771+
.collect();
1772+
if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) {
1773+
error!(logger, "Error writing to block cache {}", e);
1774+
}
1775+
blocks.extend(new_blocks);
1776+
blocks.sort_by_key(|block| block.number);
1777+
Ok(blocks)
17801778
}
17811779
}
17821780

@@ -1911,10 +1909,11 @@ pub(crate) async fn blocks_with_triggers(
19111909

19121910
let logger2 = logger.cheap_clone();
19131911

1914-
let blocks = eth
1912+
let blocks: Vec<_> = eth
19151913
.load_blocks(logger.cheap_clone(), chain_store.clone(), block_hashes)
1916-
.await
1917-
.and_then(
1914+
.await?
1915+
.into_iter()
1916+
.map(
19181917
move |block| match triggers_by_block.remove(&(block.number() as BlockNumber)) {
19191918
Some(triggers) => Ok(BlockWithTriggers::new(
19201919
BlockFinality::Final(block),
@@ -1927,9 +1926,7 @@ pub(crate) async fn blocks_with_triggers(
19271926
)),
19281927
},
19291928
)
1930-
.collect()
1931-
.compat()
1932-
.await?;
1929+
.collect::<Result<_, _>>()?;
19331930

19341931
// Filter out call triggers that come from unsuccessful transactions
19351932
let futures = blocks.into_iter().map(|block| {

0 commit comments

Comments
 (0)