Skip to content

Commit 0271efc

Browse files
committed
all: Defer getting manifest from IPFS when starting subgraph
The current code in `SubgraphAssignmentProvider.start` fetched the manifest from IPFS before starting a subgraph. But the code calling `start`, ultimately `SubgraphRegistrar.start_assigned_subgraphs` waited for all subgraphs to start successfully before processing assignment events. That could lead to a situation where a slow IPFS server, even if it was slow for just one subgraph, could keep a node from processing assignment events. With these changes, interacting with IPFS is deferred to the future that is spawned for running the subgraph so that slow IPFS can slow how long it takes for a subgraph to start, but not the system overall.
1 parent 4de51ff commit 0271efc

File tree

6 files changed

+16
-27
lines changed

6 files changed

+16
-27
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
6060
async fn start_subgraph(
6161
self: Arc<Self>,
6262
loc: DeploymentLocator,
63-
manifest: serde_yaml::Mapping,
6463
stop_block: Option<BlockNumber>,
6564
) {
6665
let runner_index = self.subgraph_start_counter.fetch_add(1, Ordering::SeqCst);
@@ -78,6 +77,19 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
7877
let deployment_status_metric = deployment_status_metric.clone();
7978

8079
async move {
80+
let link_resolver = self
81+
.link_resolver
82+
.for_manifest(&loc.hash.to_string())
83+
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
84+
85+
let file_bytes = link_resolver
86+
.cat(&logger, &loc.hash.to_ipfs_link())
87+
.await
88+
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
89+
90+
let manifest: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes)
91+
.map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?;
92+
8193
match BlockchainKind::from_manifest(&manifest)? {
8294
BlockchainKind::Ethereum => {
8395
let runner = instance_manager

core/src/subgraph/provider.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,12 @@ impl DeploymentRegistry {
4444
pub struct SubgraphAssignmentProvider<I> {
4545
logger_factory: LoggerFactory,
4646
deployment_registry: DeploymentRegistry,
47-
link_resolver: Arc<dyn LinkResolver>,
4847
instance_manager: Arc<I>,
4948
}
5049

5150
impl<I: SubgraphInstanceManager> SubgraphAssignmentProvider<I> {
5251
pub fn new(
5352
logger_factory: &LoggerFactory,
54-
link_resolver: Arc<dyn LinkResolver>,
5553
instance_manager: I,
5654
subgraph_metrics: Arc<SubgraphCountMetric>,
5755
) -> Self {
@@ -61,7 +59,6 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProvider<I> {
6159
// Create the subgraph provider
6260
SubgraphAssignmentProvider {
6361
logger_factory,
64-
link_resolver: link_resolver.with_retries().into(),
6562
instance_manager: Arc::new(instance_manager),
6663
deployment_registry: DeploymentRegistry::new(subgraph_metrics),
6764
}
@@ -86,22 +83,9 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
8683
));
8784
}
8885

89-
let link_resolver = self
90-
.link_resolver
91-
.for_manifest(&loc.hash.to_string())
92-
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
93-
94-
let file_bytes = link_resolver
95-
.cat(&logger, &loc.hash.to_ipfs_link())
96-
.await
97-
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
98-
99-
let raw: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes)
100-
.map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?;
101-
10286
self.instance_manager
10387
.cheap_clone()
104-
.start_subgraph(loc, raw, stop_block)
88+
.start_subgraph(loc, stop_block)
10589
.await;
10690

10791
Ok(())

graph/src/components/subgraph/instance_manager.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ pub trait SubgraphInstanceManager: Send + Sync + 'static {
1313
async fn start_subgraph(
1414
self: Arc<Self>,
1515
deployment: DeploymentLocator,
16-
manifest: serde_yaml::Mapping,
1716
stop_block: Option<BlockNumber>,
1817
);
1918
async fn stop_subgraph(&self, deployment: DeploymentLocator);

node/src/launcher.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -294,12 +294,8 @@ fn build_subgraph_registrar(
294294
);
295295

296296
// Create IPFS-based subgraph provider
297-
let subgraph_provider = IpfsSubgraphAssignmentProvider::new(
298-
&logger_factory,
299-
link_resolver.clone(),
300-
subgraph_instance_manager,
301-
sg_count,
302-
);
297+
let subgraph_provider =
298+
IpfsSubgraphAssignmentProvider::new(&logger_factory, subgraph_instance_manager, sg_count);
303299

304300
// Check version switching mode environment variable
305301
let version_switching_mode = ENV_VARS.subgraph_version_switching_mode;

node/src/manager/commands/run.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ pub async fn run(
158158
// Create IPFS-based subgraph provider
159159
let subgraph_provider = Arc::new(IpfsSubgraphAssignmentProvider::new(
160160
&logger_factory,
161-
link_resolver.cheap_clone(),
162161
subgraph_instance_manager,
163162
sg_metrics,
164163
));

tests/src/fixture/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,6 @@ pub async fn setup_inner<C: Blockchain>(
574574
// Create IPFS-based subgraph provider
575575
let subgraph_provider = Arc::new(IpfsSubgraphAssignmentProvider::new(
576576
&logger_factory,
577-
link_resolver.cheap_clone(),
578577
subgraph_instance_manager.clone(),
579578
sg_count,
580579
));

0 commit comments

Comments
 (0)