diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 5724fef150c..e582cc3e1e6 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -33,6 +33,7 @@ use graph::schema::EntityKey; use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache}; use std::sync::Arc; use std::time::{Duration, Instant}; +use std::vec; const MINUTE: Duration = Duration::from_secs(60); @@ -493,9 +494,12 @@ where let (data_sources, runtime_hosts) = self.create_dynamic_data_sources(block_state.drain_created_data_sources())?; - let filter = C::TriggerFilter::from_data_sources( - data_sources.iter().filter_map(DataSource::as_onchain), - ); + let filter = &Arc::new(TriggerFilterWrapper::new( + C::TriggerFilter::from_data_sources( + data_sources.iter().filter_map(DataSource::as_onchain), + ), + vec![], + )); let block: Arc = if self.inputs.chain.is_refetch_block_required() { let cur = firehose_cursor.clone(); @@ -524,7 +528,7 @@ where let block_with_triggers = self .inputs .triggers_adapter - .triggers_in_block(&logger, block.as_ref().clone(), &filter) + .triggers_in_block(&logger, block.as_ref().clone(), filter) .await?; let triggers = block_with_triggers.trigger_data; diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index fd7f9b411a7..324cdbafb2f 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -7,9 +7,8 @@ use anyhow::Error; use async_stream::stream; use futures03::Stream; use prost_types::Any; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt; -use std::ops::Range; use std::sync::Arc; use std::time::Instant; use thiserror::Error; @@ -22,7 +21,7 @@ use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore}; use crate::data::subgraph::UnifiedMappingApiVersion; use crate::firehose::{self, FirehoseEndpoint}; use crate::futures03::stream::StreamExt as _; -use crate::schema::InputSchema; +use crate::schema::{EntityType, InputSchema}; use crate::substreams_rpc::response::Message; use crate::{prelude::*, prometheus::labels}; @@ -319,7 +318,7 @@ impl BlockWithTriggers { /// logic for each chain, increasing code repetition. pub struct TriggersAdapterWrapper { pub adapter: Arc>, - pub source_subgraph_stores: Vec>, + pub source_subgraph_stores: HashMap>, } impl TriggersAdapterWrapper { @@ -327,11 +326,151 @@ impl TriggersAdapterWrapper { adapter: Arc>, source_subgraph_stores: Vec>, ) -> Self { + let stores_map: HashMap<_, _> = source_subgraph_stores + .iter() + .map(|store| (store.input_schema().id().clone(), store.clone())) + .collect(); Self { adapter, - source_subgraph_stores, + source_subgraph_stores: stores_map, } } + + pub async fn blocks_with_subgraph_triggers( + &self, + logger: &Logger, + subgraph_filter: &SubgraphFilter, + range: SubgraphTriggerScanRange, + ) -> Result>, Error> { + let store = self + .source_subgraph_stores + .get(&subgraph_filter.subgraph) + .ok_or_else(|| anyhow!("Store not found for subgraph: {}", subgraph_filter.subgraph))?; + + let schema = ::input_schema(store); + + let adapter = self.adapter.clone(); + + scan_subgraph_triggers::(logger, store, &adapter, &schema, &subgraph_filter, range).await + } +} + +fn create_subgraph_trigger_from_entities( + filter: &SubgraphFilter, + entities: &Vec, +) -> Vec { + entities + .iter() + .map(|e| subgraph::TriggerData { + source: filter.subgraph.clone(), + entity: e.entity.clone(), + entity_type: e.entity_type.as_str().to_string(), + }) + .collect() +} + +async fn create_subgraph_triggers( + logger: Logger, + blocks: Vec, + filter: &SubgraphFilter, + entities: BTreeMap>, +) -> Result>, Error> { + let logger_clone = logger.cheap_clone(); + + let blocks: Vec> = blocks + .into_iter() + .map(|block| { + let block_number = block.number(); + match entities.get(&block_number) { + Some(e) => { + let trigger_data = create_subgraph_trigger_from_entities(filter, e); + BlockWithTriggers::new_with_subgraph_triggers( + block, + trigger_data, + &logger_clone, + ) + } + None => BlockWithTriggers::new_with_subgraph_triggers(block, vec![], &logger_clone), + } + }) + .collect(); + + Ok(blocks) +} + +pub enum SubgraphTriggerScanRange { + Single(C::Block), + Range(BlockNumber, BlockNumber), +} + +async fn scan_subgraph_triggers( + logger: &Logger, + store: &Arc, + adapter: &Arc>, + schema: &InputSchema, + filter: &SubgraphFilter, + range: SubgraphTriggerScanRange, +) -> Result>, Error> { + match range { + SubgraphTriggerScanRange::Single(block) => { + let entities = + get_entities_for_range(store, filter, schema, block.number(), block.number()) + .await?; + create_subgraph_triggers::(logger.clone(), vec![block], filter, entities).await + } + SubgraphTriggerScanRange::Range(from, to) => { + let entities = get_entities_for_range(store, filter, schema, from, to).await?; + let mut block_numbers: HashSet = entities.keys().cloned().collect(); + // Ensure the 'to' block is included in the block_numbers + block_numbers.insert(to); + + let blocks = adapter + .load_blocks_by_numbers(logger.clone(), block_numbers) + .await?; + + create_subgraph_triggers::(logger.clone(), blocks, filter, entities).await + } + } +} + +pub struct EntityWithType { + pub entity_type: EntityType, + pub entity: Entity, +} + +async fn get_entities_for_range( + store: &Arc, + filter: &SubgraphFilter, + schema: &InputSchema, + from: BlockNumber, + to: BlockNumber, +) -> Result>, Error> { + let mut entities_by_block = BTreeMap::new(); + + for entity_name in &filter.entities { + let entity_type = schema.entity_type(entity_name)?; + + let entity_ranges = store.get_range(&entity_type, from..to)?; + + for (block_number, entity_vec) in entity_ranges { + let mut entity_vec = entity_vec + .into_iter() + .map(|e| EntityWithType { + entity_type: entity_type.clone(), + entity: e, + }) + .collect(); + + entities_by_block + .entry(block_number) + .and_modify(|existing_vec: &mut Vec| { + existing_vec.append(&mut entity_vec); + }) + .or_insert(entity_vec); + } + } + + Ok(entities_by_block) } impl TriggersAdapterWrapper { @@ -344,45 +483,25 @@ impl TriggersAdapterWrapper { self.adapter.ancestor_block(ptr, offset, root).await } - // TODO: Do a proper implementation, this is a complete mock implementation pub async fn scan_triggers( &self, + logger: &Logger, from: BlockNumber, to: BlockNumber, filter: &Arc>, ) -> Result<(Vec>, BlockNumber), Error> { - if !filter.subgraph_filter.is_empty() { - // TODO: handle empty range, or empty entity set bellow - - if let Some(SubgraphFilter { - subgraph: dh, - start_block: _sb, - entities: ent, - }) = filter.subgraph_filter.first() - { - if let Some(store) = self.source_subgraph_stores.first() { - let schema = store.input_schema(); - let dh2 = schema.id(); - if dh == dh2 { - if let Some(entity_type) = ent.first() { - let et = schema.entity_type(entity_type).unwrap(); - - let br: Range = from..to; - let entities = store.get_range(&et, br)?; - return self - .subgraph_triggers( - Logger::root(slog::Discard, o!()), - from, - to, - filter, - entities, - ) - .await; - } - } - } - } + if let Some(subgraph_filter) = filter.subgraph_filter.first() { + let blocks_with_triggers = self + .blocks_with_subgraph_triggers( + logger, + subgraph_filter, + SubgraphTriggerScanRange::Range(from, to), + ) + .await?; + + return Ok((blocks_with_triggers, to)); } + self.adapter .scan_triggers(from, to, &filter.chain_filter) .await @@ -392,9 +511,30 @@ impl TriggersAdapterWrapper { &self, logger: &Logger, block: C::Block, - filter: &C::TriggerFilter, + filter: &Arc>, ) -> Result, Error> { - self.adapter.triggers_in_block(logger, block, filter).await + trace!( + logger, + "triggers_in_block"; + "block_number" => block.number(), + "block_hash" => block.hash().hash_hex(), + ); + + if let Some(subgraph_filter) = filter.subgraph_filter.first() { + let blocks_with_triggers = self + .blocks_with_subgraph_triggers( + logger, + subgraph_filter, + SubgraphTriggerScanRange::Single(block), + ) + .await?; + + return Ok(blocks_with_triggers.into_iter().next().unwrap()); + } + + self.adapter + .triggers_in_block(logger, block, &filter.chain_filter) + .await } pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result { @@ -406,57 +546,20 @@ impl TriggersAdapterWrapper { } pub async fn chain_head_ptr(&self) -> Result, Error> { - self.adapter.chain_head_ptr().await - } + if self.source_subgraph_stores.is_empty() { + return self.adapter.chain_head_ptr().await; + } - async fn subgraph_triggers( - &self, - logger: Logger, - from: BlockNumber, - to: BlockNumber, - filter: &Arc>, - entities: BTreeMap>, - ) -> Result<(Vec>, BlockNumber), Error> { - let logger2 = logger.cheap_clone(); - let adapter = self.adapter.clone(); - let first_filter = filter.subgraph_filter.first().unwrap(); - let blocks = adapter - .load_blocks_by_numbers(logger, HashSet::from_iter(from..to)) - .await? - .into_iter() - .map(|block| { - let key = block.number(); - match entities.get(&key) { - Some(e) => { - let trigger_data = - Self::create_subgraph_trigger_from_entity(first_filter, e); - Some(BlockWithTriggers::new_with_subgraph_triggers( - block, - trigger_data, - &logger2, - )) - } - None => None, - } - }) - .flatten() - .collect(); + let ptrs = futures03::future::try_join_all( + self.source_subgraph_stores + .iter() + .map(|(_, store)| store.block_ptr()), + ) + .await?; - Ok((blocks, to)) - } + let min_ptr = ptrs.into_iter().flatten().min_by_key(|ptr| ptr.number); - fn create_subgraph_trigger_from_entity( - filter: &SubgraphFilter, - entity: &Vec, - ) -> Vec { - entity - .iter() - .map(|e| subgraph::TriggerData { - source: filter.subgraph.clone(), - entity: e.clone(), - entity_type: filter.entities.first().unwrap().clone(), - }) - .collect() + Ok(min_ptr) } } diff --git a/graph/src/blockchain/polling_block_stream.rs b/graph/src/blockchain/polling_block_stream.rs index 5b37cd303b4..fa774261227 100644 --- a/graph/src/blockchain/polling_block_stream.rs +++ b/graph/src/blockchain/polling_block_stream.rs @@ -379,7 +379,10 @@ where ); // Update with actually scanned range, to account for any skipped null blocks. - let (blocks, to) = self.adapter.scan_triggers(from, to, &self.filter).await?; + let (blocks, to) = self + .adapter + .scan_triggers(&self.logger, from, to, &self.filter) + .await?; let range_size = to - from + 1; // If the target block (`to`) is within the reorg threshold, indicating no non-null finalized blocks are @@ -469,11 +472,7 @@ where // Note that head_ancestor is a child of subgraph_ptr. let block = self .adapter - .triggers_in_block( - &self.logger, - head_ancestor, - &self.filter.chain_filter.clone(), - ) + .triggers_in_block(&self.logger, head_ancestor, &self.filter) .await?; Ok(ReconciliationStep::ProcessDescendantBlocks(vec![block], 1)) } else { diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index dba43786438..24bc34b9b94 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -147,6 +147,7 @@ pub struct UnresolvedDataSource { } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct UnresolvedSource { address: DeploymentHash, #[serde(default)] diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index 34eaf110f77..0bd682ebb20 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -184,8 +184,8 @@ dataSources: - Gravatar network: mainnet source: - address: 'QmUVaWpdKgcxBov1jHEa8dr46d2rkVzfHuZFu4fXJ4sFse' - startBlock: 0 + address: 'QmSWWT2yrTFDZSL8tRyoHEVrcEKAUsY2hj2TMQDfdDZU8h' + startBlock: 9562480 mapping: apiVersion: 0.0.6 language: wasm/assemblyscript @@ -207,6 +207,8 @@ specVersion: 1.3.0 match data_source { DataSourceEnum::Subgraph(ds) => { assert_eq!(ds.name, "SubgraphSource"); + assert_eq!(ds.kind, "subgraph"); + assert_eq!(ds.source.start_block, 9562480); } _ => panic!("Expected a subgraph data source"), } diff --git a/tests/integration-tests/source-subgraph/schema.graphql b/tests/integration-tests/source-subgraph/schema.graphql index 4855d8c2976..39af3e96105 100644 --- a/tests/integration-tests/source-subgraph/schema.graphql +++ b/tests/integration-tests/source-subgraph/schema.graphql @@ -1,7 +1,12 @@ - type Block @entity { id: ID! number: BigInt! hash: Bytes! -} \ No newline at end of file +} + +type Block2 @entity { + id: ID! + number: BigInt! + hash: Bytes! +} diff --git a/tests/integration-tests/source-subgraph/src/mapping.ts b/tests/integration-tests/source-subgraph/src/mapping.ts index 5ca859affdc..d978f870cda 100644 --- a/tests/integration-tests/source-subgraph/src/mapping.ts +++ b/tests/integration-tests/source-subgraph/src/mapping.ts @@ -1,10 +1,25 @@ import { ethereum, log } from '@graphprotocol/graph-ts'; -import { Block } from '../generated/schema'; +import { Block, Block2 } from '../generated/schema'; +import { BigInt } from '@graphprotocol/graph-ts'; export function handleBlock(block: ethereum.Block): void { log.info('handleBlock {}', [block.number.toString()]); - let blockEntity = new Block(block.number.toString()); + + let id = block.number.toString().concat('-v1'); + let blockEntity = new Block(id); blockEntity.number = block.number; blockEntity.hash = block.hash; blockEntity.save(); + + let id2 = block.number.toString().concat('-v2'); + let blockEntity2 = new Block(id2); + blockEntity2.number = block.number; + blockEntity2.hash = block.hash; + blockEntity2.save(); + + let id3 = block.number.toString().concat('-v3'); + let blockEntity3 = new Block2(id3); + blockEntity3.number = block.number; + blockEntity3.hash = block.hash; + blockEntity3.save(); } diff --git a/tests/integration-tests/subgraph-data-sources/schema.graphql b/tests/integration-tests/subgraph-data-sources/schema.graphql index 97f651ec409..4fd00d5a59b 100644 --- a/tests/integration-tests/subgraph-data-sources/schema.graphql +++ b/tests/integration-tests/subgraph-data-sources/schema.graphql @@ -1,5 +1,5 @@ type MirrorBlock @entity { - id: Bytes! + id: String! number: BigInt! hash: Bytes! } diff --git a/tests/integration-tests/subgraph-data-sources/src/mapping.ts b/tests/integration-tests/subgraph-data-sources/src/mapping.ts index 5842b51b21d..0f2df0e4783 100644 --- a/tests/integration-tests/subgraph-data-sources/src/mapping.ts +++ b/tests/integration-tests/subgraph-data-sources/src/mapping.ts @@ -4,10 +4,11 @@ import { MirrorBlock } from '../generated/schema'; export function handleEntity(blockEntity: Entity): void { let blockNumber = blockEntity.getBigInt('number'); let blockHash = blockEntity.getBytes('hash'); + let id = blockEntity.getString('id'); log.info('Block number: {}', [blockNumber.toString()]); - let block = new MirrorBlock(blockHash); + let block = new MirrorBlock(id); block.number = blockNumber; block.hash = blockHash; block.save(); diff --git a/tests/integration-tests/subgraph-data-sources/subgraph.yaml b/tests/integration-tests/subgraph-data-sources/subgraph.yaml index eca534d501c..46af96b1d34 100644 --- a/tests/integration-tests/subgraph-data-sources/subgraph.yaml +++ b/tests/integration-tests/subgraph-data-sources/subgraph.yaml @@ -6,7 +6,7 @@ dataSources: name: Contract network: test source: - address: 'QmUVaWpdKgcxBov1jHEa8dr46d2rkVzfHuZFu4fXJ4sFse' + address: 'QmeZhEiJuBusu7GxCe6AytvqSsgwV8QxkbSYx5ojSFB28a' startBlock: 0 mapping: apiVersion: 0.0.7 @@ -16,4 +16,6 @@ dataSources: handlers: - handler: handleEntity entity: Block + - handler: handleEntity + entity: Block2 file: ./src/mapping.ts diff --git a/tests/runner-tests/subgraph-data-sources/subgraph.yaml b/tests/runner-tests/subgraph-data-sources/subgraph.yaml index 1c666e3417e..01f719d069f 100644 --- a/tests/runner-tests/subgraph-data-sources/subgraph.yaml +++ b/tests/runner-tests/subgraph-data-sources/subgraph.yaml @@ -7,7 +7,7 @@ dataSources: network: test source: address: 'QmRFXhvyvbm4z5Lo7z2mN9Ckmo623uuB2jJYbRmAXgYKXJ' - startBlock: 6082461 + startBlock: 0 mapping: apiVersion: 0.0.7 language: wasm/assemblyscript diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index fdc82b03510..2841dcda5d6 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -522,22 +522,43 @@ async fn subgraph_data_sources(ctx: TestContext) -> anyhow::Result<()> { assert!(subgraph.healthy); let expected_response = json!({ "mirrorBlocks": [ - { "number": "1" }, - { "number": "2" }, - { "number": "3" }, - { "number": "4" }, - { "number": "5" }, - { "number": "6" }, - { "number": "7" }, - { "number": "8" }, - { "number": "9" }, + { "id": "1-v1", "number": "1" }, + { "id": "1-v2", "number": "1" }, + { "id": "1-v3", "number": "1" }, + { "id": "2-v1", "number": "2" }, + { "id": "2-v2", "number": "2" }, + { "id": "2-v3", "number": "2" }, + { "id": "3-v1", "number": "3" }, + { "id": "3-v2", "number": "3" }, + { "id": "3-v3", "number": "3" }, + { "id": "4-v1", "number": "4" }, + { "id": "4-v2", "number": "4" }, + { "id": "4-v3", "number": "4" }, + { "id": "5-v1", "number": "5" }, + { "id": "5-v2", "number": "5" }, + { "id": "5-v3", "number": "5" }, + { "id": "6-v1", "number": "6" }, + { "id": "6-v2", "number": "6" }, + { "id": "6-v3", "number": "6" }, + { "id": "7-v1", "number": "7" }, + { "id": "7-v2", "number": "7" }, + { "id": "7-v3", "number": "7" }, + { "id": "8-v1", "number": "8" }, + { "id": "8-v2", "number": "8" }, + { "id": "8-v3", "number": "8" }, + { "id": "9-v1", "number": "9" }, + { "id": "9-v2", "number": "9" }, + { "id": "9-v3", "number": "9" }, + { "id": "10-v1", "number": "10" }, + { "id": "10-v2", "number": "10" }, + { "id": "10-v3", "number": "10" }, ] }); query_succeeds( "Blocks should be right", &subgraph, - "{ mirrorBlocks(where: {number_lt: 10}, orderBy: number) { number } }", + "{ mirrorBlocks(where: {number_lte: 10}, orderBy: number) { id, number } }", expected_response, ) .await?;