Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use graph::schema::EntityKey;
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec;

const MINUTE: Duration = Duration::from_secs(60);

Expand Down Expand Up @@ -493,9 +494,12 @@ where
let (data_sources, runtime_hosts) =
self.create_dynamic_data_sources(block_state.drain_created_data_sources())?;

let filter = C::TriggerFilter::from_data_sources(
data_sources.iter().filter_map(DataSource::as_onchain),
);
let filter = &Arc::new(TriggerFilterWrapper::new(
C::TriggerFilter::from_data_sources(
data_sources.iter().filter_map(DataSource::as_onchain),
),
vec![],
));

let block: Arc<C::Block> = if self.inputs.chain.is_refetch_block_required() {
let cur = firehose_cursor.clone();
Expand Down Expand Up @@ -524,7 +528,7 @@ where
let block_with_triggers = self
.inputs
.triggers_adapter
.triggers_in_block(&logger, block.as_ref().clone(), &filter)
.triggers_in_block(&logger, block.as_ref().clone(), filter)
.await?;

let triggers = block_with_triggers.trigger_data;
Expand Down
277 changes: 190 additions & 87 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use anyhow::Error;
use async_stream::stream;
use futures03::Stream;
use prost_types::Any;
use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
Expand All @@ -22,7 +21,7 @@ use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore};
use crate::data::subgraph::UnifiedMappingApiVersion;
use crate::firehose::{self, FirehoseEndpoint};
use crate::futures03::stream::StreamExt as _;
use crate::schema::InputSchema;
use crate::schema::{EntityType, InputSchema};
use crate::substreams_rpc::response::Message;
use crate::{prelude::*, prometheus::labels};

Expand Down Expand Up @@ -319,19 +318,159 @@ impl<C: Blockchain> BlockWithTriggers<C> {
/// logic for each chain, increasing code repetition.
pub struct TriggersAdapterWrapper<C: Blockchain> {
pub adapter: Arc<dyn TriggersAdapter<C>>,
pub source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
pub source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn SourceableStore>>,
}

impl<C: Blockchain> TriggersAdapterWrapper<C> {
pub fn new(
adapter: Arc<dyn TriggersAdapter<C>>,
source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
) -> Self {
let stores_map: HashMap<_, _> = source_subgraph_stores
.iter()
.map(|store| (store.input_schema().id().clone(), store.clone()))
.collect();
Self {
adapter,
source_subgraph_stores,
source_subgraph_stores: stores_map,
}
}

pub async fn blocks_with_subgraph_triggers(
&self,
logger: &Logger,
subgraph_filter: &SubgraphFilter,
range: SubgraphTriggerScanRange<C>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
let store = self
.source_subgraph_stores
.get(&subgraph_filter.subgraph)
.ok_or_else(|| anyhow!("Store not found for subgraph: {}", subgraph_filter.subgraph))?;

let schema = <dyn crate::components::store::SourceableStore>::input_schema(store);

let adapter = self.adapter.clone();

scan_subgraph_triggers::<C>(logger, store, &adapter, &schema, &subgraph_filter, range).await
}
}

fn create_subgraph_trigger_from_entities(
filter: &SubgraphFilter,
entities: &Vec<EntityWithType>,
) -> Vec<subgraph::TriggerData> {
entities
.iter()
.map(|e| subgraph::TriggerData {
source: filter.subgraph.clone(),
entity: e.entity.clone(),
entity_type: e.entity_type.as_str().to_string(),
})
.collect()
}

async fn create_subgraph_triggers<C: Blockchain>(
logger: Logger,
blocks: Vec<C::Block>,
filter: &SubgraphFilter,
entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
let logger_clone = logger.cheap_clone();

let blocks: Vec<BlockWithTriggers<C>> = blocks
.into_iter()
.map(|block| {
let block_number = block.number();
match entities.get(&block_number) {
Some(e) => {
let trigger_data = create_subgraph_trigger_from_entities(filter, e);
BlockWithTriggers::new_with_subgraph_triggers(
block,
trigger_data,
&logger_clone,
)
}
None => BlockWithTriggers::new_with_subgraph_triggers(block, vec![], &logger_clone),
}
})
.collect();

Ok(blocks)
}

pub enum SubgraphTriggerScanRange<C: Blockchain> {
Single(C::Block),
Range(BlockNumber, BlockNumber),
}

async fn scan_subgraph_triggers<C: Blockchain>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any testing on these, specifically to ensure the comments on the code hold true like the range is inclusive etc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will dedicate a PR to more intensive testing in the end of this PR chain so that i have all the relevant changes ready before the testing

logger: &Logger,
store: &Arc<dyn SourceableStore>,
adapter: &Arc<dyn TriggersAdapter<C>>,
schema: &InputSchema,
filter: &SubgraphFilter,
range: SubgraphTriggerScanRange<C>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
match range {
SubgraphTriggerScanRange::Single(block) => {
let entities =
get_entities_for_range(store, filter, schema, block.number(), block.number())
.await?;
create_subgraph_triggers::<C>(logger.clone(), vec![block], filter, entities).await
}
SubgraphTriggerScanRange::Range(from, to) => {
let entities = get_entities_for_range(store, filter, schema, from, to).await?;
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
// Ensure the 'to' block is included in the block_numbers
block_numbers.insert(to);

let blocks = adapter
.load_blocks_by_numbers(logger.clone(), block_numbers)
.await?;

create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
}
}
}

pub struct EntityWithType {
pub entity_type: EntityType,
pub entity: Entity,
}

async fn get_entities_for_range(
store: &Arc<dyn SourceableStore>,
filter: &SubgraphFilter,
schema: &InputSchema,
from: BlockNumber,
to: BlockNumber,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, Error> {
let mut entities_by_block = BTreeMap::new();

for entity_name in &filter.entities {
let entity_type = schema.entity_type(entity_name)?;

let entity_ranges = store.get_range(&entity_type, from..to)?;

for (block_number, entity_vec) in entity_ranges {
let mut entity_vec = entity_vec
.into_iter()
.map(|e| EntityWithType {
entity_type: entity_type.clone(),
entity: e,
})
.collect();

entities_by_block
.entry(block_number)
.and_modify(|existing_vec: &mut Vec<EntityWithType>| {
existing_vec.append(&mut entity_vec);
})
.or_insert(entity_vec);
}
}

Ok(entities_by_block)
}

impl<C: Blockchain> TriggersAdapterWrapper<C> {
Expand All @@ -344,45 +483,25 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
self.adapter.ancestor_block(ptr, offset, root).await
}

// TODO: Do a proper implementation, this is a complete mock implementation
pub async fn scan_triggers(
&self,
logger: &Logger,
from: BlockNumber,
to: BlockNumber,
filter: &Arc<TriggerFilterWrapper<C>>,
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
if !filter.subgraph_filter.is_empty() {
// TODO: handle empty range, or empty entity set bellow

if let Some(SubgraphFilter {
subgraph: dh,
start_block: _sb,
entities: ent,
}) = filter.subgraph_filter.first()
{
if let Some(store) = self.source_subgraph_stores.first() {
let schema = store.input_schema();
let dh2 = schema.id();
if dh == dh2 {
if let Some(entity_type) = ent.first() {
let et = schema.entity_type(entity_type).unwrap();

let br: Range<BlockNumber> = from..to;
let entities = store.get_range(&et, br)?;
return self
.subgraph_triggers(
Logger::root(slog::Discard, o!()),
from,
to,
filter,
entities,
)
.await;
}
}
}
}
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
let blocks_with_triggers = self
.blocks_with_subgraph_triggers(
logger,
subgraph_filter,
SubgraphTriggerScanRange::Range(from, to),
)
.await?;

return Ok((blocks_with_triggers, to));
}

self.adapter
.scan_triggers(from, to, &filter.chain_filter)
.await
Expand All @@ -392,9 +511,30 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
&self,
logger: &Logger,
block: C::Block,
filter: &C::TriggerFilter,
filter: &Arc<TriggerFilterWrapper<C>>,
) -> Result<BlockWithTriggers<C>, Error> {
self.adapter.triggers_in_block(logger, block, filter).await
trace!(
logger,
"triggers_in_block";
"block_number" => block.number(),
"block_hash" => block.hash().hash_hex(),
);

if let Some(subgraph_filter) = filter.subgraph_filter.first() {
let blocks_with_triggers = self
.blocks_with_subgraph_triggers(
logger,
subgraph_filter,
SubgraphTriggerScanRange::Single(block),
)
.await?;

return Ok(blocks_with_triggers.into_iter().next().unwrap());
}

self.adapter
.triggers_in_block(logger, block, &filter.chain_filter)
.await
}

pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
Expand All @@ -406,57 +546,20 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
}

pub async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
self.adapter.chain_head_ptr().await
}
if self.source_subgraph_stores.is_empty() {
return self.adapter.chain_head_ptr().await;
}

async fn subgraph_triggers(
&self,
logger: Logger,
from: BlockNumber,
to: BlockNumber,
filter: &Arc<TriggerFilterWrapper<C>>,
entities: BTreeMap<BlockNumber, Vec<Entity>>,
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
let logger2 = logger.cheap_clone();
let adapter = self.adapter.clone();
let first_filter = filter.subgraph_filter.first().unwrap();
let blocks = adapter
.load_blocks_by_numbers(logger, HashSet::from_iter(from..to))
.await?
.into_iter()
.map(|block| {
let key = block.number();
match entities.get(&key) {
Some(e) => {
let trigger_data =
Self::create_subgraph_trigger_from_entity(first_filter, e);
Some(BlockWithTriggers::new_with_subgraph_triggers(
block,
trigger_data,
&logger2,
))
}
None => None,
}
})
.flatten()
.collect();
let ptrs = futures03::future::try_join_all(
self.source_subgraph_stores
.iter()
.map(|(_, store)| store.block_ptr()),
)
.await?;

Ok((blocks, to))
}
let min_ptr = ptrs.into_iter().flatten().min_by_key(|ptr| ptr.number);

fn create_subgraph_trigger_from_entity(
filter: &SubgraphFilter,
entity: &Vec<Entity>,
) -> Vec<subgraph::TriggerData> {
entity
.iter()
.map(|e| subgraph::TriggerData {
source: filter.subgraph.clone(),
entity: e.clone(),
entity_type: filter.entities.first().unwrap().clone(),
})
.collect()
Ok(min_ptr)
}
}

Expand Down
11 changes: 5 additions & 6 deletions graph/src/blockchain/polling_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,10 @@ where
);

// Update with actually scanned range, to account for any skipped null blocks.
let (blocks, to) = self.adapter.scan_triggers(from, to, &self.filter).await?;
let (blocks, to) = self
.adapter
.scan_triggers(&self.logger, from, to, &self.filter)
.await?;
let range_size = to - from + 1;

// If the target block (`to`) is within the reorg threshold, indicating no non-null finalized blocks are
Expand Down Expand Up @@ -469,11 +472,7 @@ where
// Note that head_ancestor is a child of subgraph_ptr.
let block = self
.adapter
.triggers_in_block(
&self.logger,
head_ancestor,
&self.filter.chain_filter.clone(),
)
.triggers_in_block(&self.logger, head_ancestor, &self.filter)
.await?;
Ok(ReconciliationStep::ProcessDescendantBlocks(vec![block], 1))
} else {
Expand Down
Loading