diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index f49611ddf93..912e4f384b2 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -3,11 +3,11 @@ use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; use graph::blockchain::{ BasicBlockchainBuilder, Block, BlockIngestor, BlockchainBuilder, BlockchainKind, - EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter, + EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter, TriggerFilterWrapper, }; use graph::cheap_clone::CheapClone; use graph::components::network_provider::ChainName; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, SourceableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::env::EnvVars; use graph::firehose::FirehoseEndpoint; @@ -27,11 +27,13 @@ use graph::{ prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory}, }; use prost::Message; +use std::collections::HashSet; use std::sync::Arc; use crate::adapter::TriggerFilter; use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate}; use crate::trigger::{self, ArweaveTrigger}; +use crate::Block as ArweaveBlock; use crate::{ codec, data_source::{DataSource, UnresolvedDataSource}, @@ -119,7 +121,8 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + _source_subgraph_stores: Vec>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { let adapter = self @@ -135,7 +138,10 @@ impl Blockchain for Chain { .subgraph_logger(&deployment) .new(o!("component" => "FirehoseBlockStream")); - let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); + let firehose_mapper = Arc::new(FirehoseMapper { + adapter, + filter: filter.chain_filter.clone(), + }); Ok(Box::new(FirehoseBlockStream::new( deployment.hash, @@ -199,6 +205,10 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn triggers_in_block( &self, logger: &Logger, @@ -258,6 +268,14 @@ impl TriggersAdapterTrait for TriggersAdapter { number: block.number.saturating_sub(1), })) } + + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result, Error> { + todo!() + } } pub struct FirehoseMapper { diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index bd6b66e55c6..d276c017e7c 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -1,9 +1,10 @@ use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; -use graph::blockchain::{BlockIngestor, NoopDecoderHook}; +use graph::blockchain::{BlockIngestor, NoopDecoderHook, TriggerFilterWrapper}; use graph::components::network_provider::ChainName; use graph::env::EnvVars; use graph::prelude::MetricsRegistry; use graph::substreams::Clock; +use std::collections::HashSet; use std::convert::TryFrom; use std::sync::Arc; @@ -11,7 +12,7 @@ use graph::blockchain::block_stream::{BlockStreamError, BlockStreamMapper, Fireh use graph::blockchain::client::ChainClient; use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter}; use graph::cheap_clone::CheapClone; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, SourceableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::{ blockchain::{ @@ -33,7 +34,7 @@ use crate::data_source::{ DataSource, DataSourceTemplate, EventOrigin, UnresolvedDataSource, UnresolvedDataSourceTemplate, }; use crate::trigger::CosmosTrigger; -use crate::{codec, TriggerFilter}; +use crate::{codec, Block, TriggerFilter}; pub struct Chain { logger_factory: LoggerFactory, @@ -113,7 +114,8 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + _source_subgraph_stores: Vec>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { let adapter = self @@ -129,7 +131,10 @@ impl Blockchain for Chain { .subgraph_logger(&deployment) .new(o!("component" => "FirehoseBlockStream")); - let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); + let firehose_mapper = Arc::new(FirehoseMapper { + adapter, + filter: filter.chain_filter.clone(), + }); Ok(Box::new(FirehoseBlockStream::new( deployment.hash, @@ -193,6 +198,18 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result, Error> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn scan_triggers( &self, _from: BlockNumber, @@ -467,9 +484,12 @@ impl FirehoseMapperTrait for FirehoseMapper { #[cfg(test)] mod test { - use graph::prelude::{ - slog::{o, Discard, Logger}, - tokio, + use graph::{ + blockchain::Trigger, + prelude::{ + slog::{o, Discard, Logger}, + tokio, + }, }; use super::*; @@ -600,7 +620,10 @@ mod test { // they may not be in the same order for trigger in expected_triggers { assert!( - triggers.trigger_data.contains(&trigger), + triggers.trigger_data.iter().any(|t| match t { + Trigger::Chain(t) => t == &trigger, + _ => false, + }), "Expected trigger list to contain {:?}, but it only contains: {:?}", trigger, triggers.trigger_data diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index f78ff1b0bec..3d4dc00c030 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1109,6 +1109,13 @@ pub trait EthereumAdapter: Send + Sync + 'static { block_hash: H256, ) -> Box + Send>; + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _chain_store: Arc, + _block_numbers: HashSet, + ) -> Box, Error = Error> + Send>; + /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. /// May use the `chain_store` as a cache. async fn load_blocks( diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index cf46a675212..ff28c975c40 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -3,10 +3,11 @@ use anyhow::{Context, Error}; use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms}; use graph::blockchain::{ - BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggersAdapterSelector, + BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper, + TriggersAdapterSelector, }; use graph::components::network_provider::ChainName; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, SourceableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::firehose::{FirehoseEndpoint, ForkStep}; use graph::futures03::compat::Future01CompatExt; @@ -61,6 +62,7 @@ use crate::{BufferedCallCache, NodeCapabilities}; use crate::{EthereumAdapter, RuntimeAdapter}; use graph::blockchain::block_stream::{ BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor, + TriggersAdapterWrapper, }; /// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320 @@ -121,24 +123,50 @@ impl BlockStreamBuilder for EthereumStreamBuilder { unimplemented!() } + async fn build_subgraph_block_stream( + &self, + chain: &Chain, + deployment: DeploymentLocator, + start_blocks: Vec, + source_subgraph_stores: Vec>, + subgraph_current_block: Option, + filter: Arc>, + unified_api_version: UnifiedMappingApiVersion, + ) -> Result>> { + self.build_polling( + chain, + deployment, + start_blocks, + source_subgraph_stores, + subgraph_current_block, + filter, + unified_api_version, + ) + .await + } + async fn build_polling( &self, chain: &Chain, deployment: DeploymentLocator, start_blocks: Vec, + source_subgraph_stores: Vec>, subgraph_current_block: Option, - filter: Arc<::TriggerFilter>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>> { - let requirements = filter.node_capabilities(); - let adapter = chain - .triggers_adapter(&deployment, &requirements, unified_api_version.clone()) - .unwrap_or_else(|_| { - panic!( - "no adapter for network {} with capabilities {}", - chain.name, requirements - ) - }); + let requirements = filter.chain_filter.node_capabilities(); + let adapter = TriggersAdapterWrapper::new( + chain + .triggers_adapter(&deployment, &requirements, unified_api_version.clone()) + .unwrap_or_else(|_| { + panic!( + "no adapter for network {} with capabilities {}", + chain.name, requirements + ) + }), + source_subgraph_stores, + ); let logger = chain .logger_factory @@ -172,7 +200,7 @@ impl BlockStreamBuilder for EthereumStreamBuilder { Ok(Box::new(PollingBlockStream::new( chain_store, chain_head_update_stream, - adapter, + Arc::new(adapter), chain.node_id.clone(), deployment.hash, filter, @@ -409,10 +437,27 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + source_subgraph_stores: Vec>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { let current_ptr = store.block_ptr(); + + if !filter.subgraph_filter.is_empty() { + return self + .block_stream_builder + .build_subgraph_block_stream( + self, + deployment, + start_blocks, + source_subgraph_stores, + current_ptr, + filter, + unified_api_version, + ) + .await; + } + match self.chain_client().as_ref() { ChainClient::Rpc(_) => { self.block_stream_builder @@ -420,6 +465,7 @@ impl Blockchain for Chain { self, deployment, start_blocks, + source_subgraph_stores, current_ptr, filter, unified_api_version, @@ -434,7 +480,7 @@ impl Blockchain for Chain { store.firehose_cursor(), start_blocks, current_ptr, - filter, + filter.chain_filter.clone(), unified_api_version, ) .await @@ -689,6 +735,35 @@ impl TriggersAdapterTrait for TriggersAdapter { .await } + async fn load_blocks_by_numbers( + &self, + logger: Logger, + block_numbers: HashSet, + ) -> Result> { + use graph::futures01::stream::Stream; + + let adapter = self + .chain_client + .rpc()? + .cheapest_with(&self.capabilities) + .await?; + + let blocks = adapter + .load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers) + .await + .map(|block| BlockFinality::Final(block)) + .collect() + .compat() + .await?; + + Ok(blocks) + } + + async fn chain_head_ptr(&self) -> Result, Error> { + let chain_store = self.chain_store.clone(); + chain_store.chain_head_ptr().await + } + async fn triggers_in_block( &self, logger: &Logger, diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index c4ea6323c7d..9fe0b8262b2 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -2,6 +2,7 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered}; use graph::blockchain::client::ChainClient; use graph::blockchain::BlockHash; use graph::blockchain::ChainIdentifier; + use graph::components::transaction_receipt::LightTransactionReceipt; use graph::data::store::ethereum::call; use graph::data::store::scalar; @@ -58,6 +59,7 @@ use crate::chain::BlockFinality; use crate::trigger::LogRef; use crate::Chain; use crate::NodeCapabilities; +use crate::TriggerFilter; use crate::{ adapter::{ ContractCall, ContractCallError, EthGetLogsFilter, EthereumAdapter as EthereumAdapterTrait, @@ -66,7 +68,7 @@ use crate::{ }, transport::Transport, trigger::{EthereumBlockTriggerType, EthereumTrigger}, - TriggerFilter, ENV_VARS, + ENV_VARS, }; #[derive(Debug, Clone)] @@ -1648,6 +1650,28 @@ impl EthereumAdapterTrait for EthereumAdapter { Ok(decoded) } + // This is a ugly temporary implementation to get the block ptrs for a range of blocks + async fn load_blocks_by_numbers( + &self, + logger: Logger, + chain_store: Arc, + block_numbers: HashSet, + ) -> Box, Error = Error> + Send> { + let block_hashes = block_numbers + .into_iter() + .map(|number| { + chain_store + .block_hashes_by_block_number(number) + .unwrap() + .first() + .unwrap() + .as_h256() + }) + .collect::>(); + + self.load_blocks(logger, chain_store, block_hashes).await + } + /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. async fn load_blocks( &self, @@ -2077,8 +2101,8 @@ async fn filter_call_triggers_from_unsuccessful_transactions( let transaction_hashes: BTreeSet = block .trigger_data .iter() - .filter_map(|trigger| match trigger { - EthereumTrigger::Call(call_trigger) => Some(call_trigger.transaction_hash), + .filter_map(|trigger| match trigger.as_chain() { + Some(EthereumTrigger::Call(call_trigger)) => Some(call_trigger.transaction_hash), _ => None, }) .collect::>>() @@ -2169,7 +2193,7 @@ async fn filter_call_triggers_from_unsuccessful_transactions( // Filter call triggers from unsuccessful transactions block.trigger_data.retain(|trigger| { - if let EthereumTrigger::Call(call_trigger) = trigger { + if let Some(EthereumTrigger::Call(call_trigger)) = trigger.as_chain() { // Unwrap: We already checked that those values exist transaction_success[&call_trigger.transaction_hash.unwrap()] } else { diff --git a/chain/ethereum/src/tests.rs b/chain/ethereum/src/tests.rs index 455a7c07432..00873f8ea87 100644 --- a/chain/ethereum/src/tests.rs +++ b/chain/ethereum/src/tests.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use graph::{ - blockchain::{block_stream::BlockWithTriggers, BlockPtr}, + blockchain::{block_stream::BlockWithTriggers, BlockPtr, Trigger}, prelude::{ web3::types::{Address, Bytes, Log, H160, H256, U64}, EthereumCall, LightEthereumBlock, @@ -107,10 +107,12 @@ fn test_trigger_ordering() { &logger, ); - assert_eq!( - block_with_triggers.trigger_data, - vec![log1, log2, call1, log3, call2, call4, call3, block2, block1] - ); + let expected = vec![log1, log2, call1, log3, call2, call4, call3, block2, block1] + .into_iter() + .map(|t| Trigger::Chain(t)) + .collect::>(); + + assert_eq!(block_with_triggers.trigger_data, expected); } #[test] @@ -203,8 +205,10 @@ fn test_trigger_dedup() { &logger, ); - assert_eq!( - block_with_triggers.trigger_data, - vec![log1, log2, call1, log3, call2, call3, block2, block1] - ); + let expected = vec![log1, log2, call1, log3, call2, call3, block2, block1] + .into_iter() + .map(|t| Trigger::Chain(t)) + .collect::>(); + + assert_eq!(block_with_triggers.trigger_data, expected); } diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 02c8e57d6a0..9fd7d510519 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -4,11 +4,11 @@ use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; use graph::blockchain::substreams_block_stream::SubstreamsBlockStream; use graph::blockchain::{ BasicBlockchainBuilder, BlockIngestor, BlockchainBuilder, BlockchainKind, NoopDecoderHook, - NoopRuntimeAdapter, + NoopRuntimeAdapter, Trigger, TriggerFilterWrapper, }; use graph::cheap_clone::CheapClone; use graph::components::network_provider::ChainName; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, SourceableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::env::EnvVars; use graph::firehose::FirehoseEndpoint; @@ -32,10 +32,12 @@ use graph::{ prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory}, }; use prost::Message; +use std::collections::HashSet; use std::sync::Arc; use crate::adapter::TriggerFilter; use crate::codec::substreams_triggers::BlockAndReceipts; +use crate::codec::Block; use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate}; use crate::trigger::{self, NearTrigger}; use crate::{ @@ -108,7 +110,6 @@ impl BlockStreamBuilder for NearStreamBuilder { chain.metrics_registry.clone(), ))) } - async fn build_firehose( &self, chain: &Chain, @@ -151,8 +152,9 @@ impl BlockStreamBuilder for NearStreamBuilder { _chain: &Chain, _deployment: DeploymentLocator, _start_blocks: Vec, + _source_subgraph_stores: Vec>, _subgraph_current_block: Option, - _filter: Arc<::TriggerFilter>, + _filter: Arc>, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>> { todo!() @@ -230,7 +232,8 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + _source_subgraph_stores: Vec>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { if self.prefer_substreams { @@ -242,7 +245,7 @@ impl Blockchain for Chain { deployment, store.firehose_cursor(), store.block_ptr(), - filter, + filter.chain_filter.clone(), ) .await; } @@ -254,7 +257,7 @@ impl Blockchain for Chain { store.firehose_cursor(), start_blocks, store.block_ptr(), - filter, + filter.chain_filter.clone(), unified_api_version, ) .await @@ -322,6 +325,18 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn triggers_in_block( &self, logger: &Logger, @@ -462,11 +477,13 @@ impl BlockStreamMapper for FirehoseMapper { .into_iter() .zip(receipt.into_iter()) .map(|(outcome, receipt)| { - NearTrigger::Receipt(Arc::new(trigger::ReceiptWithOutcome { - outcome, - receipt, - block: arc_block.clone(), - })) + Trigger::Chain(NearTrigger::Receipt(Arc::new( + trigger::ReceiptWithOutcome { + outcome, + receipt, + block: arc_block.clone(), + }, + ))) }) .collect(); @@ -973,8 +990,8 @@ mod test { .trigger_data .clone() .into_iter() - .filter_map(|x| match x { - crate::trigger::NearTrigger::Block(b) => b.header.clone().map(|x| x.height), + .filter_map(|x| match x.as_chain() { + Some(crate::trigger::NearTrigger::Block(b)) => b.header.clone().map(|x| x.height), _ => None, }) .collect() diff --git a/chain/substreams/src/block_stream.rs b/chain/substreams/src/block_stream.rs index 8844df0610e..8008694f66b 100644 --- a/chain/substreams/src/block_stream.rs +++ b/chain/substreams/src/block_stream.rs @@ -7,9 +7,9 @@ use graph::{ BlockStream, BlockStreamBuilder as BlockStreamBuilderTrait, FirehoseCursor, }, substreams_block_stream::SubstreamsBlockStream, - Blockchain, + Blockchain, TriggerFilterWrapper, }, - components::store::DeploymentLocator, + components::store::{DeploymentLocator, SourceableStore}, data::subgraph::UnifiedMappingApiVersion, prelude::{async_trait, BlockNumber, BlockPtr}, schema::InputSchema, @@ -104,8 +104,9 @@ impl BlockStreamBuilderTrait for BlockStreamBuilder { _chain: &Chain, _deployment: DeploymentLocator, _start_blocks: Vec, + _source_subgraph_stores: Vec>, _subgraph_current_block: Option, - _filter: Arc, + _filter: Arc>, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>> { unimplemented!("polling block stream is not support for substreams") diff --git a/chain/substreams/src/chain.rs b/chain/substreams/src/chain.rs index d2efe6dec91..a15dbb0f269 100644 --- a/chain/substreams/src/chain.rs +++ b/chain/substreams/src/chain.rs @@ -4,10 +4,10 @@ use anyhow::Error; use graph::blockchain::client::ChainClient; use graph::blockchain::{ BasicBlockchainBuilder, BlockIngestor, BlockTime, EmptyNodeCapabilities, NoopDecoderHook, - NoopRuntimeAdapter, + NoopRuntimeAdapter, TriggerFilterWrapper, }; use graph::components::network_provider::ChainName; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, SourceableStore}; use graph::env::EnvVars; use graph::prelude::{BlockHash, CheapClone, Entity, LoggerFactory, MetricsRegistry}; use graph::schema::EntityKey; @@ -140,7 +140,8 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, _start_blocks: Vec, - filter: Arc, + _source_subgraph_stores: Vec>, + filter: Arc>, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { self.block_stream_builder @@ -150,7 +151,7 @@ impl Blockchain for Chain { deployment, store.firehose_cursor(), store.block_ptr(), - filter, + filter.chain_filter.clone(), ) .await } diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index ed7016216d5..d2d10cde9e9 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -16,7 +16,7 @@ use graph::{ }; use graph_runtime_wasm::module::ToAscPtr; use lazy_static::__Deref; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use crate::{Block, Chain, NoopDataSourceTemplate, ParsedChanges}; @@ -136,6 +136,18 @@ impl blockchain::TriggersAdapter for TriggersAdapter { unimplemented!() } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result, Error> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn scan_triggers( &self, _from: BlockNumber, diff --git a/core/src/subgraph/context/instance/hosts.rs b/core/src/subgraph/context/instance/hosts.rs index 2ec1e4578f0..9c18e12ce1e 100644 --- a/core/src/subgraph/context/instance/hosts.rs +++ b/core/src/subgraph/context/instance/hosts.rs @@ -57,7 +57,7 @@ impl> OnchainHosts { } pub fn push(&mut self, host: Arc) { - assert!(host.data_source().as_onchain().is_some()); + assert!(host.data_source().is_chain_based()); self.hosts.push(host.cheap_clone()); let idx = self.hosts.len() - 1; diff --git a/core/src/subgraph/context/instance/mod.rs b/core/src/subgraph/context/instance/mod.rs index ed242836a28..86b64195493 100644 --- a/core/src/subgraph/context/instance/mod.rs +++ b/core/src/subgraph/context/instance/mod.rs @@ -22,13 +22,17 @@ pub(crate) struct SubgraphInstance> { pub(super) static_data_sources: Arc>>, host_metrics: Arc, - /// The hosts represent the data sources in the subgraph. There is one host per data source. + /// The hosts represent the onchain data sources in the subgraph. There is one host per data source. /// Data sources with no mappings (e.g. direct substreams) have no host. /// /// Onchain hosts must be created in increasing order of block number. `fn hosts_for_trigger` /// will return the onchain hosts in the same order as they were inserted. onchain_hosts: OnchainHosts, + /// `subgraph_hosts` represent subgraph data sources declared in the manifest. These are a special + /// kind of data source that depends on the data from another source subgraph. + subgraph_hosts: OnchainHosts, + offchain_hosts: OffchainHosts, /// Maps the hash of a module to a channel to the thread in which the module is instantiated. @@ -79,6 +83,7 @@ where network, static_data_sources: Arc::new(manifest.data_sources), onchain_hosts: OnchainHosts::new(), + subgraph_hosts: OnchainHosts::new(), offchain_hosts: OffchainHosts::new(), module_cache: HashMap::new(), templates, @@ -138,34 +143,44 @@ where ); } - let is_onchain = data_source.is_onchain(); let Some(host) = self.new_host(logger.clone(), data_source)? else { return Ok(None); }; // Check for duplicates and add the host. - if is_onchain { - // `onchain_hosts` will remain ordered by the creation block. - // See also 8f1bca33-d3b7-4035-affc-fd6161a12448. - ensure!( - self.onchain_hosts - .last() - .and_then(|h| h.creation_block_number()) - <= host.data_source().creation_block(), - ); + match host.data_source() { + DataSource::Onchain(_) => { + // `onchain_hosts` will remain ordered by the creation block. + // See also 8f1bca33-d3b7-4035-affc-fd6161a12448. + ensure!( + self.onchain_hosts + .last() + .and_then(|h| h.creation_block_number()) + <= host.data_source().creation_block(), + ); - if self.onchain_hosts.contains(&host) { - Ok(None) - } else { - self.onchain_hosts.push(host.cheap_clone()); - Ok(Some(host)) + if self.onchain_hosts.contains(&host) { + Ok(None) + } else { + self.onchain_hosts.push(host.cheap_clone()); + Ok(Some(host)) + } } - } else { - if self.offchain_hosts.contains(&host) { - Ok(None) - } else { - self.offchain_hosts.push(host.cheap_clone()); - Ok(Some(host)) + DataSource::Offchain(_) => { + if self.offchain_hosts.contains(&host) { + Ok(None) + } else { + self.offchain_hosts.push(host.cheap_clone()); + Ok(Some(host)) + } + } + DataSource::Subgraph(_) => { + if self.subgraph_hosts.contains(&host) { + Ok(None) + } else { + self.subgraph_hosts.push(host.cheap_clone()); + Ok(Some(host)) + } } } } @@ -226,6 +241,9 @@ where TriggerData::Offchain(trigger) => self .offchain_hosts .matches_by_address(trigger.source.address().as_ref().map(|a| a.as_slice())), + TriggerData::Subgraph(trigger) => self + .subgraph_hosts + .matches_by_address(Some(trigger.source.to_bytes().as_slice())), } } diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index ea42d2ff503..ef265978ede 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -6,7 +6,7 @@ use crate::polling_monitor::{ use anyhow::{self, Error}; use bytes::Bytes; use graph::{ - blockchain::{BlockTime, Blockchain}, + blockchain::{BlockTime, Blockchain, TriggerFilterWrapper}, components::{ store::{DeploymentId, SubgraphFork}, subgraph::{HostMetrics, MappingError, RuntimeHost as _, SharedProofOfIndexing}, @@ -77,7 +77,7 @@ where pub(crate) instance: SubgraphInstance, pub instances: SubgraphKeepAlive, pub offchain_monitor: OffchainMonitor, - pub filter: Option, + pub filter: Option>, pub(crate) trigger_processor: Box>, pub(crate) decoder: Box>, } diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index 02b20c089e3..ca52073ab06 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -1,7 +1,7 @@ use graph::{ - blockchain::{Blockchain, TriggersAdapter}, + blockchain::{block_stream::TriggersAdapterWrapper, Blockchain}, components::{ - store::{DeploymentLocator, SubgraphFork, WritableStore}, + store::{DeploymentLocator, SourceableStore, SubgraphFork, WritableStore}, subgraph::ProofOfIndexingVersion, }, data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion}, @@ -16,11 +16,12 @@ pub struct IndexingInputs { pub features: BTreeSet, pub start_blocks: Vec, pub end_blocks: BTreeSet, + pub source_subgraph_stores: Vec>, pub stop_block: Option, pub max_end_block: Option, pub store: Arc, pub debug_fork: Option>, - pub triggers_adapter: Arc>, + pub triggers_adapter: Arc>, pub chain: Arc, pub templates: Arc>>, pub unified_api_version: UnifiedMappingApiVersion, @@ -40,6 +41,7 @@ impl IndexingInputs { features, start_blocks, end_blocks, + source_subgraph_stores, stop_block, max_end_block, store: _, @@ -58,6 +60,7 @@ impl IndexingInputs { features: features.clone(), start_blocks: start_blocks.clone(), end_blocks: end_blocks.clone(), + source_subgraph_stores: source_subgraph_stores.clone(), stop_block: stop_block.clone(), max_end_block: max_end_block.clone(), store, diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index f9bb390d018..2d54a90417c 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -9,10 +9,11 @@ use crate::subgraph::Decoder; use std::collections::BTreeSet; use crate::subgraph::runner::SubgraphRunner; -use graph::blockchain::block_stream::BlockStreamMetrics; +use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper}; use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities}; use graph::components::metrics::gas::GasMetrics; use graph::components::metrics::subgraph::DeploymentStatusMetric; +use graph::components::store::SourceableStore; use graph::components::subgraph::ProofOfIndexingVersion; use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6}; use graph::data::value::Word; @@ -228,6 +229,30 @@ impl SubgraphInstanceManager { } } + pub async fn get_sourceable_stores( + &self, + hashes: Vec, + is_runner_test: bool, + ) -> anyhow::Result>> { + if is_runner_test { + return Ok(Vec::new()); + } + + let mut sourceable_stores = Vec::new(); + let subgraph_store = self.subgraph_store.clone(); + + for hash in hashes { + let loc = subgraph_store + .active_locator(&hash)? + .ok_or_else(|| anyhow!("no active deployment for hash {}", hash))?; + + let sourceable_store = subgraph_store.clone().sourceable(loc.id.clone()).await?; + sourceable_stores.push(sourceable_store); + } + + Ok(sourceable_stores) + } + pub async fn build_subgraph_runner( &self, logger: Logger, @@ -238,6 +263,34 @@ impl SubgraphInstanceManager { tp: Box>>, deployment_status_metric: DeploymentStatusMetric, ) -> anyhow::Result>> + where + C: Blockchain, + ::MappingTrigger: ToAscPtr, + { + self.build_subgraph_runner_inner( + logger, + env_vars, + deployment, + manifest, + stop_block, + tp, + deployment_status_metric, + false, + ) + .await + } + + pub async fn build_subgraph_runner_inner( + &self, + logger: Logger, + env_vars: Arc, + deployment: DeploymentLocator, + manifest: serde_yaml::Mapping, + stop_block: Option, + tp: Box>>, + deployment_status_metric: DeploymentStatusMetric, + is_runner_test: bool, + ) -> anyhow::Result>> where C: Blockchain, ::MappingTrigger: ToAscPtr, @@ -334,6 +387,16 @@ impl SubgraphInstanceManager { .filter_map(|d| d.as_onchain().cloned()) .collect::>(); + let subgraph_data_sources = data_sources + .iter() + .filter_map(|d| d.as_subgraph()) + .collect::>(); + + let subgraph_ds_source_deployments = subgraph_data_sources + .iter() + .map(|d| d.source.address()) + .collect::>(); + let required_capabilities = C::NodeCapabilities::from_data_sources(&onchain_data_sources); let network: Word = manifest.network_name().into(); @@ -345,7 +408,7 @@ impl SubgraphInstanceManager { let start_blocks: Vec = data_sources .iter() - .filter_map(|d| d.as_onchain().map(|d: &C::DataSource| d.start_block())) + .filter_map(|d| d.start_block()) .collect(); let end_blocks: BTreeSet = manifest @@ -453,11 +516,21 @@ impl SubgraphInstanceManager { let decoder = Box::new(Decoder::new(decoder_hook)); + let subgraph_data_source_stores = self + .get_sourceable_stores::(subgraph_ds_source_deployments, is_runner_test) + .await?; + + let triggers_adapter = Arc::new(TriggersAdapterWrapper::new( + triggers_adapter, + subgraph_data_source_stores.clone(), + )); + let inputs = IndexingInputs { deployment: deployment.clone(), features, start_blocks, end_blocks, + source_subgraph_stores: subgraph_data_source_stores, stop_block, max_end_block, store, diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index b7d45613b74..258f4bbcd15 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -611,7 +611,6 @@ async fn create_subgraph_version( ) .map_err(SubgraphRegistrarError::ResolveError) .await?; - // Determine if the graft_base should be validated. // Validate the graft_base if there is a pending graft, ensuring its presence. // If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated. diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 33b98b21ec7..5724fef150c 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -7,7 +7,10 @@ use atomic_refcell::AtomicRefCell; use graph::blockchain::block_stream::{ BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, }; -use graph::blockchain::{Block, BlockTime, Blockchain, DataSource as _, TriggerFilter as _}; +use graph::blockchain::{ + Block, BlockTime, Blockchain, DataSource as _, SubgraphFilter, Trigger, TriggerFilter as _, + TriggerFilterWrapper, +}; use graph::components::store::{EmptyStore, GetScope, ReadStore, StoredDynamicDataSource}; use graph::components::subgraph::InstanceDSTemplate; use graph::components::{ @@ -125,7 +128,7 @@ where self.inputs.static_filters || self.ctx.hosts_len() > ENV_VARS.static_filters_threshold } - fn build_filter(&self) -> C::TriggerFilter { + fn build_filter(&self) -> TriggerFilterWrapper { let current_ptr = self.inputs.store.block_ptr(); let static_filters = self.is_static_filters_enabled(); @@ -137,10 +140,30 @@ where None => true, }; + let data_sources = self.ctx.static_data_sources(); + + let subgraph_filter = data_sources + .iter() + .filter_map(|ds| ds.as_subgraph()) + .map(|ds| SubgraphFilter { + subgraph: ds.source.address(), + start_block: ds.source.start_block, + entities: ds + .mapping + .handlers + .iter() + .map(|handler| handler.entity.clone()) + .collect(), + }) + .collect::>(); + // if static_filters is not enabled we just stick to the filter based on all the data sources. if !static_filters { - return C::TriggerFilter::from_data_sources( - self.ctx.onchain_data_sources().filter(end_block_filter), + return TriggerFilterWrapper::new( + C::TriggerFilter::from_data_sources( + self.ctx.onchain_data_sources().filter(end_block_filter), + ), + subgraph_filter, ); } @@ -167,11 +190,11 @@ where filter.extend_with_template(templates.iter().filter_map(|ds| ds.as_onchain()).cloned()); - filter + TriggerFilterWrapper::new(filter, subgraph_filter) } #[cfg(debug_assertions)] - pub fn build_filter_for_test(&self) -> C::TriggerFilter { + pub fn build_filter_for_test(&self) -> TriggerFilterWrapper { self.build_filter() } @@ -229,7 +252,7 @@ where let mut block_stream = new_block_stream( &self.inputs, - self.ctx.filter.as_ref().unwrap(), // Safe to unwrap as we just called `build_filter` in the previous line + self.ctx.filter.clone().unwrap(), // Safe to unwrap as we just called `build_filter` in the previous line &self.metrics.subgraph, ) .await? @@ -369,7 +392,10 @@ where .match_and_decode_many( &logger, &block, - triggers.into_iter().map(TriggerData::Onchain), + triggers.into_iter().map(|t| match t { + Trigger::Chain(t) => TriggerData::Onchain(t), + Trigger::Subgraph(t) => TriggerData::Subgraph(t), + }), hosts_filter, &self.metrics.subgraph, ) @@ -528,7 +554,10 @@ where .match_and_decode_many( &logger, &block, - triggers.into_iter().map(TriggerData::Onchain), + triggers.into_iter().map(|t| match t { + Trigger::Chain(t) => TriggerData::Onchain(t), + Trigger::Subgraph(_) => unreachable!(), // TODO(krishna): Re-evaulate this + }), |_| Box::new(runtime_hosts.iter().map(Arc::as_ref)), &self.metrics.subgraph, ) diff --git a/core/src/subgraph/stream.rs b/core/src/subgraph/stream.rs index c1d767e3fcf..5547543f13d 100644 --- a/core/src/subgraph/stream.rs +++ b/core/src/subgraph/stream.rs @@ -1,13 +1,13 @@ use crate::subgraph::inputs::IndexingInputs; use anyhow::bail; use graph::blockchain::block_stream::{BlockStream, BufferedBlockStream}; -use graph::blockchain::Blockchain; +use graph::blockchain::{Blockchain, TriggerFilterWrapper}; use graph::prelude::{CheapClone, Error, SubgraphInstanceMetrics}; use std::sync::Arc; pub async fn new_block_stream( inputs: &IndexingInputs, - filter: &C::TriggerFilter, + filter: TriggerFilterWrapper, metrics: &SubgraphInstanceMetrics, ) -> Result>, Error> { let is_firehose = inputs.chain.chain_client().is_firehose(); @@ -18,6 +18,7 @@ pub async fn new_block_stream( inputs.deployment.clone(), inputs.store.cheap_clone(), inputs.start_blocks.clone(), + inputs.source_subgraph_stores.clone(), Arc::new(filter.clone()), inputs.unified_api_version.clone(), ) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 5d9c8b3534a..614d9f09d27 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,3 +1,5 @@ +use crate::data::store::scalar; +use crate::data_source::subgraph; use crate::substreams::Clock; use crate::substreams_rpc::response::Message as SubstreamsMessage; use crate::substreams_rpc::BlockScopedData; @@ -5,6 +7,7 @@ use anyhow::Error; use async_stream::stream; use futures03::Stream; use prost_types::Any; +use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::time::Instant; @@ -12,9 +15,11 @@ use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use super::substreams_block_stream::SubstreamsLogData; -use super::{Block, BlockPtr, BlockTime, Blockchain}; +use super::{ + Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper, +}; use crate::anyhow::Result; -use crate::components::store::{BlockNumber, DeploymentLocator}; +use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore}; use crate::data::subgraph::UnifiedMappingApiVersion; use crate::firehose::{self, FirehoseEndpoint}; use crate::futures03::stream::StreamExt as _; @@ -144,10 +149,33 @@ pub trait BlockStreamBuilder: Send + Sync { chain: &C, deployment: DeploymentLocator, start_blocks: Vec, + source_subgraph_stores: Vec>, subgraph_current_block: Option, - filter: Arc, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>>; + + async fn build_subgraph_block_stream( + &self, + chain: &C, + deployment: DeploymentLocator, + start_blocks: Vec, + source_subgraph_stores: Vec>, + subgraph_current_block: Option, + filter: Arc>, + unified_api_version: UnifiedMappingApiVersion, + ) -> Result>> { + self.build_polling( + chain, + deployment, + start_blocks, + source_subgraph_stores, + subgraph_current_block, + filter, + unified_api_version, + ) + .await + } } #[derive(Debug, Clone)] @@ -198,7 +226,7 @@ impl AsRef> for FirehoseCursor { #[derive(Debug)] pub struct BlockWithTriggers { pub block: C::Block, - pub trigger_data: Vec, + pub trigger_data: Vec>, } impl Clone for BlockWithTriggers @@ -216,7 +244,31 @@ where impl BlockWithTriggers { /// Creates a BlockWithTriggers structure, which holds /// the trigger data ordered and without any duplicates. - pub fn new(block: C::Block, mut trigger_data: Vec, logger: &Logger) -> Self { + pub fn new(block: C::Block, trigger_data: Vec, logger: &Logger) -> Self { + Self::new_with_triggers( + block, + trigger_data.into_iter().map(Trigger::Chain).collect(), + logger, + ) + } + + pub fn new_with_subgraph_triggers( + block: C::Block, + trigger_data: Vec, + logger: &Logger, + ) -> Self { + Self::new_with_triggers( + block, + trigger_data.into_iter().map(Trigger::Subgraph).collect(), + logger, + ) + } + + fn new_with_triggers( + block: C::Block, + mut trigger_data: Vec>, + logger: &Logger, + ) -> Self { // This is where triggers get sorted. trigger_data.sort(); @@ -256,6 +308,147 @@ impl BlockWithTriggers { pub fn parent_ptr(&self) -> Option { self.block.parent_ptr() } + + pub fn extend_triggers(&mut self, triggers: Vec>) { + self.trigger_data.extend(triggers); + self.trigger_data.sort(); + } +} + +/// The `TriggersAdapterWrapper` wraps the chain-specific `TriggersAdapter`, enabling chain-agnostic +/// handling of subgraph datasource triggers. Without this wrapper, we would have to duplicate the same +/// logic for each chain, increasing code repetition. +pub struct TriggersAdapterWrapper { + pub adapter: Arc>, + pub source_subgraph_stores: Vec>, +} + +impl TriggersAdapterWrapper { + pub fn new( + adapter: Arc>, + source_subgraph_stores: Vec>, + ) -> Self { + Self { + adapter, + source_subgraph_stores, + } + } +} + +impl TriggersAdapterWrapper { + pub async fn ancestor_block( + &self, + ptr: BlockPtr, + offset: BlockNumber, + root: Option, + ) -> Result, Error> { + self.adapter.ancestor_block(ptr, offset, root).await + } + + // TODO: Do a proper implementation, this is a complete mock implementation + pub async fn scan_triggers( + &self, + from: BlockNumber, + to: BlockNumber, + filter: &Arc>, + ) -> Result<(Vec>, BlockNumber), Error> { + if !filter.subgraph_filter.is_empty() { + return self + .subgraph_triggers(Logger::root(slog::Discard, o!()), from, to, filter) + .await; + } + + self.adapter + .scan_triggers(from, to, &filter.chain_filter) + .await + } + + pub async fn triggers_in_block( + &self, + logger: &Logger, + block: C::Block, + filter: &C::TriggerFilter, + ) -> Result, Error> { + self.adapter.triggers_in_block(logger, block, filter).await + } + + pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result { + self.adapter.is_on_main_chain(ptr).await + } + + pub async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error> { + self.adapter.parent_ptr(block).await + } + + pub async fn chain_head_ptr(&self) -> Result, Error> { + self.adapter.chain_head_ptr().await + } + + // TODO(krishna): Currently this is a mock implementation of subgraph triggers. + // This will be replaced with the actual implementation which will use the filters to + // query the database of the source subgraph and return the entity triggers. + async fn subgraph_triggers( + &self, + logger: Logger, + from: BlockNumber, + to: BlockNumber, + filter: &Arc>, + ) -> Result<(Vec>, BlockNumber), Error> { + let logger2 = logger.cheap_clone(); + let adapter = self.adapter.clone(); + // let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?; + // let to = to_ptr.block_number(); + + let first_filter = filter.subgraph_filter.first().unwrap(); + + let blocks = adapter + .load_blocks_by_numbers(logger, HashSet::from_iter(from..=to)) + .await? + .into_iter() + .map(|block| { + let trigger_data = vec![Self::create_mock_subgraph_trigger(first_filter, &block)]; + BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger2) + }) + .collect(); + + Ok((blocks, to)) + } + + fn create_mock_subgraph_trigger( + filter: &SubgraphFilter, + block: &C::Block, + ) -> subgraph::TriggerData { + let mock_entity = Self::create_mock_entity(block); + subgraph::TriggerData { + source: filter.subgraph.clone(), + entity: mock_entity, + entity_type: filter.entities.first().unwrap().clone(), + } + } + + fn create_mock_entity(block: &C::Block) -> Entity { + let id = DeploymentHash::new("test").unwrap(); + let data_schema = InputSchema::parse_latest( + "type Block @entity { id: Bytes!, number: BigInt!, hash: Bytes! }", + id.clone(), + ) + .unwrap(); + + let block = block.ptr(); + let hash = Value::Bytes(scalar::Bytes::from(block.hash_slice().to_vec())); + let data = data_schema + .make_entity(vec![ + ("id".into(), hash.clone()), + ( + "number".into(), + Value::BigInt(scalar::BigInt::from(block.block_number())), + ), + ("hash".into(), hash), + ]) + .unwrap(); + + data + } } #[async_trait] @@ -298,6 +491,15 @@ pub trait TriggersAdapter: Send + Sync { /// Get pointer to parent of `block`. This is called when reverting `block`. async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error>; + + /// Get pointer to parent of `block`. This is called when reverting `block`. + async fn chain_head_ptr(&self) -> Result, Error>; + + async fn load_blocks_by_numbers( + &self, + logger: Logger, + block_numbers: HashSet, + ) -> Result>; } #[async_trait] diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index c89eca95727..18f1de92546 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -2,22 +2,23 @@ use crate::{ bail, components::{ link_resolver::LinkResolver, - store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator}, + store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, SourceableStore}, subgraph::InstanceDSTemplateInfo, }, data::subgraph::UnifiedMappingApiVersion, prelude::{BlockHash, DataSourceTemplateInfo}, }; -use anyhow::Error; +use anyhow::{Error, Result}; use async_trait::async_trait; use serde::Deserialize; +use slog::Logger; use std::{collections::HashSet, convert::TryFrom, sync::Arc}; use super::{ block_stream::{self, BlockStream, FirehoseCursor}, client::ChainClient, BlockIngestor, BlockTime, EmptyNodeCapabilities, HostFn, IngestorError, MappingTriggerTrait, - NoopDecoderHook, TriggerWithHandler, + NoopDecoderHook, Trigger, TriggerFilterWrapper, TriggerWithHandler, }; use super::{ @@ -218,31 +219,49 @@ impl UnresolvedDataSourceTemplate for MockUnresolvedDataSource pub struct MockTriggersAdapter; #[async_trait] -impl TriggersAdapter for MockTriggersAdapter { +impl TriggersAdapter for MockTriggersAdapter { async fn ancestor_block( &self, _ptr: BlockPtr, _offset: BlockNumber, _root: Option, - ) -> Result, Error> { + ) -> Result, Error> { todo!() } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn scan_triggers( &self, - _from: crate::components::store::BlockNumber, - _to: crate::components::store::BlockNumber, - _filter: &C::TriggerFilter, - ) -> Result<(Vec>, BlockNumber), Error> { - todo!() + from: crate::components::store::BlockNumber, + to: crate::components::store::BlockNumber, + filter: &MockTriggerFilter, + ) -> Result< + ( + Vec>, + BlockNumber, + ), + Error, + > { + blocks_with_triggers(from, to, filter).await } async fn triggers_in_block( &self, _logger: &slog::Logger, - _block: C::Block, - _filter: &C::TriggerFilter, - ) -> Result, Error> { + _block: MockBlock, + _filter: &MockTriggerFilter, + ) -> Result, Error> { todo!() } @@ -255,6 +274,26 @@ impl TriggersAdapter for MockTriggersAdapter { } } +async fn blocks_with_triggers( + _from: crate::components::store::BlockNumber, + to: crate::components::store::BlockNumber, + _filter: &MockTriggerFilter, +) -> Result< + ( + Vec>, + BlockNumber, + ), + Error, +> { + Ok(( + vec![BlockWithTriggers { + block: MockBlock { number: 0 }, + trigger_data: vec![Trigger::Chain(MockTriggerData)], + }], + to, + )) +} + #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct MockTriggerData; @@ -347,7 +386,8 @@ impl Blockchain for MockBlockchain { _deployment: DeploymentLocator, _store: impl DeploymentCursorTracker, _start_blocks: Vec, - _filter: Arc, + _source_subgraph_stores: Vec>, + _filter: Arc>, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { todo!() diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 9f3df60fe5f..e81618a740d 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -19,13 +19,15 @@ use crate::{ cheap_clone::CheapClone, components::{ metrics::subgraph::SubgraphInstanceMetrics, - store::{DeploymentCursorTracker, DeploymentLocator, StoredDynamicDataSource}, + store::{ + DeploymentCursorTracker, DeploymentLocator, SourceableStore, StoredDynamicDataSource, + }, subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError}, trigger_processor::RunnableTriggers, }, data::subgraph::{UnifiedMappingApiVersion, MIN_SPEC_VERSION}, - data_source::{self, DataSourceTemplateInfo}, - prelude::DataSourceContext, + data_source::{self, subgraph, DataSourceTemplateInfo}, + prelude::{DataSourceContext, DeploymentHash}, runtime::{gas::GasCounter, AscHeap, HostExportError}, }; use crate::{ @@ -189,7 +191,8 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + source_subgraph_stores: Vec>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error>; @@ -247,6 +250,42 @@ impl From for IngestorError { } } +/// The `TriggerFilterWrapper` is a higher-level wrapper around the chain-specific `TriggerFilter`, +/// enabling subgraph-based trigger filtering for subgraph datasources. This abstraction is necessary +/// because subgraph filtering operates at a higher level than chain-based filtering. By using this wrapper, +/// we reduce code duplication, allowing subgraph-based filtering to be implemented once, instead of +/// duplicating it across different chains. +#[derive(Debug)] +pub struct TriggerFilterWrapper { + pub chain_filter: Arc, + pub subgraph_filter: Vec, +} + +#[derive(Clone, Debug)] +pub struct SubgraphFilter { + pub subgraph: DeploymentHash, + pub start_block: BlockNumber, + pub entities: Vec, +} + +impl TriggerFilterWrapper { + pub fn new(filter: C::TriggerFilter, subgraph_filter: Vec) -> Self { + Self { + chain_filter: Arc::new(filter), + subgraph_filter, + } + } +} + +impl Clone for TriggerFilterWrapper { + fn clone(&self) -> Self { + Self { + chain_filter: self.chain_filter.cheap_clone(), + subgraph_filter: self.subgraph_filter.clone(), + } + } +} + pub trait TriggerFilter: Default + Clone + Send + Sync { fn from_data_sources<'a>( data_sources: impl Iterator + Clone, @@ -370,6 +409,76 @@ pub trait UnresolvedDataSource: ) -> Result; } +#[derive(Debug)] +pub enum Trigger { + Chain(C::TriggerData), + Subgraph(subgraph::TriggerData), +} + +impl Trigger { + pub fn as_chain(&self) -> Option<&C::TriggerData> { + match self { + Trigger::Chain(data) => Some(data), + _ => None, + } + } + + pub fn as_subgraph(&self) -> Option<&subgraph::TriggerData> { + match self { + Trigger::Subgraph(data) => Some(data), + _ => None, + } + } +} + +impl Eq for Trigger where C::TriggerData: Eq {} + +impl PartialEq for Trigger +where + C::TriggerData: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Trigger::Chain(data1), Trigger::Chain(data2)) => data1 == data2, + (Trigger::Subgraph(a), Trigger::Subgraph(b)) => a == b, + _ => false, + } + } +} + +impl Clone for Trigger +where + C::TriggerData: Clone, +{ + fn clone(&self) -> Self { + match self { + Trigger::Chain(data) => Trigger::Chain(data.clone()), + Trigger::Subgraph(data) => Trigger::Subgraph(data.clone()), + } + } +} + +// TODO(krishna): Proper ordering for triggers +impl Ord for Trigger +where + C::TriggerData: Ord, +{ + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + match (self, other) { + (Trigger::Chain(data1), Trigger::Chain(data2)) => data1.cmp(data2), + (Trigger::Subgraph(_), Trigger::Chain(_)) => std::cmp::Ordering::Greater, + (Trigger::Chain(_), Trigger::Subgraph(_)) => std::cmp::Ordering::Less, + (Trigger::Subgraph(_), Trigger::Subgraph(_)) => std::cmp::Ordering::Equal, + } + } +} + +impl PartialOrd for Trigger { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + pub trait TriggerData { /// If there is an error when processing this trigger, this will called to add relevant context. /// For example an useful return is: `"block # (), transaction ". @@ -485,6 +594,7 @@ impl FromStr for BlockchainKind { "near" => Ok(BlockchainKind::Near), "cosmos" => Ok(BlockchainKind::Cosmos), "substreams" => Ok(BlockchainKind::Substreams), + "subgraph" => Ok(BlockchainKind::Ethereum), // TODO(krishna): We should detect the blockchain kind from the source subgraph _ => Err(anyhow!("unknown blockchain kind {}", s)), } } diff --git a/graph/src/blockchain/polling_block_stream.rs b/graph/src/blockchain/polling_block_stream.rs index ce3fdf2a4ef..5b37cd303b4 100644 --- a/graph/src/blockchain/polling_block_stream.rs +++ b/graph/src/blockchain/polling_block_stream.rs @@ -9,9 +9,9 @@ use std::time::Duration; use super::block_stream::{ BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream, - FirehoseCursor, TriggersAdapter, BUFFERED_BLOCK_STREAM_SIZE, + FirehoseCursor, TriggersAdapterWrapper, BUFFERED_BLOCK_STREAM_SIZE, }; -use super::{Block, BlockPtr, Blockchain}; +use super::{Block, BlockPtr, Blockchain, TriggerFilterWrapper}; use crate::components::store::BlockNumber; use crate::data::subgraph::UnifiedMappingApiVersion; @@ -79,13 +79,13 @@ where C: Blockchain, { chain_store: Arc, - adapter: Arc>, + adapter: Arc>, node_id: NodeId, subgraph_id: DeploymentHash, // This is not really a block number, but the (unsigned) difference // between two block numbers reorg_threshold: BlockNumber, - filter: Arc, + filter: Arc>, start_blocks: Vec, logger: Logger, previous_triggers_per_block: f64, @@ -146,10 +146,10 @@ where pub fn new( chain_store: Arc, chain_head_update_stream: ChainHeadUpdateStream, - adapter: Arc>, + adapter: Arc>, node_id: NodeId, subgraph_id: DeploymentHash, - filter: Arc, + filter: Arc>, start_blocks: Vec, reorg_threshold: BlockNumber, logger: Logger, @@ -218,7 +218,7 @@ where let max_block_range_size = self.max_block_range_size; // Get pointers from database for comparison - let head_ptr_opt = ctx.chain_store.chain_head_ptr().await?; + let head_ptr_opt = ctx.adapter.chain_head_ptr().await?; let subgraph_ptr = self.current_block.clone(); // If chain head ptr is not set yet @@ -469,7 +469,11 @@ where // Note that head_ancestor is a child of subgraph_ptr. let block = self .adapter - .triggers_in_block(&self.logger, head_ancestor, &self.filter) + .triggers_in_block( + &self.logger, + head_ancestor, + &self.filter.chain_filter.clone(), + ) .await?; Ok(ReconciliationStep::ProcessDescendantBlocks(vec![block], 1)) } else { diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 89c3e12039d..824956ea2d1 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -31,6 +31,10 @@ impl BlockHash { &self.0 } + pub fn as_h256(&self) -> H256 { + H256::from_slice(self.as_slice()) + } + /// Encodes the block hash into a hexadecimal string **without** a "0x" /// prefix. Hashes are stored in the database in this format when the /// schema uses `text` columns, which is a legacy and such columns diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 0ac80902a66..7ed6a4bc36e 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::ops::Range; use anyhow::Error; use async_trait::async_trait; @@ -185,6 +186,11 @@ pub trait SubgraphStore: Send + Sync + 'static { manifest_idx_and_name: Arc>, ) -> Result, StoreError>; + async fn sourceable( + self: Arc, + deployment: DeploymentId, + ) -> Result, StoreError>; + /// Initiate a graceful shutdown of the writable that a previous call to /// `writable` might have started async fn stop_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>; @@ -287,6 +293,42 @@ impl DeploymentCursorTracker for Arc { } } +#[async_trait] +pub trait SourceableStore: Sync + Send + 'static { + /// Returns all versions of entities of the given entity_type that were + /// changed in the given block_range. + fn get_range( + &self, + entity_type: &EntityType, + block_range: Range, + ) -> Result>, StoreError>; + + fn input_schema(&self) -> InputSchema; + + /// Get a pointer to the most recently processed block in the subgraph. + async fn block_ptr(&self) -> Result, StoreError>; +} + +// This silly impl is needed until https://github.com/rust-lang/rust/issues/65991 is stable. +#[async_trait] +impl SourceableStore for Arc { + fn get_range( + &self, + entity_type: &EntityType, + block_range: Range, + ) -> Result>, StoreError> { + (**self).get_range(entity_type, block_range) + } + + fn input_schema(&self) -> InputSchema { + (**self).input_schema() + } + + async fn block_ptr(&self) -> Result, StoreError> { + (**self).block_ptr().await + } +} + /// A view of the store for indexing. All indexing-related operations need /// to go through this trait. Methods in this trait will never return a /// `StoreError::DatabaseUnavailable`. Instead, they will retry the diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index 5609b2ac8f4..11b473a878d 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -20,6 +20,7 @@ impl From<&DataSourceTemplate> for InstanceDSTemplate { match value { DataSourceTemplate::Onchain(ds) => Self::Onchain(ds.info()), DataSourceTemplate::Offchain(ds) => Self::Offchain(ds.clone()), + DataSourceTemplate::Subgraph(_) => todo!(), // TODO(krishna) } } } diff --git a/graph/src/components/subgraph/proof_of_indexing/online.rs b/graph/src/components/subgraph/proof_of_indexing/online.rs index caaa76f0a76..f90fac969cf 100644 --- a/graph/src/components/subgraph/proof_of_indexing/online.rs +++ b/graph/src/components/subgraph/proof_of_indexing/online.rs @@ -146,8 +146,8 @@ impl BlockEventStream { fn write(&mut self, event: &ProofOfIndexingEvent<'_>) { let children = &[ 1, // kvp -> v - 0, // PoICausalityRegion.blocks: Vec - self.block_index, // Vec -> [i] + 0, // PoICausalityRegion.blocks: Result> + self.block_index, // Result> -> [i] 0, // Block.events -> Vec self.vec_length, ]; diff --git a/graph/src/data/subgraph/api_version.rs b/graph/src/data/subgraph/api_version.rs index e626e9f1dbc..43ee639007c 100644 --- a/graph/src/data/subgraph/api_version.rs +++ b/graph/src/data/subgraph/api_version.rs @@ -54,6 +54,9 @@ pub const SPEC_VERSION_1_1_0: Version = Version::new(1, 1, 0); // Enables eth call declarations and indexed arguments(topics) filtering in manifest pub const SPEC_VERSION_1_2_0: Version = Version::new(1, 2, 0); +// Enables subgraphs as datasource +pub const SPEC_VERSION_1_3_0: Version = Version::new(1, 3, 0); + // The latest spec version available pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_2_0; diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index df379845c00..a1d1156dccf 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -33,7 +33,7 @@ use web3::types::Address; use crate::{ bail, - blockchain::{BlockPtr, Blockchain, DataSource as _}, + blockchain::{BlockPtr, Blockchain}, components::{ link_resolver::LinkResolver, store::{StoreError, SubgraphStore}, @@ -140,6 +140,10 @@ impl DeploymentHash { link: format!("/ipfs/{}", self), } } + + pub fn to_bytes(&self) -> Vec { + self.0.as_bytes().to_vec() + } } impl Deref for DeploymentHash { @@ -713,7 +717,7 @@ impl UnvalidatedSubgraphManifest { .0 .data_sources .iter() - .filter_map(|d| Some(d.as_onchain()?.network()?.to_string())) + .filter_map(|d| Some(d.network()?.to_string())) .collect::>(); networks.sort(); networks.dedup(); @@ -759,11 +763,9 @@ impl SubgraphManifest { max_spec_version: semver::Version, ) -> Result { let unresolved = UnresolvedSubgraphManifest::parse(id, raw)?; - let resolved = unresolved .resolve(resolver, logger, max_spec_version) .await?; - Ok(resolved) } @@ -771,14 +773,14 @@ impl SubgraphManifest { // Assume the manifest has been validated, ensuring network names are homogenous self.data_sources .iter() - .find_map(|d| Some(d.as_onchain()?.network()?.to_string())) + .find_map(|d| Some(d.network()?.to_string())) .expect("Validated manifest does not have a network defined on any datasource") } pub fn start_blocks(&self) -> Vec { self.data_sources .iter() - .filter_map(|d| Some(d.as_onchain()?.start_block())) + .filter_map(|d| d.start_block()) .collect() } diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index a38148b25fe..1d255b1563c 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -1,6 +1,8 @@ pub mod causality_region; pub mod offchain; +pub mod subgraph; +pub use self::DataSource as DataSourceEnum; pub use causality_region::CausalityRegion; #[cfg(test)] @@ -17,6 +19,7 @@ use crate::{ store::{BlockNumber, StoredDynamicDataSource}, }, data_source::offchain::OFFCHAIN_KINDS, + data_source::subgraph::SUBGRAPH_DS_KIND, prelude::{CheapClone as _, DataSourceContext}, schema::{EntityType, InputSchema}, }; @@ -35,6 +38,7 @@ use thiserror::Error; pub enum DataSource { Onchain(C::DataSource), Offchain(offchain::DataSource), + Subgraph(subgraph::DataSource), } #[derive(Error, Debug)] @@ -89,6 +93,23 @@ impl DataSource { match self { Self::Onchain(ds) => Some(ds), Self::Offchain(_) => None, + Self::Subgraph(_) => None, + } + } + + pub fn as_subgraph(&self) -> Option<&subgraph::DataSource> { + match self { + Self::Onchain(_) => None, + Self::Offchain(_) => None, + Self::Subgraph(ds) => Some(ds), + } + } + + pub fn is_chain_based(&self) -> bool { + match self { + Self::Onchain(_) => true, + Self::Offchain(_) => false, + Self::Subgraph(_) => true, } } @@ -96,6 +117,23 @@ impl DataSource { match self { Self::Onchain(_) => None, Self::Offchain(ds) => Some(ds), + Self::Subgraph(_) => None, + } + } + + pub fn network(&self) -> Option<&str> { + match self { + DataSourceEnum::Onchain(ds) => ds.network(), + DataSourceEnum::Offchain(_) => None, + DataSourceEnum::Subgraph(ds) => ds.network(), + } + } + + pub fn start_block(&self) -> Option { + match self { + DataSourceEnum::Onchain(ds) => Some(ds.start_block()), + DataSourceEnum::Offchain(_) => None, + DataSourceEnum::Subgraph(ds) => Some(ds.source.start_block), } } @@ -111,6 +149,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.address().map(ToOwned::to_owned), Self::Offchain(ds) => ds.address(), + Self::Subgraph(ds) => ds.address(), } } @@ -118,6 +157,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.name(), Self::Offchain(ds) => &ds.name, + Self::Subgraph(ds) => &ds.name, } } @@ -125,6 +165,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.kind().to_owned(), Self::Offchain(ds) => ds.kind.to_string(), + Self::Subgraph(ds) => ds.kind.clone(), } } @@ -132,6 +173,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.min_spec_version(), Self::Offchain(ds) => ds.min_spec_version(), + Self::Subgraph(ds) => ds.min_spec_version(), } } @@ -139,6 +181,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.end_block(), Self::Offchain(_) => None, + Self::Subgraph(_) => None, } } @@ -146,6 +189,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.creation_block(), Self::Offchain(ds) => ds.creation_block, + Self::Subgraph(ds) => ds.creation_block, } } @@ -153,6 +197,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.context(), Self::Offchain(ds) => ds.context.clone(), + Self::Subgraph(ds) => ds.context.clone(), } } @@ -160,6 +205,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.api_version(), Self::Offchain(ds) => ds.mapping.api_version.clone(), + Self::Subgraph(ds) => ds.mapping.api_version.clone(), } } @@ -167,6 +213,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.runtime(), Self::Offchain(ds) => Some(ds.mapping.runtime.cheap_clone()), + Self::Subgraph(ds) => Some(ds.mapping.runtime.cheap_clone()), } } @@ -176,6 +223,7 @@ impl DataSource { // been enforced. Self::Onchain(_) => EntityTypeAccess::Any, Self::Offchain(ds) => EntityTypeAccess::Restriced(ds.mapping.entities.clone()), + Self::Subgraph(_) => EntityTypeAccess::Any, } } @@ -183,6 +231,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.handler_kinds(), Self::Offchain(ds) => vec![ds.handler_kind()].into_iter().collect(), + Self::Subgraph(ds) => vec![ds.handler_kind()].into_iter().collect(), } } @@ -190,6 +239,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.has_declared_calls(), Self::Offchain(_) => false, + Self::Subgraph(_) => false, } } @@ -207,8 +257,15 @@ impl DataSource { (Self::Offchain(ds), TriggerData::Offchain(trigger)) => { Ok(ds.match_and_decode(trigger)) } + (Self::Subgraph(ds), TriggerData::Subgraph(trigger)) => { + Ok(ds.match_and_decode(block, trigger)) + } (Self::Onchain(_), TriggerData::Offchain(_)) - | (Self::Offchain(_), TriggerData::Onchain(_)) => Ok(None), + | (Self::Offchain(_), TriggerData::Onchain(_)) + | (Self::Onchain(_), TriggerData::Subgraph(_)) + | (Self::Offchain(_), TriggerData::Subgraph(_)) + | (Self::Subgraph(_), TriggerData::Onchain(_)) + | (Self::Subgraph(_), TriggerData::Offchain(_)) => Ok(None), } } @@ -224,6 +281,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.as_stored_dynamic_data_source(), Self::Offchain(ds) => ds.as_stored_dynamic_data_source(), + Self::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -240,6 +298,7 @@ impl DataSource { offchain::DataSource::from_stored_dynamic_data_source(template, stored) .map(DataSource::Offchain) } + DataSourceTemplate::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -247,6 +306,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.validate(spec_version), Self::Offchain(_) => vec![], + Self::Subgraph(_) => vec![], // TODO(krishna) } } @@ -254,6 +314,7 @@ impl DataSource { match self { Self::Onchain(_) => CausalityRegion::ONCHAIN, Self::Offchain(ds) => ds.causality_region, + Self::Subgraph(_) => CausalityRegion::ONCHAIN, } } } @@ -262,6 +323,7 @@ impl DataSource { pub enum UnresolvedDataSource { Onchain(C::UnresolvedDataSource), Offchain(offchain::UnresolvedDataSource), + Subgraph(subgraph::UnresolvedDataSource), } impl UnresolvedDataSource { @@ -276,6 +338,10 @@ impl UnresolvedDataSource { .resolve(resolver, logger, manifest_idx) .await .map(DataSource::Onchain), + Self::Subgraph(unresolved) => unresolved + .resolve(resolver, logger, manifest_idx) + .await + .map(DataSource::Subgraph), Self::Offchain(_unresolved) => { anyhow::bail!( "static file data sources are not yet supported, \\ @@ -299,6 +365,7 @@ pub struct DataSourceTemplateInfo { pub enum DataSourceTemplate { Onchain(C::DataSourceTemplate), Offchain(offchain::DataSourceTemplate), + Subgraph(subgraph::DataSourceTemplate), } impl DataSourceTemplate { @@ -306,6 +373,7 @@ impl DataSourceTemplate { match self { DataSourceTemplate::Onchain(template) => template.info(), DataSourceTemplate::Offchain(template) => template.clone().into(), + DataSourceTemplate::Subgraph(template) => template.clone().into(), } } @@ -313,6 +381,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => Some(ds), Self::Offchain(_) => None, + Self::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -320,6 +389,7 @@ impl DataSourceTemplate { match self { Self::Onchain(_) => None, Self::Offchain(t) => Some(t), + Self::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -327,6 +397,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => Some(ds), Self::Offchain(_) => None, + Self::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -334,6 +405,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => &ds.name(), Self::Offchain(ds) => &ds.name, + Self::Subgraph(ds) => &ds.name, } } @@ -341,6 +413,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => ds.api_version(), Self::Offchain(ds) => ds.mapping.api_version.clone(), + Self::Subgraph(ds) => ds.mapping.api_version.clone(), } } @@ -348,6 +421,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => ds.runtime(), Self::Offchain(ds) => Some(ds.mapping.runtime.clone()), + Self::Subgraph(ds) => Some(ds.mapping.runtime.clone()), } } @@ -355,6 +429,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => ds.manifest_idx(), Self::Offchain(ds) => ds.manifest_idx, + Self::Subgraph(ds) => ds.manifest_idx, } } @@ -362,6 +437,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => ds.kind().to_string(), Self::Offchain(ds) => ds.kind.to_string(), + Self::Subgraph(ds) => ds.kind.clone(), } } } @@ -370,6 +446,7 @@ impl DataSourceTemplate { pub enum UnresolvedDataSourceTemplate { Onchain(C::UnresolvedDataSourceTemplate), Offchain(offchain::UnresolvedDataSourceTemplate), + Subgraph(subgraph::UnresolvedDataSourceTemplate), } impl Default for UnresolvedDataSourceTemplate { @@ -395,6 +472,10 @@ impl UnresolvedDataSourceTemplate { .resolve(resolver, logger, manifest_idx, schema) .await .map(DataSourceTemplate::Offchain), + Self::Subgraph(ds) => ds + .resolve(resolver, logger, manifest_idx) + .await + .map(DataSourceTemplate::Subgraph), } } } @@ -475,6 +556,7 @@ impl TriggerWithHandler { pub enum TriggerData { Onchain(C::TriggerData), Offchain(offchain::TriggerData), + Subgraph(subgraph::TriggerData), } impl TriggerData { @@ -482,6 +564,7 @@ impl TriggerData { match self { Self::Onchain(trigger) => trigger.error_context(), Self::Offchain(trigger) => format!("{:?}", trigger.source), + Self::Subgraph(trigger) => format!("{:?}", trigger.source), } } } @@ -490,6 +573,7 @@ impl TriggerData { pub enum MappingTrigger { Onchain(C::MappingTrigger), Offchain(offchain::TriggerData), + Subgraph(subgraph::TriggerData), } impl MappingTrigger { @@ -497,6 +581,7 @@ impl MappingTrigger { match self { Self::Onchain(trigger) => Some(trigger.error_context()), Self::Offchain(_) => None, // TODO: Add error context for offchain triggers + Self::Subgraph(_) => None, // TODO(krishna) } } @@ -504,6 +589,7 @@ impl MappingTrigger { match self { Self::Onchain(trigger) => Some(trigger), Self::Offchain(_) => None, + Self::Subgraph(_) => None, // TODO(krishna) } } } @@ -515,6 +601,7 @@ macro_rules! clone_data_source { match self { Self::Onchain(ds) => Self::Onchain(ds.clone()), Self::Offchain(ds) => Self::Offchain(ds.clone()), + Self::Subgraph(ds) => Self::Subgraph(ds.clone()), } } } @@ -541,6 +628,10 @@ macro_rules! deserialize_data_source { offchain::$t::deserialize(map.into_deserializer()) .map_err(serde::de::Error::custom) .map($t::Offchain) + } else if SUBGRAPH_DS_KIND == kind { + subgraph::$t::deserialize(map.into_deserializer()) + .map_err(serde::de::Error::custom) + .map($t::Subgraph) } else if (&C::KIND.to_string() == kind) || C::ALIASES.contains(&kind) { C::$t::deserialize(map.into_deserializer()) .map_err(serde::de::Error::custom) diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs new file mode 100644 index 00000000000..dba43786438 --- /dev/null +++ b/graph/src/data_source/subgraph.rs @@ -0,0 +1,305 @@ +use crate::{ + blockchain::{Block, Blockchain}, + components::{ + link_resolver::LinkResolver, + store::{BlockNumber, Entity}, + }, + data::{subgraph::SPEC_VERSION_1_3_0, value::Word}, + data_source, + prelude::{DataSourceContext, DeploymentHash, Link}, +}; +use anyhow::{Context, Error}; +use serde::Deserialize; +use slog::{info, Logger}; +use std::{fmt, sync::Arc}; + +use super::{DataSourceTemplateInfo, TriggerWithHandler}; + +pub const SUBGRAPH_DS_KIND: &str = "subgraph"; + +const ENTITY_HANDLER_KINDS: &str = "entity"; + +#[derive(Debug, Clone)] +pub struct DataSource { + pub kind: String, + pub name: String, + pub network: String, + pub manifest_idx: u32, + pub source: Source, + pub mapping: Mapping, + pub context: Arc>, + pub creation_block: Option, +} + +impl DataSource { + pub fn new( + kind: String, + name: String, + network: String, + manifest_idx: u32, + source: Source, + mapping: Mapping, + context: Arc>, + creation_block: Option, + ) -> Self { + Self { + kind, + name, + network, + manifest_idx, + source, + mapping, + context, + creation_block, + } + } + + pub fn min_spec_version(&self) -> semver::Version { + SPEC_VERSION_1_3_0 + } + + pub fn handler_kind(&self) -> &str { + ENTITY_HANDLER_KINDS + } + + pub fn network(&self) -> Option<&str> { + Some(&self.network) + } + + pub fn match_and_decode( + &self, + block: &Arc, + trigger: &TriggerData, + ) -> Option>> { + if self.source.address != trigger.source { + return None; + } + + let trigger_ref = self.mapping.handlers.iter().find_map(|handler| { + if handler.entity != trigger.entity_type { + return None; + } + + Some(TriggerWithHandler::new( + data_source::MappingTrigger::Subgraph(trigger.clone()), + handler.handler.clone(), + block.ptr(), + block.timestamp(), + )) + }); + + return trigger_ref; + } + + pub fn address(&self) -> Option> { + Some(self.source.address().to_bytes()) + } + + pub fn source_subgraph(&self) -> DeploymentHash { + self.source.address() + } +} + +pub type Base64 = Word; + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +pub struct Source { + pub address: DeploymentHash, + #[serde(default)] + pub start_block: BlockNumber, +} + +impl Source { + /// The concept of an address may or not make sense for a subgraph data source, but graph node + /// will use this in a few places where some sort of not necessarily unique id is useful: + /// 1. This is used as the value to be returned to mappings from the `dataSource.address()` host + /// function, so changing this is a breaking change. + /// 2. This is used to match with triggers with hosts in `fn hosts_for_trigger`, so make sure + /// the `source` of the data source is equal the `source` of the `TriggerData`. + pub fn address(&self) -> DeploymentHash { + self.address.clone() + } +} + +#[derive(Clone, Debug)] +pub struct Mapping { + pub language: String, + pub api_version: semver::Version, + pub entities: Vec, + pub handlers: Vec, + pub runtime: Arc>, + pub link: Link, +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +pub struct EntityHandler { + pub handler: String, + pub entity: String, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize)] +pub struct UnresolvedDataSource { + pub kind: String, + pub name: String, + pub network: String, + pub source: UnresolvedSource, + pub mapping: UnresolvedMapping, +} + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +pub struct UnresolvedSource { + address: DeploymentHash, + #[serde(default)] + start_block: BlockNumber, +} + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UnresolvedMapping { + pub api_version: String, + pub language: String, + pub file: Link, + pub handlers: Vec, + pub entities: Vec, +} + +impl UnresolvedDataSource { + #[allow(dead_code)] + pub(super) async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + manifest_idx: u32, + ) -> Result { + info!(logger, "Resolve subgraph data source"; + "name" => &self.name, + "kind" => &self.kind, + "source" => format_args!("{:?}", &self.source), + ); + + let kind = self.kind; + let source = Source { + address: self.source.address, + start_block: self.source.start_block, + }; + + Ok(DataSource { + manifest_idx, + kind, + name: self.name, + network: self.network, + source, + mapping: self.mapping.resolve(resolver, logger).await?, + context: Arc::new(None), + creation_block: None, + }) + } +} + +impl UnresolvedMapping { + pub async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + ) -> Result { + info!(logger, "Resolve subgraph ds mapping"; "link" => &self.file.link); + + Ok(Mapping { + language: self.language, + api_version: semver::Version::parse(&self.api_version)?, + entities: self.entities, + handlers: self.handlers, + runtime: Arc::new(resolver.cat(logger, &self.file).await?), + link: self.file, + }) + } +} + +#[derive(Clone, Debug, Deserialize)] +pub struct UnresolvedDataSourceTemplate { + pub kind: String, + pub network: Option, + pub name: String, + pub mapping: UnresolvedMapping, +} + +#[derive(Clone, Debug)] +pub struct DataSourceTemplate { + pub kind: String, + pub network: Option, + pub name: String, + pub manifest_idx: u32, + pub mapping: Mapping, +} + +impl Into for DataSourceTemplate { + fn into(self) -> DataSourceTemplateInfo { + let DataSourceTemplate { + kind, + network: _, + name, + manifest_idx, + mapping, + } = self; + + DataSourceTemplateInfo { + api_version: mapping.api_version.clone(), + runtime: Some(mapping.runtime), + name, + manifest_idx: Some(manifest_idx), + kind: kind.to_string(), + } + } +} + +impl UnresolvedDataSourceTemplate { + pub async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + manifest_idx: u32, + ) -> Result { + let kind = self.kind; + + let mapping = self + .mapping + .resolve(resolver, logger) + .await + .with_context(|| format!("failed to resolve data source template {}", self.name))?; + + Ok(DataSourceTemplate { + kind, + network: self.network, + name: self.name, + manifest_idx, + mapping, + }) + } +} + +#[derive(Clone, PartialEq, Eq)] +pub struct TriggerData { + pub source: DeploymentHash, + pub entity: Entity, + pub entity_type: String, +} + +impl TriggerData { + pub fn new(source: DeploymentHash, entity: Entity, entity_type: String) -> Self { + Self { + source, + entity, + entity_type, + } + } +} + +impl fmt::Debug for TriggerData { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "TriggerData {{ source: {:?}, entity: {:?} }}", + self.source, self.entity, + ) + } +} diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index b97e44ef9a1..66ab846ab1a 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -390,7 +390,7 @@ struct Inner { default = "false" )] allow_non_deterministic_fulltext_search: EnvVarBoolean, - #[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "1.2.0")] + #[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "1.3.0")] max_spec_version: Version, #[envconfig(from = "GRAPH_LOAD_WINDOW_SIZE", default = "300")] load_window_size_in_secs: u64, diff --git a/runtime/wasm/src/host.rs b/runtime/wasm/src/host.rs index 3ecee7ba753..ebf107fb3ec 100644 --- a/runtime/wasm/src/host.rs +++ b/runtime/wasm/src/host.rs @@ -366,6 +366,7 @@ impl RuntimeHostTrait for RuntimeHost { match self.data_source() { DataSource::Onchain(_) => None, DataSource::Offchain(ds) => ds.done_at(), + DataSource::Subgraph(_) => None, } } @@ -373,6 +374,7 @@ impl RuntimeHostTrait for RuntimeHost { match self.data_source() { DataSource::Onchain(_) => {} DataSource::Offchain(ds) => ds.set_done_at(block), + DataSource::Subgraph(_) => {} } } diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index ffe4f7aba8e..532f75d2660 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -4,6 +4,7 @@ use std::mem::MaybeUninit; use anyhow::anyhow; use anyhow::Error; use graph::blockchain::Blockchain; +use graph::data_source::subgraph; use graph::util::mem::init_slice; use semver::Version; use wasmtime::AsContext; @@ -69,6 +70,16 @@ impl ToAscPtr for offchain::TriggerData { } } +impl ToAscPtr for subgraph::TriggerData { + fn to_asc_ptr( + self, + heap: &mut H, + gas: &GasCounter, + ) -> Result, HostExportError> { + asc_new(heap, &self.entity.sorted_ref(), gas).map(|ptr| ptr.erase()) + } +} + impl ToAscPtr for MappingTrigger where C::MappingTrigger: ToAscPtr, @@ -81,6 +92,7 @@ where match self { MappingTrigger::Onchain(trigger) => trigger.to_asc_ptr(heap, gas), MappingTrigger::Offchain(trigger) => trigger.to_asc_ptr(heap, gas), + MappingTrigger::Subgraph(trigger) => trigger.to_asc_ptr(heap, gas), } } } diff --git a/store/postgres/src/block_range.rs b/store/postgres/src/block_range.rs index f05c4e73869..5f3c9f014bc 100644 --- a/store/postgres/src/block_range.rs +++ b/store/postgres/src/block_range.rs @@ -50,7 +50,7 @@ lazy_static! { /// The range of blocks for which an entity is valid. We need this struct /// to bind ranges into Diesel queries. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Copy)] pub struct BlockRange(Bound, Bound); pub(crate) fn first_block_in_range( @@ -132,6 +132,66 @@ impl<'a> QueryFragment for BlockRangeUpperBoundClause<'a> { } } +/// Helper for generating SQL fragments for selecting entities in a specific block range +#[derive(Debug, Clone, Copy)] +pub enum EntityBlockRange { + Mutable(BlockRange), // TODO: check if this is a proper type here (maybe Range?) + Immutable(BlockRange), +} + +impl EntityBlockRange { + pub fn new(table: &Table, block_range: std::ops::Range) -> Self { + let start: Bound = Bound::Included(block_range.start); + let end: Bound = Bound::Excluded(block_range.end); + let block_range: BlockRange = BlockRange(start, end); + if table.immutable { + Self::Immutable(block_range) + } else { + Self::Mutable(block_range) + } + } + + /// Output SQL that matches only rows whose block range contains `block`. + pub fn contains<'b>(&'b self, out: &mut AstPass<'_, 'b, Pg>) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + let block_range = match self { + EntityBlockRange::Mutable(br) => br, + EntityBlockRange::Immutable(br) => br, + }; + let BlockRange(start, finish) = block_range; + + self.compare_column(out); + out.push_sql(" >= "); + match start { + Bound::Included(block) => out.push_bind_param::(block)?, + Bound::Excluded(block) => { + out.push_bind_param::(block)?; + out.push_sql("+1"); + } + Bound::Unbounded => unimplemented!(), + }; + out.push_sql(" AND "); + self.compare_column(out); + out.push_sql(" <= "); + match finish { + Bound::Included(block) => { + out.push_bind_param::(block)?; + out.push_sql("+1"); + } + Bound::Excluded(block) => out.push_bind_param::(block)?, + Bound::Unbounded => unimplemented!(), + }; + Ok(()) + } + + pub fn compare_column(&self, out: &mut AstPass) { + match self { + EntityBlockRange::Mutable(_) => out.push_sql(" lower(block_range) "), + EntityBlockRange::Immutable(_) => out.push_sql(" block$ "), + } + } +} + /// Helper for generating various SQL fragments for handling the block range /// of entity versions #[allow(unused)] diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index f5b2825f63f..d0ca873009a 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -29,8 +29,8 @@ use lru_time_cache::LruCache; use rand::{seq::SliceRandom, thread_rng}; use std::collections::{BTreeMap, HashMap}; use std::convert::Into; -use std::ops::Deref; use std::ops::{Bound, DerefMut}; +use std::ops::{Deref, Range}; use std::str::FromStr; use std::sync::{atomic::AtomicUsize, Arc, Mutex}; use std::time::{Duration, Instant}; @@ -1063,6 +1063,17 @@ impl DeploymentStore { layout.find_many(&mut conn, ids_for_type, block) } + pub(crate) fn get_range( + &self, + site: Arc, + entity_type: &EntityType, + block_range: Range, + ) -> Result>, StoreError> { + let mut conn = self.get_conn()?; + let layout = self.layout(&mut conn, site)?; + layout.find_range(&mut conn, entity_type, block_range) + } + pub(crate) fn get_derived( &self, site: Arc, diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index be9f889c84a..13a81876325 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -50,6 +50,7 @@ use std::borrow::Borrow; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::convert::{From, TryFrom}; use std::fmt::{self, Write}; +use std::ops::Range; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -63,7 +64,7 @@ use crate::{ primary::{Namespace, Site}, relational_queries::{ ClampRangeQuery, EntityData, EntityDeletion, FilterCollection, FilterQuery, FindManyQuery, - InsertQuery, RevertClampQuery, RevertRemoveQuery, + FindRangeQuery, InsertQuery, RevertClampQuery, RevertRemoveQuery, }, }; use graph::components::store::{AttributeNames, DerivedEntityQuery}; @@ -541,6 +542,27 @@ impl Layout { Ok(entities) } + pub fn find_range( + &self, + conn: &mut PgConnection, + entity_type: &EntityType, + block_range: Range, + ) -> Result>, StoreError> { + let table = self.table_for_entity(entity_type)?; + let mut entities: BTreeMap> = BTreeMap::new(); + if let Some(vec) = FindRangeQuery::new(table.as_ref(), block_range) + .get_results::(conn) + .optional()? + { + for e in vec { + let block = e.clone().deserialize_block_number::()?; + let en = e.deserialize_with_layout::(self, None)?; + entities.entry(block).or_default().push(en); + } + } + Ok(entities) + } + pub fn find_derived( &self, conn: &mut PgConnection, diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 56ad1aafacb..55ade522dd7 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -18,6 +18,7 @@ use graph::data::store::{Id, IdType, NULL}; use graph::data::store::{IdList, IdRef, QueryObject}; use graph::data::value::{Object, Word}; use graph::data_source::CausalityRegion; +use graph::prelude::regex::Regex; use graph::prelude::{ anyhow, r, serde_json, BlockNumber, ChildMultiplicity, Entity, EntityCollection, EntityFilter, EntityLink, EntityOrder, EntityOrderByChild, EntityOrderByChildInfo, EntityRange, EntityWindow, @@ -31,9 +32,11 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::convert::TryFrom; use std::fmt::{self, Display}; use std::iter::FromIterator; +use std::ops::Range; use std::str::FromStr; use std::string::ToString; +use crate::block_range::EntityBlockRange; use crate::relational::dsl::AtBlock; use crate::relational::{ dsl, Column, ColumnType, Layout, SqlName, Table, BYTE_ARRAY_PREFIX_SIZE, PRIMARY_KEY_COLUMN, @@ -442,7 +445,7 @@ pub fn parse_id(id_type: IdType, json: serde_json::Value) -> Result(self) -> Result { + use serde_json::Value as j; + match self.data { + j::Object(map) => { + let mut entries = map.into_iter().filter_map(move |(key, json)| { + if key == "block_range" { + let r = json.as_str().unwrap(); + let rx = Regex::new("\\[(?P[0-9]+),([0-9]+)?\\)").unwrap(); + let cap = rx.captures(r).unwrap(); + let start = cap + .name("start") + .map(|mtch| mtch.as_str().to_string()) + .unwrap(); + let n = start.parse::().unwrap(); + Some(n) + } else if key == "block$" { + let block = json.as_i64().unwrap() as i32; + Some(block) + } else { + None + } + }); + let en = entries.next().unwrap(); + assert!(entries.next().is_none()); // there should be just one block_range field + Ok(en) + } + _ => unreachable!( + "we use `to_json` in our queries, and will therefore always get an object back" + ), + } + } + /// Map the `EntityData` using the schema information in `Layout` pub fn deserialize_with_layout( self, @@ -1802,6 +1837,54 @@ impl<'a> QueryFragment for Filter<'a> { } } +#[derive(Debug, Clone)] +pub struct FindRangeQuery<'a> { + table: &'a Table, + eb_range: EntityBlockRange, +} + +impl<'a> FindRangeQuery<'a> { + pub fn new(table: &'a Table, block_range: Range) -> Self { + let eb_range = EntityBlockRange::new(&table, block_range); + Self { table, eb_range } + } +} + +impl<'a> QueryFragment for FindRangeQuery<'a> { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + + // Generate + // select '..' as entity, to_jsonb(e.*) as data + // from schema.table e where id = $1 + out.push_sql("select "); + out.push_bind_param::(self.table.object.as_str())?; + out.push_sql(" as entity, to_jsonb(e.*) as data\n"); + out.push_sql(" from "); + out.push_sql(self.table.qualified_name.as_str()); + out.push_sql(" e\n where "); + // TODO: do we need to care about it? + // if self.table.has_causality_region { + // out.push_sql("causality_region = "); + // out.push_bind_param::(&self.key.causality_region)?; + // out.push_sql(" and "); + // } + self.eb_range.contains(&mut out) + } +} + +impl<'a> QueryId for FindRangeQuery<'a> { + type QueryId = (); + + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl<'a> Query for FindRangeQuery<'a> { + type SqlType = Untyped; +} + +impl<'a, Conn> RunQueryDsl for FindRangeQuery<'a> {} + /// Builds a query over a given set of [`Table`]s in an attempt to find updated /// and/or newly inserted entities at a given block number; i.e. such that the /// block range's lower bound is equal to said block number. diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 41cbef15982..f6544a79e0d 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -44,7 +44,7 @@ use crate::{ index::{IndexList, Method}, Layout, }, - writable::WritableStore, + writable::{SourceableStore, WritableStore}, NotificationSender, }; use crate::{ @@ -268,6 +268,50 @@ impl SubgraphStore { pub fn for_site(&self, site: &Site) -> Result<&Arc, StoreError> { self.inner.for_site(site) } + + async fn get_or_create_writable_store( + self: Arc, + logger: Logger, + deployment: graph::components::store::DeploymentId, + manifest_idx_and_name: Arc>, + ) -> Result, StoreError> { + let deployment = deployment.into(); + // We cache writables to make sure calls to this method are + // idempotent and there is ever only one `WritableStore` for any + // deployment + if let Some(writable) = self.writables.lock().unwrap().get(&deployment) { + // A poisoned writable will not write anything anymore; we + // discard it and create a new one that is properly initialized + // according to the state in the database. + if !writable.poisoned() { + return Ok(writable.cheap_clone()); + } + } + + // Ideally the lower level functions would be asyncified. + let this = self.clone(); + let site = graph::spawn_blocking_allow_panic(move || -> Result<_, StoreError> { + this.find_site(deployment) + }) + .await + .unwrap()?; // Propagate panics, there shouldn't be any. + + let writable = Arc::new( + WritableStore::new( + self.as_ref().clone(), + logger, + site, + manifest_idx_and_name, + self.registry.clone(), + ) + .await?, + ); + self.writables + .lock() + .unwrap() + .insert(deployment, writable.cheap_clone()); + Ok(writable) + } } impl std::ops::Deref for SubgraphStore { @@ -1488,42 +1532,25 @@ impl SubgraphStoreTrait for SubgraphStore { deployment: graph::components::store::DeploymentId, manifest_idx_and_name: Arc>, ) -> Result, StoreError> { - let deployment = deployment.into(); - // We cache writables to make sure calls to this method are - // idempotent and there is ever only one `WritableStore` for any - // deployment - if let Some(writable) = self.writables.lock().unwrap().get(&deployment) { - // A poisoned writable will not write anything anymore; we - // discard it and create a new one that is properly initialized - // according to the state in the database. - if !writable.poisoned() { - return Ok(writable.cheap_clone()); - } - } + self.get_or_create_writable_store(logger, deployment, manifest_idx_and_name) + .await + .map(|store| store as Arc) + } - // Ideally the lower level functions would be asyncified. - let this = self.clone(); - let site = graph::spawn_blocking_allow_panic(move || -> Result<_, StoreError> { - this.find_site(deployment) - }) - .await - .unwrap()?; // Propagate panics, there shouldn't be any. + async fn sourceable( + self: Arc, + deployment: graph::components::store::DeploymentId, + ) -> Result, StoreError> { + let deployment = deployment.into(); + let site = self.find_site(deployment)?; + let store = self.for_site(&site)?; + let input_schema = self.input_schema(&site.deployment)?; - let writable = Arc::new( - WritableStore::new( - self.as_ref().clone(), - logger, - site, - manifest_idx_and_name, - self.registry.clone(), - ) - .await?, - ); - self.writables - .lock() - .unwrap() - .insert(deployment, writable.cheap_clone()); - Ok(writable) + Ok(Arc::new(SourceableStore::new( + site, + store.clone(), + input_schema, + ))) } async fn stop_subgraph(&self, loc: &DeploymentLocator) -> Result<(), StoreError> { diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 99ccfd02217..e1273bfe763 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -1,10 +1,11 @@ use std::collections::BTreeSet; -use std::ops::Deref; +use std::ops::{Deref, Range}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Mutex, RwLock, TryLockError as RwLockError}; use std::time::Instant; use std::{collections::BTreeMap, sync::Arc}; +use async_trait::async_trait; use graph::blockchain::block_stream::FirehoseCursor; use graph::blockchain::BlockTime; use graph::components::store::{Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore}; @@ -1571,6 +1572,42 @@ impl ReadStore for WritableStore { } } +pub struct SourceableStore { + site: Arc, + store: Arc, + input_schema: InputSchema, +} + +impl SourceableStore { + pub fn new(site: Arc, store: Arc, input_schema: InputSchema) -> Self { + Self { + site, + store, + input_schema, + } + } +} + +#[async_trait] +impl store::SourceableStore for SourceableStore { + fn get_range( + &self, + entity_type: &EntityType, + block_range: Range, + ) -> Result>, StoreError> { + self.store + .get_range(self.site.clone(), entity_type, block_range) + } + + fn input_schema(&self) -> InputSchema { + self.input_schema.cheap_clone() + } + + async fn block_ptr(&self) -> Result, StoreError> { + self.store.block_ptr(self.site.cheap_clone()).await + } +} + impl DeploymentCursorTracker for WritableStore { fn block_ptr(&self) -> Option { self.block_ptr.lock().unwrap().clone() diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index 9089ec4f572..34eaf110f77 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -11,10 +11,10 @@ use graph::data::store::Value; use graph::data::subgraph::schema::SubgraphError; use graph::data::subgraph::{ Prune, LATEST_VERSION, SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7, SPEC_VERSION_0_0_8, - SPEC_VERSION_0_0_9, SPEC_VERSION_1_0_0, SPEC_VERSION_1_2_0, + SPEC_VERSION_0_0_9, SPEC_VERSION_1_0_0, SPEC_VERSION_1_2_0, SPEC_VERSION_1_3_0, }; use graph::data_source::offchain::OffchainDataSourceKind; -use graph::data_source::DataSourceTemplate; +use graph::data_source::{DataSourceEnum, DataSourceTemplate}; use graph::entity; use graph::env::ENV_VARS; use graph::prelude::web3::types::H256; @@ -166,10 +166,52 @@ specVersion: 0.0.7 let data_source = match &manifest.templates[0] { DataSourceTemplate::Offchain(ds) => ds, DataSourceTemplate::Onchain(_) => unreachable!(), + DataSourceTemplate::Subgraph(_) => unreachable!(), }; assert_eq!(data_source.kind, OffchainDataSourceKind::Ipfs); } +#[tokio::test] +async fn subgraph_ds_manifest() { + let yaml = " +schema: + file: + /: /ipfs/Qmschema +dataSources: + - name: SubgraphSource + kind: subgraph + entities: + - Gravatar + network: mainnet + source: + address: 'QmUVaWpdKgcxBov1jHEa8dr46d2rkVzfHuZFu4fXJ4sFse' + startBlock: 0 + mapping: + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - TestEntity + file: + /: /ipfs/Qmmapping + handlers: + - handler: handleEntity + entity: User +specVersion: 1.3.0 +"; + + let manifest = resolve_manifest(yaml, SPEC_VERSION_1_3_0).await; + + assert_eq!("Qmmanifest", manifest.id.as_str()); + assert_eq!(manifest.data_sources.len(), 1); + let data_source = &manifest.data_sources[0]; + match data_source { + DataSourceEnum::Subgraph(ds) => { + assert_eq!(ds.name, "SubgraphSource"); + } + _ => panic!("Expected a subgraph data source"), + } +} + #[tokio::test] async fn graft_manifest() { const YAML: &str = " diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index df04615898a..d9e9ee989af 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -6,9 +6,12 @@ use graph::schema::{EntityKey, EntityType, InputSchema}; use lazy_static::lazy_static; use std::collections::BTreeSet; use std::marker::PhantomData; +use std::ops::Range; use test_store::*; -use graph::components::store::{DeploymentLocator, DerivedEntityQuery, WritableStore}; +use graph::components::store::{ + DeploymentLocator, DerivedEntityQuery, SourceableStore, WritableStore, +}; use graph::data::subgraph::*; use graph::semver::Version; use graph::{entity, prelude::*}; @@ -21,9 +24,14 @@ const SCHEMA_GQL: &str = " id: ID!, count: Int!, } + type Counter2 @entity(immutable: true) { + id: ID!, + count: Int!, + } "; const COUNTER: &str = "Counter"; +const COUNTER2: &str = "Counter2"; lazy_static! { static ref TEST_SUBGRAPH_ID_STRING: String = String::from("writableSubgraph"); @@ -33,6 +41,7 @@ lazy_static! { InputSchema::parse_latest(SCHEMA_GQL, TEST_SUBGRAPH_ID.clone()) .expect("Failed to parse user schema"); static ref COUNTER_TYPE: EntityType = TEST_SUBGRAPH_SCHEMA.entity_type(COUNTER).unwrap(); + static ref COUNTER2_TYPE: EntityType = TEST_SUBGRAPH_SCHEMA.entity_type(COUNTER2).unwrap(); } /// Inserts test data into the store. @@ -80,7 +89,14 @@ fn remove_test_data(store: Arc) { /// Test harness for running database integration tests. fn run_test(test: F) where - F: FnOnce(Arc, Arc, DeploymentLocator) -> R + Send + 'static, + F: FnOnce( + Arc, + Arc, + Arc, + DeploymentLocator, + ) -> R + + Send + + 'static, R: std::future::Future + Send + 'static, { run_test_sequentially(|store| async move { @@ -95,10 +111,15 @@ where .writable(LOGGER.clone(), deployment.id, Arc::new(Vec::new())) .await .expect("we can get a writable store"); + let sourceable = store + .subgraph_store() + .sourceable(deployment.id) + .await + .expect("we can get a writable store"); // Run test and wait for the background writer to finish its work so // it won't conflict with the next test - test(store, writable, deployment).await; + test(store, writable, sourceable, deployment).await; }); } @@ -111,18 +132,67 @@ fn count_key(id: &str) -> EntityKey { COUNTER_TYPE.parse_key(id).unwrap() } -async fn insert_count(store: &Arc, deployment: &DeploymentLocator, count: u8) { +async fn insert_count( + store: &Arc, + deployment: &DeploymentLocator, + block: u8, + count: u8, + counter_type: &EntityType, + id: &str, + id2: &str, +) { + let count_key_local = |id: &str| counter_type.parse_key(id).unwrap(); let data = entity! { TEST_SUBGRAPH_SCHEMA => - id: "1", - count: count as i32 + id: id, + count :count as i32, }; let entity_op = EntityOperation::Set { - key: count_key(&data.get("id").unwrap().to_string()), + key: count_key_local(&data.get("id").unwrap().to_string()), data, }; - transact_entity_operations(store, deployment, block_pointer(count), vec![entity_op]) - .await - .unwrap(); + let data = entity! { TEST_SUBGRAPH_SCHEMA => + id: id2, + count :count as i32, + }; + let entity_op2 = EntityOperation::Set { + key: count_key_local(&data.get("id").unwrap().to_string()), + data, + }; + transact_entity_operations( + store, + deployment, + block_pointer(block), + vec![entity_op, entity_op2], + ) + .await + .unwrap(); +} + +async fn insert_count_mutable( + store: &Arc, + deployment: &DeploymentLocator, + block: u8, + count: u8, +) { + insert_count(store, deployment, block, count, &COUNTER_TYPE, "1", "2").await; +} + +async fn insert_count_immutable( + store: &Arc, + deployment: &DeploymentLocator, + block: u8, + count: u8, +) { + insert_count( + store, + deployment, + block, + count, + &COUNTER2_TYPE, + &(block).to_string(), + &(block + 1).to_string(), + ) + .await; } async fn pause_writer(deployment: &DeploymentLocator) { @@ -140,7 +210,7 @@ fn get_with_pending(batch: bool, read_count: F) where F: Send + Fn(&dyn WritableStore) -> i32 + Sync + 'static, { - run_test(move |store, writable, deployment| async move { + run_test(move |store, writable, _, deployment| async move { let subgraph_store = store.subgraph_store(); let read_count = || read_count(writable.as_ref()); @@ -150,13 +220,13 @@ where } for count in 1..4 { - insert_count(&subgraph_store, &deployment, count).await; + insert_count_mutable(&subgraph_store, &deployment, count, count).await; } // Test reading back with pending writes to the same entity pause_writer(&deployment).await; for count in 4..7 { - insert_count(&subgraph_store, &deployment, count).await; + insert_count_mutable(&subgraph_store, &deployment, count, count).await; } assert_eq!(6, read_count()); @@ -165,7 +235,7 @@ where // Test reading back with pending writes and a pending revert for count in 7..10 { - insert_count(&subgraph_store, &deployment, count).await; + insert_count_mutable(&subgraph_store, &deployment, count, count).await; } writable .revert_block_operations(block_pointer(2), FirehoseCursor::None) @@ -238,7 +308,7 @@ fn get_derived_nobatch() { #[test] fn restart() { - run_test(|store, writable, deployment| async move { + run_test(|store, writable, _, deployment| async move { let subgraph_store = store.subgraph_store(); let schema = subgraph_store.input_schema(&deployment.hash).unwrap(); @@ -286,3 +356,52 @@ fn restart() { writable.flush().await.unwrap(); }) } + +async fn read_range( + store: Arc, + writable: Arc, + sourceable: Arc, + deployment: DeploymentLocator, + mutable: bool, +) -> usize { + let subgraph_store = store.subgraph_store(); + writable.deployment_synced(block_pointer(0)).unwrap(); + + for count in 1..=7 { + if mutable { + insert_count_mutable(&subgraph_store, &deployment, 2 * count, 4 * count).await + } else { + insert_count_immutable(&subgraph_store, &deployment, 2 * count, 4 * count).await + } + } + writable.flush().await.unwrap(); + + let br: Range = 4..8; + let et: &EntityType = if mutable { + &COUNTER_TYPE + } else { + &COUNTER2_TYPE + }; + let e = sourceable.get_range(et, br).unwrap(); + e.iter().map(|(_, v)| v.iter()).flatten().count() +} + +#[test] +fn read_range_mutable() { + run_test( + |store, writable, sourceable: Arc, deployment| async move { + let num_entities = read_range(store, writable, sourceable, deployment, true).await; + assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open + }, + ) +} + +#[test] +fn read_range_immutable() { + run_test( + |store, writable, sourceable: Arc, deployment| async move { + let num_entities = read_range(store, writable, sourceable, deployment, false).await; + assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open + }, + ) +} diff --git a/tests/runner-tests/subgraph-data-sources/abis/Contract.abi b/tests/runner-tests/subgraph-data-sources/abis/Contract.abi new file mode 100644 index 00000000000..9d9f56b9263 --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/abis/Contract.abi @@ -0,0 +1,15 @@ +[ + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "testCommand", + "type": "string" + } + ], + "name": "TestEvent", + "type": "event" + } +] diff --git a/tests/runner-tests/subgraph-data-sources/package.json b/tests/runner-tests/subgraph-data-sources/package.json new file mode 100644 index 00000000000..87537290ad2 --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/package.json @@ -0,0 +1,13 @@ +{ + "name": "subgraph-data-sources", + "version": "0.1.0", + "scripts": { + "codegen": "graph codegen --skip-migrations", + "create:test": "graph create test/subgraph-data-sources --node $GRAPH_NODE_ADMIN_URI", + "deploy:test": "graph deploy test/subgraph-data-sources --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" + }, + "devDependencies": { + "@graphprotocol/graph-cli": "0.79.0-alpha-20240711124603-49edf22", + "@graphprotocol/graph-ts": "0.31.0" + } +} diff --git a/tests/runner-tests/subgraph-data-sources/schema.graphql b/tests/runner-tests/subgraph-data-sources/schema.graphql new file mode 100644 index 00000000000..6f97fa65c43 --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/schema.graphql @@ -0,0 +1,6 @@ +type Data @entity { + id: ID! + foo: String + bar: Int + isTest: Boolean +} diff --git a/tests/runner-tests/subgraph-data-sources/src/mapping.ts b/tests/runner-tests/subgraph-data-sources/src/mapping.ts new file mode 100644 index 00000000000..2e1a5382af3 --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/src/mapping.ts @@ -0,0 +1,6 @@ +import { Entity, log } from '@graphprotocol/graph-ts'; + +export function handleBlock(content: Entity): void { + let stringContent = content.getString('val'); + log.info('Content: {}', [stringContent]); +} diff --git a/tests/runner-tests/subgraph-data-sources/subgraph.yaml b/tests/runner-tests/subgraph-data-sources/subgraph.yaml new file mode 100644 index 00000000000..1c666e3417e --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/subgraph.yaml @@ -0,0 +1,19 @@ +specVersion: 1.3.0 +schema: + file: ./schema.graphql +dataSources: + - kind: subgraph + name: Contract + network: test + source: + address: 'QmRFXhvyvbm4z5Lo7z2mN9Ckmo623uuB2jJYbRmAXgYKXJ' + startBlock: 6082461 + mapping: + apiVersion: 0.0.7 + language: wasm/assemblyscript + entities: + - Gravatar + handlers: + - handler: handleBlock + entity: User + file: ./src/mapping.ts diff --git a/tests/runner-tests/yarn.lock b/tests/runner-tests/yarn.lock index 50e0c2b471f..9f3bdae834d 100644 --- a/tests/runner-tests/yarn.lock +++ b/tests/runner-tests/yarn.lock @@ -349,6 +349,40 @@ which "2.0.2" yaml "1.10.2" +"@graphprotocol/graph-cli@0.79.0-alpha-20240711124603-49edf22": + version "0.79.0-alpha-20240711124603-49edf22" + resolved "https://registry.yarnpkg.com/@graphprotocol/graph-cli/-/graph-cli-0.79.0-alpha-20240711124603-49edf22.tgz#4e3f6201932a0b68ce64d6badd8432cf2bead3c2" + integrity sha512-fZrdPiFbbbBVMnvsjfKA+j48WzzquaHQIpozBqnUKRPCV1n1NenIaq2nH16mlMwovRIS7AAIVCpa0QYQuPzw7Q== + dependencies: + "@float-capital/float-subgraph-uncrashable" "^0.0.0-alpha.4" + "@oclif/core" "2.8.6" + "@oclif/plugin-autocomplete" "^2.3.6" + "@oclif/plugin-not-found" "^2.4.0" + "@whatwg-node/fetch" "^0.8.4" + assemblyscript "0.19.23" + binary-install-raw "0.0.13" + chalk "3.0.0" + chokidar "3.5.3" + debug "4.3.4" + docker-compose "0.23.19" + dockerode "2.5.8" + fs-extra "9.1.0" + glob "9.3.5" + gluegun "5.1.6" + graphql "15.5.0" + immutable "4.2.1" + ipfs-http-client "55.0.0" + jayson "4.0.0" + js-yaml "3.14.1" + open "8.4.2" + prettier "3.0.3" + semver "7.4.0" + sync-request "6.1.0" + tmp-promise "3.0.3" + web3-eth-abi "1.7.0" + which "2.0.2" + yaml "1.10.2" + "@graphprotocol/graph-ts@0.30.0": version "0.30.0" resolved "https://registry.npmjs.org/@graphprotocol/graph-ts/-/graph-ts-0.30.0.tgz" @@ -1473,6 +1507,11 @@ defaults@^1.0.3: dependencies: clone "^1.0.2" +define-lazy-prop@^2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/define-lazy-prop/-/define-lazy-prop-2.0.0.tgz#3f7ae421129bcaaac9bc74905c98a0009ec9ee7f" + integrity sha512-Ds09qNh8yw3khSjiJjiUInaGX9xlqZDY7JVryGxdxV7NPeuqQfplOpQ66yJFZut3jLa5zOwkXw1g9EI2uKh4Og== + delay@^5.0.0: version "5.0.0" resolved "https://registry.npmjs.org/delay/-/delay-5.0.0.tgz" @@ -1545,6 +1584,13 @@ ejs@3.1.6: dependencies: jake "^10.6.1" +ejs@3.1.8: + version "3.1.8" + resolved "https://registry.yarnpkg.com/ejs/-/ejs-3.1.8.tgz#758d32910c78047585c7ef1f92f9ee041c1c190b" + integrity sha512-/sXZeMlhS0ArkfX2Aw780gJzXSMPnKjtspYZv+f3NiKLlubezAHDU5+9xz6gd3/NhG3txQCo6xlglmTS+oTGEQ== + dependencies: + jake "^10.8.5" + ejs@^3.1.8: version "3.1.9" resolved "https://registry.npmjs.org/ejs/-/ejs-3.1.9.tgz" @@ -1996,6 +2042,42 @@ gluegun@5.1.2: which "2.0.2" yargs-parser "^21.0.0" +gluegun@5.1.6: + version "5.1.6" + resolved "https://registry.yarnpkg.com/gluegun/-/gluegun-5.1.6.tgz#74ec13193913dc610f5c1a4039972c70c96a7bad" + integrity sha512-9zbi4EQWIVvSOftJWquWzr9gLX2kaDgPkNR5dYWbM53eVvCI3iKuxLlnKoHC0v4uPoq+Kr/+F569tjoFbA4DSA== + dependencies: + apisauce "^2.1.5" + app-module-path "^2.2.0" + cli-table3 "0.6.0" + colors "1.4.0" + cosmiconfig "7.0.1" + cross-spawn "7.0.3" + ejs "3.1.8" + enquirer "2.3.6" + execa "5.1.1" + fs-jetpack "4.3.1" + lodash.camelcase "^4.3.0" + lodash.kebabcase "^4.1.1" + lodash.lowercase "^4.3.0" + lodash.lowerfirst "^4.3.1" + lodash.pad "^4.5.1" + lodash.padend "^4.6.1" + lodash.padstart "^4.6.1" + lodash.repeat "^4.1.0" + lodash.snakecase "^4.1.1" + lodash.startcase "^4.4.0" + lodash.trim "^4.5.1" + lodash.trimend "^4.5.1" + lodash.trimstart "^4.5.1" + lodash.uppercase "^4.3.0" + lodash.upperfirst "^4.3.1" + ora "4.0.2" + pluralize "^8.0.0" + semver "7.3.5" + which "2.0.2" + yargs-parser "^21.0.0" + graceful-fs@^4.1.6, graceful-fs@^4.2.0: version "4.2.11" resolved "https://registry.yarnpkg.com/graceful-fs/-/graceful-fs-4.2.11.tgz#4183e4e8bf08bb6e05bbb2f7d2e0c8f712ca40e3" @@ -2282,7 +2364,7 @@ is-binary-path@~2.1.0: dependencies: binary-extensions "^2.0.0" -is-docker@^2.0.0: +is-docker@^2.0.0, is-docker@^2.1.1: version "2.2.1" resolved "https://registry.npmjs.org/is-docker/-/is-docker-2.2.1.tgz" integrity sha512-F+i2BKsFrH66iaUFc0woD8sLy8getkwTwtOBjvs56Cx4CgJDeKQeqfz8wAYiSb8JOprWhHH5p77PbmYCvvUuXQ== @@ -2922,6 +3004,15 @@ onetime@^5.1.0, onetime@^5.1.2: dependencies: mimic-fn "^2.1.0" +open@8.4.2: + version "8.4.2" + resolved "https://registry.yarnpkg.com/open/-/open-8.4.2.tgz#5b5ffe2a8f793dcd2aad73e550cb87b59cb084f9" + integrity sha512-7x81NCL719oNbsq/3mh+hVrAWmFuEYUqrq/Iw3kUzH8ReypT9QQ0BLoJS7/G9k6N81XjW4qHWtjWwe/9eLy1EQ== + dependencies: + define-lazy-prop "^2.0.0" + is-docker "^2.1.1" + is-wsl "^2.2.0" + ora@4.0.2: version "4.0.2" resolved "https://registry.npmjs.org/ora/-/ora-4.0.2.tgz" @@ -3042,6 +3133,11 @@ prettier@1.19.1: resolved "https://registry.npmjs.org/prettier/-/prettier-1.19.1.tgz" integrity sha512-s7PoyDv/II1ObgQunCbB9PdLmUcBZcnWOcxDh7O0N/UwDEsHyqkW+Qh28jW+mVuCdx7gLB0BotYI1Y6uI9iyew== +prettier@3.0.3: + version "3.0.3" + resolved "https://registry.yarnpkg.com/prettier/-/prettier-3.0.3.tgz#432a51f7ba422d1469096c0fdc28e235db8f9643" + integrity sha512-L/4pUDMxcNa8R/EthV08Zt42WBO4h1rarVtK0K+QJG0X187OLo7l699jWw0GKuwzkPQ//jMFA/8Xm6Fh3J/DAg== + process-nextick-args@~2.0.0: version "2.0.1" resolved "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.1.tgz" diff --git a/tests/src/fixture/ethereum.rs b/tests/src/fixture/ethereum.rs index b20672ce563..5381a530148 100644 --- a/tests/src/fixture/ethereum.rs +++ b/tests/src/fixture/ethereum.rs @@ -7,11 +7,12 @@ use super::{ NoopRuntimeAdapterBuilder, StaticBlockRefetcher, StaticStreamBuilder, Stores, TestChain, }; use graph::blockchain::client::ChainClient; -use graph::blockchain::{BlockPtr, TriggersAdapterSelector}; +use graph::blockchain::{BlockPtr, Trigger, TriggersAdapterSelector}; use graph::cheap_clone::CheapClone; +use graph::data_source::subgraph; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::web3::types::{Address, Log, Transaction, H160}; -use graph::prelude::{ethabi, tiny_keccak, LightEthereumBlock, ENV_VARS}; +use graph::prelude::{ethabi, tiny_keccak, DeploymentHash, Entity, LightEthereumBlock, ENV_VARS}; use graph::{blockchain::block_stream::BlockWithTriggers, prelude::ethabi::ethereum_types::U64}; use graph_chain_ethereum::network::EthereumNetworkAdapters; use graph_chain_ethereum::trigger::LogRef; @@ -81,7 +82,10 @@ pub fn genesis() -> BlockWithTriggers { number: Some(U64::from(ptr.number)), ..Default::default() })), - trigger_data: vec![EthereumTrigger::Block(ptr, EthereumBlockTriggerType::End)], + trigger_data: vec![Trigger::Chain(EthereumTrigger::Block( + ptr, + EthereumBlockTriggerType::End, + ))], } } @@ -128,7 +132,10 @@ pub fn empty_block(parent_ptr: BlockPtr, ptr: BlockPtr) -> BlockWithTriggers, payload: impl Into, + source: DeploymentHash, + entity: Entity, + entity_type: &str, +) { + block + .trigger_data + .push(Trigger::Subgraph(subgraph::TriggerData { + source, + entity: entity, + entity_type: entity_type.to_string(), + })); } pub fn push_test_command( @@ -175,12 +199,16 @@ pub fn push_test_command( }); block .trigger_data - .push(EthereumTrigger::Log(LogRef::FullLog(log, None))) + .push(Trigger::Chain(EthereumTrigger::Log(LogRef::FullLog( + log, None, + )))) } pub fn push_test_polling_trigger(block: &mut BlockWithTriggers) { - block.trigger_data.push(EthereumTrigger::Block( - block.ptr(), - EthereumBlockTriggerType::End, - )) + block + .trigger_data + .push(Trigger::Chain(EthereumTrigger::Block( + block.ptr(), + EthereumBlockTriggerType::End, + ))) } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 89184e0164b..c21d3198272 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -1,7 +1,7 @@ pub mod ethereum; pub mod substreams; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::sync::Mutex; use std::time::{Duration, Instant}; @@ -14,13 +14,13 @@ use graph::blockchain::block_stream::{ }; use graph::blockchain::{ Block, BlockHash, BlockPtr, Blockchain, BlockchainMap, ChainIdentifier, RuntimeAdapter, - TriggersAdapter, TriggersAdapterSelector, + TriggerFilterWrapper, TriggersAdapter, TriggersAdapterSelector, }; use graph::cheap_clone::CheapClone; use graph::components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit}; use graph::components::metrics::MetricsRegistry; use graph::components::network_provider::ChainName; -use graph::components::store::{BlockStore, DeploymentLocator, EthereumCallCache}; +use graph::components::store::{BlockStore, DeploymentLocator, EthereumCallCache, SourceableStore}; use graph::components::subgraph::Settings; use graph::data::graphql::load_manager::LoadManager; use graph::data::query::{Query, QueryTarget}; @@ -214,7 +214,7 @@ impl TestContext { .new_deployment_status_metric(&deployment); self.instance_manager - .build_subgraph_runner( + .build_subgraph_runner_inner( logger, self.env_vars.cheap_clone(), deployment, @@ -222,6 +222,7 @@ impl TestContext { Some(stop_block.block_number()), tp, deployment_status_metric, + true, ) .await .unwrap() @@ -244,7 +245,7 @@ impl TestContext { .new_deployment_status_metric(&deployment); self.instance_manager - .build_subgraph_runner( + .build_subgraph_runner_inner( logger, self.env_vars.cheap_clone(), deployment, @@ -252,6 +253,7 @@ impl TestContext { Some(stop_block.block_number()), tp, deployment_status_metric, + true, ) .await .unwrap() @@ -735,14 +737,27 @@ impl BlockStreamBuilder for MutexBlockStreamBuilder { async fn build_polling( &self, - _chain: &C, - _deployment: DeploymentLocator, - _start_blocks: Vec, - _subgraph_current_block: Option, - _filter: Arc<::TriggerFilter>, - _unified_api_version: graph::data::subgraph::UnifiedMappingApiVersion, + chain: &C, + deployment: DeploymentLocator, + start_blocks: Vec, + source_subgraph_stores: Vec>, + subgraph_current_block: Option, + filter: Arc>, + unified_api_version: graph::data::subgraph::UnifiedMappingApiVersion, ) -> anyhow::Result>> { - unimplemented!("only firehose mode should be used for tests") + let builder = self.0.lock().unwrap().clone(); + + builder + .build_polling( + chain, + deployment, + start_blocks, + source_subgraph_stores, + subgraph_current_block, + filter, + unified_api_version, + ) + .await } } @@ -800,11 +815,22 @@ where _chain: &C, _deployment: DeploymentLocator, _start_blocks: Vec, - _subgraph_current_block: Option, - _filter: Arc, + _source_subgraph_stores: Vec>, + subgraph_current_block: Option, + _filter: Arc>, _unified_api_version: graph::data::subgraph::UnifiedMappingApiVersion, ) -> anyhow::Result>> { - unimplemented!("only firehose mode should be used for tests") + let current_idx = subgraph_current_block.map(|current_block| { + self.chain + .iter() + .enumerate() + .find(|(_, b)| b.ptr() == current_block) + .unwrap() + .0 + }); + Ok(Box::new(StaticStream { + stream: Box::pin(stream_events(self.chain.clone(), current_idx)), + })) } } @@ -969,11 +995,23 @@ impl TriggersAdapter for MockTriggersAdapter { todo!() } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result, Error> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + todo!() + } + async fn scan_triggers( &self, _from: BlockNumber, _to: BlockNumber, - _filter: &::TriggerFilter, + _filter: &C::TriggerFilter, ) -> Result<(Vec>, BlockNumber), Error> { todo!() } diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index caeb67e9adf..f03eb67681d 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -19,11 +19,12 @@ use graph::object; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::web3::types::Address; use graph::prelude::{ - hex, CheapClone, DeploymentHash, SubgraphAssignmentProvider, SubgraphName, SubgraphStore, + hex, CheapClone, DeploymentHash, SubgraphAssignmentProvider, SubgraphName, SubgraphStore, Value, }; +use graph::schema::InputSchema; use graph_tests::fixture::ethereum::{ chain, empty_block, generate_empty_blocks_for_range, genesis, push_test_command, push_test_log, - push_test_polling_trigger, + push_test_polling_trigger, push_test_subgraph_trigger, }; use graph_tests::fixture::substreams::chain as substreams_chain; @@ -501,10 +502,19 @@ async fn substreams_trigger_filter_construction() -> anyhow::Result<()> { let runner = ctx.runner_substreams(test_ptr(0)).await; let filter = runner.build_filter_for_test(); - assert_eq!(filter.module_name(), "graph_out"); - assert_eq!(filter.modules().as_ref().unwrap().modules.len(), 2); - assert_eq!(filter.start_block().unwrap(), 0); - assert_eq!(filter.data_sources_len(), 1); + assert_eq!(filter.chain_filter.module_name(), "graph_out"); + assert_eq!( + filter + .chain_filter + .modules() + .as_ref() + .unwrap() + .modules + .len(), + 2 + ); + assert_eq!(filter.chain_filter.start_block().unwrap(), 0); + assert_eq!(filter.chain_filter.data_sources_len(), 1); Ok(()) } @@ -526,7 +536,11 @@ async fn end_block() -> anyhow::Result<()> { let runner = ctx.runner(block_ptr.clone()).await; let runner = runner.run_for_test(false).await.unwrap(); let filter = runner.context().filter.as_ref().unwrap(); - let addresses = filter.log().contract_addresses().collect::>(); + let addresses = filter + .chain_filter + .log() + .contract_addresses() + .collect::>(); if should_contain_addr { assert!(addresses.contains(&addr)); @@ -1078,6 +1092,49 @@ async fn parse_data_source_context() { ); } +#[tokio::test] +async fn subgraph_data_sources() { + let RunnerTestRecipe { stores, test_info } = + RunnerTestRecipe::new("subgraph-data-sources", "subgraph-data-sources").await; + + let schema = InputSchema::parse_latest( + "type User @entity { id: String!, val: String! }", + DeploymentHash::new("test").unwrap(), + ) + .unwrap(); + + let entity = schema + .make_entity(vec![ + ("id".into(), Value::String("id".to_owned())), + ("val".into(), Value::String("DATA".to_owned())), + ]) + .unwrap(); + + let blocks = { + let block_0 = genesis(); + let mut block_1 = empty_block(block_0.ptr(), test_ptr(1)); + push_test_subgraph_trigger( + &mut block_1, + DeploymentHash::new("QmRFXhvyvbm4z5Lo7z2mN9Ckmo623uuB2jJYbRmAXgYKXJ").unwrap(), + entity, + "User", + ); + + let block_2 = empty_block(block_1.ptr(), test_ptr(2)); + vec![block_0, block_1, block_2] + }; + let stop_block = blocks.last().unwrap().block.ptr(); + let chain = chain(&test_info.test_name, blocks, &stores, None).await; + + let ctx = fixture::setup(&test_info, &stores, &chain, None, None).await; + let _ = ctx + .runner(stop_block) + .await + .run_for_test(true) + .await + .unwrap(); +} + #[tokio::test] async fn retry_create_ds() { let RunnerTestRecipe { stores, test_info } =