Skip to content

Commit 686e7e1

Browse files
committed
chain, core, graph : use TriggersAdapterWrapper at top level
1 parent ae7559d commit 686e7e1

File tree

9 files changed

+69
-155
lines changed

9 files changed

+69
-155
lines changed

chain/ethereum/src/chain.rs

Lines changed: 22 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -133,82 +133,40 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
133133
filter: Arc<TriggerFilterWrapper<Chain>>,
134134
unified_api_version: UnifiedMappingApiVersion,
135135
) -> Result<Box<dyn BlockStream<Chain>>> {
136-
let requirements = filter.filter.node_capabilities();
137-
let adapter = chain
138-
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
139-
.unwrap_or_else(|_| {
140-
panic!(
141-
"no adapter for network {} with capabilities {}",
142-
chain.name, requirements
143-
)
144-
});
145-
146-
let adapter = Arc::new(TriggersAdapterWrapper::new(adapter, source_subgraph_stores));
147-
148-
let logger = chain
149-
.logger_factory
150-
.subgraph_logger(&deployment)
151-
.new(o!("component" => "BlockStream"));
152-
let chain_store = chain.chain_store();
153-
let chain_head_update_stream = chain
154-
.chain_head_update_listener
155-
.subscribe(chain.name.to_string(), logger.clone());
156-
157-
// Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used.
158-
// This is ok because Celo blocks are always final. And we _need_ to do this because
159-
// some events appear only in eth_getLogs but not in transaction receipts.
160-
// See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
161-
let chain_id = match chain.chain_client().as_ref() {
162-
ChainClient::Rpc(adapter) => {
163-
adapter
164-
.cheapest()
165-
.await
166-
.ok_or(anyhow!("unable to get eth adapter for chan_id call"))?
167-
.chain_id()
168-
.await?
169-
}
170-
_ => panic!("expected rpc when using polling blockstream"),
171-
};
172-
let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) {
173-
false => chain.reorg_threshold,
174-
true => 0,
175-
};
176-
177-
Ok(Box::new(PollingBlockStream::new(
178-
chain_store,
179-
chain_head_update_stream,
180-
adapter,
181-
chain.node_id.clone(),
182-
deployment.hash,
183-
filter,
136+
self.build_polling(
137+
chain,
138+
deployment,
184139
start_blocks,
185-
reorg_threshold,
186-
logger,
187-
ENV_VARS.max_block_range_size,
188-
ENV_VARS.target_triggers_per_block_range,
189-
unified_api_version,
140+
source_subgraph_stores,
190141
subgraph_current_block,
191-
)))
142+
filter,
143+
unified_api_version,
144+
)
145+
.await
192146
}
193147

194148
async fn build_polling(
195149
&self,
196150
chain: &Chain,
197151
deployment: DeploymentLocator,
198152
start_blocks: Vec<BlockNumber>,
153+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
199154
subgraph_current_block: Option<BlockPtr>,
200155
filter: Arc<TriggerFilterWrapper<Chain>>,
201156
unified_api_version: UnifiedMappingApiVersion,
202157
) -> Result<Box<dyn BlockStream<Chain>>> {
203158
let requirements = filter.filter.node_capabilities();
204-
let adapter = chain
205-
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
206-
.unwrap_or_else(|_| {
207-
panic!(
208-
"no adapter for network {} with capabilities {}",
209-
chain.name, requirements
210-
)
211-
});
159+
let adapter = TriggersAdapterWrapper::new(
160+
chain
161+
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
162+
.unwrap_or_else(|_| {
163+
panic!(
164+
"no adapter for network {} with capabilities {}",
165+
chain.name, requirements
166+
)
167+
}),
168+
source_subgraph_stores,
169+
);
212170

213171
let logger = chain
214172
.logger_factory
@@ -242,7 +200,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
242200
Ok(Box::new(PollingBlockStream::new(
243201
chain_store,
244202
chain_head_update_stream,
245-
adapter,
203+
Arc::new(adapter),
246204
chain.node_id.clone(),
247205
deployment.hash,
248206
filter,
@@ -507,6 +465,7 @@ impl Blockchain for Chain {
507465
self,
508466
deployment,
509467
start_blocks,
468+
source_subgraph_stores,
510469
current_ptr,
511470
filter,
512471
unified_api_version,

chain/near/src/chain.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,20 +108,6 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
108108
chain.metrics_registry.clone(),
109109
)))
110110
}
111-
112-
async fn build_subgraph_block_stream(
113-
&self,
114-
_chain: &Chain,
115-
_deployment: DeploymentLocator,
116-
_start_blocks: Vec<BlockNumber>,
117-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
118-
_subgraph_current_block: Option<BlockPtr>,
119-
_filter: Arc<TriggerFilterWrapper<Chain>>,
120-
_unified_api_version: UnifiedMappingApiVersion,
121-
) -> Result<Box<dyn BlockStream<Chain>>> {
122-
unimplemented!()
123-
}
124-
125111
async fn build_firehose(
126112
&self,
127113
chain: &Chain,
@@ -164,6 +150,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
164150
_chain: &Chain,
165151
_deployment: DeploymentLocator,
166152
_start_blocks: Vec<BlockNumber>,
153+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
167154
_subgraph_current_block: Option<BlockPtr>,
168155
_filter: Arc<TriggerFilterWrapper<Chain>>,
169156
_unified_api_version: UnifiedMappingApiVersion,

chain/starknet/src/chain.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -197,19 +197,6 @@ impl BlockStreamBuilder<Chain> for StarknetStreamBuilder {
197197
unimplemented!()
198198
}
199199

200-
async fn build_subgraph_block_stream(
201-
&self,
202-
_chain: &Chain,
203-
_deployment: DeploymentLocator,
204-
_start_blocks: Vec<BlockNumber>,
205-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
206-
_subgraph_current_block: Option<BlockPtr>,
207-
_filter: Arc<TriggerFilterWrapper<Chain>>,
208-
_unified_api_version: UnifiedMappingApiVersion,
209-
) -> Result<Box<dyn BlockStream<Chain>>> {
210-
unimplemented!()
211-
}
212-
213200
async fn build_firehose(
214201
&self,
215202
chain: &Chain,
@@ -252,6 +239,7 @@ impl BlockStreamBuilder<Chain> for StarknetStreamBuilder {
252239
_chain: &Chain,
253240
_deployment: DeploymentLocator,
254241
_start_blocks: Vec<BlockNumber>,
242+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
255243
_subgraph_current_block: Option<BlockPtr>,
256244
_filter: Arc<TriggerFilterWrapper<Chain>>,
257245
_unified_api_version: UnifiedMappingApiVersion,

chain/substreams/src/block_stream.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -99,24 +99,12 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
9999
unimplemented!()
100100
}
101101

102-
async fn build_subgraph_block_stream(
103-
&self,
104-
_chain: &Chain,
105-
_deployment: DeploymentLocator,
106-
_start_blocks: Vec<BlockNumber>,
107-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
108-
_subgraph_current_block: Option<BlockPtr>,
109-
_filter: Arc<TriggerFilterWrapper<Chain>>,
110-
_unified_api_version: UnifiedMappingApiVersion,
111-
) -> Result<Box<dyn BlockStream<Chain>>> {
112-
unimplemented!()
113-
}
114-
115102
async fn build_polling(
116103
&self,
117104
_chain: &Chain,
118105
_deployment: DeploymentLocator,
119106
_start_blocks: Vec<BlockNumber>,
107+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
120108
_subgraph_current_block: Option<BlockPtr>,
121109
_filter: Arc<TriggerFilterWrapper<Chain>>,
122110
_unified_api_version: UnifiedMappingApiVersion,

core/src/subgraph/inputs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use graph::{
2-
blockchain::{Blockchain, TriggersAdapter},
2+
blockchain::{block_stream::TriggersAdapterWrapper, Blockchain},
33
components::{
44
store::{DeploymentLocator, SubgraphFork, WritableStore},
55
subgraph::ProofOfIndexingVersion,
@@ -20,7 +20,7 @@ pub struct IndexingInputs<C: Blockchain> {
2020
pub stop_block: Option<BlockNumber>,
2121
pub store: Arc<dyn WritableStore>,
2222
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
23-
pub triggers_adapter: Arc<dyn TriggersAdapter<C>>,
23+
pub triggers_adapter: Arc<TriggersAdapterWrapper<C>>,
2424
pub chain: Arc<C>,
2525
pub templates: Arc<Vec<DataSourceTemplate<C>>>,
2626
pub unified_api_version: UnifiedMappingApiVersion,

core/src/subgraph/instance_manager.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::subgraph::Decoder;
66
use std::collections::BTreeSet;
77

88
use crate::subgraph::runner::SubgraphRunner;
9-
use graph::blockchain::block_stream::BlockStreamMetrics;
9+
use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper};
1010
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
1111
use graph::components::metrics::gas::GasMetrics;
1212
use graph::components::store::WritableStore;
@@ -501,6 +501,11 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
501501
)
502502
.await?;
503503

504+
let triggers_adapter = Arc::new(TriggersAdapterWrapper::new(
505+
triggers_adapter,
506+
subgraph_data_source_writables.clone(),
507+
));
508+
504509
let inputs = IndexingInputs {
505510
deployment: deployment.clone(),
506511
features,

graph/src/blockchain/block_stream.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
145145
chain: &C,
146146
deployment: DeploymentLocator,
147147
start_blocks: Vec<BlockNumber>,
148+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
148149
subgraph_current_block: Option<BlockPtr>,
149150
filter: Arc<TriggerFilterWrapper<C>>,
150151
unified_api_version: UnifiedMappingApiVersion,
@@ -159,7 +160,18 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
159160
subgraph_current_block: Option<BlockPtr>,
160161
filter: Arc<TriggerFilterWrapper<C>>,
161162
unified_api_version: UnifiedMappingApiVersion,
162-
) -> Result<Box<dyn BlockStream<C>>>;
163+
) -> Result<Box<dyn BlockStream<C>>> {
164+
self.build_polling(
165+
chain,
166+
deployment,
167+
start_blocks,
168+
source_subgraph_stores,
169+
subgraph_current_block,
170+
filter,
171+
unified_api_version,
172+
)
173+
.await
174+
}
163175
}
164176

165177
#[derive(Debug, Clone)]
@@ -316,9 +328,8 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
316328
}
317329
}
318330

319-
#[async_trait]
320-
impl<C: Blockchain> TriggersAdapter<C> for TriggersAdapterWrapper<C> {
321-
async fn ancestor_block(
331+
impl<C: Blockchain> TriggersAdapterWrapper<C> {
332+
pub async fn ancestor_block(
322333
&self,
323334
ptr: BlockPtr,
324335
offset: BlockNumber,
@@ -328,7 +339,7 @@ impl<C: Blockchain> TriggersAdapter<C> for TriggersAdapterWrapper<C> {
328339
}
329340

330341
// TODO: Do a proper implementation, this is a complete mock implementation
331-
async fn scan_triggers(
342+
pub async fn scan_triggers(
332343
&self,
333344
from: BlockNumber,
334345
to: BlockNumber,
@@ -337,7 +348,7 @@ impl<C: Blockchain> TriggersAdapter<C> for TriggersAdapterWrapper<C> {
337348
self.adapter.scan_triggers(from, to, filter).await
338349
}
339350

340-
async fn triggers_in_block(
351+
pub async fn triggers_in_block(
341352
&self,
342353
logger: &Logger,
343354
block: C::Block,
@@ -346,15 +357,15 @@ impl<C: Blockchain> TriggersAdapter<C> for TriggersAdapterWrapper<C> {
346357
self.adapter.triggers_in_block(logger, block, filter).await
347358
}
348359

349-
async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
360+
pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
350361
self.adapter.is_on_main_chain(ptr).await
351362
}
352363

353-
async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error> {
364+
pub async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error> {
354365
self.adapter.parent_ptr(block).await
355366
}
356367

357-
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
368+
pub async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
358369
self.adapter.chain_head_ptr().await
359370
}
360371
}

graph/src/blockchain/polling_block_stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::time::Duration;
99

1010
use super::block_stream::{
1111
BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream,
12-
FirehoseCursor, TriggersAdapter, BUFFERED_BLOCK_STREAM_SIZE,
12+
FirehoseCursor, TriggersAdapterWrapper, BUFFERED_BLOCK_STREAM_SIZE,
1313
};
1414
use super::{Block, BlockPtr, Blockchain, TriggerFilterWrapper};
1515

@@ -79,7 +79,7 @@ where
7979
C: Blockchain,
8080
{
8181
chain_store: Arc<dyn ChainStore>,
82-
adapter: Arc<dyn TriggersAdapter<C>>,
82+
adapter: Arc<TriggersAdapterWrapper<C>>,
8383
node_id: NodeId,
8484
subgraph_id: DeploymentHash,
8585
// This is not really a block number, but the (unsigned) difference
@@ -146,7 +146,7 @@ where
146146
pub fn new(
147147
chain_store: Arc<dyn ChainStore>,
148148
chain_head_update_stream: ChainHeadUpdateStream,
149-
adapter: Arc<dyn TriggersAdapter<C>>,
149+
adapter: Arc<TriggersAdapterWrapper<C>>,
150150
node_id: NodeId,
151151
subgraph_id: DeploymentHash,
152152
filter: Arc<TriggerFilterWrapper<C>>,

0 commit comments

Comments
 (0)