Skip to content

Commit 8e3e871

Browse files
committed
store: Move WritableStore into its own module
1 parent 698d668 commit 8e3e871

File tree

3 files changed

+333
-305
lines changed

3 files changed

+333
-305
lines changed

store/postgres/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ mod store;
4848
mod store_events;
4949
mod subgraph_store;
5050
pub mod transaction_receipt;
51+
mod writable;
5152

5253
#[cfg(debug_assertions)]
5354
pub mod layout_for_tests {

store/postgres/src/subgraph_store.rs

Lines changed: 9 additions & 305 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use diesel::{
55
types::{FromSql, ToSql},
66
};
77
use std::{
8-
collections::BTreeMap,
98
collections::HashMap,
109
sync::{Arc, Mutex},
1110
};
@@ -17,32 +16,29 @@ use graph::{
1716
components::{
1817
server::index_node::VersionInfo,
1918
store::{
20-
self, DeploymentLocator, EnsLookup as EnsLookupTrait, EntityType,
19+
self, DeploymentLocator, EnsLookup as EnsLookupTrait,
2120
WritableStore as WritableStoreTrait,
2221
},
2322
},
2423
constraint_violation,
2524
data::query::QueryTarget,
26-
data::subgraph::schema::{self, SubgraphError},
2725
data::subgraph::status,
2826
prelude::StoreEvent,
2927
prelude::SubgraphDeploymentEntity,
3028
prelude::{
3129
anyhow, futures03::future::join_all, lazy_static, o, web3::types::Address, ApiSchema,
32-
BlockPtr, DeploymentHash, Entity, EntityKey, EntityModification, Error, Logger, NodeId,
33-
Schema, StopwatchMetrics, StoreError, SubgraphName, SubgraphStore as SubgraphStoreTrait,
34-
SubgraphVersionSwitchingMode,
30+
BlockPtr, DeploymentHash, Logger, NodeId, Schema, StoreError, SubgraphName,
31+
SubgraphStore as SubgraphStoreTrait, SubgraphVersionSwitchingMode,
3532
},
36-
slog::{error, warn},
37-
util::{backoff::ExponentialBackoff, timed_cache::TimedCache},
33+
util::timed_cache::TimedCache,
3834
};
39-
use store::StoredDynamicDataSource;
4035

4136
use crate::{
4237
connection_pool::ConnectionPool,
4338
primary,
4439
primary::{DeploymentId, Mirror as PrimaryMirror, Site},
4540
relational::Layout,
41+
writable::WritableStore,
4642
NotificationSender,
4743
};
4844
use crate::{
@@ -382,13 +378,13 @@ impl SubgraphStoreInner {
382378
Ok((store, site))
383379
}
384380

385-
fn for_site(&self, site: &Site) -> Result<&Arc<DeploymentStore>, StoreError> {
381+
pub(crate) fn for_site(&self, site: &Site) -> Result<&Arc<DeploymentStore>, StoreError> {
386382
self.stores
387383
.get(&site.shard)
388384
.ok_or(StoreError::UnknownShard(site.shard.as_str().to_string()))
389385
}
390386

391-
fn layout(&self, id: &DeploymentHash) -> Result<Arc<Layout>, StoreError> {
387+
pub(crate) fn layout(&self, id: &DeploymentHash) -> Result<Arc<Layout>, StoreError> {
392388
let (store, site) = self.store(id)?;
393389
store.find_layout(site)
394390
}
@@ -669,7 +665,7 @@ impl SubgraphStoreInner {
669665
/// connections can deadlock the entire process if the pool runs out
670666
/// of connections in between getting the first one and trying to get the
671667
/// second one.
672-
fn primary_conn(&self) -> Result<primary::Connection, StoreError> {
668+
pub(crate) fn primary_conn(&self) -> Result<primary::Connection, StoreError> {
673669
let conn = self.mirror.primary().get()?;
674670
Ok(primary::Connection::new(conn))
675671
}
@@ -926,7 +922,7 @@ impl SubgraphStoreInner {
926922
pub fn find(
927923
&self,
928924
query: graph::prelude::EntityQuery,
929-
) -> Result<Vec<Entity>, graph::prelude::QueryExecutionError> {
925+
) -> Result<Vec<graph::prelude::Entity>, graph::prelude::QueryExecutionError> {
930926
let (store, site) = self.store(&query.subgraph_id)?;
931927
store.find(site, query)
932928
}
@@ -1107,295 +1103,3 @@ impl SubgraphStoreTrait for SubgraphStore {
11071103
.collect())
11081104
}
11091105
}
1110-
1111-
/// A wrapper around `SubgraphStore` that only exposes functions that are
1112-
/// safe to call from `WritableStore`, i.e., functions that either do not
1113-
/// deal with anything that depends on a specific deployment
1114-
/// location/instance, or where the result is independent of the deployment
1115-
/// instance
1116-
struct WritableSubgraphStore(SubgraphStore);
1117-
1118-
impl WritableSubgraphStore {
1119-
fn primary_conn(&self) -> Result<primary::Connection, StoreError> {
1120-
self.0.primary_conn()
1121-
}
1122-
1123-
pub(crate) fn send_store_event(&self, event: &StoreEvent) -> Result<(), StoreError> {
1124-
self.0.send_store_event(event)
1125-
}
1126-
1127-
fn layout(&self, id: &DeploymentHash) -> Result<Arc<Layout>, StoreError> {
1128-
self.0.layout(id)
1129-
}
1130-
}
1131-
1132-
struct WritableStore {
1133-
logger: Logger,
1134-
store: WritableSubgraphStore,
1135-
writable: Arc<DeploymentStore>,
1136-
site: Arc<Site>,
1137-
input_schema: Arc<Schema>,
1138-
}
1139-
1140-
impl WritableStore {
1141-
const BACKOFF_BASE: Duration = Duration::from_millis(100);
1142-
const BACKOFF_CEIL: Duration = Duration::from_secs(10);
1143-
1144-
fn new(
1145-
subgraph_store: SubgraphStore,
1146-
logger: Logger,
1147-
site: Arc<Site>,
1148-
) -> Result<Self, StoreError> {
1149-
let store = WritableSubgraphStore(subgraph_store.clone());
1150-
let writable = subgraph_store.for_site(site.as_ref())?.clone();
1151-
let input_schema = subgraph_store.input_schema(&site.deployment)?;
1152-
Ok(Self {
1153-
logger,
1154-
store,
1155-
writable,
1156-
site,
1157-
input_schema,
1158-
})
1159-
}
1160-
1161-
fn log_backoff_warning(&self, op: &str, backoff: &ExponentialBackoff) {
1162-
warn!(self.logger,
1163-
"database unavailable, will retry";
1164-
"operation" => op,
1165-
"attempt" => backoff.attempt,
1166-
"delay_ms" => backoff.delay().as_millis());
1167-
}
1168-
1169-
fn retry<T, F>(&self, op: &str, f: F) -> Result<T, StoreError>
1170-
where
1171-
F: Fn() -> Result<T, StoreError>,
1172-
{
1173-
let mut backoff = ExponentialBackoff::new(Self::BACKOFF_BASE, Self::BACKOFF_CEIL);
1174-
loop {
1175-
match f() {
1176-
Ok(v) => return Ok(v),
1177-
Err(StoreError::DatabaseUnavailable) => {
1178-
self.log_backoff_warning(op, &backoff);
1179-
}
1180-
Err(e) => return Err(e),
1181-
}
1182-
backoff.sleep();
1183-
}
1184-
}
1185-
1186-
async fn retry_async<T, F, Fut>(&self, op: &str, f: F) -> Result<T, StoreError>
1187-
where
1188-
F: Fn() -> Fut,
1189-
Fut: std::future::Future<Output = Result<T, StoreError>>,
1190-
{
1191-
let mut backoff = ExponentialBackoff::new(Self::BACKOFF_BASE, Self::BACKOFF_CEIL);
1192-
loop {
1193-
match f().await {
1194-
Ok(v) => return Ok(v),
1195-
Err(StoreError::DatabaseUnavailable) => {
1196-
self.log_backoff_warning(op, &backoff);
1197-
}
1198-
Err(e) => return Err(e),
1199-
}
1200-
backoff.sleep_async().await;
1201-
}
1202-
}
1203-
1204-
/// Try to send a `StoreEvent`; if sending fails, log the error but
1205-
/// return `Ok(())`
1206-
fn try_send_store_event(&self, event: StoreEvent) -> Result<(), StoreError> {
1207-
if *SEND_SUBSCRIPTION_NOTIFICATIONS {
1208-
let _ = self.store.send_store_event(&event).map_err(
1209-
|e| error!(self.logger, "Could not send store event"; "error" => e.to_string()),
1210-
);
1211-
Ok(())
1212-
} else {
1213-
Ok(())
1214-
}
1215-
}
1216-
}
1217-
1218-
#[async_trait::async_trait]
1219-
impl WritableStoreTrait for WritableStore {
1220-
fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
1221-
self.retry("block_ptr", || self.writable.block_ptr(self.site.as_ref()))
1222-
}
1223-
1224-
fn block_cursor(&self) -> Result<Option<String>, StoreError> {
1225-
self.writable.block_cursor(self.site.as_ref())
1226-
}
1227-
1228-
fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError> {
1229-
self.retry("start_subgraph_deployment", || {
1230-
let store = &self.writable;
1231-
1232-
let graft_base = match store.graft_pending(&self.site.deployment)? {
1233-
Some((base_id, base_ptr)) => {
1234-
let src = self.store.layout(&base_id)?;
1235-
Some((src, base_ptr))
1236-
}
1237-
None => None,
1238-
};
1239-
store.start_subgraph(logger, self.site.clone(), graft_base)?;
1240-
self.store.primary_conn()?.copy_finished(self.site.as_ref())
1241-
})
1242-
}
1243-
1244-
fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> {
1245-
self.retry("revert_block_operations", || {
1246-
let event = self
1247-
.writable
1248-
.revert_block_operations(self.site.clone(), block_ptr_to.clone())?;
1249-
self.try_send_store_event(event)
1250-
})
1251-
}
1252-
1253-
fn unfail_deterministic_error(
1254-
&self,
1255-
current_ptr: &BlockPtr,
1256-
parent_ptr: &BlockPtr,
1257-
) -> Result<(), StoreError> {
1258-
self.retry("unfail_deterministic_error", || {
1259-
self.writable
1260-
.unfail_deterministic_error(self.site.clone(), current_ptr, parent_ptr)
1261-
})
1262-
}
1263-
1264-
fn unfail_non_deterministic_error(&self, current_ptr: &BlockPtr) -> Result<(), StoreError> {
1265-
self.retry("unfail_non_deterministic_error", || {
1266-
self.writable
1267-
.unfail_non_deterministic_error(self.site.clone(), current_ptr)
1268-
})
1269-
}
1270-
1271-
async fn fail_subgraph(&self, error: SubgraphError) -> Result<(), StoreError> {
1272-
self.retry_async("fail_subgraph", || {
1273-
let error = error.clone();
1274-
async {
1275-
self.writable
1276-
.clone()
1277-
.fail_subgraph(self.site.deployment.clone(), error)
1278-
.await
1279-
}
1280-
})
1281-
.await
1282-
}
1283-
1284-
async fn supports_proof_of_indexing(&self) -> Result<bool, StoreError> {
1285-
self.retry_async("supports_proof_of_indexing", || async {
1286-
self.writable
1287-
.supports_proof_of_indexing(self.site.clone())
1288-
.await
1289-
})
1290-
.await
1291-
}
1292-
1293-
fn get(&self, key: &EntityKey) -> Result<Option<Entity>, StoreError> {
1294-
self.retry("get", || self.writable.get(self.site.cheap_clone(), key))
1295-
}
1296-
1297-
fn transact_block_operations(
1298-
&self,
1299-
block_ptr_to: BlockPtr,
1300-
firehose_cursor: Option<String>,
1301-
mods: Vec<EntityModification>,
1302-
stopwatch: StopwatchMetrics,
1303-
data_sources: Vec<StoredDynamicDataSource>,
1304-
deterministic_errors: Vec<SubgraphError>,
1305-
) -> Result<(), StoreError> {
1306-
assert!(
1307-
same_subgraph(&mods, &self.site.deployment),
1308-
"can only transact operations within one shard"
1309-
);
1310-
self.retry("transact_block_operations", move || {
1311-
let event = self.writable.transact_block_operations(
1312-
self.site.clone(),
1313-
&block_ptr_to,
1314-
firehose_cursor.as_deref(),
1315-
&mods,
1316-
stopwatch.cheap_clone(),
1317-
&data_sources,
1318-
&deterministic_errors,
1319-
)?;
1320-
1321-
let _section = stopwatch.start_section("send_store_event");
1322-
self.try_send_store_event(event)
1323-
})
1324-
}
1325-
1326-
fn get_many(
1327-
&self,
1328-
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
1329-
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
1330-
self.retry("get_many", || {
1331-
self.writable
1332-
.get_many(self.site.cheap_clone(), &ids_for_type)
1333-
})
1334-
}
1335-
1336-
async fn is_deployment_synced(&self) -> Result<bool, StoreError> {
1337-
self.retry_async("is_deployment_synced", || async {
1338-
self.writable
1339-
.exists_and_synced(self.site.deployment.cheap_clone())
1340-
.await
1341-
})
1342-
.await
1343-
}
1344-
1345-
fn unassign_subgraph(&self) -> Result<(), StoreError> {
1346-
self.retry("unassign_subgraph", || {
1347-
let pconn = self.store.primary_conn()?;
1348-
pconn.transaction(|| -> Result<_, StoreError> {
1349-
let changes = pconn.unassign_subgraph(self.site.as_ref())?;
1350-
pconn.send_store_event(&self.store.0.sender, &StoreEvent::new(changes))
1351-
})
1352-
})
1353-
}
1354-
1355-
async fn load_dynamic_data_sources(&self) -> Result<Vec<StoredDynamicDataSource>, StoreError> {
1356-
self.retry_async("load_dynamic_data_sources", || async {
1357-
self.writable
1358-
.load_dynamic_data_sources(self.site.deployment.clone())
1359-
.await
1360-
})
1361-
.await
1362-
}
1363-
1364-
fn deployment_synced(&self) -> Result<(), StoreError> {
1365-
self.retry("deployment_synced", || {
1366-
let event = {
1367-
// Make sure we drop `pconn` before we call into the deployment
1368-
// store so that we do not hold two database connections which
1369-
// might come from the same pool and could therefore deadlock
1370-
let pconn = self.store.primary_conn()?;
1371-
pconn.transaction(|| -> Result<_, Error> {
1372-
let changes = pconn.promote_deployment(&self.site.deployment)?;
1373-
Ok(StoreEvent::new(changes))
1374-
})?
1375-
};
1376-
1377-
self.writable.deployment_synced(&self.site.deployment)?;
1378-
1379-
self.store.send_store_event(&event)
1380-
})
1381-
}
1382-
1383-
fn shard(&self) -> &str {
1384-
self.site.shard.as_str()
1385-
}
1386-
1387-
async fn health(&self, id: &DeploymentHash) -> Result<schema::SubgraphHealth, StoreError> {
1388-
self.retry_async("health", || async {
1389-
self.writable.health(id).await.map(Into::into)
1390-
})
1391-
.await
1392-
}
1393-
1394-
fn input_schema(&self) -> Arc<Schema> {
1395-
self.input_schema.clone()
1396-
}
1397-
}
1398-
1399-
fn same_subgraph(mods: &Vec<EntityModification>, id: &DeploymentHash) -> bool {
1400-
mods.iter().all(|md| &md.entity_key().subgraph_id == id)
1401-
}

0 commit comments

Comments
 (0)