Skip to content

Commit 8878caf

Browse files
author
Zoran Cvetkov
committed
cleanup WritableStore
1 parent 77850be commit 8878caf

File tree

5 files changed

+25
-46
lines changed

5 files changed

+25
-46
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use graph::data::value::Word;
1616
use graph::data_source::causality_region::CausalityRegionSeq;
1717
use graph::env::EnvVars;
1818
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
19-
use graph::semver::Version;
2019
use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator};
2120
use graph_runtime_wasm::module::ToAscPtr;
2221
use graph_runtime_wasm::RuntimeHostBuilder;
@@ -206,10 +205,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
206205

207206
pub async fn hashes_to_read_store<C: Blockchain>(
208207
&self,
209-
logger: &Logger,
210-
link_resolver: &Arc<dyn LinkResolver>,
211208
hashes: Vec<DeploymentHash>,
212-
max_spec_version: Version,
213209
is_runner_test: bool,
214210
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn SourceableStore>)>> {
215211
let mut sourceable_stores = Vec::new();
@@ -220,29 +216,11 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
220216
}
221217

222218
for hash in hashes {
223-
let file_bytes = link_resolver
224-
.cat(logger, &hash.to_ipfs_link())
225-
.await
226-
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
227-
let raw: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes)
228-
.map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?;
229-
let manifest = UnresolvedSubgraphManifest::<C>::parse(hash.cheap_clone(), raw)?;
230-
let manifest = manifest
231-
.resolve(&link_resolver, &logger, max_spec_version.clone())
232-
.await?;
233-
234219
let loc = subgraph_store
235220
.active_locator(&hash)?
236221
.ok_or_else(|| anyhow!("no active deployment for hash {}", hash))?;
237222

238-
let sourceable_store = subgraph_store
239-
.clone()
240-
.sourceable(
241-
logger.clone(),
242-
loc.id.clone(),
243-
Arc::new(manifest.template_idx_and_name().collect()),
244-
)
245-
.await?;
223+
let sourceable_store = subgraph_store.clone().sourceable(loc.id.clone()).await?;
246224

247225
sourceable_stores.push((loc.hash, sourceable_store));
248226
}
@@ -492,13 +470,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
492470
let decoder = Box::new(Decoder::new(decoder_hook));
493471

494472
let subgraph_data_source_read_stores = self
495-
.hashes_to_read_store::<C>(
496-
&logger,
497-
&link_resolver,
498-
subgraph_ds_source_deployments,
499-
manifest.spec_version.clone(),
500-
is_runner_test,
501-
)
473+
.hashes_to_read_store::<C>(subgraph_ds_source_deployments, is_runner_test)
502474
.await?;
503475

504476
let triggers_adapter = Arc::new(TriggersAdapterWrapper::new(

graph/src/components/store/traits.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,7 @@ pub trait SubgraphStore: Send + Sync + 'static {
188188

189189
async fn sourceable(
190190
self: Arc<Self>,
191-
logger: Logger,
192191
deployment: DeploymentId,
193-
manifest_idx_and_name: Arc<Vec<(u32, String)>>,
194192
) -> Result<Arc<dyn SourceableStore>, StoreError>;
195193

196194
/// Initiate a graceful shutdown of the writable that a previous call to

store/postgres/src/subgraph_store.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,18 +1539,21 @@ impl SubgraphStoreTrait for SubgraphStore {
15391539

15401540
async fn sourceable(
15411541
self: Arc<Self>,
1542-
logger: Logger,
15431542
deployment: graph::components::store::DeploymentId,
1544-
manifest_idx_and_name: Arc<Vec<(u32, String)>>,
15451543
) -> Result<Arc<dyn store::SourceableStore>, StoreError> {
1546-
let writable = self
1547-
.clone()
1548-
.writable(logger, deployment, manifest_idx_and_name)
1549-
.await?;
15501544
let deployment = deployment.into();
15511545
let site = self.find_site(deployment)?;
15521546
let store = self.for_site(&site)?;
1553-
let s = Arc::new(SourceableStore::new(site, store.clone(), writable));
1547+
let input_schema = self.input_schema(&site.deployment)?;
1548+
let block_ptr = store.block_ptr(site.clone()).await?;
1549+
let block_cursor = store.block_cursor(site.clone()).await?;
1550+
let s = Arc::new(SourceableStore::new(
1551+
site,
1552+
store.clone(),
1553+
block_ptr,
1554+
block_cursor,
1555+
input_schema,
1556+
));
15541557
Ok(s as Arc<dyn store::SourceableStore>)
15551558
}
15561559

store/postgres/src/writable.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,19 +1572,25 @@ impl ReadStore for WritableStore {
15721572
pub struct SourceableStore {
15731573
site: Arc<Site>,
15741574
store: Arc<DeploymentStore>,
1575-
writable: Arc<dyn store::WritableStore>,
1575+
block_ptr: Option<BlockPtr>,
1576+
block_cursor: FirehoseCursor,
1577+
input_schema: InputSchema,
15761578
}
15771579

15781580
impl SourceableStore {
15791581
pub fn new(
15801582
site: Arc<Site>,
15811583
store: Arc<DeploymentStore>,
1582-
writable: Arc<dyn store::WritableStore>,
1584+
block_ptr: Option<BlockPtr>,
1585+
block_cursor: FirehoseCursor,
1586+
input_schema: InputSchema,
15831587
) -> Self {
15841588
Self {
15851589
site,
15861590
store,
1587-
writable,
1591+
block_ptr,
1592+
block_cursor,
1593+
input_schema,
15881594
}
15891595
}
15901596
}
@@ -1601,15 +1607,15 @@ impl store::SourceableStore for SourceableStore {
16011607
}
16021608
impl DeploymentCursorTracker for SourceableStore {
16031609
fn input_schema(&self) -> InputSchema {
1604-
ReadStore::input_schema(&self.writable)
1610+
self.input_schema.cheap_clone()
16051611
}
16061612

16071613
fn block_ptr(&self) -> Option<BlockPtr> {
1608-
self.writable.block_ptr()
1614+
self.block_ptr.clone()
16091615
}
16101616

16111617
fn firehose_cursor(&self) -> FirehoseCursor {
1612-
self.writable.firehose_cursor()
1618+
self.block_cursor.clone()
16131619
}
16141620
}
16151621

store/test-store/tests/postgres/writable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ where
113113
.expect("we can get a writable store");
114114
let sourceable = store
115115
.subgraph_store()
116-
.sourceable(LOGGER.clone(), deployment.id, Arc::new(Vec::new()))
116+
.sourceable(deployment.id)
117117
.await
118118
.expect("we can get a writable store");
119119

0 commit comments

Comments
 (0)