Skip to content

Commit a051073

Browse files
committed
Subgraph Composition: Reading the entities for subgraph as a datasource
1 parent 4fa04b2 commit a051073

File tree

20 files changed

+457
-114
lines changed

20 files changed

+457
-114
lines changed

chain/arweave/src/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use graph::blockchain::{
77
};
88
use graph::cheap_clone::CheapClone;
99
use graph::components::network_provider::ChainName;
10-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
10+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::env::EnvVars;
1313
use graph::firehose::FirehoseEndpoint;
14-
use graph::prelude::{DeploymentHash, MetricsRegistry};
14+
use graph::prelude::MetricsRegistry;
1515
use graph::substreams::Clock;
1616
use graph::{
1717
blockchain::{
@@ -121,7 +121,7 @@ impl Blockchain for Chain {
121121
deployment: DeploymentLocator,
122122
store: impl DeploymentCursorTracker,
123123
start_blocks: Vec<BlockNumber>,
124-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
124+
_source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
125125
filter: Arc<TriggerFilterWrapper<Self>>,
126126
unified_api_version: UnifiedMappingApiVersion,
127127
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/cosmos/src/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
22
use graph::blockchain::{BlockIngestor, NoopDecoderHook, TriggerFilterWrapper};
33
use graph::components::network_provider::ChainName;
44
use graph::env::EnvVars;
5-
use graph::prelude::{DeploymentHash, MetricsRegistry};
5+
use graph::prelude::MetricsRegistry;
66
use graph::substreams::Clock;
77
use std::collections::HashSet;
88
use std::convert::TryFrom;
@@ -12,7 +12,7 @@ use graph::blockchain::block_stream::{BlockStreamError, BlockStreamMapper, Fireh
1212
use graph::blockchain::client::ChainClient;
1313
use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter};
1414
use graph::cheap_clone::CheapClone;
15-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
15+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1616
use graph::data::subgraph::UnifiedMappingApiVersion;
1717
use graph::{
1818
blockchain::{
@@ -114,7 +114,7 @@ impl Blockchain for Chain {
114114
deployment: DeploymentLocator,
115115
store: impl DeploymentCursorTracker,
116116
start_blocks: Vec<BlockNumber>,
117-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
117+
_source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
118118
filter: Arc<TriggerFilterWrapper<Self>>,
119119
unified_api_version: UnifiedMappingApiVersion,
120120
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/ethereum/src/chain.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ use graph::blockchain::{
77
TriggersAdapterSelector,
88
};
99
use graph::components::network_provider::ChainName;
10-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
10+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::firehose::{FirehoseEndpoint, ForkStep};
1313
use graph::futures03::compat::Future01CompatExt;
1414
use graph::prelude::{
15-
BlockHash, ComponentLoggerConfig, DeploymentHash, ElasticComponentLoggerConfig, EthereumBlock,
15+
BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
1616
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
1717
};
1818
use graph::schema::InputSchema;
@@ -128,7 +128,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
128128
chain: &Chain,
129129
deployment: DeploymentLocator,
130130
start_blocks: Vec<BlockNumber>,
131-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
131+
source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
132132
subgraph_current_block: Option<BlockPtr>,
133133
filter: Arc<TriggerFilterWrapper<Chain>>,
134134
unified_api_version: UnifiedMappingApiVersion,
@@ -150,7 +150,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
150150
chain: &Chain,
151151
deployment: DeploymentLocator,
152152
start_blocks: Vec<BlockNumber>,
153-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
153+
source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
154154
subgraph_current_block: Option<BlockPtr>,
155155
filter: Arc<TriggerFilterWrapper<Chain>>,
156156
unified_api_version: UnifiedMappingApiVersion,
@@ -437,7 +437,7 @@ impl Blockchain for Chain {
437437
deployment: DeploymentLocator,
438438
store: impl DeploymentCursorTracker,
439439
start_blocks: Vec<BlockNumber>,
440-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
440+
source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
441441
filter: Arc<TriggerFilterWrapper<Self>>,
442442
unified_api_version: UnifiedMappingApiVersion,
443443
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/near/src/chain.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use graph::blockchain::{
88
};
99
use graph::cheap_clone::CheapClone;
1010
use graph::components::network_provider::ChainName;
11-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
11+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1212
use graph::data::subgraph::UnifiedMappingApiVersion;
1313
use graph::env::EnvVars;
1414
use graph::firehose::FirehoseEndpoint;
1515
use graph::futures03::TryFutureExt;
16-
use graph::prelude::{DeploymentHash, MetricsRegistry};
16+
use graph::prelude::MetricsRegistry;
1717
use graph::schema::InputSchema;
1818
use graph::substreams::{Clock, Package};
1919
use graph::{
@@ -152,7 +152,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
152152
_chain: &Chain,
153153
_deployment: DeploymentLocator,
154154
_start_blocks: Vec<BlockNumber>,
155-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
155+
_source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
156156
_subgraph_current_block: Option<BlockPtr>,
157157
_filter: Arc<TriggerFilterWrapper<Chain>>,
158158
_unified_api_version: UnifiedMappingApiVersion,
@@ -232,7 +232,7 @@ impl Blockchain for Chain {
232232
deployment: DeploymentLocator,
233233
store: impl DeploymentCursorTracker,
234234
start_blocks: Vec<BlockNumber>,
235-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
235+
_source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
236236
filter: Arc<TriggerFilterWrapper<Self>>,
237237
unified_api_version: UnifiedMappingApiVersion,
238238
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/substreams/src/block_stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use graph::{
99
substreams_block_stream::SubstreamsBlockStream,
1010
Blockchain, TriggerFilterWrapper,
1111
},
12-
components::store::{DeploymentLocator, ReadStore},
12+
components::store::{DeploymentLocator, SourceableStore},
1313
data::subgraph::UnifiedMappingApiVersion,
14-
prelude::{async_trait, BlockNumber, BlockPtr, DeploymentHash},
14+
prelude::{async_trait, BlockNumber, BlockPtr},
1515
schema::InputSchema,
1616
slog::o,
1717
};
@@ -104,7 +104,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
104104
_chain: &Chain,
105105
_deployment: DeploymentLocator,
106106
_start_blocks: Vec<BlockNumber>,
107-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
107+
_source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
108108
_subgraph_current_block: Option<BlockPtr>,
109109
_filter: Arc<TriggerFilterWrapper<Chain>>,
110110
_unified_api_version: UnifiedMappingApiVersion,

chain/substreams/src/chain.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ use graph::blockchain::{
77
NoopRuntimeAdapter, TriggerFilterWrapper,
88
};
99
use graph::components::network_provider::ChainName;
10-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
10+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1111
use graph::env::EnvVars;
12-
use graph::prelude::{
13-
BlockHash, CheapClone, DeploymentHash, Entity, LoggerFactory, MetricsRegistry,
14-
};
12+
use graph::prelude::{BlockHash, CheapClone, Entity, LoggerFactory, MetricsRegistry};
1513
use graph::schema::EntityKey;
1614
use graph::{
1715
blockchain::{
@@ -142,7 +140,7 @@ impl Blockchain for Chain {
142140
deployment: DeploymentLocator,
143141
store: impl DeploymentCursorTracker,
144142
_start_blocks: Vec<BlockNumber>,
145-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
143+
_source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
146144
filter: Arc<TriggerFilterWrapper<Self>>,
147145
_unified_api_version: UnifiedMappingApiVersion,
148146
) -> Result<Box<dyn BlockStream<Self>>, Error> {

core/src/subgraph/inputs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use graph::{
22
blockchain::{block_stream::TriggersAdapterWrapper, Blockchain},
33
components::{
4-
store::{DeploymentLocator, ReadStore, SubgraphFork, WritableStore},
4+
store::{DeploymentLocator, SourceableStore, SubgraphFork, WritableStore},
55
subgraph::ProofOfIndexingVersion,
66
},
77
data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion},
88
data_source::DataSourceTemplate,
9-
prelude::{BlockNumber, DeploymentHash},
9+
prelude::BlockNumber,
1010
};
1111
use std::collections::BTreeSet;
1212
use std::sync::Arc;
@@ -16,7 +16,7 @@ pub struct IndexingInputs<C: Blockchain> {
1616
pub features: BTreeSet<SubgraphFeature>,
1717
pub start_blocks: Vec<BlockNumber>,
1818
pub end_blocks: BTreeSet<BlockNumber>,
19-
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
19+
pub source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
2020
pub stop_block: Option<BlockNumber>,
2121
pub max_end_block: Option<BlockNumber>,
2222
pub store: Arc<dyn WritableStore>,

core/src/subgraph/instance_manager.rs

Lines changed: 14 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,13 @@ use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper
1313
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
1414
use graph::components::metrics::gas::GasMetrics;
1515
use graph::components::metrics::subgraph::DeploymentStatusMetric;
16-
use graph::components::store::ReadStore;
16+
use graph::components::store::SourceableStore;
1717
use graph::components::subgraph::ProofOfIndexingVersion;
1818
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
1919
use graph::data::value::Word;
2020
use graph::data_source::causality_region::CausalityRegionSeq;
2121
use graph::env::EnvVars;
2222
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
23-
use graph::semver::Version;
2423
use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator};
2524
use graph_runtime_wasm::module::ToAscPtr;
2625
use graph_runtime_wasm::RuntimeHostBuilder;
@@ -230,50 +229,28 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
230229
}
231230
}
232231

233-
pub async fn hashes_to_read_store<C: Blockchain>(
232+
pub async fn get_sourceable_stores<C: Blockchain>(
234233
&self,
235-
logger: &Logger,
236-
link_resolver: &Arc<dyn LinkResolver>,
237234
hashes: Vec<DeploymentHash>,
238-
max_spec_version: Version,
239235
is_runner_test: bool,
240-
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn ReadStore>)>> {
241-
let mut writable_stores = Vec::new();
242-
let subgraph_store = self.subgraph_store.clone();
243-
236+
) -> anyhow::Result<Vec<Arc<dyn SourceableStore>>> {
244237
if is_runner_test {
245-
return Ok(writable_stores);
238+
return Ok(Vec::new());
246239
}
247240

248-
for hash in hashes {
249-
let file_bytes = link_resolver
250-
.cat(logger, &hash.to_ipfs_link())
251-
.await
252-
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
253-
let raw: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes)
254-
.map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?;
255-
let manifest = UnresolvedSubgraphManifest::<C>::parse(hash.cheap_clone(), raw)?;
256-
let manifest = manifest
257-
.resolve(&link_resolver, &logger, max_spec_version.clone())
258-
.await?;
241+
let mut sourceable_stores = Vec::new();
242+
let subgraph_store = self.subgraph_store.clone();
259243

244+
for hash in hashes {
260245
let loc = subgraph_store
261246
.active_locator(&hash)?
262247
.ok_or_else(|| anyhow!("no active deployment for hash {}", hash))?;
263248

264-
let readable_store = subgraph_store
265-
.clone()
266-
.readable(
267-
logger.clone(),
268-
loc.id.clone(),
269-
Arc::new(manifest.template_idx_and_name().collect()),
270-
)
271-
.await?;
272-
273-
writable_stores.push((loc.hash, readable_store));
249+
let sourceable_store = subgraph_store.clone().sourceable(loc.id.clone()).await?;
250+
sourceable_stores.push(sourceable_store);
274251
}
275252

276-
Ok(writable_stores)
253+
Ok(sourceable_stores)
277254
}
278255

279256
pub async fn build_subgraph_runner<C>(
@@ -539,27 +516,21 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
539516

540517
let decoder = Box::new(Decoder::new(decoder_hook));
541518

542-
let subgraph_data_source_read_stores = self
543-
.hashes_to_read_store::<C>(
544-
&logger,
545-
&link_resolver,
546-
subgraph_ds_source_deployments,
547-
manifest.spec_version.clone(),
548-
is_runner_test,
549-
)
519+
let subgraph_data_source_stores = self
520+
.get_sourceable_stores::<C>(subgraph_ds_source_deployments, is_runner_test)
550521
.await?;
551522

552523
let triggers_adapter = Arc::new(TriggersAdapterWrapper::new(
553524
triggers_adapter,
554-
subgraph_data_source_read_stores.clone(),
525+
subgraph_data_source_stores.clone(),
555526
));
556527

557528
let inputs = IndexingInputs {
558529
deployment: deployment.clone(),
559530
features,
560531
start_blocks,
561532
end_blocks,
562-
source_subgraph_stores: subgraph_data_source_read_stores,
533+
source_subgraph_stores: subgraph_data_source_stores,
563534
stop_block,
564535
max_end_block,
565536
store,

graph/src/blockchain/block_stream.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use super::{
1919
Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper,
2020
};
2121
use crate::anyhow::Result;
22-
use crate::components::store::{BlockNumber, DeploymentLocator, ReadStore};
22+
use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore};
2323
use crate::data::subgraph::UnifiedMappingApiVersion;
2424
use crate::firehose::{self, FirehoseEndpoint};
2525
use crate::futures03::stream::StreamExt as _;
@@ -149,7 +149,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
149149
chain: &C,
150150
deployment: DeploymentLocator,
151151
start_blocks: Vec<BlockNumber>,
152-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
152+
source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
153153
subgraph_current_block: Option<BlockPtr>,
154154
filter: Arc<TriggerFilterWrapper<C>>,
155155
unified_api_version: UnifiedMappingApiVersion,
@@ -160,7 +160,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
160160
chain: &C,
161161
deployment: DeploymentLocator,
162162
start_blocks: Vec<BlockNumber>,
163-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
163+
source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
164164
subgraph_current_block: Option<BlockPtr>,
165165
filter: Arc<TriggerFilterWrapper<C>>,
166166
unified_api_version: UnifiedMappingApiVersion,
@@ -320,13 +320,13 @@ impl<C: Blockchain> BlockWithTriggers<C> {
320320
/// logic for each chain, increasing code repetition.
321321
pub struct TriggersAdapterWrapper<C: Blockchain> {
322322
pub adapter: Arc<dyn TriggersAdapter<C>>,
323-
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
323+
pub source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
324324
}
325325

326326
impl<C: Blockchain> TriggersAdapterWrapper<C> {
327327
pub fn new(
328328
adapter: Arc<dyn TriggersAdapter<C>>,
329-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
329+
source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
330330
) -> Self {
331331
Self {
332332
adapter,

graph/src/blockchain/mock.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ use crate::{
22
bail,
33
components::{
44
link_resolver::LinkResolver,
5-
store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, ReadStore},
5+
store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, SourceableStore},
66
subgraph::InstanceDSTemplateInfo,
77
},
88
data::subgraph::UnifiedMappingApiVersion,
9-
prelude::{BlockHash, DataSourceTemplateInfo, DeploymentHash},
9+
prelude::{BlockHash, DataSourceTemplateInfo},
1010
};
1111
use anyhow::{Error, Result};
1212
use async_trait::async_trait;
@@ -386,7 +386,7 @@ impl Blockchain for MockBlockchain {
386386
_deployment: DeploymentLocator,
387387
_store: impl DeploymentCursorTracker,
388388
_start_blocks: Vec<BlockNumber>,
389-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
389+
_source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
390390
_filter: Arc<TriggerFilterWrapper<Self>>,
391391
_unified_api_version: UnifiedMappingApiVersion,
392392
) -> Result<Box<dyn BlockStream<Self>>, Error> {

0 commit comments

Comments
 (0)