Skip to content

Commit a5ef1bd

Browse files
committed
core, graph,store: Asyncify SubgraphStore.active_assignments
1 parent f62b6fe commit a5ef1bd

File tree

4 files changed

+39
-7
lines changed

4 files changed

+39
-7
lines changed

core/src/subgraph/registrar.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ where
242242
let deployments = self
243243
.store
244244
.active_assignments(&self.node_id)
245+
.await
245246
.map_err(|e| anyhow!("Error querying subgraph assignments: {}", e))?;
246247
// This operation should finish only after all subgraphs are
247248
// started. We wait for the spawned tasks to complete by giving

graph/src/components/store/traits.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ pub trait SubgraphStore: Send + Sync + 'static {
129129
fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;
130130

131131
/// Returns assignments that are not paused
132-
fn active_assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;
132+
async fn active_assignments(&self, node: &NodeId)
133+
-> Result<Vec<DeploymentLocator>, StoreError>;
133134

134135
/// Return `true` if a subgraph `name` exists, regardless of whether the
135136
/// subgraph has any deployments attached to it

store/postgres/src/primary.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use diesel::{
3030
Connection as _,
3131
};
3232
use graph::{
33+
cheap_clone::CheapClone,
3334
components::store::DeploymentLocator,
3435
data::{
3536
store::scalar::ToPrimitive,
@@ -1886,8 +1887,9 @@ pub fn is_empty(conn: &mut PgConnection) -> Result<bool, StoreError> {
18861887
/// a query returns either success or anything but a
18871888
/// `Err(StoreError::DatabaseUnavailable)`. This only works for tables that
18881889
/// are mirrored through `refresh_tables`
1890+
#[derive(Clone, CheapClone)]
18891891
pub struct Mirror {
1890-
pools: Vec<ConnectionPool>,
1892+
pools: Arc<Vec<ConnectionPool>>,
18911893
}
18921894

18931895
impl Mirror {
@@ -1917,6 +1919,7 @@ impl Mirror {
19171919
pools.push(pool.clone());
19181920
pools
19191921
});
1922+
let pools = Arc::new(pools);
19201923
Mirror { pools }
19211924
}
19221925

@@ -1925,7 +1928,7 @@ impl Mirror {
19251928
/// used for non-critical uses like command line tools
19261929
pub fn primary_only(primary: ConnectionPool) -> Mirror {
19271930
Mirror {
1928-
pools: vec![primary],
1931+
pools: Arc::new(vec![primary]),
19291932
}
19301933
}
19311934

@@ -1940,7 +1943,7 @@ impl Mirror {
19401943
mut f: impl 'a
19411944
+ FnMut(&mut PooledConnection<ConnectionManager<PgConnection>>) -> Result<T, StoreError>,
19421945
) -> Result<T, StoreError> {
1943-
for pool in &self.pools {
1946+
for pool in self.pools.as_ref() {
19441947
let mut conn = match pool.get() {
19451948
Ok(conn) => conn,
19461949
Err(StoreError::DatabaseUnavailable) => continue,
@@ -1955,6 +1958,27 @@ impl Mirror {
19551958
Err(StoreError::DatabaseUnavailable)
19561959
}
19571960

1961+
/// An async version of `read` that spawns a blocking task to do the
1962+
/// actual work. This is useful when you want to call `read` from an
1963+
/// async context
1964+
pub(crate) async fn read_async<T, F>(&self, mut f: F) -> Result<T, StoreError>
1965+
where
1966+
T: 'static + Send,
1967+
F: 'static
1968+
+ Send
1969+
+ FnMut(&mut PooledConnection<ConnectionManager<PgConnection>>) -> Result<T, StoreError>,
1970+
{
1971+
let this = self.cheap_clone();
1972+
let res = graph::spawn_blocking(async move { this.read(|conn| f(conn)) }).await;
1973+
match res {
1974+
Ok(v) => v,
1975+
Err(e) => Err(internal_error!(
1976+
"spawn_blocking in read_async failed: {}",
1977+
e
1978+
)),
1979+
}
1980+
}
1981+
19581982
/// Refresh the contents of mirrored tables from the primary (through
19591983
/// the fdw mapping that `ForeignServer` establishes)
19601984
pub(crate) fn refresh_tables(
@@ -2050,8 +2074,10 @@ impl Mirror {
20502074
self.read(|conn| queries::assignments(conn, node))
20512075
}
20522076

2053-
pub fn active_assignments(&self, node: &NodeId) -> Result<Vec<Site>, StoreError> {
2054-
self.read(|conn| queries::active_assignments(conn, node))
2077+
pub async fn active_assignments(&self, node: &NodeId) -> Result<Vec<Site>, StoreError> {
2078+
let node = node.clone();
2079+
self.read_async(move |conn| queries::active_assignments(conn, &node))
2080+
.await
20552081
}
20562082

20572083
pub fn assigned_node(&self, site: &Site) -> Result<Option<NodeId>, StoreError> {

store/postgres/src/subgraph_store.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1469,9 +1469,13 @@ impl SubgraphStoreTrait for SubgraphStore {
14691469
.map(|sites| sites.iter().map(|site| site.into()).collect())
14701470
}
14711471

1472-
fn active_assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError> {
1472+
async fn active_assignments(
1473+
&self,
1474+
node: &NodeId,
1475+
) -> Result<Vec<DeploymentLocator>, StoreError> {
14731476
self.mirror
14741477
.active_assignments(node)
1478+
.await
14751479
.map(|sites| sites.iter().map(|site| site.into()).collect())
14761480
}
14771481

0 commit comments

Comments
 (0)