Skip to content

Commit ae28ebc

Browse files
committed
chain, core, graph : Refactor subgraph trigger creation
1 parent 6b83c3d commit ae28ebc

File tree

13 files changed

+131
-107
lines changed

13 files changed

+131
-107
lines changed

chain/arweave/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use graph::{
2727
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
2828
};
2929
use prost::Message;
30-
use std::collections::HashSet;
30+
use std::collections::{HashMap, HashSet};
3131
use std::sync::Arc;
3232

3333
use crate::adapter::TriggerFilter;
@@ -121,7 +121,7 @@ impl Blockchain for Chain {
121121
deployment: DeploymentLocator,
122122
store: impl DeploymentCursorTracker,
123123
start_blocks: Vec<BlockNumber>,
124-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
124+
_source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
125125
filter: Arc<TriggerFilterWrapper<Self>>,
126126
unified_api_version: UnifiedMappingApiVersion,
127127
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/cosmos/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use graph::components::adapter::ChainId;
44
use graph::env::EnvVars;
55
use graph::prelude::{DeploymentHash, MetricsRegistry};
66
use graph::substreams::Clock;
7-
use std::collections::HashSet;
7+
use std::collections::{HashMap, HashSet};
88
use std::convert::TryFrom;
99
use std::sync::Arc;
1010

@@ -114,7 +114,7 @@ impl Blockchain for Chain {
114114
deployment: DeploymentLocator,
115115
store: impl DeploymentCursorTracker,
116116
start_blocks: Vec<BlockNumber>,
117-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
117+
_source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
118118
filter: Arc<TriggerFilterWrapper<Self>>,
119119
unified_api_version: UnifiedMappingApiVersion,
120120
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/ethereum/src/chain.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use graph::{
3737
},
3838
};
3939
use prost::Message;
40-
use std::collections::HashSet;
40+
use std::collections::{HashMap, HashSet};
4141
use std::iter::FromIterator;
4242
use std::sync::Arc;
4343
use std::time::Duration;
@@ -128,7 +128,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
128128
chain: &Chain,
129129
deployment: DeploymentLocator,
130130
start_blocks: Vec<BlockNumber>,
131-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
131+
source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
132132
subgraph_current_block: Option<BlockPtr>,
133133
filter: Arc<TriggerFilterWrapper<Chain>>,
134134
unified_api_version: UnifiedMappingApiVersion,
@@ -150,7 +150,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
150150
chain: &Chain,
151151
deployment: DeploymentLocator,
152152
start_blocks: Vec<BlockNumber>,
153-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
153+
source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
154154
subgraph_current_block: Option<BlockPtr>,
155155
filter: Arc<TriggerFilterWrapper<Chain>>,
156156
unified_api_version: UnifiedMappingApiVersion,
@@ -437,7 +437,7 @@ impl Blockchain for Chain {
437437
deployment: DeploymentLocator,
438438
store: impl DeploymentCursorTracker,
439439
start_blocks: Vec<BlockNumber>,
440-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
440+
source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
441441
filter: Arc<TriggerFilterWrapper<Self>>,
442442
unified_api_version: UnifiedMappingApiVersion,
443443
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/near/src/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use graph::{
3232
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
3333
};
3434
use prost::Message;
35-
use std::collections::HashSet;
35+
use std::collections::{HashMap, HashSet};
3636
use std::sync::Arc;
3737

3838
use crate::adapter::TriggerFilter;
@@ -152,7 +152,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
152152
_chain: &Chain,
153153
_deployment: DeploymentLocator,
154154
_start_blocks: Vec<BlockNumber>,
155-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
155+
_source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
156156
_subgraph_current_block: Option<BlockPtr>,
157157
_filter: Arc<TriggerFilterWrapper<Chain>>,
158158
_unified_api_version: UnifiedMappingApiVersion,
@@ -232,7 +232,7 @@ impl Blockchain for Chain {
232232
deployment: DeploymentLocator,
233233
store: impl DeploymentCursorTracker,
234234
start_blocks: Vec<BlockNumber>,
235-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
235+
_source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
236236
filter: Arc<TriggerFilterWrapper<Self>>,
237237
unified_api_version: UnifiedMappingApiVersion,
238238
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/starknet/src/chain.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use graph::{
3030
slog::o,
3131
};
3232
use prost::Message;
33-
use std::{collections::HashSet, sync::Arc};
33+
use std::{
34+
collections::{HashMap, HashSet},
35+
sync::Arc,
36+
};
3437

3538
use crate::{
3639
adapter::TriggerFilter,
@@ -116,7 +119,7 @@ impl Blockchain for Chain {
116119
deployment: DeploymentLocator,
117120
store: impl DeploymentCursorTracker,
118121
start_blocks: Vec<BlockNumber>,
119-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
122+
_source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
120123
filter: Arc<TriggerFilterWrapper<Self>>,
121124
unified_api_version: UnifiedMappingApiVersion,
122125
) -> Result<Box<dyn BlockStream<Self>>, Error> {
@@ -240,7 +243,7 @@ impl BlockStreamBuilder<Chain> for StarknetStreamBuilder {
240243
_chain: &Chain,
241244
_deployment: DeploymentLocator,
242245
_start_blocks: Vec<BlockNumber>,
243-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
246+
_source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
244247
_subgraph_current_block: Option<BlockPtr>,
245248
_filter: Arc<TriggerFilterWrapper<Chain>>,
246249
_unified_api_version: UnifiedMappingApiVersion,

chain/substreams/src/block_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::Result;
2-
use std::sync::Arc;
2+
use std::{collections::HashMap, sync::Arc};
33

44
use graph::{
55
blockchain::{
@@ -104,7 +104,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
104104
_chain: &Chain,
105105
_deployment: DeploymentLocator,
106106
_start_blocks: Vec<BlockNumber>,
107-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
107+
_source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
108108
_subgraph_current_block: Option<BlockPtr>,
109109
_filter: Arc<TriggerFilterWrapper<Chain>>,
110110
_unified_api_version: UnifiedMappingApiVersion,

chain/substreams/src/chain.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use graph::{
2525
slog::Logger,
2626
};
2727

28+
use std::collections::HashMap;
2829
use std::sync::Arc;
2930

3031
// ParsedChanges are an internal representation of the equivalent operations defined on the
@@ -142,7 +143,7 @@ impl Blockchain for Chain {
142143
deployment: DeploymentLocator,
143144
store: impl DeploymentCursorTracker,
144145
_start_blocks: Vec<BlockNumber>,
145-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
146+
_source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
146147
filter: Arc<TriggerFilterWrapper<Self>>,
147148
_unified_api_version: UnifiedMappingApiVersion,
148149
) -> Result<Box<dyn BlockStream<Self>>, Error> {

core/src/subgraph/inputs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ use graph::{
88
data_source::DataSourceTemplate,
99
prelude::{BlockNumber, DeploymentHash},
1010
};
11-
use std::collections::BTreeSet;
11+
use std::collections::{BTreeSet, HashMap};
1212
use std::sync::Arc;
1313

1414
pub struct IndexingInputs<C: Blockchain> {
1515
pub deployment: DeploymentLocator,
1616
pub features: BTreeSet<SubgraphFeature>,
1717
pub start_blocks: Vec<BlockNumber>,
1818
pub end_blocks: BTreeSet<BlockNumber>,
19-
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
19+
pub source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn WritableStore>>,
2020
pub stop_block: Option<BlockNumber>,
2121
pub store: Arc<dyn WritableStore>,
2222
pub debug_fork: Option<Arc<dyn SubgraphFork>>,

core/src/subgraph/instance_manager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::subgraph::context::{IndexingContext, SubgraphKeepAlive};
33
use crate::subgraph::inputs::IndexingInputs;
44
use crate::subgraph::loader::load_dynamic_data_sources;
55
use crate::subgraph::Decoder;
6-
use std::collections::BTreeSet;
6+
use std::collections::{BTreeSet, HashMap};
77

88
use crate::subgraph::runner::SubgraphRunner;
99
use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper};
@@ -211,8 +211,8 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
211211
hashes: Vec<DeploymentHash>,
212212
max_spec_version: Version,
213213
is_runner_test: bool,
214-
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn WritableStore>)>> {
215-
let mut writable_stores = Vec::new();
214+
) -> anyhow::Result<HashMap<DeploymentHash, Arc<dyn WritableStore>>> {
215+
let mut writable_stores = HashMap::new();
216216
let subgraph_store = self.subgraph_store.clone();
217217

218218
if is_runner_test {
@@ -244,7 +244,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
244244
)
245245
.await?;
246246

247-
writable_stores.push((loc.hash, writable_store));
247+
writable_stores.insert(hash, writable_store);
248248
}
249249

250250
Ok(writable_stores)

0 commit comments

Comments
 (0)