Skip to content

Commit 698d668

Browse files
committed
store: Make SubgraphStore.writable idempotent
1 parent 1486e8c commit 698d668

File tree

1 file changed

+22
-7
lines changed

1 file changed

+22
-7
lines changed

store/postgres/src/subgraph_store.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ use diesel::{
44
sql_types::Text,
55
types::{FromSql, ToSql},
66
};
7-
use std::{collections::BTreeMap, collections::HashMap, sync::Arc};
7+
use std::{
8+
collections::BTreeMap,
9+
collections::HashMap,
10+
sync::{Arc, Mutex},
11+
};
812
use std::{fmt, io::Write};
913
use std::{iter::FromIterator, time::Duration};
1014

@@ -256,6 +260,7 @@ pub struct SubgraphStoreInner {
256260
sites: TimedCache<DeploymentHash, Site>,
257261
placer: Arc<dyn DeploymentPlacer + Send + Sync + 'static>,
258262
sender: Arc<NotificationSender>,
263+
writables: Mutex<HashMap<DeploymentId, Arc<WritableStore>>>,
259264
}
260265

261266
impl SubgraphStoreInner {
@@ -309,6 +314,7 @@ impl SubgraphStoreInner {
309314
sites,
310315
placer,
311316
sender,
317+
writables: Mutex::new(HashMap::new()),
312318
}
313319
}
314320

@@ -1045,19 +1051,28 @@ impl SubgraphStoreTrait for SubgraphStore {
10451051
logger: Logger,
10461052
deployment: graph::components::store::DeploymentId,
10471053
) -> Result<Arc<dyn store::WritableStore>, StoreError> {
1054+
let deployment = deployment.into();
1055+
// We cache writables to make sure calls to this method are
1056+
// idempotent and there is ever only one `WritableStore` for any
1057+
// deployment
1058+
if let Some(writable) = self.writables.lock().unwrap().get(&deployment) {
1059+
return Ok(writable.cheap_clone());
1060+
}
1061+
10481062
// Ideally the lower level functions would be asyncified.
10491063
let this = self.clone();
10501064
let site = graph::spawn_blocking_allow_panic(move || -> Result<_, StoreError> {
1051-
this.find_site(deployment.into())
1065+
this.find_site(deployment)
10521066
})
10531067
.await
10541068
.unwrap()?; // Propagate panics, there shouldn't be any.
10551069

1056-
Ok(Arc::new(WritableStore::new(
1057-
self.as_ref().clone(),
1058-
logger,
1059-
site,
1060-
)?))
1070+
let writable = Arc::new(WritableStore::new(self.as_ref().clone(), logger, site)?);
1071+
self.writables
1072+
.lock()
1073+
.unwrap()
1074+
.insert(deployment, writable.cheap_clone());
1075+
Ok(writable)
10611076
}
10621077

10631078
fn writable_for_network_indexer(

0 commit comments

Comments
 (0)