Skip to content

Commit d66db1c

Browse files
committed
graph, chain/ethereum: use block ptrs when fetching blocks for composed subgraphs
1 parent 6f92f9e commit d66db1c

File tree

16 files changed

+228
-64
lines changed

16 files changed

+228
-64
lines changed

chain/arweave/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
269269
}))
270270
}
271271

272-
async fn load_blocks_by_numbers(
272+
async fn load_block_ptrs_by_numbers(
273273
&self,
274274
_logger: Logger,
275275
_block_numbers: HashSet<BlockNumber>,

chain/cosmos/src/chain.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,15 +197,13 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
197197
) -> Result<Option<codec::Block>, Error> {
198198
panic!("Should never be called since not used by FirehoseBlockStream")
199199
}
200-
201-
async fn load_blocks_by_numbers(
200+
async fn load_block_ptrs_by_numbers(
202201
&self,
203202
_logger: Logger,
204203
_block_numbers: HashSet<BlockNumber>,
205204
) -> Result<Vec<Block>, Error> {
206-
unimplemented!()
205+
todo!()
207206
}
208-
209207
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
210208
unimplemented!()
211209
}

chain/ethereum/src/adapter.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::Error;
22
use ethabi::{Error as ABIError, Function, ParamType, Token};
3+
use graph::blockchain::BlockPtrExt;
34
use graph::blockchain::ChainIdentifier;
45
use graph::components::subgraph::MappingError;
56
use graph::data::store::ethereum::call;
@@ -1109,12 +1110,12 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11091110
block_hash: H256,
11101111
) -> Box<dyn Future<Item = LightEthereumBlock, Error = Error> + Send>;
11111112

1112-
async fn load_blocks_by_numbers(
1113+
async fn load_block_ptrs_by_numbers(
11131114
&self,
11141115
_logger: Logger,
11151116
_chain_store: Arc<dyn ChainStore>,
11161117
_block_numbers: HashSet<BlockNumber>,
1117-
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send>;
1118+
) -> Box<dyn Stream<Item = Arc<BlockPtrExt>, Error = Error> + Send>;
11181119

11191120
/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
11201121
/// May use the `chain_store` as a cache.

chain/ethereum/src/chain.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use anyhow::{Context, Error};
33
use graph::blockchain::client::ChainClient;
44
use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms};
55
use graph::blockchain::{
6-
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper,
6+
BlockIngestor, BlockPtrExt, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper,
77
TriggersAdapterSelector,
88
};
99
use graph::components::adapter::ChainId;
@@ -156,6 +156,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
156156
unified_api_version: UnifiedMappingApiVersion,
157157
) -> Result<Box<dyn BlockStream<Chain>>> {
158158
let requirements = filter.chain_filter.node_capabilities();
159+
let is_using_subgraph_composition = !source_subgraph_stores.is_empty();
159160
let adapter = TriggersAdapterWrapper::new(
160161
chain
161162
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
@@ -181,20 +182,26 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
181182
// This is ok because Celo blocks are always final. And we _need_ to do this because
182183
// some events appear only in eth_getLogs but not in transaction receipts.
183184
// See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
184-
let chain_id = match chain.chain_client().as_ref() {
185+
let reorg_threshold = match chain.chain_client().as_ref() {
185186
ChainClient::Rpc(adapter) => {
186-
adapter
187+
let chain_id = adapter
187188
.cheapest()
188189
.await
189190
.ok_or(anyhow!("unable to get eth adapter for chan_id call"))?
190191
.chain_id()
191-
.await?
192+
.await?;
193+
194+
if CELO_CHAIN_IDS.contains(&chain_id) {
195+
0
196+
} else {
197+
chain.reorg_threshold
198+
}
192199
}
193-
_ => panic!("expected rpc when using polling blockstream"),
194-
};
195-
let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) {
196-
false => chain.reorg_threshold,
197-
true => 0,
200+
_ if is_using_subgraph_composition => chain.reorg_threshold,
201+
_ => panic!(
202+
"expected rpc when using polling blockstream : {}",
203+
is_using_subgraph_composition
204+
),
198205
};
199206

200207
Ok(Box::new(PollingBlockStream::new(
@@ -617,6 +624,8 @@ pub enum BlockFinality {
617624

618625
// If a block may still be reorged, we need to work with more local data.
619626
NonFinal(EthereumBlockWithCalls),
627+
628+
Ptr(Arc<BlockPtrExt>),
620629
}
621630

622631
impl Default for BlockFinality {
@@ -630,6 +639,7 @@ impl BlockFinality {
630639
match self {
631640
BlockFinality::Final(block) => block,
632641
BlockFinality::NonFinal(block) => &block.ethereum_block.block,
642+
BlockFinality::Ptr(_) => unreachable!("light_block called on HeaderOnly"),
633643
}
634644
}
635645
}
@@ -639,6 +649,7 @@ impl<'a> From<&'a BlockFinality> for BlockPtr {
639649
match block {
640650
BlockFinality::Final(b) => BlockPtr::from(&**b),
641651
BlockFinality::NonFinal(b) => BlockPtr::from(&b.ethereum_block),
652+
BlockFinality::Ptr(b) => BlockPtr::new(b.hash.clone(), b.number),
642653
}
643654
}
644655
}
@@ -648,13 +659,17 @@ impl Block for BlockFinality {
648659
match self {
649660
BlockFinality::Final(block) => block.block_ptr(),
650661
BlockFinality::NonFinal(block) => block.ethereum_block.block.block_ptr(),
662+
BlockFinality::Ptr(block) => BlockPtr::new(block.hash.clone(), block.number),
651663
}
652664
}
653665

654666
fn parent_ptr(&self) -> Option<BlockPtr> {
655667
match self {
656668
BlockFinality::Final(block) => block.parent_ptr(),
657669
BlockFinality::NonFinal(block) => block.ethereum_block.block.parent_ptr(),
670+
BlockFinality::Ptr(block) => {
671+
Some(BlockPtr::new(block.parent_hash.clone(), block.number - 1))
672+
}
658673
}
659674
}
660675

@@ -687,13 +702,15 @@ impl Block for BlockFinality {
687702
json::to_value(eth_block)
688703
}
689704
BlockFinality::NonFinal(block) => json::to_value(&block.ethereum_block),
705+
BlockFinality::Ptr(_) => Ok(json::Value::Null),
690706
}
691707
}
692708

693709
fn timestamp(&self) -> BlockTime {
694710
let ts = match self {
695711
BlockFinality::Final(block) => block.timestamp,
696712
BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp,
713+
BlockFinality::Ptr(block) => block.timestamp,
697714
};
698715
let ts = i64::try_from(ts.as_u64()).unwrap();
699716
BlockTime::since_epoch(ts, 0)
@@ -735,7 +752,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
735752
.await
736753
}
737754

738-
async fn load_blocks_by_numbers(
755+
async fn load_block_ptrs_by_numbers(
739756
&self,
740757
logger: Logger,
741758
block_numbers: HashSet<BlockNumber>,
@@ -749,9 +766,9 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
749766
.await?;
750767

751768
let blocks = adapter
752-
.load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers)
769+
.load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers)
753770
.await
754-
.map(|block| BlockFinality::Final(block))
771+
.map(|block| BlockFinality::Ptr(block))
755772
.collect()
756773
.compat()
757774
.await?;
@@ -812,6 +829,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
812829
triggers.append(&mut parse_block_triggers(&filter.block, full_block));
813830
Ok(BlockWithTriggers::new(block, triggers, logger))
814831
}
832+
BlockFinality::Ptr(_) => unreachable!("triggers_in_block called on HeaderOnly"),
815833
}
816834
}
817835

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use futures03::{future::BoxFuture, stream::FuturesUnordered};
22
use graph::blockchain::client::ChainClient;
33
use graph::blockchain::BlockHash;
4+
use graph::blockchain::BlockPtrExt;
45
use graph::blockchain::ChainIdentifier;
56

67
use graph::components::transaction_receipt::LightTransactionReceipt;
@@ -783,11 +784,11 @@ impl EthereumAdapter {
783784
}
784785

785786
/// Request blocks by number through JSON-RPC.
786-
fn load_blocks_by_numbers_rpc(
787+
fn load_block_ptrs_by_numbers_rpc(
787788
&self,
788789
logger: Logger,
789790
numbers: Vec<BlockNumber>,
790-
) -> impl Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send {
791+
) -> impl Stream<Item = Arc<BlockPtrExt>, Error = Error> + Send {
791792
let web3 = self.web3.clone();
792793

793794
stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
@@ -798,19 +799,29 @@ impl EthereumAdapter {
798799
.run(move || {
799800
Box::pin(
800801
web3.eth()
801-
.block_with_txs(BlockId::Number(Web3BlockNumber::Number(
802-
number.into(),
803-
))),
802+
.block(BlockId::Number(Web3BlockNumber::Number(number.into()))),
804803
)
805804
.compat()
806805
.from_err::<Error>()
807806
.and_then(move |block| {
808-
block.map(Arc::new).ok_or_else(|| {
809-
anyhow::anyhow!(
810-
"Ethereum node did not find block with number {:?}",
811-
number
812-
)
813-
})
807+
block
808+
.map(|block| {
809+
let ptr = BlockPtrExt::try_from((
810+
block.hash,
811+
block.number,
812+
block.parent_hash,
813+
block.timestamp,
814+
))
815+
.unwrap();
816+
817+
Arc::new(ptr)
818+
})
819+
.ok_or_else(|| {
820+
anyhow::anyhow!(
821+
"Ethereum node did not find block with number {:?}",
822+
number
823+
)
824+
})
814825
})
815826
.compat()
816827
})
@@ -1690,23 +1701,23 @@ impl EthereumAdapterTrait for EthereumAdapter {
16901701
}
16911702

16921703
/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
1693-
async fn load_blocks_by_numbers(
1704+
async fn load_block_ptrs_by_numbers(
16941705
&self,
16951706
logger: Logger,
16961707
chain_store: Arc<dyn ChainStore>,
16971708
block_numbers: HashSet<BlockNumber>,
1698-
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send> {
1709+
) -> Box<dyn Stream<Item = Arc<BlockPtrExt>, Error = Error> + Send> {
16991710
let blocks_map: BTreeMap<i32, Vec<json::Value>> = chain_store
17001711
.cheap_clone()
1701-
.blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
1712+
.block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
17021713
.await
17031714
.map_err(|e| {
17041715
error!(&logger, "Error accessing block cache {}", e);
17051716
e
17061717
})
17071718
.unwrap_or_default();
17081719

1709-
let mut blocks: Vec<Arc<LightEthereumBlock>> = blocks_map
1720+
let mut blocks: Vec<Arc<BlockPtrExt>> = blocks_map
17101721
.into_iter()
17111722
.filter_map(|(_number, values)| {
17121723
if values.len() == 1 {
@@ -1719,7 +1730,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
17191730

17201731
let missing_blocks: Vec<i32> = block_numbers
17211732
.into_iter()
1722-
.filter(|&number| !blocks.iter().any(|block| block.number() == number))
1733+
.filter(|&number| !blocks.iter().any(|block| block.block_number() == number))
17231734
.collect();
17241735

17251736
if !missing_blocks.is_empty() {
@@ -1731,20 +1742,9 @@ impl EthereumAdapterTrait for EthereumAdapter {
17311742
}
17321743

17331744
Box::new(
1734-
self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks)
1745+
self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks)
17351746
.collect()
17361747
.map(move |new_blocks| {
1737-
let upsert_blocks: Vec<_> = new_blocks
1738-
.iter()
1739-
.map(|block| BlockFinality::Final(block.clone()))
1740-
.collect();
1741-
let block_refs: Vec<_> = upsert_blocks
1742-
.iter()
1743-
.map(|block| block as &dyn graph::blockchain::Block)
1744-
.collect();
1745-
if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) {
1746-
error!(logger, "Error writing to block cache {}", e);
1747-
}
17481748
blocks.extend(new_blocks);
17491749
blocks.sort_by_key(|block| block.number);
17501750
stream::iter_ok(blocks)
@@ -2028,6 +2028,9 @@ pub(crate) async fn get_calls(
20282028
calls: Some(calls),
20292029
}))
20302030
}
2031+
BlockFinality::Ptr(_) => {
2032+
unreachable!("get_calls called with BlockFinality::Ptr")
2033+
}
20312034
}
20322035
}
20332036

@@ -2209,6 +2212,11 @@ async fn filter_call_triggers_from_unsuccessful_transactions(
22092212
"this function should not be called when dealing with non-final blocks"
22102213
)
22112214
}
2215+
BlockFinality::Ptr(_block) => {
2216+
unreachable!(
2217+
"this function should not be called when dealing with header-only blocks"
2218+
)
2219+
}
22122220
}
22132221
};
22142222

chain/near/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
325325
panic!("Should never be called since not used by FirehoseBlockStream")
326326
}
327327

328-
async fn load_blocks_by_numbers(
328+
async fn load_block_ptrs_by_numbers(
329329
&self,
330330
_logger: Logger,
331331
_block_numbers: HashSet<BlockNumber>,

chain/starknet/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
375375
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
376376
}
377377

378-
async fn load_blocks_by_numbers(
378+
async fn load_block_ptrs_by_numbers(
379379
&self,
380380
_logger: Logger,
381381
_block_numbers: HashSet<BlockNumber>,

chain/substreams/src/trigger.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl blockchain::TriggersAdapter<Chain> for TriggersAdapter {
136136
unimplemented!()
137137
}
138138

139-
async fn load_blocks_by_numbers(
139+
async fn load_block_ptrs_by_numbers(
140140
&self,
141141
_logger: Logger,
142142
_block_numbers: HashSet<BlockNumber>,

graph/src/blockchain/block_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ async fn scan_subgraph_triggers<C: Blockchain>(
412412
block_numbers.insert(to);
413413

414414
let blocks = adapter
415-
.load_blocks_by_numbers(logger.clone(), block_numbers)
415+
.load_block_ptrs_by_numbers(logger.clone(), block_numbers)
416416
.await?;
417417

418418
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
@@ -581,7 +581,7 @@ pub trait TriggersAdapter<C: Blockchain>: Send + Sync {
581581
/// Get pointer to parent of `block`. This is called when reverting `block`.
582582
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error>;
583583

584-
async fn load_blocks_by_numbers(
584+
async fn load_block_ptrs_by_numbers(
585585
&self,
586586
logger: Logger,
587587
block_numbers: HashSet<BlockNumber>,

graph/src/blockchain/mock.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ impl TriggersAdapter<MockBlockchain> for MockTriggersAdapter {
233233
todo!()
234234
}
235235

236-
async fn load_blocks_by_numbers(
236+
async fn load_block_ptrs_by_numbers(
237237
&self,
238238
_logger: Logger,
239239
_block_numbers: HashSet<BlockNumber>,

0 commit comments

Comments
 (0)