Skip to content

Commit a441339

Browse files
committed
all: Fold the reachable parts of registrar::start_subgraph into provider
1 parent eff3e19 commit a441339

File tree

5 files changed

+20
-62
lines changed

5 files changed

+20
-62
lines changed

core/src/subgraph/provider.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use std::collections::HashSet;
21
use std::sync::Mutex;
2+
use std::{collections::HashSet, time::Instant};
33

44
use async_trait::async_trait;
55

@@ -67,28 +67,28 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProvider<I> {
6767

6868
#[async_trait]
6969
impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAssignmentProvider<I> {
70-
async fn start(
71-
&self,
72-
loc: DeploymentLocator,
73-
stop_block: Option<BlockNumber>,
74-
) -> Result<(), SubgraphAssignmentProviderError> {
70+
async fn start(&self, loc: DeploymentLocator, stop_block: Option<BlockNumber>) {
7571
let logger = self.logger_factory.subgraph_logger(&loc);
7672

7773
// If subgraph ID already in set
7874
if !self.deployment_registry.insert(loc.id) {
7975
info!(logger, "Subgraph deployment is already running");
8076

81-
return Err(SubgraphAssignmentProviderError::AlreadyRunning(
82-
loc.hash.clone(),
83-
));
77+
return;
8478
}
8579

80+
let start_time = Instant::now();
81+
8682
self.instance_manager
8783
.cheap_clone()
8884
.start_subgraph(loc, stop_block)
8985
.await;
9086

91-
Ok(())
87+
debug!(
88+
logger,
89+
"Subgraph started";
90+
"start_ms" => start_time.elapsed().as_millis()
91+
);
9292
}
9393

9494
async fn stop(

core/src/subgraph/registrar.rs

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::collections::HashSet;
2-
use std::time::Instant;
32

43
use async_trait::async_trait;
54
use graph::blockchain::Blockchain;
@@ -12,7 +11,6 @@ use graph::data::subgraph::schema::DeploymentCreate;
1211
use graph::data::subgraph::Graft;
1312
use graph::data::value::Word;
1413
use graph::futures03;
15-
use graph::futures03::future::FutureExt;
1614
use graph::futures03::future::TryFutureExt;
1715
use graph::futures03::stream;
1816
use graph::futures03::stream::TryStreamExt;
@@ -238,7 +236,6 @@ where
238236
}
239237

240238
async fn start_assigned_subgraphs(&self) -> Result<(), Error> {
241-
let provider = self.provider.clone();
242239
let logger = self.logger.clone();
243240
let node_id = self.node_id.clone();
244241

@@ -258,9 +255,12 @@ where
258255
let (sender, receiver) = futures03::channel::mpsc::channel::<()>(1);
259256
for id in deployments {
260257
let sender = sender.clone();
261-
let logger = logger.clone();
258+
let provider = self.provider.cheap_clone();
262259

263-
graph::spawn(start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender)));
260+
graph::spawn(async move {
261+
provider.start(id, None).await;
262+
drop(sender)
263+
});
264264
}
265265
drop(sender);
266266
let _: Vec<_> = receiver.collect().await;
@@ -473,7 +473,7 @@ async fn handle_assignment_event(
473473
deployment,
474474
node_id: _,
475475
} => {
476-
start_subgraph(deployment, provider.clone(), logger).await;
476+
provider.start(deployment, None).await;
477477
Ok(())
478478
}
479479
AssignmentEvent::Remove {
@@ -486,39 +486,6 @@ async fn handle_assignment_event(
486486
}
487487
}
488488

489-
async fn start_subgraph(
490-
deployment: DeploymentLocator,
491-
provider: Arc<impl SubgraphAssignmentProviderTrait>,
492-
logger: Logger,
493-
) {
494-
let logger = logger
495-
.new(o!("subgraph_id" => deployment.hash.to_string(), "sgd" => deployment.id.to_string()));
496-
497-
trace!(logger, "Start subgraph");
498-
499-
let start_time = Instant::now();
500-
let result = provider.start(deployment.clone(), None).await;
501-
502-
debug!(
503-
logger,
504-
"Subgraph started";
505-
"start_ms" => start_time.elapsed().as_millis()
506-
);
507-
508-
match result {
509-
Ok(()) => (),
510-
Err(SubgraphAssignmentProviderError::AlreadyRunning(_)) => (),
511-
Err(e) => {
512-
// Errors here are likely an issue with the subgraph.
513-
error!(
514-
logger,
515-
"Subgraph instance failed to start";
516-
"error" => e.to_string()
517-
);
518-
}
519-
}
520-
}
521-
522489
/// Resolves the subgraph's earliest block
523490
async fn resolve_start_block(
524491
manifest: &SubgraphManifest<impl Blockchain>,

graph/src/components/subgraph/provider.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,7 @@ use crate::{components::store::DeploymentLocator, prelude::*};
55
/// Common trait for subgraph providers.
66
#[async_trait]
77
pub trait SubgraphAssignmentProvider: Send + Sync + 'static {
8-
async fn start(
9-
&self,
10-
deployment: DeploymentLocator,
11-
stop_block: Option<BlockNumber>,
12-
) -> Result<(), SubgraphAssignmentProviderError>;
8+
async fn start(&self, deployment: DeploymentLocator, stop_block: Option<BlockNumber>);
139
async fn stop(
1410
&self,
1511
deployment: DeploymentLocator,

node/src/manager/commands/run.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,7 @@ pub async fn run(
216216

217217
let locator = locate(subgraph_store.as_ref(), &hash)?;
218218

219-
SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block))
220-
.await?;
219+
SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block)).await;
221220

222221
loop {
223222
tokio::time::sleep(Duration::from_millis(1000)).await;

tests/src/fixture/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,7 @@ impl TestContext {
286286

287287
self.provider
288288
.start(self.deployment.clone(), Some(stop_block.number))
289-
.await
290-
.expect("unable to start subgraph");
289+
.await;
291290

292291
debug!(self.logger, "TEST: syncing to {}", stop_block.number);
293292

@@ -305,10 +304,7 @@ impl TestContext {
305304
// In case the subgraph has been previously started.
306305
self.provider.stop(self.deployment.clone()).await.unwrap();
307306

308-
self.provider
309-
.start(self.deployment.clone(), None)
310-
.await
311-
.expect("unable to start subgraph");
307+
self.provider.start(self.deployment.clone(), None).await;
312308

313309
wait_for_sync(
314310
&self.logger,

0 commit comments

Comments
 (0)