Skip to content

Commit eff3e19

Browse files
committed
core, graph, store: Asyncify SubgraphStore.assignment_status
1 parent a5ef1bd commit eff3e19

File tree

4 files changed

+16
-9
lines changed

4 files changed

+16
-9
lines changed

core/src/subgraph/registrar.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ where
121121
self.start_assigned_subgraphs().await?;
122122

123123
// Spawn a task to handle assignment events.
124-
// Blocking due to store interactions. Won't be blocking after #905.
125124
let assignment_event_stream_cancel_handle =
126125
self.assignment_event_stream_cancel_guard.handle();
127126

@@ -147,14 +146,17 @@ where
147146
}
148147
});
149148

150-
graph::spawn_blocking(fut);
149+
graph::spawn(fut);
151150
Ok(())
152151
}
153152

154153
/// Maps an assignment change to an assignment event by checking the
155154
/// current state in the database, ignoring changes that do not affect
156155
/// this node or do not require anything to change.
157-
fn map_assignment(&self, change: AssignmentChange) -> Result<Option<AssignmentEvent>, Error> {
156+
async fn map_assignment(
157+
&self,
158+
change: AssignmentChange,
159+
) -> Result<Option<AssignmentEvent>, Error> {
158160
let (deployment, operation) = change.into_parts();
159161

160162
trace!(self.logger, "Received assignment change";
@@ -167,6 +169,7 @@ where
167169
let assigned = self
168170
.store
169171
.assignment_status(&deployment)
172+
.await
170173
.map_err(|e| anyhow!("Failed to get subgraph assignment entity: {}", e))?;
171174

172175
let logger = self.logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => self.node_id.to_string()));
@@ -223,7 +226,7 @@ where
223226
let this = this.cheap_clone();
224227

225228
async move {
226-
match this.map_assignment(change) {
229+
match this.map_assignment(change).await {
227230
Ok(Some(event)) => stream::once(futures03::future::ok(event)).boxed(),
228231
Ok(None) => stream::empty().boxed(),
229232
Err(e) => stream::once(futures03::future::err(e)).boxed(),

graph/src/components/store/traits.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ pub trait SubgraphStore: Send + Sync + 'static {
121121
/// the subgraph is assigned to, and `is_paused` is true if the
122122
/// subgraph is paused.
123123
/// Returns None if the deployment does not exist.
124-
fn assignment_status(
124+
async fn assignment_status(
125125
&self,
126126
deployment: &DeploymentLocator,
127127
) -> Result<Option<(NodeId, bool)>, StoreError>;

store/postgres/src/primary.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,8 +2088,12 @@ impl Mirror {
20882088
/// the subgraph is assigned to, and `is_paused` is true if the
20892089
/// subgraph is paused.
20902090
/// Returns None if the deployment does not exist.
2091-
pub fn assignment_status(&self, site: &Site) -> Result<Option<(NodeId, bool)>, StoreError> {
2092-
self.read(|conn| queries::assignment_status(conn, site))
2091+
pub async fn assignment_status(
2092+
&self,
2093+
site: Arc<Site>,
2094+
) -> Result<Option<(NodeId, bool)>, StoreError> {
2095+
self.read_async(move |conn| queries::assignment_status(conn, &site))
2096+
.await
20932097
}
20942098

20952099
pub fn find_active_site(&self, subgraph: &DeploymentHash) -> Result<Option<Site>, StoreError> {

store/postgres/src/subgraph_store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,12 +1455,12 @@ impl SubgraphStoreTrait for SubgraphStore {
14551455
/// the subgraph is assigned to, and `is_paused` is true if the
14561456
/// subgraph is paused.
14571457
/// Returns None if the deployment does not exist.
1458-
fn assignment_status(
1458+
async fn assignment_status(
14591459
&self,
14601460
deployment: &DeploymentLocator,
14611461
) -> Result<Option<(NodeId, bool)>, StoreError> {
14621462
let site = self.find_site(deployment.id.into())?;
1463-
self.mirror.assignment_status(site.as_ref())
1463+
self.mirror.assignment_status(site).await
14641464
}
14651465

14661466
fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError> {

0 commit comments

Comments
 (0)