@@ -11,11 +11,13 @@ use graph::components::store::{DeploymentCursorTracker, WritableStore};
1111use graph:: data:: subgraph:: UnifiedMappingApiVersion ;
1212use graph:: firehose:: { FirehoseEndpoint , ForkStep } ;
1313use graph:: futures03:: compat:: Future01CompatExt ;
14+ use graph:: futures03:: TryStreamExt ;
1415use graph:: prelude:: {
1516 BlockHash , ComponentLoggerConfig , DeploymentHash , ElasticComponentLoggerConfig , EthereumBlock ,
1617 EthereumCallCache , LightEthereumBlock , LightEthereumBlockExt , MetricsRegistry ,
1718} ;
1819use graph:: schema:: InputSchema ;
20+ use graph:: slog:: { debug, error, warn} ;
1921use graph:: substreams:: Clock ;
2022use graph:: {
2123 blockchain:: {
@@ -243,7 +245,7 @@ impl BlockRefetcher<Chain> for EthereumBlockRefetcher {
243245 logger : & Logger ,
244246 cursor : FirehoseCursor ,
245247 ) -> Result < BlockFinality , Error > {
246- let endpoint = chain. chain_client ( ) . firehose_endpoint ( ) . await ?;
248+ let endpoint: Arc < FirehoseEndpoint > = chain. chain_client ( ) . firehose_endpoint ( ) . await ?;
247249 let block = endpoint. get_block :: < codec:: Block > ( cursor, logger) . await ?;
248250 let ethereum_block: EthereumBlockWithCalls = ( & block) . try_into ( ) ?;
249251 Ok ( BlockFinality :: NonFinal ( ethereum_block) )
@@ -714,13 +716,17 @@ impl Block for BlockFinality {
714716 }
715717
716718 fn timestamp ( & self ) -> BlockTime {
717- let ts = match self {
718- BlockFinality :: Final ( block) => block. timestamp ,
719- BlockFinality :: NonFinal ( block) => block. ethereum_block . block . timestamp ,
719+ match self {
720+ BlockFinality :: Final ( block) => {
721+ let ts = i64:: try_from ( block. timestamp . as_u64 ( ) ) . unwrap ( ) ;
722+ BlockTime :: since_epoch ( ts, 0 )
723+ }
724+ BlockFinality :: NonFinal ( block) => {
725+ let ts = i64:: try_from ( block. ethereum_block . block . timestamp . as_u64 ( ) ) . unwrap ( ) ;
726+ BlockTime :: since_epoch ( ts, 0 )
727+ }
720728 BlockFinality :: Ptr ( block) => block. timestamp ,
721- } ;
722- let ts = i64:: try_from ( ts. as_u64 ( ) ) . unwrap ( ) ;
723- BlockTime :: since_epoch ( ts, 0 )
729+ }
724730 }
725731}
726732
@@ -735,6 +741,61 @@ pub struct TriggersAdapter {
735741 unified_api_version : UnifiedMappingApiVersion ,
736742}
737743
744+ /// Fetches blocks from the cache based on block numbers, excluding duplicates
745+ /// (i.e., multiple blocks for the same number), and identifying missing blocks that
746+ /// need to be fetched via RPC/Firehose. Returns a tuple of the found blocks and the missing block numbers.
747+ async fn fetch_unique_blocks_from_cache (
748+ logger : & Logger ,
749+ chain_store : Arc < dyn ChainStore > ,
750+ block_numbers : HashSet < BlockNumber > ,
751+ ) -> ( Vec < Arc < BlockPtrExt > > , Vec < i32 > ) {
752+ // Load blocks from the cache
753+ let blocks_map = chain_store
754+ . cheap_clone ( )
755+ . block_ptrs_by_numbers ( block_numbers. iter ( ) . map ( |& b| b. into ( ) ) . collect :: < Vec < _ > > ( ) )
756+ . await
757+ . map_err ( |e| {
758+ error ! ( logger, "Error accessing block cache {}" , e) ;
759+ e
760+ } )
761+ . unwrap_or_default ( ) ;
762+
763+ // Collect blocks and filter out ones with multiple entries
764+ let blocks: Vec < Arc < BlockPtrExt > > = blocks_map
765+ . into_iter ( )
766+ . filter_map ( |( number, values) | {
767+ if values. len ( ) == 1 {
768+ Some ( Arc :: new ( values[ 0 ] . clone ( ) ) )
769+ } else {
770+ warn ! (
771+ logger,
772+ "Expected one block for block number {:?}, found {}" ,
773+ number,
774+ values. len( )
775+ ) ;
776+ None
777+ }
778+ } )
779+ . collect ( ) ;
780+
781+ // Identify missing blocks
782+ let missing_blocks: Vec < i32 > = block_numbers
783+ . into_iter ( )
784+ . filter ( |& number| !blocks. iter ( ) . any ( |block| block. block_number ( ) == number) )
785+ . collect ( ) ;
786+
787+ if !missing_blocks. is_empty ( ) {
788+ debug ! (
789+ logger,
790+ "Loading {} block(s) not in the block cache" ,
791+ missing_blocks. len( )
792+ ) ;
793+ debug ! ( logger, "Missing blocks {:?}" , missing_blocks) ;
794+ }
795+
796+ ( blocks, missing_blocks)
797+ }
798+
738799#[ async_trait]
739800impl TriggersAdapterTrait < Chain > for TriggersAdapter {
740801 async fn scan_triggers (
@@ -764,21 +825,71 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
764825 logger : Logger ,
765826 block_numbers : HashSet < BlockNumber > ,
766827 ) -> Result < Vec < BlockFinality > > {
767- use graph:: futures01:: stream:: Stream ;
828+ let blocks = match & * self . chain_client {
829+ ChainClient :: Firehose ( endpoints, _) => {
830+ let endpoint = endpoints. endpoint ( ) . await ?;
831+ let chain_store = self . chain_store . clone ( ) ;
832+
833+ // Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
834+ // that we can fetch the right block from the RPC.
835+ let ( mut cached_blocks, missing_block_numbers) =
836+ fetch_unique_blocks_from_cache ( & logger, chain_store, block_numbers) . await ;
837+
838+ // Then fetch missing blocks from RPC
839+ if !missing_block_numbers. is_empty ( ) {
840+ let missing_blocks = endpoint
841+ . load_blocks_by_numbers :: < codec:: Block > (
842+ missing_block_numbers. iter ( ) . map ( |& n| n as u64 ) . collect ( ) ,
843+ & logger,
844+ )
845+ . await ?
846+ . into_iter ( )
847+ . map ( |block| {
848+ let block: BlockPtrExt = BlockPtrExt {
849+ hash : block. hash ( ) ,
850+ number : block. number ( ) ,
851+ parent_hash : block. parent_hash ( ) . unwrap_or_default ( ) ,
852+ timestamp : block. timestamp ( ) ,
853+ } ;
854+ Arc :: new ( block)
855+ } )
856+ . collect :: < Vec < _ > > ( ) ;
857+
858+ // Combine cached and newly fetched blocks
859+ cached_blocks. extend ( missing_blocks) ;
860+ cached_blocks. sort_by_key ( |block| block. number ) ;
861+ }
768862
769- let adapter = self
770- . chain_client
771- . rpc ( ) ?
772- . cheapest_with ( & self . capabilities )
773- . await ?;
863+ vec ! [ ]
864+ }
865+ ChainClient :: Rpc ( client) => {
866+ let adapter = client. cheapest_with ( & self . capabilities ) . await ?;
867+ let chain_store = self . chain_store . clone ( ) ;
868+
869+ // Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
870+ // that we can fetch the right block from the RPC.
871+ let ( mut cached_blocks, missing_block_numbers) =
872+ fetch_unique_blocks_from_cache ( & logger, chain_store, block_numbers) . await ;
873+
874+ // Then fetch missing blocks from RPC
875+ if !missing_block_numbers. is_empty ( ) {
876+ let missing_blocks: Vec < Arc < BlockPtrExt > > = adapter
877+ . load_block_ptrs_by_numbers_rpc ( logger. clone ( ) , missing_block_numbers)
878+ . try_collect ( )
879+ . await ?;
880+
881+ // Combine cached and newly fetched blocks
882+ cached_blocks. extend ( missing_blocks) ;
883+ cached_blocks. sort_by_key ( |block| block. number ) ;
884+ }
774885
775- let blocks = adapter
776- . load_block_ptrs_by_numbers ( logger , self . chain_store . clone ( ) , block_numbers )
777- . await
778- . map ( |block| BlockFinality :: Ptr ( block ) )
779- . collect ( )
780- . compat ( )
781- . await ? ;
886+ // Convert to BlockFinality
887+ let blocks : Vec < BlockFinality > =
888+ cached_blocks . into_iter ( ) . map ( BlockFinality :: Ptr ) . collect ( ) ;
889+
890+ blocks
891+ }
892+ } ;
782893
783894 Ok ( blocks)
784895 }
0 commit comments