Skip to content

Commit 2aae288

Browse files
committed
feat(core, store): load raw subgraph manifests from store
1 parent 2418aa1 commit 2aae288

File tree

13 files changed

+240
-107
lines changed

13 files changed

+240
-107
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
mod subgraph_manifest;
2+
13
pub mod amp_subgraph;
24
pub mod polling_monitor;
3-
45
pub mod subgraph;
56
pub mod subgraph_provider;

core/src/subgraph/instance_manager.rs

Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use crate::subgraph::runner::SubgraphRunner;
1212
use graph::amp;
1313
use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper};
1414
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
15-
use graph::components::link_resolver::LinkResolverContext;
1615
use graph::components::metrics::gas::GasMetrics;
1716
use graph::components::metrics::subgraph::DeploymentStatusMetric;
1817
use graph::components::store::SourceableStore;
@@ -29,7 +28,7 @@ use tokio::task;
2928

3029
use super::context::OffchainMonitor;
3130
use super::SubgraphTriggerProcessor;
32-
use crate::subgraph::runner::SubgraphRunnerError;
31+
use crate::{subgraph::runner::SubgraphRunnerError, subgraph_manifest};
3332

3433
#[derive(Clone)]
3534
pub struct SubgraphInstanceManager<S: SubgraphStore, AC> {
@@ -83,30 +82,22 @@ where
8382
let deployment_status_metric = deployment_status_metric.clone();
8483

8584
async move {
86-
let link_resolver = self
87-
.link_resolver
88-
.for_manifest(&loc.hash.to_string())
89-
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
90-
91-
let file_bytes = link_resolver
92-
.cat(
93-
&LinkResolverContext::new(&loc.hash, &logger),
94-
&loc.hash.to_ipfs_link(),
95-
)
96-
.await
97-
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
98-
99-
let manifest: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes)
100-
.map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?;
101-
102-
match BlockchainKind::from_manifest(&manifest)? {
85+
let raw_manifest = subgraph_manifest::load_raw_subgraph_manifest(
86+
&logger,
87+
&*instance_manager.subgraph_store,
88+
&*instance_manager.link_resolver,
89+
&loc.hash,
90+
)
91+
.await?;
92+
93+
match BlockchainKind::from_manifest(&raw_manifest)? {
10394
BlockchainKind::Ethereum => {
10495
let runner = instance_manager
10596
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
10697
logger.clone(),
10798
self.env_vars.cheap_clone(),
10899
loc.clone(),
109-
manifest,
100+
raw_manifest,
110101
stop_block,
111102
Box::new(SubgraphTriggerProcessor {}),
112103
deployment_status_metric,
@@ -121,7 +112,7 @@ where
121112
logger.clone(),
122113
self.env_vars.cheap_clone(),
123114
loc.clone(),
124-
manifest,
115+
raw_manifest,
125116
stop_block,
126117
Box::new(SubgraphTriggerProcessor {}),
127118
deployment_status_metric,
@@ -136,7 +127,7 @@ where
136127
logger.clone(),
137128
self.env_vars.cheap_clone(),
138129
loc.cheap_clone(),
139-
manifest,
130+
raw_manifest,
140131
stop_block,
141132
Box::new(graph_chain_substreams::TriggerProcessor::new(
142133
loc.clone(),
@@ -251,7 +242,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
251242
logger: Logger,
252243
env_vars: Arc<EnvVars>,
253244
deployment: DeploymentLocator,
254-
manifest: serde_yaml::Mapping,
245+
raw_manifest: serde_yaml::Mapping,
255246
stop_block: Option<BlockNumber>,
256247
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
257248
deployment_status_metric: DeploymentStatusMetric,
@@ -264,7 +255,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
264255
logger,
265256
env_vars,
266257
deployment,
267-
manifest,
258+
raw_manifest,
268259
stop_block,
269260
tp,
270261
deployment_status_metric,
@@ -278,7 +269,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
278269
logger: Logger,
279270
env_vars: Arc<EnvVars>,
280271
deployment: DeploymentLocator,
281-
manifest: serde_yaml::Mapping,
272+
raw_manifest: serde_yaml::Mapping,
282273
stop_block: Option<BlockNumber>,
283274
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
284275
deployment_status_metric: DeploymentStatusMetric,
@@ -291,8 +282,8 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
291282
let subgraph_store = self.subgraph_store.cheap_clone();
292283
let registry = self.metrics_registry.cheap_clone();
293284

294-
let raw_yaml = serde_yaml::to_string(&manifest).unwrap();
295-
let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?;
285+
let manifest =
286+
UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), raw_manifest)?;
296287

297288
// Allow for infinite retries for subgraph definition files.
298289
let link_resolver = Arc::from(
@@ -302,24 +293,16 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
302293
.with_retries(),
303294
);
304295

305-
// Make sure the `raw_yaml` is present on both this subgraph and the graft base.
306-
self.subgraph_store
307-
.set_manifest_raw_yaml(&deployment.hash, raw_yaml)
308-
.await?;
309296
if let Some(graft) = &manifest.graft {
310297
if self.subgraph_store.is_deployed(&graft.base)? {
311-
let file_bytes = self
312-
.link_resolver
313-
.cat(
314-
&LinkResolverContext::new(&deployment.hash, &logger),
315-
&graft.base.to_ipfs_link(),
316-
)
317-
.await?;
318-
let yaml = String::from_utf8(file_bytes)?;
319-
320-
self.subgraph_store
321-
.set_manifest_raw_yaml(&graft.base, yaml)
322-
.await?;
298+
// Makes sure the raw manifest is cached in the subgraph store
299+
let _raw_manifest = subgraph_manifest::load_raw_subgraph_manifest(
300+
&logger,
301+
&*self.subgraph_store,
302+
&*self.link_resolver,
303+
&graft.base,
304+
)
305+
.await?;
323306
}
324307
}
325308

core/src/subgraph_manifest.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use graph::{
2+
cheap_clone::CheapClone as _,
3+
components::{
4+
link_resolver::{LinkResolver, LinkResolverContext},
5+
store::{StoreError, SubgraphStore},
6+
},
7+
data::subgraph::DeploymentHash,
8+
};
9+
use slog::{debug, Logger};
10+
11+
pub(super) async fn load_raw_subgraph_manifest(
12+
logger: &Logger,
13+
subgraph_store: &dyn SubgraphStore,
14+
link_resolver: &dyn LinkResolver,
15+
hash: &DeploymentHash,
16+
) -> Result<serde_yaml::Mapping, Error> {
17+
if let Some(raw_manifest) =
18+
subgraph_store
19+
.raw_manifest(hash)
20+
.await
21+
.map_err(|e| Error::LoadManifest {
22+
hash: hash.cheap_clone(),
23+
source: anyhow::Error::from(e),
24+
})?
25+
{
26+
debug!(logger, "Loaded raw manifest from the subgraph store");
27+
return Ok(raw_manifest);
28+
}
29+
30+
debug!(logger, "Loading raw manifest using link resolver");
31+
32+
let link_resolver =
33+
link_resolver
34+
.for_manifest(&hash.to_string())
35+
.map_err(|e| Error::CreateLinkResolver {
36+
hash: hash.cheap_clone(),
37+
source: e,
38+
})?;
39+
40+
let file_bytes = link_resolver
41+
.cat(
42+
&LinkResolverContext::new(hash, logger),
43+
&hash.to_ipfs_link(),
44+
)
45+
.await
46+
.map_err(|e| Error::LoadManifest {
47+
hash: hash.cheap_clone(),
48+
source: e,
49+
})?;
50+
51+
let raw_manifest: serde_yaml::Mapping =
52+
serde_yaml::from_slice(&file_bytes).map_err(|e| Error::ParseManifest {
53+
hash: hash.cheap_clone(),
54+
source: e,
55+
})?;
56+
57+
subgraph_store
58+
.set_raw_manifest_once(hash, &raw_manifest)
59+
.await
60+
.map_err(|e| Error::StoreManifest {
61+
hash: hash.cheap_clone(),
62+
source: e,
63+
})?;
64+
65+
Ok(raw_manifest)
66+
}
67+
68+
#[derive(Debug, thiserror::Error)]
69+
pub(super) enum Error {
70+
#[error("failed to create link resolver for '{hash}': {source:#}")]
71+
CreateLinkResolver {
72+
hash: DeploymentHash,
73+
source: anyhow::Error,
74+
},
75+
76+
#[error("failed to load manifest for '{hash}': {source:#}")]
77+
LoadManifest {
78+
hash: DeploymentHash,
79+
source: anyhow::Error,
80+
},
81+
82+
#[error("failed to parse manifest for '{hash}': {source:#}")]
83+
ParseManifest {
84+
hash: DeploymentHash,
85+
source: serde_yaml::Error,
86+
},
87+
88+
#[error("failed to store manifest for '{hash}': {source:#}")]
89+
StoreManifest {
90+
hash: DeploymentHash,
91+
source: StoreError,
92+
},
93+
}

core/src/subgraph_provider.rs

Lines changed: 20 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use graph::{
44
amp,
55
cheap_clone::CheapClone as _,
66
components::{
7-
link_resolver::{LinkResolver, LinkResolverContext},
7+
link_resolver::LinkResolver,
88
metrics::subgraph::SubgraphCountMetric,
9-
store::DeploymentLocator,
9+
store::{DeploymentLocator, SubgraphStore},
1010
subgraph::SubgraphInstanceManager,
1111
},
1212
log::factory::LoggerFactory,
@@ -16,6 +16,8 @@ use parking_lot::RwLock;
1616
use slog::{debug, error};
1717
use tokio_util::sync::CancellationToken;
1818

19+
use super::subgraph_manifest;
20+
1921
/// Starts and stops subgraph deployments.
2022
///
2123
/// For each subgraph deployment, checks the subgraph processing kind
@@ -27,6 +29,7 @@ use tokio_util::sync::CancellationToken;
2729
pub struct SubgraphProvider {
2830
logger_factory: LoggerFactory,
2931
count_metrics: Arc<SubgraphCountMetric>,
32+
subgraph_store: Arc<dyn SubgraphStore>,
3033
link_resolver: Arc<dyn LinkResolver>,
3134

3235
/// Stops active subgraph start request tasks.
@@ -54,12 +57,14 @@ impl SubgraphProvider {
5457
/// # Arguments
5558
/// - `logger_factory`: Creates loggers for each subgraph deployment start/stop request
5659
/// - `count_metrics`: Tracks the number of started subgraph deployments
60+
/// - `subgraph_store`: Loads subgraph manifests to determine the subgraph processing kinds
5761
/// - `link_resolver`: Loads subgraph manifests to determine the subgraph processing kinds
5862
/// - `cancel_token`: Stops active subgraph start request tasks
5963
/// - `instance_managers`: Contains the enabled subgraph instance managers
6064
pub fn new(
6165
logger_factory: &LoggerFactory,
6266
count_metrics: Arc<SubgraphCountMetric>,
67+
subgraph_store: Arc<dyn SubgraphStore>,
6368
link_resolver: Arc<dyn LinkResolver>,
6469
cancel_token: CancellationToken,
6570
instance_managers: SubgraphInstanceManagers,
@@ -74,6 +79,7 @@ impl SubgraphProvider {
7479
Self {
7580
logger_factory,
7681
count_metrics,
82+
subgraph_store,
7783
link_resolver,
7884
cancel_token,
7985
instance_managers,
@@ -94,30 +100,17 @@ impl SubgraphProvider {
94100
) -> Result<(), Error> {
95101
let logger = self.logger_factory.subgraph_logger(&loc);
96102

97-
let link_resolver = self
98-
.link_resolver
99-
.for_manifest(&loc.hash.to_string())
100-
.map_err(|e| Error::CreateLinkResolver {
101-
loc: loc.cheap_clone(),
102-
source: e,
103-
})?;
104-
105-
let file_bytes = link_resolver
106-
.cat(
107-
&LinkResolverContext::new(&loc.hash, &logger),
108-
&loc.hash.to_ipfs_link(),
109-
)
110-
.await
111-
.map_err(|e| Error::LoadManifest {
112-
loc: loc.cheap_clone(),
113-
source: e,
114-
})?;
115-
116-
let raw_manifest: serde_yaml::Mapping =
117-
serde_yaml::from_slice(&file_bytes).map_err(|e| Error::ParseManifest {
118-
loc: loc.cheap_clone(),
119-
source: e,
120-
})?;
103+
let raw_manifest = subgraph_manifest::load_raw_subgraph_manifest(
104+
&logger,
105+
&*self.subgraph_store,
106+
&*self.link_resolver,
107+
&loc.hash,
108+
)
109+
.await
110+
.map_err(|e| Error::LoadManifest {
111+
loc: loc.cheap_clone(),
112+
source: e,
113+
})?;
121114

122115
let subgraph_kind = SubgraphProcessingKind::from_manifest(&raw_manifest);
123116
self.assignments.set_subgraph_kind(&loc, subgraph_kind);
@@ -220,22 +213,10 @@ impl SubgraphInstanceManager for SubgraphProvider {
220213
/// Enumerates all possible errors of the subgraph provider.
221214
#[derive(Debug, thiserror::Error)]
222215
enum Error {
223-
#[error("failed to create link resolver for '{loc}': {source:#}")]
224-
CreateLinkResolver {
225-
loc: DeploymentLocator,
226-
source: anyhow::Error,
227-
},
228-
229216
#[error("failed to load manifest for '{loc}': {source:#}")]
230217
LoadManifest {
231218
loc: DeploymentLocator,
232-
source: anyhow::Error,
233-
},
234-
235-
#[error("failed to parse manifest for '{loc}': {source:#}")]
236-
ParseManifest {
237-
loc: DeploymentLocator,
238-
source: serde_yaml::Error,
219+
source: subgraph_manifest::Error,
239220
},
240221

241222
#[error("failed to get instance manager for '{loc}' with kind '{subgraph_kind}'")]

0 commit comments

Comments
 (0)