Skip to content

Commit 3aba98f

Browse files
committed
chain, graph : move subgraph trigger scanning back to TriggersAdapterWrapper
1 parent 686e7e1 commit 3aba98f

File tree

16 files changed

+228
-146
lines changed

16 files changed

+228
-146
lines changed

chain/arweave/src/chain.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ use graph::{
2727
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
2828
};
2929
use prost::Message;
30+
use std::collections::HashSet;
3031
use std::sync::Arc;
3132

3233
use crate::adapter::TriggerFilter;
3334
use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate};
3435
use crate::trigger::{self, ArweaveTrigger};
36+
use crate::Block as ArweaveBlock;
3537
use crate::{
3638
codec,
3739
data_source::{DataSource, UnresolvedDataSource},
@@ -138,7 +140,7 @@ impl Blockchain for Chain {
138140

139141
let firehose_mapper = Arc::new(FirehoseMapper {
140142
adapter,
141-
filter: filter.filter.clone(),
143+
filter: filter.chain_filter.clone(),
142144
});
143145

144146
Ok(Box::new(FirehoseBlockStream::new(
@@ -198,7 +200,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
198200
&self,
199201
_from: BlockNumber,
200202
_to: BlockNumber,
201-
_filter: &Arc<TriggerFilterWrapper<Chain>>,
203+
_filter: &TriggerFilter,
202204
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
203205
panic!("Should never be called since not used by FirehoseBlockStream")
204206
}
@@ -266,6 +268,14 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
266268
number: block.number.saturating_sub(1),
267269
}))
268270
}
271+
272+
async fn load_blocks_by_numbers(
273+
&self,
274+
_logger: Logger,
275+
_block_numbers: HashSet<BlockNumber>,
276+
) -> Result<Vec<ArweaveBlock>, Error> {
277+
todo!()
278+
}
269279
}
270280

271281
pub struct FirehoseMapper {

chain/cosmos/src/chain.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use graph::components::adapter::ChainId;
44
use graph::env::EnvVars;
55
use graph::prelude::{DeploymentHash, MetricsRegistry};
66
use graph::substreams::Clock;
7+
use std::collections::HashSet;
78
use std::convert::TryFrom;
89
use std::sync::Arc;
910

@@ -33,7 +34,7 @@ use crate::data_source::{
3334
DataSource, DataSourceTemplate, EventOrigin, UnresolvedDataSource, UnresolvedDataSourceTemplate,
3435
};
3536
use crate::trigger::CosmosTrigger;
36-
use crate::{codec, TriggerFilter};
37+
use crate::{codec, Block, TriggerFilter};
3738

3839
pub struct Chain {
3940
logger_factory: LoggerFactory,
@@ -132,7 +133,7 @@ impl Blockchain for Chain {
132133

133134
let firehose_mapper = Arc::new(FirehoseMapper {
134135
adapter,
135-
filter: filter.filter.clone(),
136+
filter: filter.chain_filter.clone(),
136137
});
137138

138139
Ok(Box::new(FirehoseBlockStream::new(
@@ -197,6 +198,14 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
197198
panic!("Should never be called since not used by FirehoseBlockStream")
198199
}
199200

201+
async fn load_blocks_by_numbers(
202+
&self,
203+
_logger: Logger,
204+
_block_numbers: HashSet<BlockNumber>,
205+
) -> Result<Vec<Block>, Error> {
206+
unimplemented!()
207+
}
208+
200209
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
201210
unimplemented!()
202211
}
@@ -205,7 +214,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
205214
&self,
206215
_from: BlockNumber,
207216
_to: BlockNumber,
208-
_filter: &Arc<TriggerFilterWrapper<Chain>>,
217+
_filter: &TriggerFilter,
209218
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
210219
panic!("Should never be called since not used by FirehoseBlockStream")
211220
}

chain/ethereum/src/adapter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,9 +1111,9 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11111111

11121112
async fn load_blocks_by_numbers(
11131113
&self,
1114-
logger: Logger,
1115-
chain_store: Arc<dyn ChainStore>,
1116-
block_numbers: HashSet<BlockNumber>,
1114+
_logger: Logger,
1115+
_chain_store: Arc<dyn ChainStore>,
1116+
_block_numbers: HashSet<BlockNumber>,
11171117
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send>;
11181118

11191119
/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.

chain/ethereum/src/chain.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
155155
filter: Arc<TriggerFilterWrapper<Chain>>,
156156
unified_api_version: UnifiedMappingApiVersion,
157157
) -> Result<Box<dyn BlockStream<Chain>>> {
158-
let requirements = filter.filter.node_capabilities();
158+
let requirements = filter.chain_filter.node_capabilities();
159159
let adapter = TriggersAdapterWrapper::new(
160160
chain
161161
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
@@ -480,7 +480,7 @@ impl Blockchain for Chain {
480480
store.firehose_cursor(),
481481
start_blocks,
482482
current_ptr,
483-
filter.filter.clone(),
483+
filter.chain_filter.clone(),
484484
unified_api_version,
485485
)
486486
.await
@@ -717,7 +717,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
717717
&self,
718718
from: BlockNumber,
719719
to: BlockNumber,
720-
filter: &Arc<TriggerFilterWrapper<Chain>>,
720+
filter: &TriggerFilter,
721721
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
722722
blocks_with_triggers(
723723
self.chain_client
@@ -735,6 +735,30 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
735735
.await
736736
}
737737

738+
async fn load_blocks_by_numbers(
739+
&self,
740+
logger: Logger,
741+
block_numbers: HashSet<BlockNumber>,
742+
) -> Result<Vec<BlockFinality>> {
743+
use graph::futures01::stream::Stream;
744+
745+
let adapter = self
746+
.chain_client
747+
.rpc()?
748+
.cheapest_with(&self.capabilities)
749+
.await?;
750+
751+
let blocks = adapter
752+
.load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers)
753+
.await
754+
.map(|block| BlockFinality::Final(block))
755+
.collect()
756+
.compat()
757+
.await?;
758+
759+
Ok(blocks)
760+
}
761+
738762
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
739763
let chain_store = self.chain_store.clone();
740764
chain_store.chain_head_ptr().await
@@ -771,7 +795,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
771795
self.ethrpc_metrics.clone(),
772796
block_number,
773797
block_number,
774-
&Arc::new(TriggerFilterWrapper::<Chain>::new(filter.clone(), vec![])), // TODO(krishna): This is temporary until we take TriggerFilterWrapper as param in triggers_in_block
798+
filter,
775799
self.unified_api_version.clone(),
776800
)
777801
.await?;

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 3 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered};
22
use graph::blockchain::client::ChainClient;
33
use graph::blockchain::BlockHash;
44
use graph::blockchain::ChainIdentifier;
5-
use graph::blockchain::SubgraphFilter;
6-
use graph::blockchain::TriggerFilterWrapper;
5+
76
use graph::components::transaction_receipt::LightTransactionReceipt;
87
use graph::data::store::ethereum::call;
98
use graph::data::store::scalar;
109
use graph::data::subgraph::UnifiedMappingApiVersion;
1110
use graph::data::subgraph::API_VERSION_0_0_7;
12-
use graph::data_source::subgraph;
1311
use graph::futures01::stream;
1412
use graph::futures01::Future;
1513
use graph::futures01::Stream;
@@ -21,10 +19,6 @@ use graph::prelude::ethabi::ParamType;
2119
use graph::prelude::ethabi::Token;
2220
use graph::prelude::tokio::try_join;
2321
use graph::prelude::web3::types::U256;
24-
use graph::prelude::DeploymentHash;
25-
use graph::prelude::Entity;
26-
use graph::prelude::Value;
27-
use graph::schema::InputSchema;
2822
use graph::slog::o;
2923
use graph::tokio::sync::RwLock;
3024
use graph::tokio::time::timeout;
@@ -65,6 +59,7 @@ use crate::chain::BlockFinality;
6559
use crate::trigger::LogRef;
6660
use crate::Chain;
6761
use crate::NodeCapabilities;
62+
use crate::TriggerFilter;
6863
use crate::{
6964
adapter::{
7065
ContractCall, ContractCallError, EthGetLogsFilter, EthereumAdapter as EthereumAdapterTrait,
@@ -1729,81 +1724,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
17291724
}
17301725
}
17311726

1732-
// TODO(krishna): Currently this is a mock implementation of subgraph triggers.
1733-
// This will be replaced with the actual implementation which will use the filters to
1734-
// query the database of the source subgraph and return the entity triggers.
1735-
async fn subgraph_triggers(
1736-
adapter: Arc<EthereumAdapter>,
1737-
logger: Logger,
1738-
chain_store: Arc<dyn ChainStore>,
1739-
_subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
1740-
from: BlockNumber,
1741-
to: BlockNumber,
1742-
filter: &Arc<TriggerFilterWrapper<Chain>>,
1743-
_unified_api_version: UnifiedMappingApiVersion,
1744-
) -> Result<(Vec<BlockWithTriggers<crate::Chain>>, BlockNumber), Error> {
1745-
let logger2 = logger.cheap_clone();
1746-
let eth = adapter.clone();
1747-
let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?;
1748-
let to = to_ptr.block_number();
1749-
1750-
let first_filter = filter.subgraph_filter.first().unwrap();
1751-
1752-
let blocks = adapter
1753-
.load_blocks_by_numbers(
1754-
logger.cheap_clone(),
1755-
chain_store.clone(),
1756-
HashSet::from_iter(from..=to),
1757-
)
1758-
.await
1759-
.and_then(move |block| {
1760-
Ok(BlockWithTriggers::<Chain>::new_with_subgraph_triggers(
1761-
BlockFinality::Final(block.clone()),
1762-
vec![create_mock_subgraph_trigger(first_filter, &block)],
1763-
&logger2,
1764-
))
1765-
})
1766-
.collect()
1767-
.compat()
1768-
.await?;
1769-
1770-
Ok((blocks, to))
1771-
}
1772-
1773-
fn create_mock_subgraph_trigger(
1774-
filter: &SubgraphFilter,
1775-
block: &LightEthereumBlock,
1776-
) -> subgraph::TriggerData {
1777-
let mock_entity = create_mock_entity(block);
1778-
subgraph::TriggerData {
1779-
source: filter.subgraph.clone(),
1780-
entity: mock_entity,
1781-
entity_type: filter.entities.first().unwrap().clone(),
1782-
}
1783-
}
1784-
1785-
fn create_mock_entity(block: &LightEthereumBlock) -> Entity {
1786-
let id = DeploymentHash::new("test").unwrap();
1787-
let data_schema = InputSchema::parse_latest(
1788-
"type Block @entity { id: Bytes!, number: BigInt!, hash: Bytes! }",
1789-
id.clone(),
1790-
)
1791-
.unwrap();
1792-
let hash = Value::Bytes(scalar::Bytes::from(block.hash.unwrap().as_bytes().to_vec()));
1793-
let data = data_schema
1794-
.make_entity(vec![
1795-
("id".into(), hash.clone()),
1796-
(
1797-
"number".into(),
1798-
Value::BigInt(scalar::BigInt::from(block.number())),
1799-
),
1800-
("hash".into(), hash),
1801-
])
1802-
.unwrap();
1803-
1804-
data
1805-
}
1806-
18071727
/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved
18081728
/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block.
18091729
/// If a block contains no triggers, there may be no corresponding item in the stream.
@@ -1825,33 +1745,13 @@ pub(crate) async fn blocks_with_triggers(
18251745
subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
18261746
from: BlockNumber,
18271747
to: BlockNumber,
1828-
filter: &Arc<TriggerFilterWrapper<Chain>>,
1748+
filter: &TriggerFilter,
18291749
unified_api_version: UnifiedMappingApiVersion,
18301750
) -> Result<(Vec<BlockWithTriggers<crate::Chain>>, BlockNumber), Error> {
18311751
// Each trigger filter needs to be queried for the same block range
18321752
// and the blocks yielded need to be deduped. If any error occurs
18331753
// while searching for a trigger type, the entire operation fails.
18341754
let eth = adapter.clone();
1835-
let subgraph_filter = filter.subgraph_filter.clone();
1836-
1837-
// TODO(krishna): In the initial implementation we do not allow any other datasource type
1838-
// When using subgraph data sources, there if subgraph_filter is not empty, we can return
1839-
// by just processing the subgraph triggers.
1840-
if !subgraph_filter.is_empty() {
1841-
return subgraph_triggers(
1842-
adapter.clone(),
1843-
logger.clone(),
1844-
chain_store.clone(),
1845-
subgraph_metrics.clone(),
1846-
from,
1847-
to,
1848-
filter,
1849-
unified_api_version,
1850-
)
1851-
.await;
1852-
}
1853-
1854-
let filter = filter.filter.clone();
18551755
let call_filter = EthereumCallFilter::from(&filter.block);
18561756

18571757
// Scan the block range to find relevant triggers

chain/near/src/chain.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ use graph::{
3232
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
3333
};
3434
use prost::Message;
35+
use std::collections::HashSet;
3536
use std::sync::Arc;
3637

3738
use crate::adapter::TriggerFilter;
3839
use crate::codec::substreams_triggers::BlockAndReceipts;
40+
use crate::codec::Block;
3941
use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate};
4042
use crate::trigger::{self, NearTrigger};
4143
use crate::{
@@ -243,7 +245,7 @@ impl Blockchain for Chain {
243245
deployment,
244246
store.firehose_cursor(),
245247
store.block_ptr(),
246-
filter.filter.clone(),
248+
filter.chain_filter.clone(),
247249
)
248250
.await;
249251
}
@@ -255,7 +257,7 @@ impl Blockchain for Chain {
255257
store.firehose_cursor(),
256258
start_blocks,
257259
store.block_ptr(),
258-
filter.filter.clone(),
260+
filter.chain_filter.clone(),
259261
unified_api_version,
260262
)
261263
.await
@@ -318,11 +320,19 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
318320
&self,
319321
_from: BlockNumber,
320322
_to: BlockNumber,
321-
_filter: &Arc<TriggerFilterWrapper<Chain>>,
323+
_filter: &TriggerFilter,
322324
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
323325
panic!("Should never be called since not used by FirehoseBlockStream")
324326
}
325327

328+
async fn load_blocks_by_numbers(
329+
&self,
330+
_logger: Logger,
331+
_block_numbers: HashSet<BlockNumber>,
332+
) -> Result<Vec<Block>> {
333+
unimplemented!()
334+
}
335+
326336
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
327337
unimplemented!()
328338
}

0 commit comments

Comments
 (0)