Skip to content

Commit 4fa04b2

Browse files
committed
Subgraph composition : TriggersAdapter wrapper
1 parent 070072b commit 4fa04b2

File tree

44 files changed

+1684
-225
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1684
-225
lines changed

chain/arweave/src/chain.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ use graph::blockchain::client::ChainClient;
33
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
44
use graph::blockchain::{
55
BasicBlockchainBuilder, Block, BlockIngestor, BlockchainBuilder, BlockchainKind,
6-
EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter,
6+
EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter, TriggerFilterWrapper,
77
};
88
use graph::cheap_clone::CheapClone;
99
use graph::components::network_provider::ChainName;
10-
use graph::components::store::DeploymentCursorTracker;
10+
use graph::components::store::{DeploymentCursorTracker, ReadStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::env::EnvVars;
1313
use graph::firehose::FirehoseEndpoint;
14-
use graph::prelude::MetricsRegistry;
14+
use graph::prelude::{DeploymentHash, MetricsRegistry};
1515
use graph::substreams::Clock;
1616
use graph::{
1717
blockchain::{
@@ -27,11 +27,13 @@ use graph::{
2727
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
2828
};
2929
use prost::Message;
30+
use std::collections::HashSet;
3031
use std::sync::Arc;
3132

3233
use crate::adapter::TriggerFilter;
3334
use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate};
3435
use crate::trigger::{self, ArweaveTrigger};
36+
use crate::Block as ArweaveBlock;
3537
use crate::{
3638
codec,
3739
data_source::{DataSource, UnresolvedDataSource},
@@ -119,7 +121,8 @@ impl Blockchain for Chain {
119121
deployment: DeploymentLocator,
120122
store: impl DeploymentCursorTracker,
121123
start_blocks: Vec<BlockNumber>,
122-
filter: Arc<Self::TriggerFilter>,
124+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
125+
filter: Arc<TriggerFilterWrapper<Self>>,
123126
unified_api_version: UnifiedMappingApiVersion,
124127
) -> Result<Box<dyn BlockStream<Self>>, Error> {
125128
let adapter = self
@@ -135,7 +138,10 @@ impl Blockchain for Chain {
135138
.subgraph_logger(&deployment)
136139
.new(o!("component" => "FirehoseBlockStream"));
137140

138-
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });
141+
let firehose_mapper = Arc::new(FirehoseMapper {
142+
adapter,
143+
filter: filter.chain_filter.clone(),
144+
});
139145

140146
Ok(Box::new(FirehoseBlockStream::new(
141147
deployment.hash,
@@ -199,6 +205,10 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
199205
panic!("Should never be called since not used by FirehoseBlockStream")
200206
}
201207

208+
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
209+
unimplemented!()
210+
}
211+
202212
async fn triggers_in_block(
203213
&self,
204214
logger: &Logger,
@@ -258,6 +268,14 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
258268
number: block.number.saturating_sub(1),
259269
}))
260270
}
271+
272+
async fn load_blocks_by_numbers(
273+
&self,
274+
_logger: Logger,
275+
_block_numbers: HashSet<BlockNumber>,
276+
) -> Result<Vec<ArweaveBlock>, Error> {
277+
todo!()
278+
}
261279
}
262280

263281
pub struct FirehoseMapper {

chain/cosmos/src/chain.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
2-
use graph::blockchain::{BlockIngestor, NoopDecoderHook};
2+
use graph::blockchain::{BlockIngestor, NoopDecoderHook, TriggerFilterWrapper};
33
use graph::components::network_provider::ChainName;
44
use graph::env::EnvVars;
5-
use graph::prelude::MetricsRegistry;
5+
use graph::prelude::{DeploymentHash, MetricsRegistry};
66
use graph::substreams::Clock;
7+
use std::collections::HashSet;
78
use std::convert::TryFrom;
89
use std::sync::Arc;
910

1011
use graph::blockchain::block_stream::{BlockStreamError, BlockStreamMapper, FirehoseCursor};
1112
use graph::blockchain::client::ChainClient;
1213
use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter};
1314
use graph::cheap_clone::CheapClone;
14-
use graph::components::store::DeploymentCursorTracker;
15+
use graph::components::store::{DeploymentCursorTracker, ReadStore};
1516
use graph::data::subgraph::UnifiedMappingApiVersion;
1617
use graph::{
1718
blockchain::{
@@ -33,7 +34,7 @@ use crate::data_source::{
3334
DataSource, DataSourceTemplate, EventOrigin, UnresolvedDataSource, UnresolvedDataSourceTemplate,
3435
};
3536
use crate::trigger::CosmosTrigger;
36-
use crate::{codec, TriggerFilter};
37+
use crate::{codec, Block, TriggerFilter};
3738

3839
pub struct Chain {
3940
logger_factory: LoggerFactory,
@@ -113,7 +114,8 @@ impl Blockchain for Chain {
113114
deployment: DeploymentLocator,
114115
store: impl DeploymentCursorTracker,
115116
start_blocks: Vec<BlockNumber>,
116-
filter: Arc<Self::TriggerFilter>,
117+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
118+
filter: Arc<TriggerFilterWrapper<Self>>,
117119
unified_api_version: UnifiedMappingApiVersion,
118120
) -> Result<Box<dyn BlockStream<Self>>, Error> {
119121
let adapter = self
@@ -129,7 +131,10 @@ impl Blockchain for Chain {
129131
.subgraph_logger(&deployment)
130132
.new(o!("component" => "FirehoseBlockStream"));
131133

132-
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });
134+
let firehose_mapper = Arc::new(FirehoseMapper {
135+
adapter,
136+
filter: filter.chain_filter.clone(),
137+
});
133138

134139
Ok(Box::new(FirehoseBlockStream::new(
135140
deployment.hash,
@@ -193,6 +198,18 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
193198
panic!("Should never be called since not used by FirehoseBlockStream")
194199
}
195200

201+
async fn load_blocks_by_numbers(
202+
&self,
203+
_logger: Logger,
204+
_block_numbers: HashSet<BlockNumber>,
205+
) -> Result<Vec<Block>, Error> {
206+
unimplemented!()
207+
}
208+
209+
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
210+
unimplemented!()
211+
}
212+
196213
async fn scan_triggers(
197214
&self,
198215
_from: BlockNumber,
@@ -467,9 +484,12 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
467484

468485
#[cfg(test)]
469486
mod test {
470-
use graph::prelude::{
471-
slog::{o, Discard, Logger},
472-
tokio,
487+
use graph::{
488+
blockchain::Trigger,
489+
prelude::{
490+
slog::{o, Discard, Logger},
491+
tokio,
492+
},
473493
};
474494

475495
use super::*;
@@ -600,7 +620,10 @@ mod test {
600620
// they may not be in the same order
601621
for trigger in expected_triggers {
602622
assert!(
603-
triggers.trigger_data.contains(&trigger),
623+
triggers.trigger_data.iter().any(|t| match t {
624+
Trigger::Chain(t) => t == &trigger,
625+
_ => false,
626+
}),
604627
"Expected trigger list to contain {:?}, but it only contains: {:?}",
605628
trigger,
606629
triggers.trigger_data

chain/ethereum/src/adapter.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,6 +1109,13 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11091109
block_hash: H256,
11101110
) -> Box<dyn Future<Item = LightEthereumBlock, Error = Error> + Send>;
11111111

1112+
async fn load_blocks_by_numbers(
1113+
&self,
1114+
_logger: Logger,
1115+
_chain_store: Arc<dyn ChainStore>,
1116+
_block_numbers: HashSet<BlockNumber>,
1117+
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send>;
1118+
11121119
/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
11131120
/// May use the `chain_store` as a cache.
11141121
async fn load_blocks(

chain/ethereum/src/chain.rs

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ use anyhow::{Context, Error};
33
use graph::blockchain::client::ChainClient;
44
use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms};
55
use graph::blockchain::{
6-
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggersAdapterSelector,
6+
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper,
7+
TriggersAdapterSelector,
78
};
89
use graph::components::network_provider::ChainName;
9-
use graph::components::store::DeploymentCursorTracker;
10+
use graph::components::store::{DeploymentCursorTracker, ReadStore};
1011
use graph::data::subgraph::UnifiedMappingApiVersion;
1112
use graph::firehose::{FirehoseEndpoint, ForkStep};
1213
use graph::futures03::compat::Future01CompatExt;
1314
use graph::prelude::{
14-
BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
15+
BlockHash, ComponentLoggerConfig, DeploymentHash, ElasticComponentLoggerConfig, EthereumBlock,
1516
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
1617
};
1718
use graph::schema::InputSchema;
@@ -61,6 +62,7 @@ use crate::{BufferedCallCache, NodeCapabilities};
6162
use crate::{EthereumAdapter, RuntimeAdapter};
6263
use graph::blockchain::block_stream::{
6364
BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor,
65+
TriggersAdapterWrapper,
6466
};
6567

6668
/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320
@@ -121,24 +123,50 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
121123
unimplemented!()
122124
}
123125

126+
async fn build_subgraph_block_stream(
127+
&self,
128+
chain: &Chain,
129+
deployment: DeploymentLocator,
130+
start_blocks: Vec<BlockNumber>,
131+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
132+
subgraph_current_block: Option<BlockPtr>,
133+
filter: Arc<TriggerFilterWrapper<Chain>>,
134+
unified_api_version: UnifiedMappingApiVersion,
135+
) -> Result<Box<dyn BlockStream<Chain>>> {
136+
self.build_polling(
137+
chain,
138+
deployment,
139+
start_blocks,
140+
source_subgraph_stores,
141+
subgraph_current_block,
142+
filter,
143+
unified_api_version,
144+
)
145+
.await
146+
}
147+
124148
async fn build_polling(
125149
&self,
126150
chain: &Chain,
127151
deployment: DeploymentLocator,
128152
start_blocks: Vec<BlockNumber>,
153+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
129154
subgraph_current_block: Option<BlockPtr>,
130-
filter: Arc<<Chain as Blockchain>::TriggerFilter>,
155+
filter: Arc<TriggerFilterWrapper<Chain>>,
131156
unified_api_version: UnifiedMappingApiVersion,
132157
) -> Result<Box<dyn BlockStream<Chain>>> {
133-
let requirements = filter.node_capabilities();
134-
let adapter = chain
135-
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
136-
.unwrap_or_else(|_| {
137-
panic!(
138-
"no adapter for network {} with capabilities {}",
139-
chain.name, requirements
140-
)
141-
});
158+
let requirements = filter.chain_filter.node_capabilities();
159+
let adapter = TriggersAdapterWrapper::new(
160+
chain
161+
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
162+
.unwrap_or_else(|_| {
163+
panic!(
164+
"no adapter for network {} with capabilities {}",
165+
chain.name, requirements
166+
)
167+
}),
168+
source_subgraph_stores,
169+
);
142170

143171
let logger = chain
144172
.logger_factory
@@ -172,7 +200,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
172200
Ok(Box::new(PollingBlockStream::new(
173201
chain_store,
174202
chain_head_update_stream,
175-
adapter,
203+
Arc::new(adapter),
176204
chain.node_id.clone(),
177205
deployment.hash,
178206
filter,
@@ -409,17 +437,35 @@ impl Blockchain for Chain {
409437
deployment: DeploymentLocator,
410438
store: impl DeploymentCursorTracker,
411439
start_blocks: Vec<BlockNumber>,
412-
filter: Arc<Self::TriggerFilter>,
440+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
441+
filter: Arc<TriggerFilterWrapper<Self>>,
413442
unified_api_version: UnifiedMappingApiVersion,
414443
) -> Result<Box<dyn BlockStream<Self>>, Error> {
415444
let current_ptr = store.block_ptr();
445+
446+
if !filter.subgraph_filter.is_empty() {
447+
return self
448+
.block_stream_builder
449+
.build_subgraph_block_stream(
450+
self,
451+
deployment,
452+
start_blocks,
453+
source_subgraph_stores,
454+
current_ptr,
455+
filter,
456+
unified_api_version,
457+
)
458+
.await;
459+
}
460+
416461
match self.chain_client().as_ref() {
417462
ChainClient::Rpc(_) => {
418463
self.block_stream_builder
419464
.build_polling(
420465
self,
421466
deployment,
422467
start_blocks,
468+
source_subgraph_stores,
423469
current_ptr,
424470
filter,
425471
unified_api_version,
@@ -434,7 +480,7 @@ impl Blockchain for Chain {
434480
store.firehose_cursor(),
435481
start_blocks,
436482
current_ptr,
437-
filter,
483+
filter.chain_filter.clone(),
438484
unified_api_version,
439485
)
440486
.await
@@ -689,6 +735,35 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
689735
.await
690736
}
691737

738+
async fn load_blocks_by_numbers(
739+
&self,
740+
logger: Logger,
741+
block_numbers: HashSet<BlockNumber>,
742+
) -> Result<Vec<BlockFinality>> {
743+
use graph::futures01::stream::Stream;
744+
745+
let adapter = self
746+
.chain_client
747+
.rpc()?
748+
.cheapest_with(&self.capabilities)
749+
.await?;
750+
751+
let blocks = adapter
752+
.load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers)
753+
.await
754+
.map(|block| BlockFinality::Final(block))
755+
.collect()
756+
.compat()
757+
.await?;
758+
759+
Ok(blocks)
760+
}
761+
762+
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
763+
let chain_store = self.chain_store.clone();
764+
chain_store.chain_head_ptr().await
765+
}
766+
692767
async fn triggers_in_block(
693768
&self,
694769
logger: &Logger,

0 commit comments

Comments
 (0)