Skip to content

Commit d48f6b8

Browse files
committed
Subgraph composition: TriggersAdapterWrapper refactor
1 parent afcbb3d commit d48f6b8

File tree

12 files changed

+270
-117
lines changed

12 files changed

+270
-117
lines changed

core/src/subgraph/runner.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use graph::schema::EntityKey;
3333
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
3434
use std::sync::Arc;
3535
use std::time::{Duration, Instant};
36+
use std::vec;
3637

3738
const MINUTE: Duration = Duration::from_secs(60);
3839

@@ -493,9 +494,12 @@ where
493494
let (data_sources, runtime_hosts) =
494495
self.create_dynamic_data_sources(block_state.drain_created_data_sources())?;
495496

496-
let filter = C::TriggerFilter::from_data_sources(
497-
data_sources.iter().filter_map(DataSource::as_onchain),
498-
);
497+
let filter = &Arc::new(TriggerFilterWrapper::new(
498+
C::TriggerFilter::from_data_sources(
499+
data_sources.iter().filter_map(DataSource::as_onchain),
500+
),
501+
vec![],
502+
));
499503

500504
let block: Arc<C::Block> = if self.inputs.chain.is_refetch_block_required() {
501505
let cur = firehose_cursor.clone();
@@ -524,7 +528,7 @@ where
524528
let block_with_triggers = self
525529
.inputs
526530
.triggers_adapter
527-
.triggers_in_block(&logger, block.as_ref().clone(), &filter)
531+
.triggers_in_block(&logger, block.as_ref().clone(), filter)
528532
.await?;
529533

530534
let triggers = block_with_triggers.trigger_data;

graph/src/blockchain/block_stream.rs

Lines changed: 190 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@ use anyhow::Error;
77
use async_stream::stream;
88
use futures03::Stream;
99
use prost_types::Any;
10-
use std::collections::{BTreeMap, HashSet};
10+
use std::collections::{BTreeMap, HashMap, HashSet};
1111
use std::fmt;
12-
use std::ops::Range;
1312
use std::sync::Arc;
1413
use std::time::Instant;
1514
use thiserror::Error;
@@ -22,7 +21,7 @@ use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore};
2221
use crate::data::subgraph::UnifiedMappingApiVersion;
2322
use crate::firehose::{self, FirehoseEndpoint};
2423
use crate::futures03::stream::StreamExt as _;
25-
use crate::schema::InputSchema;
24+
use crate::schema::{EntityType, InputSchema};
2625
use crate::substreams_rpc::response::Message;
2726
use crate::{prelude::*, prometheus::labels};
2827

@@ -319,19 +318,159 @@ impl<C: Blockchain> BlockWithTriggers<C> {
319318
/// logic for each chain, increasing code repetition.
320319
pub struct TriggersAdapterWrapper<C: Blockchain> {
321320
pub adapter: Arc<dyn TriggersAdapter<C>>,
322-
pub source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
321+
pub source_subgraph_stores: HashMap<DeploymentHash, Arc<dyn SourceableStore>>,
323322
}
324323

325324
impl<C: Blockchain> TriggersAdapterWrapper<C> {
326325
pub fn new(
327326
adapter: Arc<dyn TriggersAdapter<C>>,
328327
source_subgraph_stores: Vec<Arc<dyn SourceableStore>>,
329328
) -> Self {
329+
let stores_map: HashMap<_, _> = source_subgraph_stores
330+
.iter()
331+
.map(|store| (store.input_schema().id().clone(), store.clone()))
332+
.collect();
330333
Self {
331334
adapter,
332-
source_subgraph_stores,
335+
source_subgraph_stores: stores_map,
333336
}
334337
}
338+
339+
pub async fn blocks_with_subgraph_triggers(
340+
&self,
341+
logger: &Logger,
342+
subgraph_filter: &SubgraphFilter,
343+
range: SubgraphTriggerScanRange<C>,
344+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
345+
let store = self
346+
.source_subgraph_stores
347+
.get(&subgraph_filter.subgraph)
348+
.ok_or_else(|| anyhow!("Store not found for subgraph: {}", subgraph_filter.subgraph))?;
349+
350+
let schema = <dyn crate::components::store::SourceableStore>::input_schema(store);
351+
352+
let adapter = self.adapter.clone();
353+
354+
scan_subgraph_triggers::<C>(logger, store, &adapter, &schema, &subgraph_filter, range).await
355+
}
356+
}
357+
358+
fn create_subgraph_trigger_from_entities(
359+
filter: &SubgraphFilter,
360+
entities: &Vec<EntityWithType>,
361+
) -> Vec<subgraph::TriggerData> {
362+
entities
363+
.iter()
364+
.map(|e| subgraph::TriggerData {
365+
source: filter.subgraph.clone(),
366+
entity: e.entity.clone(),
367+
entity_type: e.entity_type.as_str().to_string(),
368+
})
369+
.collect()
370+
}
371+
372+
async fn create_subgraph_triggers<C: Blockchain>(
373+
logger: Logger,
374+
blocks: Vec<C::Block>,
375+
filter: &SubgraphFilter,
376+
entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
377+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
378+
let logger_clone = logger.cheap_clone();
379+
380+
let blocks: Vec<BlockWithTriggers<C>> = blocks
381+
.into_iter()
382+
.map(|block| {
383+
let block_number = block.number();
384+
match entities.get(&block_number) {
385+
Some(e) => {
386+
let trigger_data = create_subgraph_trigger_from_entities(filter, e);
387+
BlockWithTriggers::new_with_subgraph_triggers(
388+
block,
389+
trigger_data,
390+
&logger_clone,
391+
)
392+
}
393+
None => BlockWithTriggers::new_with_subgraph_triggers(block, vec![], &logger_clone),
394+
}
395+
})
396+
.collect();
397+
398+
Ok(blocks)
399+
}
400+
401+
pub enum SubgraphTriggerScanRange<C: Blockchain> {
402+
Single(C::Block),
403+
Range(BlockNumber, BlockNumber),
404+
}
405+
406+
async fn scan_subgraph_triggers<C: Blockchain>(
407+
logger: &Logger,
408+
store: &Arc<dyn SourceableStore>,
409+
adapter: &Arc<dyn TriggersAdapter<C>>,
410+
schema: &InputSchema,
411+
filter: &SubgraphFilter,
412+
range: SubgraphTriggerScanRange<C>,
413+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
414+
match range {
415+
SubgraphTriggerScanRange::Single(block) => {
416+
let entities =
417+
get_entities_for_range(store, filter, schema, block.number(), block.number())
418+
.await?;
419+
create_subgraph_triggers::<C>(logger.clone(), vec![block], filter, entities).await
420+
}
421+
SubgraphTriggerScanRange::Range(from, to) => {
422+
let entities = get_entities_for_range(store, filter, schema, from, to).await?;
423+
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
424+
// Ensure the 'to' block is included in the block_numbers
425+
block_numbers.insert(to);
426+
427+
let blocks = adapter
428+
.load_blocks_by_numbers(logger.clone(), block_numbers)
429+
.await?;
430+
431+
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
432+
}
433+
}
434+
}
435+
436+
pub struct EntityWithType {
437+
pub entity_type: EntityType,
438+
pub entity: Entity,
439+
}
440+
441+
async fn get_entities_for_range(
442+
store: &Arc<dyn SourceableStore>,
443+
filter: &SubgraphFilter,
444+
schema: &InputSchema,
445+
from: BlockNumber,
446+
to: BlockNumber,
447+
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, Error> {
448+
let mut entities_by_block = BTreeMap::new();
449+
450+
for entity_name in &filter.entities {
451+
let entity_type = schema.entity_type(entity_name)?;
452+
453+
let entity_ranges = store.get_range(&entity_type, from..to)?;
454+
455+
for (block_number, entity_vec) in entity_ranges {
456+
let mut entity_vec = entity_vec
457+
.into_iter()
458+
.map(|e| EntityWithType {
459+
entity_type: entity_type.clone(),
460+
entity: e,
461+
})
462+
.collect();
463+
464+
entities_by_block
465+
.entry(block_number)
466+
.and_modify(|existing_vec: &mut Vec<EntityWithType>| {
467+
existing_vec.append(&mut entity_vec);
468+
})
469+
.or_insert(entity_vec);
470+
}
471+
}
472+
473+
Ok(entities_by_block)
335474
}
336475

337476
impl<C: Blockchain> TriggersAdapterWrapper<C> {
@@ -344,45 +483,25 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
344483
self.adapter.ancestor_block(ptr, offset, root).await
345484
}
346485

347-
// TODO: Do a proper implementation, this is a complete mock implementation
348486
pub async fn scan_triggers(
349487
&self,
488+
logger: &Logger,
350489
from: BlockNumber,
351490
to: BlockNumber,
352491
filter: &Arc<TriggerFilterWrapper<C>>,
353492
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
354-
if !filter.subgraph_filter.is_empty() {
355-
// TODO: handle empty range, or empty entity set bellow
356-
357-
if let Some(SubgraphFilter {
358-
subgraph: dh,
359-
start_block: _sb,
360-
entities: ent,
361-
}) = filter.subgraph_filter.first()
362-
{
363-
if let Some(store) = self.source_subgraph_stores.first() {
364-
let schema = store.input_schema();
365-
let dh2 = schema.id();
366-
if dh == dh2 {
367-
if let Some(entity_type) = ent.first() {
368-
let et = schema.entity_type(entity_type).unwrap();
369-
370-
let br: Range<BlockNumber> = from..to;
371-
let entities = store.get_range(&et, br)?;
372-
return self
373-
.subgraph_triggers(
374-
Logger::root(slog::Discard, o!()),
375-
from,
376-
to,
377-
filter,
378-
entities,
379-
)
380-
.await;
381-
}
382-
}
383-
}
384-
}
493+
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
494+
let blocks_with_triggers = self
495+
.blocks_with_subgraph_triggers(
496+
logger,
497+
subgraph_filter,
498+
SubgraphTriggerScanRange::Range(from, to),
499+
)
500+
.await?;
501+
502+
return Ok((blocks_with_triggers, to));
385503
}
504+
386505
self.adapter
387506
.scan_triggers(from, to, &filter.chain_filter)
388507
.await
@@ -392,9 +511,30 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
392511
&self,
393512
logger: &Logger,
394513
block: C::Block,
395-
filter: &C::TriggerFilter,
514+
filter: &Arc<TriggerFilterWrapper<C>>,
396515
) -> Result<BlockWithTriggers<C>, Error> {
397-
self.adapter.triggers_in_block(logger, block, filter).await
516+
trace!(
517+
logger,
518+
"triggers_in_block";
519+
"block_number" => block.number(),
520+
"block_hash" => block.hash().hash_hex(),
521+
);
522+
523+
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
524+
let blocks_with_triggers = self
525+
.blocks_with_subgraph_triggers(
526+
logger,
527+
subgraph_filter,
528+
SubgraphTriggerScanRange::Single(block),
529+
)
530+
.await?;
531+
532+
return Ok(blocks_with_triggers.into_iter().next().unwrap());
533+
}
534+
535+
self.adapter
536+
.triggers_in_block(logger, block, &filter.chain_filter)
537+
.await
398538
}
399539

400540
pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
@@ -406,57 +546,20 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
406546
}
407547

408548
pub async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
409-
self.adapter.chain_head_ptr().await
410-
}
549+
if self.source_subgraph_stores.is_empty() {
550+
return self.adapter.chain_head_ptr().await;
551+
}
411552

412-
async fn subgraph_triggers(
413-
&self,
414-
logger: Logger,
415-
from: BlockNumber,
416-
to: BlockNumber,
417-
filter: &Arc<TriggerFilterWrapper<C>>,
418-
entities: BTreeMap<BlockNumber, Vec<Entity>>,
419-
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
420-
let logger2 = logger.cheap_clone();
421-
let adapter = self.adapter.clone();
422-
let first_filter = filter.subgraph_filter.first().unwrap();
423-
let blocks = adapter
424-
.load_blocks_by_numbers(logger, HashSet::from_iter(from..to))
425-
.await?
426-
.into_iter()
427-
.map(|block| {
428-
let key = block.number();
429-
match entities.get(&key) {
430-
Some(e) => {
431-
let trigger_data =
432-
Self::create_subgraph_trigger_from_entity(first_filter, e);
433-
Some(BlockWithTriggers::new_with_subgraph_triggers(
434-
block,
435-
trigger_data,
436-
&logger2,
437-
))
438-
}
439-
None => None,
440-
}
441-
})
442-
.flatten()
443-
.collect();
553+
let ptrs = futures03::future::try_join_all(
554+
self.source_subgraph_stores
555+
.iter()
556+
.map(|(_, store)| store.block_ptr()),
557+
)
558+
.await?;
444559

445-
Ok((blocks, to))
446-
}
560+
let min_ptr = ptrs.into_iter().flatten().min_by_key(|ptr| ptr.number);
447561

448-
fn create_subgraph_trigger_from_entity(
449-
filter: &SubgraphFilter,
450-
entity: &Vec<Entity>,
451-
) -> Vec<subgraph::TriggerData> {
452-
entity
453-
.iter()
454-
.map(|e| subgraph::TriggerData {
455-
source: filter.subgraph.clone(),
456-
entity: e.clone(),
457-
entity_type: filter.entities.first().unwrap().clone(),
458-
})
459-
.collect()
562+
Ok(min_ptr)
460563
}
461564
}
462565

graph/src/blockchain/polling_block_stream.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,10 @@ where
379379
);
380380

381381
// Update with actually scanned range, to account for any skipped null blocks.
382-
let (blocks, to) = self.adapter.scan_triggers(from, to, &self.filter).await?;
382+
let (blocks, to) = self
383+
.adapter
384+
.scan_triggers(&self.logger, from, to, &self.filter)
385+
.await?;
383386
let range_size = to - from + 1;
384387

385388
// If the target block (`to`) is within the reorg threshold, indicating no non-null finalized blocks are
@@ -469,11 +472,7 @@ where
469472
// Note that head_ancestor is a child of subgraph_ptr.
470473
let block = self
471474
.adapter
472-
.triggers_in_block(
473-
&self.logger,
474-
head_ancestor,
475-
&self.filter.chain_filter.clone(),
476-
)
475+
.triggers_in_block(&self.logger, head_ancestor, &self.filter)
477476
.await?;
478477
Ok(ReconciliationStep::ProcessDescendantBlocks(vec![block], 1))
479478
} else {

0 commit comments

Comments
 (0)