@@ -7,9 +7,8 @@ use anyhow::Error;
77use async_stream:: stream;
88use futures03:: Stream ;
99use prost_types:: Any ;
10- use std:: collections:: { BTreeMap , HashSet } ;
10+ use std:: collections:: { BTreeMap , HashMap , HashSet } ;
1111use std:: fmt;
12- use std:: ops:: Range ;
1312use std:: sync:: Arc ;
1413use std:: time:: Instant ;
1514use thiserror:: Error ;
@@ -22,7 +21,7 @@ use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore};
2221use crate :: data:: subgraph:: UnifiedMappingApiVersion ;
2322use crate :: firehose:: { self , FirehoseEndpoint } ;
2423use crate :: futures03:: stream:: StreamExt as _;
25- use crate :: schema:: InputSchema ;
24+ use crate :: schema:: { EntityType , InputSchema } ;
2625use crate :: substreams_rpc:: response:: Message ;
2726use crate :: { prelude:: * , prometheus:: labels} ;
2827
@@ -319,19 +318,159 @@ impl<C: Blockchain> BlockWithTriggers<C> {
319318/// logic for each chain, increasing code repetition.
320319pub struct TriggersAdapterWrapper < C : Blockchain > {
321320 pub adapter : Arc < dyn TriggersAdapter < C > > ,
322- pub source_subgraph_stores : Vec < Arc < dyn SourceableStore > > ,
321+ pub source_subgraph_stores : HashMap < DeploymentHash , Arc < dyn SourceableStore > > ,
323322}
324323
325324impl < C : Blockchain > TriggersAdapterWrapper < C > {
326325 pub fn new (
327326 adapter : Arc < dyn TriggersAdapter < C > > ,
328327 source_subgraph_stores : Vec < Arc < dyn SourceableStore > > ,
329328 ) -> Self {
329+ let stores_map: HashMap < _ , _ > = source_subgraph_stores
330+ . iter ( )
331+ . map ( |store| ( store. input_schema ( ) . id ( ) . clone ( ) , store. clone ( ) ) )
332+ . collect ( ) ;
330333 Self {
331334 adapter,
332- source_subgraph_stores,
335+ source_subgraph_stores : stores_map ,
333336 }
334337 }
338+
339+ pub async fn blocks_with_subgraph_triggers (
340+ & self ,
341+ logger : & Logger ,
342+ subgraph_filter : & SubgraphFilter ,
343+ range : SubgraphTriggerScanRange < C > ,
344+ ) -> Result < Vec < BlockWithTriggers < C > > , Error > {
345+ let store = self
346+ . source_subgraph_stores
347+ . get ( & subgraph_filter. subgraph )
348+ . ok_or_else ( || anyhow ! ( "Store not found for subgraph: {}" , subgraph_filter. subgraph) ) ?;
349+
350+ let schema = <dyn crate :: components:: store:: SourceableStore >:: input_schema ( store) ;
351+
352+ let adapter = self . adapter . clone ( ) ;
353+
354+ scan_subgraph_triggers :: < C > ( logger, store, & adapter, & schema, & subgraph_filter, range) . await
355+ }
356+ }
357+
358+ fn create_subgraph_trigger_from_entities (
359+ filter : & SubgraphFilter ,
360+ entities : & Vec < EntityWithType > ,
361+ ) -> Vec < subgraph:: TriggerData > {
362+ entities
363+ . iter ( )
364+ . map ( |e| subgraph:: TriggerData {
365+ source : filter. subgraph . clone ( ) ,
366+ entity : e. entity . clone ( ) ,
367+ entity_type : e. entity_type . as_str ( ) . to_string ( ) ,
368+ } )
369+ . collect ( )
370+ }
371+
372+ async fn create_subgraph_triggers < C : Blockchain > (
373+ logger : Logger ,
374+ blocks : Vec < C :: Block > ,
375+ filter : & SubgraphFilter ,
376+ entities : BTreeMap < BlockNumber , Vec < EntityWithType > > ,
377+ ) -> Result < Vec < BlockWithTriggers < C > > , Error > {
378+ let logger_clone = logger. cheap_clone ( ) ;
379+
380+ let blocks: Vec < BlockWithTriggers < C > > = blocks
381+ . into_iter ( )
382+ . map ( |block| {
383+ let block_number = block. number ( ) ;
384+ match entities. get ( & block_number) {
385+ Some ( e) => {
386+ let trigger_data = create_subgraph_trigger_from_entities ( filter, e) ;
387+ BlockWithTriggers :: new_with_subgraph_triggers (
388+ block,
389+ trigger_data,
390+ & logger_clone,
391+ )
392+ }
393+ None => BlockWithTriggers :: new_with_subgraph_triggers ( block, vec ! [ ] , & logger_clone) ,
394+ }
395+ } )
396+ . collect ( ) ;
397+
398+ Ok ( blocks)
399+ }
400+
401+ pub enum SubgraphTriggerScanRange < C : Blockchain > {
402+ Single ( C :: Block ) ,
403+ Range ( BlockNumber , BlockNumber ) ,
404+ }
405+
406+ async fn scan_subgraph_triggers < C : Blockchain > (
407+ logger : & Logger ,
408+ store : & Arc < dyn SourceableStore > ,
409+ adapter : & Arc < dyn TriggersAdapter < C > > ,
410+ schema : & InputSchema ,
411+ filter : & SubgraphFilter ,
412+ range : SubgraphTriggerScanRange < C > ,
413+ ) -> Result < Vec < BlockWithTriggers < C > > , Error > {
414+ match range {
415+ SubgraphTriggerScanRange :: Single ( block) => {
416+ let entities =
417+ get_entities_for_range ( store, filter, schema, block. number ( ) , block. number ( ) )
418+ . await ?;
419+ create_subgraph_triggers :: < C > ( logger. clone ( ) , vec ! [ block] , filter, entities) . await
420+ }
421+ SubgraphTriggerScanRange :: Range ( from, to) => {
422+ let entities = get_entities_for_range ( store, filter, schema, from, to) . await ?;
423+ let mut block_numbers: HashSet < BlockNumber > = entities. keys ( ) . cloned ( ) . collect ( ) ;
424+ // Ensure the 'to' block is included in the block_numbers
425+ block_numbers. insert ( to) ;
426+
427+ let blocks = adapter
428+ . load_blocks_by_numbers ( logger. clone ( ) , block_numbers)
429+ . await ?;
430+
431+ create_subgraph_triggers :: < C > ( logger. clone ( ) , blocks, filter, entities) . await
432+ }
433+ }
434+ }
435+
436+ pub struct EntityWithType {
437+ pub entity_type : EntityType ,
438+ pub entity : Entity ,
439+ }
440+
441+ async fn get_entities_for_range (
442+ store : & Arc < dyn SourceableStore > ,
443+ filter : & SubgraphFilter ,
444+ schema : & InputSchema ,
445+ from : BlockNumber ,
446+ to : BlockNumber ,
447+ ) -> Result < BTreeMap < BlockNumber , Vec < EntityWithType > > , Error > {
448+ let mut entities_by_block = BTreeMap :: new ( ) ;
449+
450+ for entity_name in & filter. entities {
451+ let entity_type = schema. entity_type ( entity_name) ?;
452+
453+ let entity_ranges = store. get_range ( & entity_type, from..to) ?;
454+
455+ for ( block_number, entity_vec) in entity_ranges {
456+ let mut entity_vec = entity_vec
457+ . into_iter ( )
458+ . map ( |e| EntityWithType {
459+ entity_type : entity_type. clone ( ) ,
460+ entity : e,
461+ } )
462+ . collect ( ) ;
463+
464+ entities_by_block
465+ . entry ( block_number)
466+ . and_modify ( |existing_vec : & mut Vec < EntityWithType > | {
467+ existing_vec. append ( & mut entity_vec) ;
468+ } )
469+ . or_insert ( entity_vec) ;
470+ }
471+ }
472+
473+ Ok ( entities_by_block)
335474}
336475
337476impl < C : Blockchain > TriggersAdapterWrapper < C > {
@@ -344,45 +483,25 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
344483 self . adapter . ancestor_block ( ptr, offset, root) . await
345484 }
346485
347- // TODO: Do a proper implementation, this is a complete mock implementation
348486 pub async fn scan_triggers (
349487 & self ,
488+ logger : & Logger ,
350489 from : BlockNumber ,
351490 to : BlockNumber ,
352491 filter : & Arc < TriggerFilterWrapper < C > > ,
353492 ) -> Result < ( Vec < BlockWithTriggers < C > > , BlockNumber ) , Error > {
354- if !filter. subgraph_filter . is_empty ( ) {
355- // TODO: handle empty range, or empty entity set bellow
356-
357- if let Some ( SubgraphFilter {
358- subgraph : dh,
359- start_block : _sb,
360- entities : ent,
361- } ) = filter. subgraph_filter . first ( )
362- {
363- if let Some ( store) = self . source_subgraph_stores . first ( ) {
364- let schema = store. input_schema ( ) ;
365- let dh2 = schema. id ( ) ;
366- if dh == dh2 {
367- if let Some ( entity_type) = ent. first ( ) {
368- let et = schema. entity_type ( entity_type) . unwrap ( ) ;
369-
370- let br: Range < BlockNumber > = from..to;
371- let entities = store. get_range ( & et, br) ?;
372- return self
373- . subgraph_triggers (
374- Logger :: root ( slog:: Discard , o ! ( ) ) ,
375- from,
376- to,
377- filter,
378- entities,
379- )
380- . await ;
381- }
382- }
383- }
384- }
493+ if let Some ( subgraph_filter) = filter. subgraph_filter . first ( ) {
494+ let blocks_with_triggers = self
495+ . blocks_with_subgraph_triggers (
496+ logger,
497+ subgraph_filter,
498+ SubgraphTriggerScanRange :: Range ( from, to) ,
499+ )
500+ . await ?;
501+
502+ return Ok ( ( blocks_with_triggers, to) ) ;
385503 }
504+
386505 self . adapter
387506 . scan_triggers ( from, to, & filter. chain_filter )
388507 . await
@@ -392,9 +511,30 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
392511 & self ,
393512 logger : & Logger ,
394513 block : C :: Block ,
395- filter : & C :: TriggerFilter ,
514+ filter : & Arc < TriggerFilterWrapper < C > > ,
396515 ) -> Result < BlockWithTriggers < C > , Error > {
397- self . adapter . triggers_in_block ( logger, block, filter) . await
516+ trace ! (
517+ logger,
518+ "triggers_in_block" ;
519+ "block_number" => block. number( ) ,
520+ "block_hash" => block. hash( ) . hash_hex( ) ,
521+ ) ;
522+
523+ if let Some ( subgraph_filter) = filter. subgraph_filter . first ( ) {
524+ let blocks_with_triggers = self
525+ . blocks_with_subgraph_triggers (
526+ logger,
527+ subgraph_filter,
528+ SubgraphTriggerScanRange :: Single ( block) ,
529+ )
530+ . await ?;
531+
532+ return Ok ( blocks_with_triggers. into_iter ( ) . next ( ) . unwrap ( ) ) ;
533+ }
534+
535+ self . adapter
536+ . triggers_in_block ( logger, block, & filter. chain_filter )
537+ . await
398538 }
399539
400540 pub async fn is_on_main_chain ( & self , ptr : BlockPtr ) -> Result < bool , Error > {
@@ -406,57 +546,20 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
406546 }
407547
408548 pub async fn chain_head_ptr ( & self ) -> Result < Option < BlockPtr > , Error > {
409- self . adapter . chain_head_ptr ( ) . await
410- }
549+ if self . source_subgraph_stores . is_empty ( ) {
550+ return self . adapter . chain_head_ptr ( ) . await ;
551+ }
411552
412- async fn subgraph_triggers (
413- & self ,
414- logger : Logger ,
415- from : BlockNumber ,
416- to : BlockNumber ,
417- filter : & Arc < TriggerFilterWrapper < C > > ,
418- entities : BTreeMap < BlockNumber , Vec < Entity > > ,
419- ) -> Result < ( Vec < BlockWithTriggers < C > > , BlockNumber ) , Error > {
420- let logger2 = logger. cheap_clone ( ) ;
421- let adapter = self . adapter . clone ( ) ;
422- let first_filter = filter. subgraph_filter . first ( ) . unwrap ( ) ;
423- let blocks = adapter
424- . load_blocks_by_numbers ( logger, HashSet :: from_iter ( from..to) )
425- . await ?
426- . into_iter ( )
427- . map ( |block| {
428- let key = block. number ( ) ;
429- match entities. get ( & key) {
430- Some ( e) => {
431- let trigger_data =
432- Self :: create_subgraph_trigger_from_entity ( first_filter, e) ;
433- Some ( BlockWithTriggers :: new_with_subgraph_triggers (
434- block,
435- trigger_data,
436- & logger2,
437- ) )
438- }
439- None => None ,
440- }
441- } )
442- . flatten ( )
443- . collect ( ) ;
553+ let ptrs = futures03:: future:: try_join_all (
554+ self . source_subgraph_stores
555+ . iter ( )
556+ . map ( |( _, store) | store. block_ptr ( ) ) ,
557+ )
558+ . await ?;
444559
445- Ok ( ( blocks, to) )
446- }
560+ let min_ptr = ptrs. into_iter ( ) . flatten ( ) . min_by_key ( |ptr| ptr. number ) ;
447561
448- fn create_subgraph_trigger_from_entity (
449- filter : & SubgraphFilter ,
450- entity : & Vec < Entity > ,
451- ) -> Vec < subgraph:: TriggerData > {
452- entity
453- . iter ( )
454- . map ( |e| subgraph:: TriggerData {
455- source : filter. subgraph . clone ( ) ,
456- entity : e. clone ( ) ,
457- entity_type : filter. entities . first ( ) . unwrap ( ) . clone ( ) ,
458- } )
459- . collect ( )
562+ Ok ( min_ptr)
460563 }
461564}
462565
0 commit comments