Skip to content

Commit 34984d5

Browse files
committed
core, node: Asyncify starting all subgraphs
1 parent a675c22 commit 34984d5

File tree

2 files changed

+57
-62
lines changed

2 files changed

+57
-62
lines changed

core/src/subgraph/registrar.rs

Lines changed: 55 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ use graph::components::subgraph::Settings;
1111
use graph::data::subgraph::schema::DeploymentCreate;
1212
use graph::data::subgraph::Graft;
1313
use graph::data::value::Word;
14-
use graph::futures01;
15-
use graph::futures01::future;
1614
use graph::futures01::stream;
1715
use graph::futures01::Future;
1816
use graph::futures01::Stream;
17+
use graph::futures03;
1918
use graph::futures03::compat::Future01CompatExt;
2019
use graph::futures03::compat::Stream01CompatExt;
2120
use graph::futures03::future::FutureExt;
2221
use graph::futures03::future::TryFutureExt;
2322
use graph::futures03::stream::TryStreamExt;
23+
use graph::futures03::StreamExt;
2424
use graph::prelude::{
2525
CreateSubgraphResult, SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait,
2626
SubgraphRegistrar as SubgraphRegistrarTrait, *,
@@ -80,7 +80,7 @@ where
8080
}
8181
}
8282

83-
pub fn start(&self) -> impl Future<Item = (), Error = Error> {
83+
pub async fn start(self: Arc<Self>) -> Result<(), Error> {
8484
let logger_clone1 = self.logger.clone();
8585
let logger_clone2 = self.logger.clone();
8686
let provider = self.provider.clone();
@@ -113,37 +113,37 @@ where
113113
let assignment_event_stream = self.assignment_events();
114114

115115
// Deploy named subgraphs found in store
116-
self.start_assigned_subgraphs().and_then(move |()| {
117-
// Spawn a task to handle assignment events.
118-
// Blocking due to store interactions. Won't be blocking after #905.
119-
graph::spawn_blocking(
120-
assignment_event_stream
121-
.compat()
122-
.map_err(SubgraphAssignmentProviderError::Unknown)
123-
.cancelable(&assignment_event_stream_cancel_handle)
116+
self.start_assigned_subgraphs().await?;
117+
118+
// Spawn a task to handle assignment events.
119+
// Blocking due to store interactions. Won't be blocking after #905.
120+
graph::spawn_blocking(
121+
assignment_event_stream
122+
.compat()
123+
.map_err(SubgraphAssignmentProviderError::Unknown)
124+
.cancelable(&assignment_event_stream_cancel_handle)
125+
.compat()
126+
.for_each(move |assignment_event| {
127+
assert_eq!(assignment_event.node_id(), &node_id);
128+
handle_assignment_event(
129+
assignment_event,
130+
provider.clone(),
131+
logger_clone1.clone(),
132+
)
133+
.boxed()
124134
.compat()
125-
.for_each(move |assignment_event| {
126-
assert_eq!(assignment_event.node_id(), &node_id);
127-
handle_assignment_event(
128-
assignment_event,
129-
provider.clone(),
130-
logger_clone1.clone(),
131-
)
132-
.boxed()
133-
.compat()
134-
})
135-
.map_err(move |e| match e {
136-
CancelableError::Cancel => panic!("assignment event stream canceled"),
137-
CancelableError::Error(e) => {
138-
error!(logger_clone2, "Assignment event stream failed: {}", e);
139-
panic!("assignment event stream failed: {}", e);
140-
}
141-
})
142-
.compat(),
143-
);
135+
})
136+
.map_err(move |e| match e {
137+
CancelableError::Cancel => panic!("assignment event stream canceled"),
138+
CancelableError::Error(e) => {
139+
error!(logger_clone2, "Assignment event stream failed: {}", e);
140+
panic!("assignment event stream failed: {}", e);
141+
}
142+
})
143+
.compat(),
144+
);
144145

145-
Ok(())
146-
})
146+
Ok(())
147147
}
148148

149149
pub fn assignment_events(&self) -> impl Stream<Item = AssignmentEvent, Error = Error> + Send {
@@ -220,36 +220,33 @@ where
220220
.flatten()
221221
}
222222

223-
fn start_assigned_subgraphs(&self) -> impl Future<Item = (), Error = Error> {
223+
async fn start_assigned_subgraphs(&self) -> Result<(), Error> {
224224
let provider = self.provider.clone();
225225
let logger = self.logger.clone();
226226
let node_id = self.node_id.clone();
227227

228-
future::result(self.store.active_assignments(&self.node_id))
229-
.map_err(|e| anyhow!("Error querying subgraph assignments: {}", e))
230-
.and_then(move |deployments| {
231-
// This operation should finish only after all subgraphs are
232-
// started. We wait for the spawned tasks to complete by giving
233-
// each a `sender` and waiting for all of them to be dropped, so
234-
// the receiver terminates without receiving anything.
235-
let deployments = HashSet::<DeploymentLocator>::from_iter(deployments);
236-
let deployments_len = deployments.len();
237-
let (sender, receiver) = futures01::sync::mpsc::channel::<()>(1);
238-
for id in deployments {
239-
let sender = sender.clone();
240-
let logger = logger.clone();
241-
242-
graph::spawn(
243-
start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender)),
244-
);
245-
}
246-
drop(sender);
247-
receiver.collect().then(move |_| {
248-
info!(logger, "Started all assigned subgraphs";
249-
"count" => deployments_len, "node_id" => &node_id);
250-
future::ok(())
251-
})
252-
})
228+
let deployments = self
229+
.store
230+
.active_assignments(&self.node_id)
231+
.map_err(|e| anyhow!("Error querying subgraph assignments: {}", e))?;
232+
// This operation should finish only after all subgraphs are
233+
// started. We wait for the spawned tasks to complete by giving
234+
// each a `sender` and waiting for all of them to be dropped, so
235+
// the receiver terminates without receiving anything.
236+
let deployments = HashSet::<DeploymentLocator>::from_iter(deployments);
237+
let deployments_len = deployments.len();
238+
let (sender, receiver) = futures03::channel::mpsc::channel::<()>(1);
239+
for id in deployments {
240+
let sender = sender.clone();
241+
let logger = logger.clone();
242+
243+
graph::spawn(start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender)));
244+
}
245+
drop(sender);
246+
let _: Vec<_> = receiver.collect().await;
247+
info!(logger, "Started all assigned subgraphs";
248+
"count" => deployments_len, "node_id" => &node_id);
249+
Ok(())
253250
}
254251
}
255252

node/src/launcher.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use anyhow::Result;
22

33
use git_testament::{git_testament, render_testament};
4-
use graph::futures01::Future as _;
5-
use graph::futures03::compat::Future01CompatExt;
64
use graph::futures03::future::TryFutureExt;
75

86
use crate::config::Config;
@@ -524,9 +522,9 @@ pub async fn run(
524522

525523
graph::spawn(
526524
subgraph_registrar
525+
.cheap_clone()
527526
.start()
528-
.map_err(|e| panic!("failed to initialize subgraph provider {}", e))
529-
.compat(),
527+
.map_err(|e| panic!("failed to initialize subgraph provider {}", e)),
530528
);
531529

532530
// Start admin JSON-RPC server.

0 commit comments

Comments
 (0)