Skip to content

Commit 1486e8c

Browse files
committed
all: Use a WritableStore much earlier when running a subgraph
1 parent d6af534 commit 1486e8c

File tree

7 files changed

+23
-30
lines changed

7 files changed

+23
-30
lines changed

chain/ethereum/src/chain.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::{Context, Error};
22
use graph::blockchain::BlockchainKind;
3+
use graph::components::store::WritableStore;
34
use graph::data::subgraph::UnifiedMappingApiVersion;
45
use graph::firehose::{FirehoseEndpoints, ForkStep};
56
use graph::prelude::{
@@ -22,7 +23,6 @@ use graph::{
2223
prelude::{
2324
async_trait, lazy_static, o, serde_json as json, BlockNumber, ChainStore,
2425
EthereumBlockWithCalls, Future01CompatExt, Logger, LoggerFactory, MetricsRegistry, NodeId,
25-
SubgraphStore,
2626
},
2727
};
2828
use prost::Message;
@@ -72,7 +72,6 @@ pub struct Chain {
7272
eth_adapters: Arc<EthereumNetworkAdapters>,
7373
chain_store: Arc<dyn ChainStore>,
7474
call_cache: Arc<dyn EthereumCallCache>,
75-
subgraph_store: Arc<dyn SubgraphStore>,
7675
chain_head_update_listener: Arc<dyn ChainHeadUpdateListener>,
7776
reorg_threshold: BlockNumber,
7877
pub is_ingestible: bool,
@@ -92,7 +91,6 @@ impl Chain {
9291
registry: Arc<dyn MetricsRegistry>,
9392
chain_store: Arc<dyn ChainStore>,
9493
call_cache: Arc<dyn EthereumCallCache>,
95-
subgraph_store: Arc<dyn SubgraphStore>,
9694
firehose_endpoints: FirehoseEndpoints,
9795
eth_adapters: EthereumNetworkAdapters,
9896
chain_head_update_listener: Arc<dyn ChainHeadUpdateListener>,
@@ -108,7 +106,6 @@ impl Chain {
108106
eth_adapters: Arc::new(eth_adapters),
109107
chain_store,
110108
call_cache,
111-
subgraph_store,
112109
chain_head_update_listener,
113110
reorg_threshold,
114111
is_ingestible,
@@ -190,8 +187,8 @@ impl Blockchain for Chain {
190187
async fn new_firehose_block_stream(
191188
&self,
192189
deployment: DeploymentLocator,
190+
writable: Arc<dyn WritableStore>,
193191
start_blocks: Vec<BlockNumber>,
194-
firehose_cursor: Option<String>,
195192
filter: Arc<Self::TriggerFilter>,
196193
metrics: Arc<BlockStreamMetrics>,
197194
unified_api_version: UnifiedMappingApiVersion,
@@ -220,6 +217,7 @@ impl Blockchain for Chain {
220217
.new(o!("component" => "FirehoseBlockStream"));
221218

222219
let firehose_mapper = Arc::new(FirehoseMapper {});
220+
let firehose_cursor = writable.block_cursor()?;
223221

224222
Ok(Box::new(FirehoseBlockStream::new(
225223
firehose_endpoint,
@@ -235,6 +233,7 @@ impl Blockchain for Chain {
235233
async fn new_polling_block_stream(
236234
&self,
237235
deployment: DeploymentLocator,
236+
writable: Arc<dyn WritableStore>,
238237
start_blocks: Vec<BlockNumber>,
239238
subgraph_start_block: Option<BlockPtr>,
240239
filter: Arc<Self::TriggerFilter>,
@@ -259,12 +258,6 @@ impl Blockchain for Chain {
259258
.subgraph_logger(&deployment)
260259
.new(o!("component" => "BlockStream"));
261260
let chain_store = self.chain_store().clone();
262-
let writable = self
263-
.subgraph_store
264-
.cheap_clone()
265-
.writable(logger.clone(), deployment.id)
266-
.await
267-
.with_context(|| format!("no store for deployment `{}`", deployment.hash))?;
268261
let chain_head_update_stream = self
269262
.chain_head_update_listener
270263
.subscribe(self.name.clone(), logger.clone());

chain/near/src/chain.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use graph::blockchain::BlockchainKind;
22
use graph::cheap_clone::CheapClone;
3+
use graph::components::store::WritableStore;
34
use graph::data::subgraph::UnifiedMappingApiVersion;
45
use graph::firehose::FirehoseEndpoints;
56
use graph::prelude::StopwatchMetrics;
@@ -100,8 +101,8 @@ impl Blockchain for Chain {
100101
async fn new_firehose_block_stream(
101102
&self,
102103
deployment: DeploymentLocator,
104+
store: Arc<dyn WritableStore>,
103105
start_blocks: Vec<BlockNumber>,
104-
firehose_cursor: Option<String>,
105106
filter: Arc<Self::TriggerFilter>,
106107
metrics: Arc<BlockStreamMetrics>,
107108
unified_api_version: UnifiedMappingApiVersion,
@@ -126,6 +127,7 @@ impl Blockchain for Chain {
126127
.new(o!("component" => "FirehoseBlockStream"));
127128

128129
let firehose_mapper = Arc::new(FirehoseMapper {});
130+
let firehose_cursor = store.block_cursor()?;
129131

130132
Ok(Box::new(FirehoseBlockStream::new(
131133
firehose_endpoint,
@@ -141,6 +143,7 @@ impl Blockchain for Chain {
141143
async fn new_polling_block_stream(
142144
&self,
143145
_deployment: DeploymentLocator,
146+
_writable: Arc<dyn WritableStore>,
144147
_start_blocks: Vec<BlockNumber>,
145148
_subgraph_start_block: Option<BlockPtr>,
146149
_filter: Arc<Self::TriggerFilter>,

core/src/subgraph/instance_manager.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -472,23 +472,20 @@ async fn new_block_stream<C: Blockchain>(
472472
};
473473

474474
let block_stream = match is_firehose {
475-
true => {
476-
let firehose_cursor = inputs.store.block_cursor()?;
477-
478-
chain.new_firehose_block_stream(
479-
inputs.deployment.clone(),
480-
inputs.start_blocks.clone(),
481-
firehose_cursor,
482-
Arc::new(filter.clone()),
483-
block_stream_metrics.clone(),
484-
inputs.unified_api_version.clone(),
485-
)
486-
}
475+
true => chain.new_firehose_block_stream(
476+
inputs.deployment.clone(),
477+
inputs.store.clone(),
478+
inputs.start_blocks.clone(),
479+
Arc::new(filter.clone()),
480+
block_stream_metrics.clone(),
481+
inputs.unified_api_version.clone(),
482+
),
487483
false => {
488484
let start_block = inputs.store.block_ptr()?;
489485

490486
chain.new_polling_block_stream(
491487
inputs.deployment.clone(),
488+
inputs.store.clone(),
492489
inputs.start_blocks.clone(),
493490
start_block,
494491
Arc::new(filter.clone()),

graph/src/blockchain/mock.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use anyhow::Error;
66
use async_trait::async_trait;
77
use core::fmt;
88
use serde::Deserialize;
9-
use std::convert::TryFrom;
9+
use std::{convert::TryFrom, sync::Arc};
1010

1111
use super::{block_stream, HostFn, IngestorError, TriggerWithHandler};
1212

@@ -296,8 +296,8 @@ impl Blockchain for MockBlockchain {
296296
async fn new_firehose_block_stream(
297297
&self,
298298
_deployment: crate::components::store::DeploymentLocator,
299+
_store: Arc<dyn crate::components::store::WritableStore>,
299300
_start_blocks: Vec<crate::components::store::BlockNumber>,
300-
_firehose_cursor: Option<String>,
301301
_filter: std::sync::Arc<Self::TriggerFilter>,
302302
_metrics: std::sync::Arc<block_stream::BlockStreamMetrics>,
303303
_unified_api_version: crate::data::subgraph::UnifiedMappingApiVersion,
@@ -308,6 +308,7 @@ impl Blockchain for MockBlockchain {
308308
async fn new_polling_block_stream(
309309
&self,
310310
_deployment: crate::components::store::DeploymentLocator,
311+
_writable: Arc<dyn crate::components::store::WritableStore>,
311312
_start_blocks: Vec<crate::components::store::BlockNumber>,
312313
_subgraph_start_block: Option<BlockPtr>,
313314
_filter: std::sync::Arc<Self::TriggerFilter>,

graph/src/blockchain/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
};
2323
use crate::{
2424
components::{
25-
store::{BlockNumber, ChainStore},
25+
store::{BlockNumber, ChainStore, WritableStore},
2626
subgraph::DataSourceTemplateInfo,
2727
},
2828
prelude::{thiserror::Error, LinkResolver},
@@ -111,8 +111,8 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {
111111
async fn new_firehose_block_stream(
112112
&self,
113113
deployment: DeploymentLocator,
114+
store: Arc<dyn WritableStore>,
114115
start_blocks: Vec<BlockNumber>,
115-
firehose_cursor: Option<String>,
116116
filter: Arc<Self::TriggerFilter>,
117117
metrics: Arc<BlockStreamMetrics>,
118118
unified_api_version: UnifiedMappingApiVersion,
@@ -121,6 +121,7 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {
121121
async fn new_polling_block_stream(
122122
&self,
123123
deployment: DeploymentLocator,
124+
writable: Arc<dyn WritableStore>,
124125
start_blocks: Vec<BlockNumber>,
125126
subgraph_start_block: Option<BlockPtr>,
126127
filter: Arc<Self::TriggerFilter>,

node/src/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,6 @@ fn ethereum_networks_as_chains(
479479
registry.clone(),
480480
chain_store.cheap_clone(),
481481
chain_store,
482-
store.subgraph_store(),
483482
firehose_endpoints.map_or_else(|| FirehoseEndpoints::new(), |v| v.clone()),
484483
eth_adapters.clone(),
485484
chain_head_update_listener.clone(),

node/src/manager/commands/run.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ pub async fn run(
102102
metrics_registry.clone(),
103103
chain_store.cheap_clone(),
104104
chain_store,
105-
subgraph_store.clone(),
106105
firehose_endpoints.map_or_else(|| FirehoseEndpoints::new(), |v| v.clone()),
107106
eth_adapters,
108107
chain_head_update_listener,

0 commit comments

Comments
 (0)