Skip to content

Commit f62b6fe

Browse files
committed
all: Modernize the async code for starting subgraphs
1 parent a7e9255 commit f62b6fe

File tree

4 files changed

+127
-120
lines changed

4 files changed

+127
-120
lines changed

core/src/subgraph/registrar.rs

Lines changed: 118 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,12 @@ use graph::components::subgraph::Settings;
1111
use graph::data::subgraph::schema::DeploymentCreate;
1212
use graph::data::subgraph::Graft;
1313
use graph::data::value::Word;
14-
use graph::futures01::stream;
15-
use graph::futures01::Future;
16-
use graph::futures01::Stream;
1714
use graph::futures03;
18-
use graph::futures03::compat::Future01CompatExt;
19-
use graph::futures03::compat::Stream01CompatExt;
2015
use graph::futures03::future::FutureExt;
2116
use graph::futures03::future::TryFutureExt;
17+
use graph::futures03::stream;
2218
use graph::futures03::stream::TryStreamExt;
19+
use graph::futures03::Stream;
2320
use graph::futures03::StreamExt;
2421
use graph::prelude::{
2522
CreateSubgraphResult, SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait,
@@ -81,13 +78,6 @@ where
8178
}
8279

8380
pub async fn start(self: Arc<Self>) -> Result<(), Error> {
84-
let logger_clone1 = self.logger.clone();
85-
let logger_clone2 = self.logger.clone();
86-
let provider = self.provider.clone();
87-
let node_id = self.node_id.clone();
88-
let assignment_event_stream_cancel_handle =
89-
self.assignment_event_stream_cancel_guard.handle();
90-
9181
// The order of the following three steps is important:
9282
// - Start assignment event stream
9383
// - Read assignments table and start assigned subgraphs
@@ -109,114 +99,138 @@ where
10999
// The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning
110100
// (on subgraph start) which makes the operations idempotent. Subgraph stop is already idempotent.
111101

102+
fn panic_on_cancel(
103+
logger: &Logger,
104+
e: CancelableError<SubgraphAssignmentProviderError>,
105+
) -> ! {
106+
match e {
107+
CancelableError::Cancel => {
108+
panic!("assignment event stream canceled")
109+
}
110+
CancelableError::Error(e) => {
111+
error!(logger, "Assignment event stream failed: {}", e);
112+
panic!("assignment event stream failed: {}", e);
113+
}
114+
}
115+
}
116+
112117
// Start event stream
113-
let assignment_event_stream = self.assignment_events();
118+
let assignment_event_stream = self.cheap_clone().assignment_events().await;
114119

115120
// Deploy named subgraphs found in store
116121
self.start_assigned_subgraphs().await?;
117122

118123
// Spawn a task to handle assignment events.
119124
// Blocking due to store interactions. Won't be blocking after #905.
120-
graph::spawn_blocking(
121-
assignment_event_stream
122-
.compat()
123-
.map_err(SubgraphAssignmentProviderError::Unknown)
125+
let assignment_event_stream_cancel_handle =
126+
self.assignment_event_stream_cancel_guard.handle();
127+
128+
let fut =
129+
Box::pin(assignment_event_stream.map_err(SubgraphAssignmentProviderError::Unknown))
124130
.cancelable(&assignment_event_stream_cancel_handle)
125-
.compat()
126-
.for_each(move |assignment_event| {
127-
assert_eq!(assignment_event.node_id(), &node_id);
128-
handle_assignment_event(
129-
assignment_event,
130-
provider.clone(),
131-
logger_clone1.clone(),
132-
)
133-
.boxed()
134-
.compat()
135-
})
136-
.map_err(move |e| match e {
137-
CancelableError::Cancel => panic!("assignment event stream canceled"),
138-
CancelableError::Error(e) => {
139-
error!(logger_clone2, "Assignment event stream failed: {}", e);
140-
panic!("assignment event stream failed: {}", e);
131+
.for_each({
132+
move |event| {
133+
let this = self.cheap_clone();
134+
let provider = self.provider.clone();
135+
async move {
136+
if let Err(e) = match event {
137+
Ok(event) => {
138+
assert_eq!(event.node_id(), &this.node_id);
139+
handle_assignment_event(event, provider.clone(), &this.logger)
140+
.await
141+
}
142+
Err(e) => Err(e),
143+
} {
144+
panic_on_cancel(&this.logger, e);
145+
};
146+
}
141147
}
142-
})
143-
.compat(),
144-
);
148+
});
145149

150+
graph::spawn_blocking(fut);
146151
Ok(())
147152
}
148153

149-
pub fn assignment_events(&self) -> impl Stream<Item = AssignmentEvent, Error = Error> + Send {
150-
let store = self.store.clone();
151-
let node_id = self.node_id.clone();
152-
let logger = self.logger.clone();
154+
/// Maps an assignment change to an assignment event by checking the
155+
/// current state in the database, ignoring changes that do not affect
156+
/// this node or do not require anything to change.
157+
fn map_assignment(&self, change: AssignmentChange) -> Result<Option<AssignmentEvent>, Error> {
158+
let (deployment, operation) = change.into_parts();
153159

160+
trace!(self.logger, "Received assignment change";
161+
"deployment" => %deployment,
162+
"operation" => format!("{:?}", operation),
163+
);
164+
165+
match operation {
166+
AssignmentOperation::Set => {
167+
let assigned = self
168+
.store
169+
.assignment_status(&deployment)
170+
.map_err(|e| anyhow!("Failed to get subgraph assignment entity: {}", e))?;
171+
172+
let logger = self.logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => self.node_id.to_string()));
173+
if let Some((assigned, is_paused)) = assigned {
174+
if &assigned == &self.node_id {
175+
if is_paused {
176+
// Subgraph is paused, so we don't start it
177+
debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore");
178+
return Ok(None);
179+
}
180+
181+
// Start subgraph on this node
182+
debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add");
183+
Ok(Some(AssignmentEvent::Add {
184+
deployment,
185+
node_id: self.node_id.clone(),
186+
}))
187+
} else {
188+
// Ensure it is removed from this node
189+
debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove");
190+
Ok(Some(AssignmentEvent::Remove {
191+
deployment,
192+
node_id: self.node_id.clone(),
193+
}))
194+
}
195+
} else {
196+
// Was added/updated, but is now gone.
197+
debug!(self.logger, "Deployment assignee not found in database"; "action" => "ignore");
198+
Ok(None)
199+
}
200+
}
201+
AssignmentOperation::Removed => {
202+
// Send remove event without checking node ID.
203+
// If node ID does not match, then this is a no-op when handled in
204+
// assignment provider.
205+
Ok(Some(AssignmentEvent::Remove {
206+
deployment,
207+
node_id: self.node_id.clone(),
208+
}))
209+
}
210+
}
211+
}
212+
213+
pub async fn assignment_events(
214+
self: Arc<Self>,
215+
) -> impl Stream<Item = Result<AssignmentEvent, Error>> + Send {
154216
self.subscription_manager
155217
.subscribe()
156-
.map_err(|()| anyhow!("Entity change stream failed"))
157-
.map(|event| {
158-
let changes: Vec<_> = event.changes.iter().cloned().map(AssignmentChange::into_parts).collect();
159-
stream::iter_ok(changes)
160-
})
218+
.map(|event| futures03::stream::iter(event.changes.clone()))
161219
.flatten()
162-
.and_then(
163-
move |(deployment, operation)| -> Result<Box<dyn Stream<Item = _, Error = _> + Send>, _> {
164-
trace!(logger, "Received assignment change";
165-
"deployment" => %deployment,
166-
"operation" => format!("{:?}", operation),
167-
);
168-
169-
match operation {
170-
AssignmentOperation::Set => {
171-
store
172-
.assignment_status(&deployment)
173-
.map_err(|e| {
174-
anyhow!("Failed to get subgraph assignment entity: {}", e)
175-
})
176-
.map(|assigned| -> Box<dyn Stream<Item = _, Error = _> + Send> {
177-
let logger = logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => node_id.to_string()));
178-
if let Some((assigned,is_paused)) = assigned {
179-
if assigned == node_id {
180-
181-
if is_paused{
182-
// Subgraph is paused, so we don't start it
183-
debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore");
184-
return Box::new(stream::empty());
185-
}
186-
187-
// Start subgraph on this node
188-
debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add");
189-
Box::new(stream::once(Ok(AssignmentEvent::Add {
190-
deployment,
191-
node_id: node_id.clone(),
192-
})))
193-
} else {
194-
// Ensure it is removed from this node
195-
debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove");
196-
Box::new(stream::once(Ok(AssignmentEvent::Remove {
197-
deployment,
198-
node_id: node_id.clone(),
199-
})))
200-
}
201-
} else {
202-
// Was added/updated, but is now gone.
203-
debug!(logger, "Deployment assignee not found in database"; "action" => "ignore");
204-
Box::new(stream::empty())
205-
}
206-
})
207-
}
208-
AssignmentOperation::Removed => {
209-
// Send remove event without checking node ID.
210-
// If node ID does not match, then this is a no-op when handled in
211-
// assignment provider.
212-
Ok(Box::new(stream::once(Ok(AssignmentEvent::Remove {
213-
deployment,
214-
node_id: node_id.clone(),
215-
}))))
220+
.then({
221+
let this = self.cheap_clone();
222+
move |change| {
223+
let this = this.cheap_clone();
224+
225+
async move {
226+
match this.map_assignment(change) {
227+
Ok(Some(event)) => stream::once(futures03::future::ok(event)).boxed(),
228+
Ok(None) => stream::empty().boxed(),
229+
Err(e) => stream::once(futures03::future::err(e)).boxed(),
216230
}
217231
}
218-
},
219-
)
232+
}
233+
})
220234
.flatten()
221235
}
222236

@@ -235,6 +249,8 @@ where
235249
// the receiver terminates without receiving anything.
236250
let deployments = HashSet::<DeploymentLocator>::from_iter(deployments);
237251
let deployments_len = deployments.len();
252+
debug!(logger, "Starting all assigned subgraphs";
253+
"count" => deployments_len, "node_id" => &node_id);
238254
let (sender, receiver) = futures03::channel::mpsc::channel::<()>(1);
239255
for id in deployments {
240256
let sender = sender.clone();
@@ -442,7 +458,7 @@ where
442458
async fn handle_assignment_event(
443459
event: AssignmentEvent,
444460
provider: Arc<impl SubgraphAssignmentProviderTrait>,
445-
logger: Logger,
461+
logger: &Logger,
446462
) -> Result<(), CancelableError<SubgraphAssignmentProviderError>> {
447463
let logger = logger.clone();
448464

graph/src/components/store/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use diesel::sql_types::Integer;
1010
use diesel_derives::{AsExpression, FromSqlRow};
1111
pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache};
1212
use slog::Logger;
13+
use tokio_stream::wrappers::ReceiverStream;
1314

1415
pub use super::subgraph::Entity;
1516
pub use err::{StoreError, StoreResult};
@@ -18,7 +19,6 @@ use strum_macros::Display;
1819
pub use traits::*;
1920
pub use write::Batch;
2021

21-
use futures01::Stream;
2222
use serde::{Deserialize, Serialize};
2323
use std::collections::btree_map::Entry;
2424
use std::collections::{BTreeMap, BTreeSet, HashSet};
@@ -634,7 +634,7 @@ impl PartialEq for StoreEvent {
634634
}
635635

636636
/// A boxed `StoreEventStream`
637-
pub type StoreEventStreamBox = Box<dyn Stream<Item = Arc<StoreEvent>, Error = ()> + Send>;
637+
pub type StoreEventStreamBox = ReceiverStream<Arc<StoreEvent>>;
638638

639639
/// An entity operation that can be transacted into the store.
640640
#[derive(Clone, Debug, PartialEq)]

node/src/manager/commands/listen.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::io::Write;
22
use std::sync::Arc;
33

4-
use graph::futures01::Stream as _;
5-
use graph::futures03::compat::Future01CompatExt;
4+
use graph::futures03::{future, StreamExt};
5+
66
use graph::{
77
components::store::SubscriptionManager as _,
88
prelude::{serde_json, Error},
@@ -12,25 +12,16 @@ use graph_store_postgres::SubscriptionManager;
1212
async fn listen(mgr: Arc<SubscriptionManager>) -> Result<(), Error> {
1313
let events = mgr.subscribe();
1414
println!("press ctrl-c to stop");
15-
let res = events
16-
.inspect(move |event| {
17-
serde_json::to_writer_pretty(std::io::stdout(), event)
15+
events
16+
.for_each(move |event| {
17+
serde_json::to_writer_pretty(std::io::stdout(), &event)
1818
.expect("event can be serialized to JSON");
1919
writeln!(std::io::stdout()).unwrap();
2020
std::io::stdout().flush().unwrap();
21+
future::ready(())
2122
})
22-
.collect()
23-
.compat()
2423
.await;
2524

26-
match res {
27-
Ok(_) => {
28-
println!("stream finished")
29-
}
30-
Err(()) => {
31-
eprintln!("stream failed")
32-
}
33-
}
3425
Ok(())
3526
}
3627

store/postgres/src/store_events.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,6 @@ impl SubscriptionManagerTrait for SubscriptionManager {
221221
self.subscriptions.write().unwrap().insert(id, sender);
222222

223223
// Return the subscription ID and entity change stream
224-
Box::new(ReceiverStream::new(receiver).map(Ok).compat())
224+
ReceiverStream::new(receiver)
225225
}
226226
}

0 commit comments

Comments
 (0)