Skip to content

Commit aaf42b2

Browse files
committed
store: Introduce a WritableAgent
Right now, it is only a wrapper around WritableStore, but we will evolve it into an agent that manages writing to the database asynchronously. We also stop impelementing the `WritableStore` trait for `WritableStore` since we will need to modify some method signatures.
1 parent 8e3e871 commit aaf42b2

File tree

2 files changed

+118
-6
lines changed

2 files changed

+118
-6
lines changed

store/postgres/src/subgraph_store.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::{
3838
primary,
3939
primary::{DeploymentId, Mirror as PrimaryMirror, Site},
4040
relational::Layout,
41-
writable::WritableStore,
41+
writable::WritableAgent,
4242
NotificationSender,
4343
};
4444
use crate::{
@@ -256,7 +256,7 @@ pub struct SubgraphStoreInner {
256256
sites: TimedCache<DeploymentHash, Site>,
257257
placer: Arc<dyn DeploymentPlacer + Send + Sync + 'static>,
258258
sender: Arc<NotificationSender>,
259-
writables: Mutex<HashMap<DeploymentId, Arc<WritableStore>>>,
259+
writables: Mutex<HashMap<DeploymentId, Arc<WritableAgent>>>,
260260
}
261261

262262
impl SubgraphStoreInner {
@@ -1063,7 +1063,7 @@ impl SubgraphStoreTrait for SubgraphStore {
10631063
.await
10641064
.unwrap()?; // Propagate panics, there shouldn't be any.
10651065

1066-
let writable = Arc::new(WritableStore::new(self.as_ref().clone(), logger, site)?);
1066+
let writable = Arc::new(WritableAgent::new(self.as_ref().clone(), logger, site)?);
10671067
self.writables
10681068
.lock()
10691069
.unwrap()
@@ -1077,7 +1077,7 @@ impl SubgraphStoreTrait for SubgraphStore {
10771077
id: &DeploymentHash,
10781078
) -> Result<Arc<dyn WritableStoreTrait>, StoreError> {
10791079
let site = self.site(id)?;
1080-
Ok(Arc::new(WritableStore::new(self.clone(), logger, site)?))
1080+
Ok(Arc::new(WritableAgent::new(self.clone(), logger, site)?))
10811081
}
10821082

10831083
fn is_deployed(&self, id: &DeploymentHash) -> Result<bool, StoreError> {

store/postgres/src/writable.rs

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ impl WritableStore {
136136
}
137137
}
138138

139-
#[async_trait::async_trait]
140-
impl WritableStoreTrait for WritableStore {
139+
// Methods that mirror `WritableStoreTrait`
140+
impl WritableStore {
141141
fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
142142
self.retry("block_ptr", || self.writable.block_ptr(self.site.as_ref()))
143143
}
@@ -321,3 +321,115 @@ impl WritableStoreTrait for WritableStore {
321321
fn same_subgraph(mods: &Vec<EntityModification>, id: &DeploymentHash) -> bool {
322322
mods.iter().all(|md| &md.entity_key().subgraph_id == id)
323323
}
324+
325+
#[allow(dead_code)]
326+
pub struct WritableAgent {
327+
store: Arc<WritableStore>,
328+
}
329+
330+
impl WritableAgent {
331+
pub(crate) fn new(
332+
subgraph_store: SubgraphStore,
333+
logger: Logger,
334+
site: Arc<Site>,
335+
) -> Result<Self, StoreError> {
336+
Ok(Self {
337+
store: Arc::new(WritableStore::new(subgraph_store, logger, site)?),
338+
})
339+
}
340+
}
341+
342+
#[allow(unused_variables)]
343+
#[async_trait::async_trait]
344+
impl WritableStoreTrait for WritableAgent {
345+
fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
346+
self.store.block_ptr()
347+
}
348+
349+
fn block_cursor(&self) -> Result<Option<String>, StoreError> {
350+
self.store.block_cursor()
351+
}
352+
353+
fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError> {
354+
// TODO: Spin up a background writer thread and establish a channel
355+
self.store.start_subgraph_deployment(logger)
356+
}
357+
358+
fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> {
359+
// TODO: If we haven't written the block yet, revert in memory. If
360+
// we have, revert in the database
361+
self.store.revert_block_operations(block_ptr_to)
362+
}
363+
364+
fn unfail_deterministic_error(
365+
&self,
366+
current_ptr: &BlockPtr,
367+
parent_ptr: &BlockPtr,
368+
) -> Result<(), StoreError> {
369+
self.store
370+
.unfail_deterministic_error(current_ptr, parent_ptr)
371+
}
372+
373+
fn unfail_non_deterministic_error(&self, current_ptr: &BlockPtr) -> Result<(), StoreError> {
374+
self.store.unfail_non_deterministic_error(current_ptr)
375+
}
376+
377+
async fn fail_subgraph(&self, error: SubgraphError) -> Result<(), StoreError> {
378+
self.store.fail_subgraph(error).await
379+
}
380+
381+
async fn supports_proof_of_indexing(&self) -> Result<bool, StoreError> {
382+
self.store.supports_proof_of_indexing().await
383+
}
384+
385+
fn get(&self, key: &EntityKey) -> Result<Option<EntityVersion>, StoreError> {
386+
self.store.get(key)
387+
}
388+
389+
fn transact_block_operations(
390+
&self,
391+
block_ptr_to: BlockPtr,
392+
firehose_cursor: Option<String>,
393+
mods: Vec<EntityModification>,
394+
stopwatch: StopwatchMetrics,
395+
data_sources: Vec<StoredDynamicDataSource>,
396+
deterministic_errors: Vec<SubgraphError>,
397+
) -> Result<Vec<(EntityKey, Vid)>, StoreError> {
398+
self.store.transact_block_operations(
399+
block_ptr_to,
400+
firehose_cursor,
401+
mods,
402+
stopwatch,
403+
data_sources,
404+
deterministic_errors,
405+
)
406+
}
407+
408+
fn get_many(
409+
&self,
410+
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
411+
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError> {
412+
self.store.get_many(ids_for_type)
413+
}
414+
415+
fn deployment_synced(&self) -> Result<(), StoreError> {
416+
self.store.deployment_synced()
417+
}
418+
419+
async fn is_deployment_synced(&self) -> Result<bool, StoreError> {
420+
self.store.is_deployment_synced().await
421+
}
422+
423+
fn unassign_subgraph(&self) -> Result<(), StoreError> {
424+
self.store.unassign_subgraph()
425+
}
426+
427+
async fn load_dynamic_data_sources(&self) -> Result<Vec<StoredDynamicDataSource>, StoreError> {
428+
// TODO: Combine in-memory and stored data sources
429+
self.store.load_dynamic_data_sources().await
430+
}
431+
432+
fn shard(&self) -> &str {
433+
self.store.shard()
434+
}
435+
}

0 commit comments

Comments
 (0)