@@ -3,8 +3,8 @@ use anyhow::{Context, Error};
33use graph:: blockchain:: client:: ChainClient ;
44use graph:: blockchain:: firehose_block_ingestor:: { FirehoseBlockIngestor , Transforms } ;
55use graph:: blockchain:: {
6- BlockIngestor , BlockTime , BlockchainKind , ChainIdentifier , TriggerFilterWrapper ,
7- TriggersAdapterSelector ,
6+ BlockIngestor , BlockTime , BlockchainKind , ChainIdentifier , ExtendedBlockPtr ,
7+ TriggerFilterWrapper , TriggersAdapterSelector ,
88} ;
99use graph:: components:: network_provider:: ChainName ;
1010use graph:: components:: store:: { DeploymentCursorTracker , SourceableStore } ;
@@ -156,6 +156,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
156156 unified_api_version : UnifiedMappingApiVersion ,
157157 ) -> Result < Box < dyn BlockStream < Chain > > > {
158158 let requirements = filter. chain_filter . node_capabilities ( ) ;
159+ let is_using_subgraph_composition = !source_subgraph_stores. is_empty ( ) ;
159160 let adapter = TriggersAdapterWrapper :: new (
160161 chain
161162 . triggers_adapter ( & deployment, & requirements, unified_api_version. clone ( ) )
@@ -181,20 +182,32 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
181182 // This is ok because Celo blocks are always final. And we _need_ to do this because
182183 // some events appear only in eth_getLogs but not in transaction receipts.
183184 // See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
184- let chain_id = match chain. chain_client ( ) . as_ref ( ) {
185+ let reorg_threshold = match chain. chain_client ( ) . as_ref ( ) {
185186 ChainClient :: Rpc ( adapter) => {
186- adapter
187+ let chain_id = adapter
187188 . cheapest ( )
188189 . await
189190 . ok_or ( anyhow ! ( "unable to get eth adapter for chan_id call" ) ) ?
190191 . chain_id ( )
191- . await ?
192+ . await ?;
193+
194+ if CELO_CHAIN_IDS . contains ( & chain_id) {
195+ 0
196+ } else {
197+ chain. reorg_threshold
198+ }
192199 }
193- _ => panic ! ( "expected rpc when using polling blockstream" ) ,
200+ _ if is_using_subgraph_composition => chain. reorg_threshold ,
201+ _ => panic ! (
202+ "expected rpc when using polling blockstream : {}" ,
203+ is_using_subgraph_composition
204+ ) ,
194205 } ;
195- let reorg_threshold = match CELO_CHAIN_IDS . contains ( & chain_id) {
196- false => chain. reorg_threshold ,
197- true => 0 ,
206+
207+ let max_block_range_size = if is_using_subgraph_composition {
208+ ENV_VARS . max_block_range_size * 10
209+ } else {
210+ ENV_VARS . max_block_range_size
198211 } ;
199212
200213 Ok ( Box :: new ( PollingBlockStream :: new (
@@ -207,7 +220,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
207220 start_blocks,
208221 reorg_threshold,
209222 logger,
210- ENV_VARS . max_block_range_size ,
223+ max_block_range_size,
211224 ENV_VARS . target_triggers_per_block_range ,
212225 unified_api_version,
213226 subgraph_current_block,
@@ -617,6 +630,8 @@ pub enum BlockFinality {
617630
618631 // If a block may still be reorged, we need to work with more local data.
619632 NonFinal ( EthereumBlockWithCalls ) ,
633+
634+ Ptr ( Arc < ExtendedBlockPtr > ) ,
620635}
621636
622637impl Default for BlockFinality {
@@ -630,6 +645,7 @@ impl BlockFinality {
630645 match self {
631646 BlockFinality :: Final ( block) => block,
632647 BlockFinality :: NonFinal ( block) => & block. ethereum_block . block ,
648+ BlockFinality :: Ptr ( _) => unreachable ! ( "light_block called on HeaderOnly" ) ,
633649 }
634650 }
635651}
@@ -639,6 +655,7 @@ impl<'a> From<&'a BlockFinality> for BlockPtr {
639655 match block {
640656 BlockFinality :: Final ( b) => BlockPtr :: from ( & * * b) ,
641657 BlockFinality :: NonFinal ( b) => BlockPtr :: from ( & b. ethereum_block ) ,
658+ BlockFinality :: Ptr ( b) => BlockPtr :: new ( b. hash . clone ( ) , b. number ) ,
642659 }
643660 }
644661}
@@ -648,13 +665,17 @@ impl Block for BlockFinality {
648665 match self {
649666 BlockFinality :: Final ( block) => block. block_ptr ( ) ,
650667 BlockFinality :: NonFinal ( block) => block. ethereum_block . block . block_ptr ( ) ,
668+ BlockFinality :: Ptr ( block) => BlockPtr :: new ( block. hash . clone ( ) , block. number ) ,
651669 }
652670 }
653671
654672 fn parent_ptr ( & self ) -> Option < BlockPtr > {
655673 match self {
656674 BlockFinality :: Final ( block) => block. parent_ptr ( ) ,
657675 BlockFinality :: NonFinal ( block) => block. ethereum_block . block . parent_ptr ( ) ,
676+ BlockFinality :: Ptr ( block) => {
677+ Some ( BlockPtr :: new ( block. parent_hash . clone ( ) , block. number - 1 ) )
678+ }
658679 }
659680 }
660681
@@ -687,13 +708,15 @@ impl Block for BlockFinality {
687708 json:: to_value ( eth_block)
688709 }
689710 BlockFinality :: NonFinal ( block) => json:: to_value ( & block. ethereum_block ) ,
711+ BlockFinality :: Ptr ( _) => Ok ( json:: Value :: Null ) ,
690712 }
691713 }
692714
693715 fn timestamp ( & self ) -> BlockTime {
694716 let ts = match self {
695717 BlockFinality :: Final ( block) => block. timestamp ,
696718 BlockFinality :: NonFinal ( block) => block. ethereum_block . block . timestamp ,
719+ BlockFinality :: Ptr ( block) => block. timestamp ,
697720 } ;
698721 let ts = i64:: try_from ( ts. as_u64 ( ) ) . unwrap ( ) ;
699722 BlockTime :: since_epoch ( ts, 0 )
@@ -735,7 +758,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
735758 . await
736759 }
737760
738- async fn load_blocks_by_numbers (
761+ async fn load_block_ptrs_by_numbers (
739762 & self ,
740763 logger : Logger ,
741764 block_numbers : HashSet < BlockNumber > ,
@@ -749,9 +772,9 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
749772 . await ?;
750773
751774 let blocks = adapter
752- . load_blocks_by_numbers ( logger, self . chain_store . clone ( ) , block_numbers)
775+ . load_block_ptrs_by_numbers ( logger, self . chain_store . clone ( ) , block_numbers)
753776 . await
754- . map ( |block| BlockFinality :: Final ( block) )
777+ . map ( |block| BlockFinality :: Ptr ( block) )
755778 . collect ( )
756779 . compat ( )
757780 . await ?;
@@ -812,6 +835,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
812835 triggers. append ( & mut parse_block_triggers ( & filter. block , full_block) ) ;
813836 Ok ( BlockWithTriggers :: new ( block, triggers, logger) )
814837 }
838+ BlockFinality :: Ptr ( _) => unreachable ! ( "triggers_in_block called on HeaderOnly" ) ,
815839 }
816840 }
817841
0 commit comments