Skip to content

Commit 6302611

Browse files
committed
implement recursive auto sync for grafts
create subgraph before trying to create the deployment add config value to flip on/off auto syncing grafts add auto_sync_grafts value and plumb max depth when enabled restore manifest validation comment implement awaiting for auto graft sync
1 parent b72621e commit 6302611

File tree

9 files changed

+241
-31
lines changed

9 files changed

+241
-31
lines changed

core/src/subgraph/registrar.rs

Lines changed: 149 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use graph::tokio_retry::Retry;
2828
use graph::util::futures::retry_strategy;
2929
use graph::util::futures::RETRY_DEFAULT_LIMIT;
3030

31+
const MAX_AUTO_GRAFT_SYNC_DEPTH: u32 = 42;
32+
3133
pub struct SubgraphRegistrar<P, S, SM> {
3234
logger: Logger,
3335
logger_factory: LoggerFactory,
@@ -303,20 +305,8 @@ where
303305
.logger_factory
304306
.subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone()));
305307

306-
let raw: serde_yaml::Mapping = {
307-
let file_bytes = self
308-
.resolver
309-
.cat(&logger, &hash.to_ipfs_link())
310-
.await
311-
.map_err(|e| {
312-
SubgraphRegistrarError::ResolveError(
313-
SubgraphManifestResolveError::ResolveError(e),
314-
)
315-
})?;
316-
317-
serde_yaml::from_slice(&file_bytes)
318-
.map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))?
319-
};
308+
let raw: serde_yaml::Mapping =
309+
resolve_raw_manifest(&self.resolver, &self.logger, &hash).await?;
320310

321311
let kind = BlockchainKind::from_manifest(&raw).map_err(|e| {
322312
SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e))
@@ -326,6 +316,12 @@ where
326316
let history_blocks =
327317
history_blocks.or(self.settings.for_name(&name).map(|c| c.history_blocks));
328318

319+
let auto_graft_sync_depth = if self.store.auto_sync_grafts() {
320+
Some(0)
321+
} else {
322+
None
323+
};
324+
329325
let deployment_locator = match kind {
330326
BlockchainKind::Arweave => {
331327
create_subgraph_version::<graph_chain_arweave::Chain, _>(
@@ -342,6 +338,7 @@ where
342338
self.version_switching_mode,
343339
&self.resolver,
344340
history_blocks,
341+
auto_graft_sync_depth,
345342
)
346343
.await?
347344
}
@@ -360,6 +357,7 @@ where
360357
self.version_switching_mode,
361358
&self.resolver,
362359
history_blocks,
360+
auto_graft_sync_depth,
363361
)
364362
.await?
365363
}
@@ -378,6 +376,7 @@ where
378376
self.version_switching_mode,
379377
&self.resolver,
380378
history_blocks,
379+
auto_graft_sync_depth,
381380
)
382381
.await?
383382
}
@@ -396,6 +395,7 @@ where
396395
self.version_switching_mode,
397396
&self.resolver,
398397
history_blocks,
398+
auto_graft_sync_depth,
399399
)
400400
.await?
401401
}
@@ -414,6 +414,7 @@ where
414414
self.version_switching_mode,
415415
&self.resolver,
416416
history_blocks,
417+
auto_graft_sync_depth,
417418
)
418419
.await?
419420
}
@@ -432,6 +433,7 @@ where
432433
self.version_switching_mode,
433434
&self.resolver,
434435
history_blocks,
436+
auto_graft_sync_depth,
435437
)
436438
.await?
437439
}
@@ -555,9 +557,9 @@ async fn start_subgraph(
555557
}
556558

557559
/// Resolves the subgraph's earliest block
558-
async fn resolve_start_block(
559-
manifest: &SubgraphManifest<impl Blockchain>,
560-
chain: &impl Blockchain,
560+
async fn resolve_start_block<C: Blockchain>(
561+
manifest: &SubgraphManifest<C>,
562+
chain: &C,
561563
logger: &Logger,
562564
) -> Result<Option<BlockPtr>, SubgraphRegistrarError> {
563565
// If the minimum start block is 0 (i.e. the genesis block),
@@ -618,9 +620,12 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
618620
version_switching_mode: SubgraphVersionSwitchingMode,
619621
resolver: &Arc<dyn LinkResolver>,
620622
history_blocks_override: Option<i32>,
623+
depth: Option<u32>,
621624
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
622625
let raw_string = serde_yaml::to_string(&raw).unwrap();
623-
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
626+
627+
// We need to defer validation of the manifest until after we have synced the base subgraph.
628+
let unvalidated_manifest = UnvalidatedSubgraphManifest::<C>::resolve(
624629
deployment.clone(),
625630
raw,
626631
resolver,
@@ -630,16 +635,37 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
630635
.map_err(SubgraphRegistrarError::ResolveError)
631636
.await?;
632637

633-
// Determine if the graft_base should be validated.
634-
// Validate the graft_base if there is a pending graft, ensuring its presence.
635-
// If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated.
636-
// If the subgraph already exists and there is no pending graft, graft_base validation is not required.
638+
if let (Some(depth), Some(graft)) = (depth, unvalidated_manifest.unvalidated_graft()) {
639+
if depth < MAX_AUTO_GRAFT_SYNC_DEPTH {
640+
Box::pin(auto_sync_graft::<C, S>(
641+
graft,
642+
resolver,
643+
logger,
644+
&store,
645+
&chains,
646+
&name,
647+
&node_id,
648+
&debug_fork,
649+
version_switching_mode,
650+
history_blocks_override,
651+
depth,
652+
))
653+
.await?;
654+
} else {
655+
warn!(
656+
logger,
657+
"Auto-syncing subgraph grafts depth limit reached";
658+
"depth" => depth
659+
);
660+
}
661+
}
662+
637663
let should_validate = match store.graft_pending(&deployment) {
638664
Ok(graft_pending) => graft_pending,
639665
Err(StoreError::DeploymentNotFound(_)) => true,
640666
Err(e) => return Err(SubgraphRegistrarError::StoreError(e)),
641667
};
642-
let manifest = unvalidated
668+
let manifest = unvalidated_manifest
643669
.validate(store.cheap_clone(), should_validate)
644670
.await
645671
.map_err(SubgraphRegistrarError::ManifestValidationError)?;
@@ -732,3 +758,103 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
732758
)
733759
.map_err(SubgraphRegistrarError::SubgraphDeploymentError)
734760
}
761+
762+
/// Automatically syncs a subgraph graft from the base subgraph.
763+
/// This will await the syncing of the base subgraph before proceeding.
764+
/// Recursively calls `create_subgraph_version` to create any grafts of
765+
/// this graft up to `MAX_AUTO_GRAFT_SYNC_DEPTH`.`
766+
async fn auto_sync_graft<C: Blockchain, S: SubgraphStore>(
767+
graft: &Graft,
768+
resolver: &Arc<dyn LinkResolver>,
769+
logger: &Logger,
770+
store: &Arc<S>,
771+
chains: &Arc<BlockchainMap>,
772+
name: &SubgraphName,
773+
node_id: &NodeId,
774+
debug_fork: &Option<DeploymentHash>,
775+
version_switching_mode: SubgraphVersionSwitchingMode,
776+
history_blocks_override: Option<i32>,
777+
depth: u32,
778+
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
779+
info!(
780+
logger,
781+
"Auto-syncing subgraph graft";
782+
"subgraph" => name.to_string(),
783+
"hash" => graft.base.to_string(),
784+
"depth" => depth,
785+
"block" => graft.block
786+
);
787+
let subgraft_raw_manifest = resolve_raw_manifest(resolver, logger, &graft.base).await?;
788+
789+
let id = graft.base.clone();
790+
let name = &id[id.len().saturating_sub(10)..];
791+
let name = format!("auto-graft-sync/{}", name);
792+
let name =
793+
SubgraphName::new(name.clone()).map_err(|_| SubgraphRegistrarError::NameNotValid(name))?;
794+
795+
info!(
796+
logger,
797+
"auto-syncing subgraph";
798+
"subgraph" => name.to_string(),
799+
"hash" => graft.base.to_string()
800+
);
801+
802+
let _ = store.create_subgraph(name.clone())?;
803+
info!(logger, "Created subgraph"; "subgraph_name" => name.to_string(), "id" => id.to_string());
804+
805+
let locator = create_subgraph_version::<C, S>(
806+
logger,
807+
store.clone(),
808+
chains.clone(),
809+
name.clone(),
810+
graft.base.clone(),
811+
None,
812+
None,
813+
subgraft_raw_manifest.clone(),
814+
node_id.clone(),
815+
debug_fork.clone(),
816+
version_switching_mode,
817+
resolver,
818+
history_blocks_override,
819+
Some(depth + 1),
820+
)
821+
.await?;
822+
823+
info!(
824+
logger,
825+
"Awaiting subgraph sync";
826+
"subgraph" => name.to_string(),
827+
"hash" => graft.base.to_string()
828+
);
829+
830+
graft
831+
.await_sync(store.clone(), Duration::from_secs(1))
832+
.await?;
833+
834+
info!(
835+
logger,
836+
"auto-syncing subgraph complete";
837+
"subgraph" => name.to_string(),
838+
"hash" => graft.base.to_string()
839+
);
840+
Ok(locator)
841+
}
842+
843+
async fn resolve_raw_manifest(
844+
resolver: &Arc<dyn LinkResolver>,
845+
logger: &Logger,
846+
deployment_hash: &DeploymentHash,
847+
) -> Result<serde_yaml::Mapping, SubgraphRegistrarError> {
848+
let subgraft_raw_manifest: serde_yaml::Mapping = {
849+
let file_bytes = resolver
850+
.cat(&logger, &deployment_hash.to_ipfs_link())
851+
.await
852+
.map_err(|e| {
853+
SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e))
854+
})?;
855+
856+
serde_yaml::from_slice(&file_bytes)
857+
.map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))?
858+
};
859+
Ok(subgraft_raw_manifest)
860+
}

graph/src/components/store/traits.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,8 @@ pub trait SubgraphStore: Send + Sync + 'static {
215215
/// When this flag is set, indexing of the deployment should log
216216
/// additional diagnostic information
217217
fn instrument(&self, deployment: &DeploymentLocator) -> Result<bool, StoreError>;
218+
219+
fn auto_sync_grafts(&self) -> bool;
218220
}
219221

220222
pub trait ReadStore: Send + Sync + 'static {

graph/src/data/subgraph/mod.rs

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use stable_hash_legacy::SequenceNumber;
2626
use std::{
2727
collections::{BTreeSet, HashMap, HashSet},
2828
marker::PhantomData,
29+
time::Duration,
30+
time::Instant,
2931
};
3032
use thiserror::Error;
3133
use wasmparser;
@@ -284,6 +286,8 @@ pub enum SubgraphRegistrarError {
284286
NameExists(String),
285287
#[error("subgraph name not found: {0}")]
286288
NameNotFound(String),
289+
#[error("subgraph name not valid: {0}")]
290+
NameNotValid(String),
287291
#[error("network not supported by registrar: {0}")]
288292
NetworkNotSupported(Error),
289293
#[error("deployment not found: {0}")]
@@ -473,14 +477,7 @@ impl Graft {
473477
) -> Result<(), SubgraphManifestValidationError> {
474478
use SubgraphManifestValidationError::*;
475479

476-
let last_processed_block = store
477-
.least_block_ptr(&self.base)
478-
.await
479-
.map_err(|e| GraftBaseInvalid(e.to_string()))?;
480-
let is_base_healthy = store
481-
.is_healthy(&self.base)
482-
.await
483-
.map_err(|e| GraftBaseInvalid(e.to_string()))?;
480+
let (last_processed_block, is_base_healthy) = self.graft_status(store).await?;
484481

485482
// We are being defensive here: we don't know which specific
486483
// instance of a subgraph we will use as the base for the graft,
@@ -522,6 +519,60 @@ impl Graft {
522519
(Some(_), _) => Ok(()),
523520
}
524521
}
522+
523+
async fn graft_status<S: SubgraphStore>(
524+
&self,
525+
store: Arc<S>,
526+
) -> Result<(Option<BlockPtr>, bool), SubgraphManifestValidationError> {
527+
use SubgraphManifestValidationError::*;
528+
let last_processed_block = store
529+
.least_block_ptr(&self.base)
530+
.await
531+
.map_err(|e| GraftBaseInvalid(e.to_string()))?;
532+
let is_base_healthy = store
533+
.is_healthy(&self.base)
534+
.await
535+
.map_err(|e| GraftBaseInvalid(e.to_string()))?;
536+
Ok((last_processed_block, is_base_healthy))
537+
}
538+
539+
/// Awaits the target block sync for the graft.
540+
pub async fn await_sync<S: SubgraphStore>(
541+
&self,
542+
store: Arc<S>,
543+
interval: Duration,
544+
) -> Result<(), SubgraphManifestValidationError> {
545+
use SubgraphManifestValidationError::*;
546+
547+
const MAX_WAIT_NO_BLOCKS: Duration = Duration::from_secs(60 * 60);
548+
let start = Instant::now();
549+
550+
loop {
551+
let (ptr, healthy) = self.graft_status(store.clone()).await?;
552+
match ptr {
553+
Some(ptr) if ptr.number < self.block => {
554+
tokio::time::sleep(interval).await;
555+
}
556+
Some(_) => {
557+
if !healthy {
558+
return Err(GraftBaseInvalid(format!(
559+
"failed to graft onto `{}` at block {} since it's not healthy",
560+
self.base, self.block
561+
)));
562+
}
563+
break Ok(());
564+
}
565+
None => {
566+
if start.elapsed() > MAX_WAIT_NO_BLOCKS {
567+
return Err(GraftBaseInvalid(format!(
568+
"failed to graft onto `{}` at block {} since it has not processed any blocks",
569+
self.base, self.block
570+
)));
571+
}
572+
}
573+
}
574+
}
575+
}
525576
}
526577

527578
#[derive(Clone, Debug)]
@@ -753,6 +804,11 @@ impl<C: Blockchain> UnvalidatedSubgraphManifest<C> {
753804
pub fn spec_version(&self) -> &Version {
754805
&self.0.spec_version
755806
}
807+
808+
/// Get the graft from this unvalidated manifest.
809+
pub fn unvalidated_graft(&self) -> Option<&Graft> {
810+
self.0.graft.as_ref()
811+
}
756812
}
757813

758814
impl<C: Blockchain> SubgraphManifest<C> {

node/resources/tests/full_config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[general]
22
query = "query_node_.*"
3+
auto_sync_grafts = false
34

45
[store]
56
[store.primary]

node/src/chain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,7 @@ mod test {
706706
ethereum_ws: vec![],
707707
ethereum_ipc: vec![],
708708
unsafe_config: false,
709+
auto_sync_grafts: false,
709710
};
710711

711712
let metrics = Arc::new(EndpointMetrics::mock());

0 commit comments

Comments
 (0)