Skip to content

Commit 641f4fa

Browse files
committed
core, graph: Simplify assignment handling
We analyze and react to assignment changes directly rather than worrying abut translating one stream into another. With that, We don't need the AssignmentEvent struct anymore.
1 parent 2e9725e commit 641f4fa

File tree

3 files changed

+42
-126
lines changed

3 files changed

+42
-126
lines changed

core/src/subgraph/registrar.rs

Lines changed: 41 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ use graph::data::subgraph::Graft;
1212
use graph::data::value::Word;
1313
use graph::futures03;
1414
use graph::futures03::future::TryFutureExt;
15-
use graph::futures03::stream;
16-
use graph::futures03::stream::TryStreamExt;
1715
use graph::futures03::Stream;
1816
use graph::futures03::StreamExt;
1917
use graph::prelude::{
@@ -90,68 +88,47 @@ where
9088
//
9189
// The discrepancy between the start time of the event stream and the table read can result
9290
// in some extraneous events on start up. Examples:
93-
// - The event stream sees an Add event for subgraph A, but the table query finds that
91+
// - The event stream sees an 'set' event for subgraph A, but the table query finds that
9492
// subgraph A is already in the table.
95-
// - The event stream sees a Remove event for subgraph B, but the table query finds that
93+
// - The event stream sees a 'removed' event for subgraph B, but the table query finds that
9694
// subgraph B has already been removed.
97-
// The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning
98-
// (on subgraph start) which makes the operations idempotent. Subgraph stop is already idempotent.
99-
100-
fn panic_on_cancel(
101-
logger: &Logger,
102-
e: CancelableError<SubgraphAssignmentProviderError>,
103-
) -> ! {
104-
match e {
105-
CancelableError::Cancel => {
106-
panic!("assignment event stream canceled")
107-
}
108-
CancelableError::Error(e) => {
109-
error!(logger, "Assignment event stream failed: {}", e);
110-
panic!("assignment event stream failed: {}", e);
111-
}
112-
}
113-
}
95+
// The `change_assignment` function handles these cases by ignoring
96+
// such cases which makes the operations idempotent
11497

11598
// Start event stream
11699
let assignment_event_stream = self.cheap_clone().assignment_events().await;
117100

118101
// Deploy named subgraphs found in store
119102
self.start_assigned_subgraphs().await?;
120103

104+
let cancel_handle = self.assignment_event_stream_cancel_guard.handle();
105+
121106
// Spawn a task to handle assignment events.
122-
let assignment_event_stream_cancel_handle =
123-
self.assignment_event_stream_cancel_guard.handle();
124-
125-
let fut =
126-
Box::pin(assignment_event_stream.map_err(SubgraphAssignmentProviderError::Unknown))
127-
.cancelable(&assignment_event_stream_cancel_handle)
128-
.for_each({
129-
move |event| {
130-
let this = self.cheap_clone();
131-
let provider = self.provider.clone();
132-
async move {
133-
match event {
134-
Ok(event) => {
135-
assert_eq!(event.node_id(), &this.node_id);
136-
handle_assignment_event(event, provider.clone()).await
137-
}
138-
Err(e) => panic_on_cancel(&this.logger, e),
139-
};
140-
}
141-
}
142-
});
107+
let fut = assignment_event_stream.for_each({
108+
move |event| {
109+
// The assignment stream should run forever. If it gets
110+
// cancelled, that probably indicates a serious problem and
111+
// we panic
112+
if cancel_handle.is_canceled() {
113+
panic!("assignment event stream canceled");
114+
}
115+
116+
let this = self.cheap_clone();
117+
async move {
118+
this.change_assignment(event).await;
119+
}
120+
}
121+
});
143122

144123
graph::spawn(fut);
145124
Ok(())
146125
}
147126

148-
/// Maps an assignment change to an assignment event by checking the
149-
/// current state in the database, ignoring changes that do not affect
150-
/// this node or do not require anything to change.
151-
async fn map_assignment(
152-
&self,
153-
change: AssignmentChange,
154-
) -> Result<Option<AssignmentEvent>, Error> {
127+
/// Start/stop subgraphs as needed, considering the current assignment
128+
/// state in the database, ignoring changes that do not affect this
129+
/// node, do not require anything to change, or for which we can not
130+
/// find the assignment status from the database
131+
async fn change_assignment(&self, change: AssignmentChange) {
155132
let (deployment, operation) = change.into_parts();
156133

157134
trace!(self.logger, "Received assignment change";
@@ -161,75 +138,53 @@ where
161138

162139
match operation {
163140
AssignmentOperation::Set => {
164-
let assigned = self
165-
.store
166-
.assignment_status(&deployment)
167-
.await
168-
.map_err(|e| anyhow!("Failed to get subgraph assignment entity: {}", e))?;
141+
let assigned = match self.store.assignment_status(&deployment).await {
142+
Ok(assigned) => assigned,
143+
Err(e) => {
144+
error!(
145+
self.logger,
146+
"Failed to get subgraph assignment entity"; "deployment" => deployment, "error" => e.to_string()
147+
);
148+
return;
149+
}
150+
};
169151

170152
let logger = self.logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => self.node_id.to_string()));
171153
if let Some((assigned, is_paused)) = assigned {
172154
if &assigned == &self.node_id {
173155
if is_paused {
174156
// Subgraph is paused, so we don't start it
175157
debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore");
176-
return Ok(None);
158+
return;
177159
}
178160

179161
// Start subgraph on this node
180162
debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add");
181-
Ok(Some(AssignmentEvent::Add {
182-
deployment,
183-
node_id: self.node_id.clone(),
184-
}))
163+
self.provider.start(deployment, None).await;
185164
} else {
186165
// Ensure it is removed from this node
187166
debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove");
188-
Ok(Some(AssignmentEvent::Remove {
189-
deployment,
190-
node_id: self.node_id.clone(),
191-
}))
167+
self.provider.stop(deployment).await
192168
}
193169
} else {
194170
// Was added/updated, but is now gone.
195171
debug!(self.logger, "Deployment assignee not found in database"; "action" => "ignore");
196-
Ok(None)
197172
}
198173
}
199174
AssignmentOperation::Removed => {
200175
// Send remove event without checking node ID.
201176
// If node ID does not match, then this is a no-op when handled in
202177
// assignment provider.
203-
Ok(Some(AssignmentEvent::Remove {
204-
deployment,
205-
node_id: self.node_id.clone(),
206-
}))
178+
self.provider.stop(deployment).await;
207179
}
208180
}
209181
}
210182

211-
pub async fn assignment_events(
212-
self: Arc<Self>,
213-
) -> impl Stream<Item = Result<AssignmentEvent, Error>> + Send {
183+
pub async fn assignment_events(self: Arc<Self>) -> impl Stream<Item = AssignmentChange> + Send {
214184
self.subscription_manager
215185
.subscribe()
216186
.map(|event| futures03::stream::iter(event.changes.clone()))
217187
.flatten()
218-
.then({
219-
let this = self.cheap_clone();
220-
move |change| {
221-
let this = this.cheap_clone();
222-
223-
async move {
224-
match this.map_assignment(change).await {
225-
Ok(Some(event)) => stream::once(futures03::future::ok(event)).boxed(),
226-
Ok(None) => stream::empty().boxed(),
227-
Err(e) => stream::once(futures03::future::err(e)).boxed(),
228-
}
229-
}
230-
}
231-
})
232-
.flatten()
233188
}
234189

235190
async fn start_assigned_subgraphs(&self) -> Result<(), Error> {
@@ -456,22 +411,6 @@ where
456411
}
457412
}
458413

459-
async fn handle_assignment_event(
460-
event: AssignmentEvent,
461-
provider: Arc<impl SubgraphAssignmentProviderTrait>,
462-
) {
463-
match event {
464-
AssignmentEvent::Add {
465-
deployment,
466-
node_id: _,
467-
} => provider.start(deployment, None).await,
468-
AssignmentEvent::Remove {
469-
deployment,
470-
node_id: _,
471-
} => provider.stop(deployment).await,
472-
}
473-
}
474-
475414
/// Resolves the subgraph's earliest block
476415
async fn resolve_start_block(
477416
manifest: &SubgraphManifest<impl Blockchain>,

graph/src/data/store/mod.rs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::{
2-
components::store::DeploymentLocator,
32
derive::CacheWeight,
43
prelude::{lazy_static, q, r, s, CacheWeight, QueryExecutionError},
54
runtime::gas::{Gas, GasSizeOf},
@@ -83,28 +82,6 @@ impl<'de> de::Deserialize<'de> for NodeId {
8382
}
8483
}
8584

86-
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
87-
#[serde(tag = "type")]
88-
pub enum AssignmentEvent {
89-
Add {
90-
deployment: DeploymentLocator,
91-
node_id: NodeId,
92-
},
93-
Remove {
94-
deployment: DeploymentLocator,
95-
node_id: NodeId,
96-
},
97-
}
98-
99-
impl AssignmentEvent {
100-
pub fn node_id(&self) -> &NodeId {
101-
match self {
102-
AssignmentEvent::Add { node_id, .. } => node_id,
103-
AssignmentEvent::Remove { node_id, .. } => node_id,
104-
}
105-
}
106-
}
107-
10885
/// An entity attribute name is represented as a string.
10986
pub type Attribute = String;
11087

graph/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ pub mod prelude {
152152
Query, QueryError, QueryExecutionError, QueryResult, QueryTarget, QueryVariables,
153153
};
154154
pub use crate::data::store::scalar::{BigDecimal, BigInt, BigIntSign};
155-
pub use crate::data::store::{AssignmentEvent, Attribute, Entity, NodeId, Value, ValueType};
155+
pub use crate::data::store::{Attribute, Entity, NodeId, Value, ValueType};
156156
pub use crate::data::subgraph::schema::SubgraphDeploymentEntity;
157157
pub use crate::data::subgraph::{
158158
CreateSubgraphResult, DataSourceContext, DeploymentHash, DeploymentState, Link,

0 commit comments

Comments
 (0)