@@ -28,6 +28,8 @@ use graph::tokio_retry::Retry;
2828use graph:: util:: futures:: retry_strategy;
2929use graph:: util:: futures:: RETRY_DEFAULT_LIMIT ;
3030
31+ const MAX_AUTO_GRAFT_SYNC_DEPTH : u32 = 42 ;
32+
3133pub struct SubgraphRegistrar < P , S , SM > {
3234 logger : Logger ,
3335 logger_factory : LoggerFactory ,
@@ -303,20 +305,8 @@ where
303305 . logger_factory
304306 . subgraph_logger ( & DeploymentLocator :: new ( DeploymentId ( 0 ) , hash. clone ( ) ) ) ;
305307
306- let raw: serde_yaml:: Mapping = {
307- let file_bytes = self
308- . resolver
309- . cat ( & logger, & hash. to_ipfs_link ( ) )
310- . await
311- . map_err ( |e| {
312- SubgraphRegistrarError :: ResolveError (
313- SubgraphManifestResolveError :: ResolveError ( e) ,
314- )
315- } ) ?;
316-
317- serde_yaml:: from_slice ( & file_bytes)
318- . map_err ( |e| SubgraphRegistrarError :: ResolveError ( e. into ( ) ) ) ?
319- } ;
308+ let raw: serde_yaml:: Mapping =
309+ resolve_raw_manifest ( & self . resolver , & self . logger , & hash) . await ?;
320310
321311 let kind = BlockchainKind :: from_manifest ( & raw ) . map_err ( |e| {
322312 SubgraphRegistrarError :: ResolveError ( SubgraphManifestResolveError :: ResolveError ( e) )
@@ -326,9 +316,15 @@ where
326316 let history_blocks =
327317 history_blocks. or ( self . settings . for_name ( & name) . map ( |c| c. history_blocks ) ) ;
328318
319+ let auto_graft_sync_depth = if self . store . auto_graft_sync ( ) {
320+ Some ( 0 )
321+ } else {
322+ None
323+ } ;
324+
329325 let deployment_locator = match kind {
330326 BlockchainKind :: Arweave => {
331- create_subgraph_version :: < graph_chain_arweave:: Chain , _ > (
327+ create_subgraph_version :: < graph_chain_arweave:: Chain , _ , _ > (
332328 & logger,
333329 self . store . clone ( ) ,
334330 self . chains . cheap_clone ( ) ,
@@ -342,11 +338,13 @@ where
342338 self . version_switching_mode ,
343339 & self . resolver ,
344340 history_blocks,
341+ auto_graft_sync_depth,
342+ self . provider . clone ( ) ,
345343 )
346344 . await ?
347345 }
348346 BlockchainKind :: Ethereum => {
349- create_subgraph_version :: < graph_chain_ethereum:: Chain , _ > (
347+ create_subgraph_version :: < graph_chain_ethereum:: Chain , _ , _ > (
350348 & logger,
351349 self . store . clone ( ) ,
352350 self . chains . cheap_clone ( ) ,
@@ -360,11 +358,13 @@ where
360358 self . version_switching_mode ,
361359 & self . resolver ,
362360 history_blocks,
361+ auto_graft_sync_depth,
362+ self . provider . clone ( ) ,
363363 )
364364 . await ?
365365 }
366366 BlockchainKind :: Near => {
367- create_subgraph_version :: < graph_chain_near:: Chain , _ > (
367+ create_subgraph_version :: < graph_chain_near:: Chain , _ , _ > (
368368 & logger,
369369 self . store . clone ( ) ,
370370 self . chains . cheap_clone ( ) ,
@@ -378,11 +378,13 @@ where
378378 self . version_switching_mode ,
379379 & self . resolver ,
380380 history_blocks,
381+ auto_graft_sync_depth,
382+ self . provider . clone ( ) ,
381383 )
382384 . await ?
383385 }
384386 BlockchainKind :: Cosmos => {
385- create_subgraph_version :: < graph_chain_cosmos:: Chain , _ > (
387+ create_subgraph_version :: < graph_chain_cosmos:: Chain , _ , _ > (
386388 & logger,
387389 self . store . clone ( ) ,
388390 self . chains . cheap_clone ( ) ,
@@ -396,11 +398,13 @@ where
396398 self . version_switching_mode ,
397399 & self . resolver ,
398400 history_blocks,
401+ auto_graft_sync_depth,
402+ self . provider . clone ( ) ,
399403 )
400404 . await ?
401405 }
402406 BlockchainKind :: Substreams => {
403- create_subgraph_version :: < graph_chain_substreams:: Chain , _ > (
407+ create_subgraph_version :: < graph_chain_substreams:: Chain , _ , _ > (
404408 & logger,
405409 self . store . clone ( ) ,
406410 self . chains . cheap_clone ( ) ,
@@ -414,11 +418,13 @@ where
414418 self . version_switching_mode ,
415419 & self . resolver ,
416420 history_blocks,
421+ auto_graft_sync_depth,
422+ self . provider . clone ( ) ,
417423 )
418424 . await ?
419425 }
420426 BlockchainKind :: Starknet => {
421- create_subgraph_version :: < graph_chain_starknet:: Chain , _ > (
427+ create_subgraph_version :: < graph_chain_starknet:: Chain , _ , _ > (
422428 & logger,
423429 self . store . clone ( ) ,
424430 self . chains . cheap_clone ( ) ,
@@ -432,6 +438,8 @@ where
432438 self . version_switching_mode ,
433439 & self . resolver ,
434440 history_blocks,
441+ auto_graft_sync_depth,
442+ self . provider . clone ( ) ,
435443 )
436444 . await ?
437445 }
@@ -555,9 +563,9 @@ async fn start_subgraph(
555563}
556564
557565/// Resolves the subgraph's earliest block
558- async fn resolve_start_block (
559- manifest : & SubgraphManifest < impl Blockchain > ,
560- chain : & impl Blockchain ,
566+ async fn resolve_start_block < C : Blockchain > (
567+ manifest : & SubgraphManifest < C > ,
568+ chain : & C ,
561569 logger : & Logger ,
562570) -> Result < Option < BlockPtr > , SubgraphRegistrarError > {
563571 // If the minimum start block is 0 (i.e. the genesis block),
@@ -591,20 +599,26 @@ async fn resolve_graft_block(
591599 chain : & impl Blockchain ,
592600 logger : & Logger ,
593601) -> Result < BlockPtr , SubgraphRegistrarError > {
602+ debug ! ( & logger, "Resolve graft block" ; "base" => base. base. to_string( ) , "block" => base. block) ;
594603 chain
595604 . block_pointer_from_number ( logger, base. block )
596605 . await
597- . map_err ( |_| {
606+ . map_err ( |err| {
607+ error ! ( & logger, "Failed to resolve graft block" ; "error" => err. to_string( ) ) ;
598608 SubgraphRegistrarError :: ManifestValidationError ( vec ! [
599609 SubgraphManifestValidationError :: BlockNotFound ( format!(
600- "graft base block {} not found" ,
601- base. block
610+ "graft base {} block {} not found" ,
611+ base. base , base . block
602612 ) ) ,
603613 ] )
604614 } )
605615}
606616
607- async fn create_subgraph_version < C : Blockchain , S : SubgraphStore > (
617+ async fn create_subgraph_version <
618+ C : Blockchain ,
619+ S : SubgraphStore ,
620+ P : SubgraphAssignmentProviderTrait ,
621+ > (
608622 logger : & Logger ,
609623 store : Arc < S > ,
610624 chains : Arc < BlockchainMap > ,
@@ -618,9 +632,13 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
618632 version_switching_mode : SubgraphVersionSwitchingMode ,
619633 resolver : & Arc < dyn LinkResolver > ,
620634 history_blocks_override : Option < i32 > ,
635+ depth : Option < u32 > ,
636+ provider : Arc < P > ,
621637) -> Result < DeploymentLocator , SubgraphRegistrarError > {
622638 let raw_string = serde_yaml:: to_string ( & raw ) . unwrap ( ) ;
623- let unvalidated = UnvalidatedSubgraphManifest :: < C > :: resolve (
639+
640+ // We need to defer validation of the manifest until after we have synced the base subgraph.
641+ let unvalidated_manifest = UnvalidatedSubgraphManifest :: < C > :: resolve (
624642 deployment. clone ( ) ,
625643 raw,
626644 resolver,
@@ -630,16 +648,38 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
630648 . map_err ( SubgraphRegistrarError :: ResolveError )
631649 . await ?;
632650
633- // Determine if the graft_base should be validated.
634- // Validate the graft_base if there is a pending graft, ensuring its presence.
635- // If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated.
636- // If the subgraph already exists and there is no pending graft, graft_base validation is not required.
651+ if let ( Some ( depth) , Some ( graft) ) = ( depth, unvalidated_manifest. unvalidated_graft ( ) ) {
652+ if depth < MAX_AUTO_GRAFT_SYNC_DEPTH {
653+ Box :: pin ( auto_sync_graft :: < C , S , P > (
654+ graft,
655+ resolver,
656+ logger,
657+ & store,
658+ & chains,
659+ & name,
660+ & node_id,
661+ & debug_fork,
662+ version_switching_mode,
663+ history_blocks_override,
664+ depth,
665+ provider,
666+ ) )
667+ . await ?;
668+ } else {
669+ warn ! (
670+ logger,
671+ "auto-graft-sync: subgraph grafts depth limit reached" ;
672+ "depth" => depth
673+ ) ;
674+ }
675+ }
676+
637677 let should_validate = match store. graft_pending ( & deployment) {
638678 Ok ( graft_pending) => graft_pending,
639679 Err ( StoreError :: DeploymentNotFound ( _) ) => true ,
640680 Err ( e) => return Err ( SubgraphRegistrarError :: StoreError ( e) ) ,
641681 } ;
642- let manifest = unvalidated
682+ let manifest = unvalidated_manifest
643683 . validate ( store. cheap_clone ( ) , should_validate)
644684 . await
645685 . map_err ( SubgraphRegistrarError :: ManifestValidationError ) ?;
@@ -732,3 +772,115 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
732772 )
733773 . map_err ( SubgraphRegistrarError :: SubgraphDeploymentError )
734774}
775+
776+ /// Automatically syncs a subgraph graft from the base subgraph.
777+ /// This will await the syncing of the base subgraph before proceeding.
778+ /// Recursively calls `create_subgraph_version` to create any grafts of
779+ /// this graft up to `MAX_AUTO_GRAFT_SYNC_DEPTH`.`
780+ async fn auto_sync_graft < C : Blockchain , S : SubgraphStore , P : SubgraphAssignmentProviderTrait > (
781+ graft : & Graft ,
782+ resolver : & Arc < dyn LinkResolver > ,
783+ logger : & Logger ,
784+ store : & Arc < S > ,
785+ chains : & Arc < BlockchainMap > ,
786+ name : & SubgraphName ,
787+ node_id : & NodeId ,
788+ debug_fork : & Option < DeploymentHash > ,
789+ version_switching_mode : SubgraphVersionSwitchingMode ,
790+ history_blocks_override : Option < i32 > ,
791+ depth : u32 ,
792+ provider : Arc < P > ,
793+ ) -> Result < DeploymentLocator , SubgraphRegistrarError > {
794+ info ! (
795+ logger,
796+ "auto-graft-sync: begin graft sync" ;
797+ "subgraph" => name. to_string( ) ,
798+ "hash" => graft. base. to_string( ) ,
799+ "depth" => depth,
800+ "block" => graft. block
801+ ) ;
802+ let subgraft_raw_manifest = resolve_raw_manifest ( resolver, logger, & graft. base ) . await ?;
803+
804+ let deployment = graft. base . clone ( ) ;
805+
806+ let name = & deployment[ deployment. len ( ) . saturating_sub ( 10 ) ..] ;
807+ let name = format ! ( "auto-graft-sync/{}" , name) ;
808+ let name =
809+ SubgraphName :: new ( name. clone ( ) ) . map_err ( |_| SubgraphRegistrarError :: NameNotValid ( name) ) ?;
810+
811+ info ! (
812+ logger,
813+ "auto-graft-sync: create subgraph" ;
814+ "subgraph" => name. to_string( ) ,
815+ "hash" => graft. base. to_string( )
816+ ) ;
817+
818+ let _ = store. create_subgraph ( name. clone ( ) ) ?;
819+ info ! ( logger, "Created subgraph" ; "subgraph_name" => name. to_string( ) , "id" => deployment. to_string( ) ) ;
820+
821+ let locator = create_subgraph_version :: < C , S , P > (
822+ logger,
823+ store. clone ( ) ,
824+ chains. clone ( ) ,
825+ name. clone ( ) ,
826+ graft. base . clone ( ) ,
827+ None ,
828+ None ,
829+ subgraft_raw_manifest. clone ( ) ,
830+ node_id. clone ( ) ,
831+ debug_fork. clone ( ) ,
832+ version_switching_mode,
833+ resolver,
834+ history_blocks_override,
835+ Some ( depth + 1 ) ,
836+ provider. clone ( ) ,
837+ )
838+ . await ?;
839+
840+ info ! (
841+ logger,
842+ "auto-graft-sync: awaiting subgraph sync" ;
843+ "subgraph" => name. to_string( ) ,
844+ "hash" => graft. base. to_string( )
845+ ) ;
846+
847+ info ! ( & logger, "auto-graft-sync: starting graft sync" ; "subgraph" => name. to_string( ) , "hash" => graft. base. to_string( ) ) ;
848+ provider
849+ . start ( locator. clone ( ) , Some ( graft. block ) )
850+ . await
851+ . map_err ( SubgraphRegistrarError :: AutoGraftSubgraphAssignmentError ) ?;
852+
853+ info ! ( & logger, "auto-graft-sync: waiting for graft sync" ; "subgraph" => name. to_string( ) , "hash" => graft. base. to_string( ) ) ;
854+ graft
855+ . await_sync ( store. clone ( ) , Duration :: from_secs ( 1 ) )
856+ . await ?;
857+
858+ info ! (
859+ logger,
860+ "auto-graft-sync: sync complete" ;
861+ "subgraph" => name. to_string( ) ,
862+ "graft-hash" => graft. base. to_string( ) ,
863+ "depth" => depth,
864+ "hash" => graft. base. to_string( )
865+ ) ;
866+ Ok ( locator)
867+ }
868+
869+ async fn resolve_raw_manifest (
870+ resolver : & Arc < dyn LinkResolver > ,
871+ logger : & Logger ,
872+ deployment_hash : & DeploymentHash ,
873+ ) -> Result < serde_yaml:: Mapping , SubgraphRegistrarError > {
874+ let subgraft_raw_manifest: serde_yaml:: Mapping = {
875+ let file_bytes = resolver
876+ . cat ( & logger, & deployment_hash. to_ipfs_link ( ) )
877+ . await
878+ . map_err ( |e| {
879+ SubgraphRegistrarError :: ResolveError ( SubgraphManifestResolveError :: ResolveError ( e) )
880+ } ) ?;
881+
882+ serde_yaml:: from_slice ( & file_bytes)
883+ . map_err ( |e| SubgraphRegistrarError :: ResolveError ( e. into ( ) ) ) ?
884+ } ;
885+ Ok ( subgraft_raw_manifest)
886+ }
0 commit comments