Skip to content

Commit d0c7215

Browse files
authored
Move the substreams block stream process to earlier in the pipeline when parallel processing is possible (#4851)
1 parent 24bc7b0 commit d0c7215

File tree

28 files changed

+599
-390
lines changed

28 files changed

+599
-390
lines changed

Cargo.lock

Lines changed: 27 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/ethereum/src/chain.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use graph::prelude::{
1010
BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
1111
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
1212
};
13+
use graph::schema::InputSchema;
1314
use graph::{
1415
blockchain::{
1516
block_stream::{
@@ -102,6 +103,18 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
102103
)))
103104
}
104105

106+
async fn build_substreams(
107+
&self,
108+
_chain: &Chain,
109+
_schema: Arc<InputSchema>,
110+
_deployment: DeploymentLocator,
111+
_block_cursor: FirehoseCursor,
112+
_subgraph_current_block: Option<BlockPtr>,
113+
_filter: Arc<<Chain as Blockchain>::TriggerFilter>,
114+
) -> Result<Box<dyn BlockStream<Chain>>> {
115+
unimplemented!()
116+
}
117+
105118
async fn build_polling(
106119
&self,
107120
chain: &Chain,

chain/near/src/chain.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use graph::components::store::DeploymentCursorTracker;
99
use graph::data::subgraph::UnifiedMappingApiVersion;
1010
use graph::firehose::FirehoseEndpoint;
1111
use graph::prelude::{MetricsRegistry, TryFutureExt};
12+
use graph::schema::InputSchema;
1213
use graph::{
1314
anyhow::Result,
1415
blockchain::{
@@ -40,6 +41,18 @@ pub struct NearStreamBuilder {}
4041

4142
#[async_trait]
4243
impl BlockStreamBuilder<Chain> for NearStreamBuilder {
44+
async fn build_substreams(
45+
&self,
46+
_chain: &Chain,
47+
_schema: Arc<InputSchema>,
48+
_deployment: DeploymentLocator,
49+
_block_cursor: FirehoseCursor,
50+
_subgraph_current_block: Option<BlockPtr>,
51+
_filter: Arc<<Chain as Blockchain>::TriggerFilter>,
52+
) -> Result<Box<dyn BlockStream<Chain>>> {
53+
unimplemented!()
54+
}
55+
4356
async fn build_firehose(
4457
&self,
4558
chain: &Chain,

chain/substreams/examples/substreams.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async fn main() -> Result<(), Error> {
6868
client,
6969
None,
7070
None,
71-
Arc::new(Mapper {}),
71+
Arc::new(Mapper { schema: None }),
7272
package.modules.clone(),
7373
module_name.to_string(),
7474
vec![12369621],

chain/substreams/src/block_ingestor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ impl SubstreamsBlockIngestor {
125125
#[async_trait]
126126
impl BlockIngestor for SubstreamsBlockIngestor {
127127
async fn run(self: Box<Self>) {
128-
let mapper = Arc::new(Mapper {});
128+
let mapper = Arc::new(Mapper { schema: None });
129129
let mut latest_cursor = self.fetch_head_cursor().await;
130130
let mut backoff =
131131
ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30));

chain/substreams/src/block_stream.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use graph::{
1212
components::store::DeploymentLocator,
1313
data::subgraph::UnifiedMappingApiVersion,
1414
prelude::{async_trait, BlockNumber, BlockPtr},
15+
schema::InputSchema,
1516
slog::o,
1617
};
1718

@@ -30,17 +31,18 @@ impl BlockStreamBuilder {
3031
/// is very similar, so we can re-use the configuration and the builder for it.
3132
/// This is probably something to improve but for now it works.
3233
impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
33-
async fn build_firehose(
34+
async fn build_substreams(
3435
&self,
3536
chain: &Chain,
37+
schema: Arc<InputSchema>,
3638
deployment: DeploymentLocator,
3739
block_cursor: FirehoseCursor,
38-
_start_blocks: Vec<BlockNumber>,
3940
subgraph_current_block: Option<BlockPtr>,
40-
filter: Arc<TriggerFilter>,
41-
_unified_api_version: UnifiedMappingApiVersion,
41+
filter: Arc<<Chain as Blockchain>::TriggerFilter>,
4242
) -> Result<Box<dyn BlockStream<Chain>>> {
43-
let mapper = Arc::new(Mapper {});
43+
let mapper = Arc::new(Mapper {
44+
schema: Some(schema),
45+
});
4446

4547
let logger = chain
4648
.logger_factory
@@ -62,6 +64,19 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
6264
)))
6365
}
6466

67+
async fn build_firehose(
68+
&self,
69+
_chain: &Chain,
70+
_deployment: DeploymentLocator,
71+
_block_cursor: FirehoseCursor,
72+
_start_blocks: Vec<BlockNumber>,
73+
_subgraph_current_block: Option<BlockPtr>,
74+
_filter: Arc<TriggerFilter>,
75+
_unified_api_version: UnifiedMappingApiVersion,
76+
) -> Result<Box<dyn BlockStream<Chain>>> {
77+
unimplemented!()
78+
}
79+
6580
async fn build_polling(
6681
&self,
6782
_chain: &Chain,

chain/substreams/src/chain.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ use crate::{data_source::*, EntityChanges, TriggerData, TriggerFilter, TriggersA
33
use anyhow::Error;
44
use graph::blockchain::client::ChainClient;
55
use graph::blockchain::{
6-
BasicBlockchainBuilder, BlockIngestor, BlockchainBuilder, EmptyNodeCapabilities,
7-
NoopRuntimeAdapter,
6+
BasicBlockchainBuilder, BlockIngestor, EmptyNodeCapabilities, NoopRuntimeAdapter,
87
};
9-
use graph::components::store::DeploymentCursorTracker;
8+
use graph::components::store::{DeploymentCursorTracker, EntityKey};
109
use graph::firehose::FirehoseEndpoints;
11-
use graph::prelude::{BlockHash, CheapClone, LoggerFactory, MetricsRegistry};
10+
use graph::prelude::{BlockHash, CheapClone, Entity, LoggerFactory, MetricsRegistry};
1211
use graph::{
1312
blockchain::{
1413
self,
@@ -20,13 +19,29 @@ use graph::{
2019
prelude::{async_trait, BlockNumber, ChainStore},
2120
slog::Logger,
2221
};
22+
2323
use std::sync::Arc;
2424

25+
// ParsedChanges are an internal representation of the equivalent operations defined on the
26+
// graph-out format used by substreams.
27+
// Unset serves as a sentinel value, if for some reason an unknown value is sent or the value
28+
// was empty then it's probably an unintended behaviour. This code was moved here for performance
29+
// reasons, but the validation is still performed during trigger processing so while Unset will
30+
// very likely just indicate an error somewhere, as far as the stream is concerned we just pass
31+
// that along and let the downstream components deal with it.
32+
#[derive(Debug, Clone)]
33+
pub enum ParsedChanges {
34+
Unset,
35+
Delete(EntityKey),
36+
Upsert { key: EntityKey, entity: Entity },
37+
}
38+
2539
#[derive(Default, Debug, Clone)]
2640
pub struct Block {
2741
pub hash: BlockHash,
2842
pub number: BlockNumber,
2943
pub changes: EntityChanges,
44+
pub parsed_changes: Vec<ParsedChanges>,
3045
}
3146

3247
impl blockchain::Block for Block {
@@ -112,19 +127,18 @@ impl Blockchain for Chain {
112127
&self,
113128
deployment: DeploymentLocator,
114129
store: impl DeploymentCursorTracker,
115-
start_blocks: Vec<BlockNumber>,
130+
_start_blocks: Vec<BlockNumber>,
116131
filter: Arc<Self::TriggerFilter>,
117-
unified_api_version: UnifiedMappingApiVersion,
132+
_unified_api_version: UnifiedMappingApiVersion,
118133
) -> Result<Box<dyn BlockStream<Self>>, Error> {
119134
self.block_stream_builder
120-
.build_firehose(
135+
.build_substreams(
121136
self,
137+
store.input_schema(),
122138
deployment,
123139
store.firehose_cursor(),
124-
start_blocks,
125140
store.block_ptr(),
126141
filter,
127-
unified_api_version,
128142
)
129143
.await
130144
}
@@ -177,7 +191,7 @@ impl Blockchain for Chain {
177191
}
178192
}
179193

180-
impl BlockchainBuilder<super::Chain> for BasicBlockchainBuilder {
194+
impl blockchain::BlockchainBuilder<super::Chain> for BasicBlockchainBuilder {
181195
fn build(self) -> super::Chain {
182196
let BasicBlockchainBuilder {
183197
logger_factory,

0 commit comments

Comments
 (0)