Skip to content

Commit dd795c1

Browse files
committed
graph: refactor TriggersAdapterWrapper.scan_triggers
1 parent 928cf9c commit dd795c1

File tree

3 files changed

+78
-76
lines changed

3 files changed

+78
-76
lines changed

core/src/subgraph/runner.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use graph::schema::EntityKey;
3333
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
3434
use std::sync::Arc;
3535
use std::time::{Duration, Instant};
36+
use std::vec;
3637

3738
const MINUTE: Duration = Duration::from_secs(60);
3839

@@ -447,9 +448,12 @@ where
447448
let (data_sources, runtime_hosts) =
448449
self.create_dynamic_data_sources(block_state.drain_created_data_sources())?;
449450

450-
let filter = C::TriggerFilter::from_data_sources(
451-
data_sources.iter().filter_map(DataSource::as_onchain),
452-
);
451+
let filter = &Arc::new(TriggerFilterWrapper::new(
452+
C::TriggerFilter::from_data_sources(
453+
data_sources.iter().filter_map(DataSource::as_onchain),
454+
),
455+
vec![],
456+
));
453457

454458
let block: Arc<C::Block> = if self.inputs.chain.is_refetch_block_required() {
455459
let cur = firehose_cursor.clone();
@@ -478,7 +482,7 @@ where
478482
let block_with_triggers = self
479483
.inputs
480484
.triggers_adapter
481-
.triggers_in_block(&logger, block.as_ref().clone(), &filter)
485+
.triggers_in_block(&logger, block.as_ref().clone(), filter)
482486
.await?;
483487

484488
let triggers = block_with_triggers.trigger_data;

graph/src/blockchain/block_stream.rs

Lines changed: 65 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use futures03::Stream;
99
use prost_types::Any;
1010
use std::collections::{BTreeMap, HashSet};
1111
use std::fmt;
12-
use std::ops::Range;
1312
use std::sync::Arc;
1413
use std::time::Instant;
1514
use thiserror::Error;
@@ -340,52 +339,54 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
340339
) -> Result<Option<C::Block>, Error> {
341340
self.adapter.ancestor_block(ptr, offset, root).await
342341
}
343-
344-
// TODO: Do a proper implementation, this is a complete mock implementation
345342
pub async fn scan_triggers(
346343
&self,
344+
logger: &Logger,
347345
from: BlockNumber,
348346
to: BlockNumber,
349347
filter: &Arc<TriggerFilterWrapper<C>>,
350348
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
351-
if !filter.subgraph_filter.is_empty() {
352-
// TODO: handle empty range, or empty entity set bellow
353-
354-
if let Some(SubgraphFilter {
355-
subgraph: dh,
356-
start_block: _sb,
357-
entities: ent,
358-
}) = filter.subgraph_filter.first()
359-
{
360-
if let Some((dh2, store)) = self.source_subgraph_stores.first() {
361-
if dh == dh2 {
362-
let schema = crate::components::store::ReadStore::input_schema(store);
363-
if let Some(entity_type) = ent.first() {
364-
let et = schema.entity_type(entity_type).unwrap();
365-
366-
let br: Range<BlockNumber> = from..to;
367-
let entities = store.get_range(&et, br)?;
368-
let block_numbers = entities
369-
.iter()
370-
.map(|(bn, _)| bn)
371-
.cloned()
372-
.collect::<HashSet<_>>();
373-
374-
return self
375-
.subgraph_triggers(
376-
Logger::root(slog::Discard, o!()),
377-
block_numbers,
378-
from,
379-
to,
380-
filter,
381-
entities,
382-
)
383-
.await;
384-
}
385-
}
349+
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
350+
let (stored_subgraph, store) = self.source_subgraph_stores.first().unwrap();
351+
assert_eq!(stored_subgraph, &subgraph_filter.subgraph);
352+
353+
let schema = crate::components::store::ReadStore::input_schema(store);
354+
let entity_type_name = subgraph_filter.entities.first().unwrap();
355+
let entity_type = schema.entity_type(entity_type_name).unwrap();
356+
357+
let entities = store.get_range(&entity_type, from..to)?;
358+
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
359+
360+
// Ensure the 'to' block is included in the block_numbers
361+
block_numbers.insert(to);
362+
363+
let mut blocks_with_triggers = self
364+
.subgraph_triggers(
365+
Logger::root(slog::Discard, o!()),
366+
block_numbers,
367+
filter,
368+
entities,
369+
)
370+
.await?;
371+
372+
// Ensure the 'to' block is present even if it has no triggers
373+
if !blocks_with_triggers.iter().any(|b| b.block.number() == to) {
374+
let to_block_numbers: HashSet<BlockNumber> = vec![to].into_iter().collect();
375+
let to_blocks = self
376+
.adapter
377+
.load_blocks_by_numbers(logger.clone(), to_block_numbers)
378+
.await?;
379+
if let Some(to_block) = to_blocks.into_iter().next() {
380+
blocks_with_triggers.push(BlockWithTriggers::new_with_subgraph_triggers(
381+
to_block,
382+
vec![],
383+
&Logger::root(slog::Discard, o!()),
384+
));
386385
}
387386
}
387+
return Ok((blocks_with_triggers, to));
388388
}
389+
389390
self.adapter
390391
.scan_triggers(from, to, &filter.chain_filter)
391392
.await
@@ -395,9 +396,18 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
395396
&self,
396397
logger: &Logger,
397398
block: C::Block,
398-
filter: &C::TriggerFilter,
399+
filter: &Arc<TriggerFilterWrapper<C>>,
399400
) -> Result<BlockWithTriggers<C>, Error> {
400-
self.adapter.triggers_in_block(logger, block, filter).await
401+
let block_number = block.number();
402+
if filter.subgraph_filter.is_empty() {
403+
return self
404+
.adapter
405+
.triggers_in_block(logger, block, &filter.chain_filter)
406+
.await;
407+
}
408+
self.scan_triggers(logger, block_number, block_number, filter)
409+
.await
410+
.map(|(mut blocks, _)| blocks.pop().unwrap())
401411
}
402412

403413
pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
@@ -411,49 +421,38 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
411421
pub async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
412422
self.adapter.chain_head_ptr().await
413423
}
414-
415424
async fn subgraph_triggers(
416425
&self,
417426
logger: Logger,
418427
block_numbers: HashSet<BlockNumber>,
419-
_from: BlockNumber,
420-
to: BlockNumber,
421428
filter: &Arc<TriggerFilterWrapper<C>>,
422429
entities: BTreeMap<BlockNumber, Vec<Entity>>,
423-
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
424-
let logger2 = logger.cheap_clone();
430+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
431+
let logger = logger.cheap_clone();
425432
let adapter = self.adapter.clone();
426-
let first_filter = filter.subgraph_filter.first().unwrap();
427-
let blocks = adapter
428-
.load_blocks_by_numbers(logger, block_numbers)
433+
let first_filter = filter.subgraph_filter.first().unwrap(); //TODO(krishna): Avoid unwrap
434+
435+
let blocks_with_triggers = adapter
436+
.load_blocks_by_numbers(logger.clone(), block_numbers)
429437
.await?
430438
.into_iter()
431-
.map(|block| {
439+
.filter_map(|block| {
432440
let key = block.number();
433-
match entities.get(&key) {
434-
Some(e) => {
435-
let trigger_data =
436-
Self::create_subgraph_trigger_from_entity(first_filter, e);
437-
Some(BlockWithTriggers::new_with_subgraph_triggers(
438-
block,
439-
trigger_data,
440-
&logger2,
441-
))
442-
}
443-
None => None,
444-
}
441+
entities.get(&key).map(|e| {
442+
let trigger_data = Self::create_subgraph_trigger_from_entities(first_filter, e);
443+
BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger)
444+
})
445445
})
446-
.flatten()
447446
.collect();
448447

449-
Ok((blocks, to))
448+
Ok(blocks_with_triggers)
450449
}
451450

452-
fn create_subgraph_trigger_from_entity(
451+
fn create_subgraph_trigger_from_entities(
453452
filter: &SubgraphFilter,
454-
entity: &Vec<Entity>,
453+
entities: &Vec<Entity>,
455454
) -> Vec<subgraph::TriggerData> {
456-
entity
455+
entities
457456
.iter()
458457
.map(|e| subgraph::TriggerData {
459458
source: filter.subgraph.clone(),

graph/src/blockchain/polling_block_stream.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,10 @@ where
379379
);
380380

381381
// Update with actually scanned range, to account for any skipped null blocks.
382-
let (blocks, to) = self.adapter.scan_triggers(from, to, &self.filter).await?;
382+
let (blocks, to) = self
383+
.adapter
384+
.scan_triggers(&self.logger, from, to, &self.filter)
385+
.await?;
383386
let range_size = to - from + 1;
384387

385388
// If the target block (`to`) is within the reorg threshold, indicating no non-null finalized blocks are
@@ -469,11 +472,7 @@ where
469472
// Note that head_ancestor is a child of subgraph_ptr.
470473
let block = self
471474
.adapter
472-
.triggers_in_block(
473-
&self.logger,
474-
head_ancestor,
475-
&self.filter.chain_filter.clone(),
476-
)
475+
.triggers_in_block(&self.logger, head_ancestor, &self.filter)
477476
.await?;
478477
Ok(ReconciliationStep::ProcessDescendantBlocks(vec![block], 1))
479478
} else {

0 commit comments

Comments
 (0)