Skip to content

Commit 854383b

Browse files
committed
Subgraph composition: Use block cache to get blocks for subgraph triggers
1 parent 5f6071b commit 854383b

File tree

4 files changed

+297
-16
lines changed

4 files changed

+297
-16
lines changed

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,45 @@ impl EthereumAdapter {
782782
.buffered(ENV_VARS.block_batch_size)
783783
}
784784

785+
/// Request blocks by number through JSON-RPC.
786+
fn load_blocks_by_numbers_rpc(
787+
&self,
788+
logger: Logger,
789+
numbers: Vec<BlockNumber>,
790+
) -> impl Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send {
791+
let web3 = self.web3.clone();
792+
793+
stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
794+
let web3 = web3.clone();
795+
retry(format!("load block {}", number), &logger)
796+
.limit(ENV_VARS.request_retries)
797+
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
798+
.run(move || {
799+
Box::pin(
800+
web3.eth()
801+
.block_with_txs(BlockId::Number(Web3BlockNumber::Number(
802+
number.into(),
803+
))),
804+
)
805+
.compat()
806+
.from_err::<Error>()
807+
.and_then(move |block| {
808+
block.map(Arc::new).ok_or_else(|| {
809+
anyhow::anyhow!(
810+
"Ethereum node did not find block with number {:?}",
811+
number
812+
)
813+
})
814+
})
815+
.compat()
816+
})
817+
.boxed()
818+
.compat()
819+
.from_err()
820+
}))
821+
.buffered(ENV_VARS.block_batch_size)
822+
}
823+
785824
/// Request blocks ptrs for numbers through JSON-RPC.
786825
///
787826
/// Reorg safety: If ids are numbers, they must be a final blocks.
@@ -1650,26 +1689,68 @@ impl EthereumAdapterTrait for EthereumAdapter {
16501689
Ok(decoded)
16511690
}
16521691

1653-
// This is a ugly temporary implementation to get the block ptrs for a range of blocks
1692+
/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
16541693
async fn load_blocks_by_numbers(
16551694
&self,
16561695
logger: Logger,
16571696
chain_store: Arc<dyn ChainStore>,
16581697
block_numbers: HashSet<BlockNumber>,
16591698
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send> {
1660-
let block_hashes = block_numbers
1699+
let blocks_map: BTreeMap<i32, Vec<json::Value>> = chain_store
1700+
.cheap_clone()
1701+
.blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
1702+
.await
1703+
.map_err(|e| {
1704+
error!(&logger, "Error accessing block cache {}", e);
1705+
e
1706+
})
1707+
.unwrap_or_default();
1708+
1709+
let mut blocks: Vec<Arc<LightEthereumBlock>> = blocks_map
16611710
.into_iter()
1662-
.map(|number| {
1663-
chain_store
1664-
.block_hashes_by_block_number(number)
1665-
.unwrap()
1666-
.first()
1667-
.unwrap()
1668-
.as_h256()
1711+
.filter_map(|(_number, values)| {
1712+
if values.len() == 1 {
1713+
json::from_value(values[0].clone()).ok()
1714+
} else {
1715+
None
1716+
}
16691717
})
1670-
.collect::<HashSet<_>>();
1718+
.collect::<Vec<_>>();
16711719

1672-
self.load_blocks(logger, chain_store, block_hashes).await
1720+
let missing_blocks: Vec<i32> = block_numbers
1721+
.into_iter()
1722+
.filter(|&number| !blocks.iter().any(|block| block.number() == number))
1723+
.collect();
1724+
1725+
if !missing_blocks.is_empty() {
1726+
debug!(
1727+
logger,
1728+
"Loading {} block(s) not in the block cache",
1729+
missing_blocks.len()
1730+
);
1731+
}
1732+
1733+
Box::new(
1734+
self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks)
1735+
.collect()
1736+
.map(move |new_blocks| {
1737+
let upsert_blocks: Vec<_> = new_blocks
1738+
.iter()
1739+
.map(|block| BlockFinality::Final(block.clone()))
1740+
.collect();
1741+
let block_refs: Vec<_> = upsert_blocks
1742+
.iter()
1743+
.map(|block| block as &dyn graph::blockchain::Block)
1744+
.collect();
1745+
if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) {
1746+
error!(logger, "Error writing to block cache {}", e);
1747+
}
1748+
blocks.extend(new_blocks);
1749+
blocks.sort_by_key(|block| block.number);
1750+
stream::iter_ok(blocks)
1751+
})
1752+
.flatten_stream(),
1753+
)
16731754
}
16741755

16751756
/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.

graph/src/blockchain/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl<C: Blockchain> ChainClient<C> {
4141
pub fn rpc(&self) -> anyhow::Result<&C::Client> {
4242
match self {
4343
Self::Rpc(rpc) => Ok(rpc),
44-
_ => Err(anyhow!("rpc endpoint requested on firehose chain client")),
44+
Self::Firehose(_) => Err(anyhow!("rpc endpoint requested on firehose chain client")),
4545
}
4646
}
4747
}

graph/src/components/store/traits.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,12 @@ pub trait ChainStore: Send + Sync + 'static {
522522
hashes: Vec<BlockHash>,
523523
) -> Result<Vec<serde_json::Value>, Error>;
524524

525+
/// Returns the blocks present in the store for the given block numbers.
526+
async fn blocks_by_numbers(
527+
self: Arc<Self>,
528+
numbers: Vec<BlockNumber>,
529+
) -> Result<BTreeMap<BlockNumber, Vec<serde_json::Value>>, Error>;
530+
525531
/// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching
526532
/// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding
527533
/// a child of `root`. Returns None if unable to complete due to missing blocks in the chain

0 commit comments

Comments
 (0)