Skip to content

Commit cc82e95

Browse files
committed
feat(core, store): load raw subgraph manifests from store
1 parent 3576eb2 commit cc82e95

File tree

13 files changed

+238
-107
lines changed

13 files changed

+238
-107
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
mod subgraph_manifest;
2+
13
pub mod amp_subgraph;
24
pub mod polling_monitor;
3-
45
pub mod subgraph;
56
pub mod subgraph_provider;

core/src/subgraph/instance_manager.rs

Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use async_trait::async_trait;
1313
use graph::amp;
1414
use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper};
1515
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
16-
use graph::components::link_resolver::LinkResolverContext;
1716
use graph::components::metrics::gas::GasMetrics;
1817
use graph::components::metrics::subgraph::DeploymentStatusMetric;
1918
use graph::components::store::SourceableStore;
@@ -30,7 +29,7 @@ use tokio::task;
3029

3130
use super::context::OffchainMonitor;
3231
use super::SubgraphTriggerProcessor;
33-
use crate::subgraph::runner::SubgraphRunnerError;
32+
use crate::{subgraph::runner::SubgraphRunnerError, subgraph_manifest};
3433

3534
#[derive(Clone)]
3635
pub struct SubgraphInstanceManager<S: SubgraphStore, AC> {
@@ -84,30 +83,22 @@ where
8483
let deployment_status_metric = deployment_status_metric.clone();
8584

8685
async move {
87-
let link_resolver = self
88-
.link_resolver
89-
.for_manifest(&loc.hash.to_string())
90-
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
91-
92-
let file_bytes = link_resolver
93-
.cat(
94-
&LinkResolverContext::new(&loc.hash, &logger),
95-
&loc.hash.to_ipfs_link(),
96-
)
97-
.await
98-
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
99-
100-
let manifest: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes)
101-
.map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?;
102-
103-
match BlockchainKind::from_manifest(&manifest)? {
86+
let raw_manifest = subgraph_manifest::load_raw_subgraph_manifest(
87+
&logger,
88+
&*instance_manager.subgraph_store,
89+
&*instance_manager.link_resolver,
90+
&loc.hash,
91+
)
92+
.await?;
93+
94+
match BlockchainKind::from_manifest(&raw_manifest)? {
10495
BlockchainKind::Ethereum => {
10596
let runner = instance_manager
10697
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
10798
logger.clone(),
10899
self.env_vars.cheap_clone(),
109100
loc.clone(),
110-
manifest,
101+
raw_manifest,
111102
stop_block,
112103
Box::new(SubgraphTriggerProcessor {}),
113104
deployment_status_metric,
@@ -122,7 +113,7 @@ where
122113
logger.clone(),
123114
self.env_vars.cheap_clone(),
124115
loc.clone(),
125-
manifest,
116+
raw_manifest,
126117
stop_block,
127118
Box::new(SubgraphTriggerProcessor {}),
128119
deployment_status_metric,
@@ -137,7 +128,7 @@ where
137128
logger.clone(),
138129
self.env_vars.cheap_clone(),
139130
loc.cheap_clone(),
140-
manifest,
131+
raw_manifest,
141132
stop_block,
142133
Box::new(graph_chain_substreams::TriggerProcessor::new(
143134
loc.clone(),
@@ -253,7 +244,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
253244
logger: Logger,
254245
env_vars: Arc<EnvVars>,
255246
deployment: DeploymentLocator,
256-
manifest: serde_yaml::Mapping,
247+
raw_manifest: serde_yaml::Mapping,
257248
stop_block: Option<BlockNumber>,
258249
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
259250
deployment_status_metric: DeploymentStatusMetric,
@@ -266,7 +257,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
266257
logger,
267258
env_vars,
268259
deployment,
269-
manifest,
260+
raw_manifest,
270261
stop_block,
271262
tp,
272263
deployment_status_metric,
@@ -280,7 +271,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
280271
logger: Logger,
281272
env_vars: Arc<EnvVars>,
282273
deployment: DeploymentLocator,
283-
manifest: serde_yaml::Mapping,
274+
raw_manifest: serde_yaml::Mapping,
284275
stop_block: Option<BlockNumber>,
285276
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
286277
deployment_status_metric: DeploymentStatusMetric,
@@ -293,8 +284,8 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
293284
let subgraph_store = self.subgraph_store.cheap_clone();
294285
let registry = self.metrics_registry.cheap_clone();
295286

296-
let raw_yaml = serde_yaml::to_string(&manifest).unwrap();
297-
let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?;
287+
let manifest =
288+
UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), raw_manifest)?;
298289

299290
// Allow for infinite retries for subgraph definition files.
300291
let link_resolver = Arc::from(
@@ -304,24 +295,16 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
304295
.with_retries(),
305296
);
306297

307-
// Make sure the `raw_yaml` is present on both this subgraph and the graft base.
308-
self.subgraph_store
309-
.set_manifest_raw_yaml(&deployment.hash, raw_yaml)
310-
.await?;
311298
if let Some(graft) = &manifest.graft {
312299
if self.subgraph_store.is_deployed(&graft.base).await? {
313-
let file_bytes = self
314-
.link_resolver
315-
.cat(
316-
&LinkResolverContext::new(&deployment.hash, &logger),
317-
&graft.base.to_ipfs_link(),
318-
)
319-
.await?;
320-
let yaml = String::from_utf8(file_bytes)?;
321-
322-
self.subgraph_store
323-
.set_manifest_raw_yaml(&graft.base, yaml)
324-
.await?;
300+
// Makes sure the raw manifest is cached in the subgraph store
301+
let _raw_manifest = subgraph_manifest::load_raw_subgraph_manifest(
302+
&logger,
303+
&*self.subgraph_store,
304+
&*self.link_resolver,
305+
&graft.base,
306+
)
307+
.await?;
325308
}
326309
}
327310

core/src/subgraph_manifest.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use graph::{
2+
cheap_clone::CheapClone as _,
3+
components::{
4+
link_resolver::{LinkResolver, LinkResolverContext},
5+
store::{StoreError, SubgraphStore},
6+
},
7+
data::subgraph::DeploymentHash,
8+
};
9+
use slog::{debug, Logger};
10+
11+
pub(super) async fn load_raw_subgraph_manifest(
12+
logger: &Logger,
13+
subgraph_store: &dyn SubgraphStore,
14+
link_resolver: &dyn LinkResolver,
15+
hash: &DeploymentHash,
16+
) -> Result<serde_yaml::Mapping, Error> {
17+
if let Some(raw_manifest) =
18+
subgraph_store
19+
.raw_manifest(hash)
20+
.await
21+
.map_err(|e| Error::LoadManifest {
22+
hash: hash.cheap_clone(),
23+
source: anyhow::Error::from(e),
24+
})?
25+
{
26+
debug!(logger, "Loaded raw manifest from the subgraph store");
27+
return Ok(raw_manifest);
28+
}
29+
30+
debug!(logger, "Loading raw manifest using link resolver");
31+
32+
let link_resolver =
33+
link_resolver
34+
.for_manifest(&hash.to_string())
35+
.map_err(|e| Error::CreateLinkResolver {
36+
hash: hash.cheap_clone(),
37+
source: e,
38+
})?;
39+
40+
let file_bytes = link_resolver
41+
.cat(
42+
&LinkResolverContext::new(hash, logger),
43+
&hash.to_ipfs_link(),
44+
)
45+
.await
46+
.map_err(|e| Error::LoadManifest {
47+
hash: hash.cheap_clone(),
48+
source: e,
49+
})?;
50+
51+
let raw_manifest: serde_yaml::Mapping =
52+
serde_yaml::from_slice(&file_bytes).map_err(|e| Error::ParseManifest {
53+
hash: hash.cheap_clone(),
54+
source: e,
55+
})?;
56+
57+
subgraph_store
58+
.set_raw_manifest_once(hash, &raw_manifest)
59+
.await
60+
.map_err(|e| Error::StoreManifest {
61+
hash: hash.cheap_clone(),
62+
source: e,
63+
})?;
64+
65+
Ok(raw_manifest)
66+
}
67+
68+
#[derive(Debug, thiserror::Error)]
69+
pub(super) enum Error {
70+
#[error("failed to create link resolver for '{hash}': {source:#}")]
71+
CreateLinkResolver {
72+
hash: DeploymentHash,
73+
source: anyhow::Error,
74+
},
75+
76+
#[error("failed to load manifest for '{hash}': {source:#}")]
77+
LoadManifest {
78+
hash: DeploymentHash,
79+
source: anyhow::Error,
80+
},
81+
82+
#[error("failed to parse manifest for '{hash}': {source:#}")]
83+
ParseManifest {
84+
hash: DeploymentHash,
85+
source: serde_yaml::Error,
86+
},
87+
88+
#[error("failed to store manifest for '{hash}': {source:#}")]
89+
StoreManifest {
90+
hash: DeploymentHash,
91+
source: StoreError,
92+
},
93+
}

core/src/subgraph_provider.rs

Lines changed: 20 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use graph::{
44
amp,
55
cheap_clone::CheapClone as _,
66
components::{
7-
link_resolver::{LinkResolver, LinkResolverContext},
7+
link_resolver::LinkResolver,
88
metrics::subgraph::SubgraphCountMetric,
9-
store::DeploymentLocator,
9+
store::{DeploymentLocator, SubgraphStore},
1010
subgraph::SubgraphInstanceManager,
1111
},
1212
log::factory::LoggerFactory,
@@ -16,6 +16,8 @@ use parking_lot::RwLock;
1616
use slog::{debug, error};
1717
use tokio_util::sync::CancellationToken;
1818

19+
use super::subgraph_manifest;
20+
1921
/// Starts and stops subgraph deployments.
2022
///
2123
/// For each subgraph deployment, checks the subgraph processing kind
@@ -27,6 +29,7 @@ use tokio_util::sync::CancellationToken;
2729
pub struct SubgraphProvider {
2830
logger_factory: LoggerFactory,
2931
count_metrics: Arc<SubgraphCountMetric>,
32+
subgraph_store: Arc<dyn SubgraphStore>,
3033
link_resolver: Arc<dyn LinkResolver>,
3134

3235
/// Stops active subgraph start request tasks.
@@ -54,12 +57,14 @@ impl SubgraphProvider {
5457
/// # Arguments
5558
/// - `logger_factory`: Creates loggers for each subgraph deployment start/stop request
5659
/// - `count_metrics`: Tracks the number of started subgraph deployments
60+
/// - `subgraph_store`: Loads subgraph manifests to determine the subgraph processing kinds
5761
/// - `link_resolver`: Loads subgraph manifests to determine the subgraph processing kinds
5862
/// - `cancel_token`: Stops active subgraph start request tasks
5963
/// - `instance_managers`: Contains the enabled subgraph instance managers
6064
pub fn new(
6165
logger_factory: &LoggerFactory,
6266
count_metrics: Arc<SubgraphCountMetric>,
67+
subgraph_store: Arc<dyn SubgraphStore>,
6368
link_resolver: Arc<dyn LinkResolver>,
6469
cancel_token: CancellationToken,
6570
instance_managers: SubgraphInstanceManagers,
@@ -74,6 +79,7 @@ impl SubgraphProvider {
7479
Self {
7580
logger_factory,
7681
count_metrics,
82+
subgraph_store,
7783
link_resolver,
7884
cancel_token,
7985
instance_managers,
@@ -94,30 +100,17 @@ impl SubgraphProvider {
94100
) -> Result<(), Error> {
95101
let logger = self.logger_factory.subgraph_logger(&loc);
96102

97-
let link_resolver = self
98-
.link_resolver
99-
.for_manifest(&loc.hash.to_string())
100-
.map_err(|e| Error::CreateLinkResolver {
101-
loc: loc.cheap_clone(),
102-
source: e,
103-
})?;
104-
105-
let file_bytes = link_resolver
106-
.cat(
107-
&LinkResolverContext::new(&loc.hash, &logger),
108-
&loc.hash.to_ipfs_link(),
109-
)
110-
.await
111-
.map_err(|e| Error::LoadManifest {
112-
loc: loc.cheap_clone(),
113-
source: e,
114-
})?;
115-
116-
let raw_manifest: serde_yaml::Mapping =
117-
serde_yaml::from_slice(&file_bytes).map_err(|e| Error::ParseManifest {
118-
loc: loc.cheap_clone(),
119-
source: e,
120-
})?;
103+
let raw_manifest = subgraph_manifest::load_raw_subgraph_manifest(
104+
&logger,
105+
&*self.subgraph_store,
106+
&*self.link_resolver,
107+
&loc.hash,
108+
)
109+
.await
110+
.map_err(|e| Error::LoadManifest {
111+
loc: loc.cheap_clone(),
112+
source: e,
113+
})?;
121114

122115
let subgraph_kind = SubgraphProcessingKind::from_manifest(&raw_manifest);
123116
self.assignments.set_subgraph_kind(&loc, subgraph_kind);
@@ -220,22 +213,10 @@ impl SubgraphInstanceManager for SubgraphProvider {
220213
/// Enumerates all possible errors of the subgraph provider.
221214
#[derive(Debug, thiserror::Error)]
222215
enum Error {
223-
#[error("failed to create link resolver for '{loc}': {source:#}")]
224-
CreateLinkResolver {
225-
loc: DeploymentLocator,
226-
source: anyhow::Error,
227-
},
228-
229216
#[error("failed to load manifest for '{loc}': {source:#}")]
230217
LoadManifest {
231218
loc: DeploymentLocator,
232-
source: anyhow::Error,
233-
},
234-
235-
#[error("failed to parse manifest for '{loc}': {source:#}")]
236-
ParseManifest {
237-
loc: DeploymentLocator,
238-
source: serde_yaml::Error,
219+
source: subgraph_manifest::Error,
239220
},
240221

241222
#[error("failed to get instance manager for '{loc}' with kind '{subgraph_kind}'")]

0 commit comments

Comments
 (0)