Skip to content

Commit 48d1b1f

Browse files
committed
graph: refactor TriggersAdapterWrapper.triggers_in_block to not rely on scan_triggers
1 parent c25dd9b commit 48d1b1f

File tree

1 file changed

+78
-51
lines changed

1 file changed

+78
-51
lines changed

graph/src/blockchain/block_stream.rs

Lines changed: 78 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,24 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
331331
source_subgraph_stores,
332332
}
333333
}
334+
335+
pub async fn blocks_with_subgraph_triggers(
336+
&self,
337+
logger: &Logger,
338+
subgraph_filter: &SubgraphFilter,
339+
range: SubgraphTriggerScanRange<C>,
340+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
341+
let store = self
342+
.source_subgraph_stores
343+
.get(&subgraph_filter.subgraph)
344+
.unwrap(); // TODO(krishna): Avoid unwrap
345+
346+
let schema = <dyn crate::components::store::SourceableStore>::input_schema(store);
347+
348+
let adapter = self.adapter.clone();
349+
350+
scan_subgraph_triggers::<C>(logger, store, &adapter, &schema, &subgraph_filter, range).await
351+
}
334352
}
335353

336354
fn create_subgraph_trigger_from_entities(
@@ -376,34 +394,60 @@ async fn create_subgraph_triggers<C: Blockchain>(
376394
Ok(blocks)
377395
}
378396

397+
pub enum SubgraphTriggerScanRange<C: Blockchain> {
398+
Single(C::Block),
399+
Range(BlockNumber, BlockNumber),
400+
}
401+
379402
async fn scan_subgraph_triggers<C: Blockchain>(
380403
logger: &Logger,
381404
store: &Arc<dyn SourceableStore>,
382405
adapter: &Arc<dyn TriggersAdapter<C>>,
383406
schema: &InputSchema,
384407
filter: &SubgraphFilter,
408+
range: SubgraphTriggerScanRange<C>,
409+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
410+
match range {
411+
SubgraphTriggerScanRange::Single(block) => {
412+
let entities =
413+
get_entities_for_range(store, filter, schema, block.number(), block.number())
414+
.await?;
415+
create_subgraph_triggers::<C>(logger.clone(), vec![block], filter, entities).await
416+
}
417+
SubgraphTriggerScanRange::Range(from, to) => {
418+
let entities = get_entities_for_range(store, filter, schema, from, to).await?;
419+
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
420+
// Ensure the 'to' block is included in the block_numbers
421+
block_numbers.insert(to);
422+
423+
let blocks = adapter
424+
.load_blocks_by_numbers(logger.clone(), block_numbers)
425+
.await?;
426+
427+
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
428+
}
429+
}
430+
}
431+
432+
async fn get_entities_for_range(
433+
store: &Arc<dyn SourceableStore>,
434+
filter: &SubgraphFilter,
435+
schema: &InputSchema,
385436
from: BlockNumber,
386437
to: BlockNumber,
387-
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
438+
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, Error> {
388439
let entity_types: Vec<EntityType> = filter
389440
.entities
390441
.iter()
391442
.map(|e| schema.entity_type(e).unwrap())
392443
.collect();
393-
394-
let entity_type = entity_types.first().unwrap();
395-
let range = from..to;
396-
let entities = store.get_range(&entity_type, range)?;
397-
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
398-
399-
// Ensure the 'to' block is included in the block_numbers
400-
block_numbers.insert(to);
401-
402-
let blocks = adapter
403-
.load_blocks_by_numbers(logger.clone(), block_numbers)
404-
.await?;
405-
406-
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
444+
let mut entities = BTreeMap::new();
445+
for entity_type in entity_types {
446+
let range = from..to;
447+
let mut entities_for_type = store.get_range(&entity_type, range)?;
448+
entities.append(&mut entities_for_type);
449+
}
450+
Ok(entities)
407451
}
408452

409453
impl<C: Blockchain> TriggersAdapterWrapper<C> {
@@ -424,32 +468,13 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
424468
filter: &Arc<TriggerFilterWrapper<C>>,
425469
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
426470
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
427-
let store = self
428-
.source_subgraph_stores
429-
.get(&subgraph_filter.subgraph)
430-
.unwrap(); // TODO(krishna): Avoid unwrap
431-
432-
let schema = <dyn crate::components::store::SourceableStore>::input_schema(store);
433-
let adapter = self.adapter.clone();
434-
435-
let blocks_with_triggers = scan_subgraph_triggers::<C>(
436-
logger,
437-
store,
438-
&adapter,
439-
&schema,
440-
&subgraph_filter,
441-
from,
442-
to,
443-
)
444-
.await?;
445-
446-
debug!(
447-
logger,
448-
"Scanned subgraph triggers";
449-
"from" => from,
450-
"to" => to,
451-
"blocks_with_triggers" => blocks_with_triggers.len(),
452-
);
471+
let blocks_with_triggers = self
472+
.blocks_with_subgraph_triggers(
473+
logger,
474+
subgraph_filter,
475+
SubgraphTriggerScanRange::Range(from, to),
476+
)
477+
.await?;
453478

454479
return Ok((blocks_with_triggers, to));
455480
}
@@ -472,19 +497,21 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
472497
"block_hash" => block.hash().hash_hex(),
473498
);
474499

475-
let block_number = block.number();
476-
477-
if filter.subgraph_filter.is_empty() {
478-
trace!(logger, "No subgraph filters, scanning triggers in block");
479-
return self
480-
.adapter
481-
.triggers_in_block(logger, block, &filter.chain_filter)
482-
.await;
500+
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
501+
let blocks_with_triggers = self
502+
.blocks_with_subgraph_triggers(
503+
logger,
504+
subgraph_filter,
505+
SubgraphTriggerScanRange::Single(block),
506+
)
507+
.await?;
508+
509+
return Ok(blocks_with_triggers.into_iter().next().unwrap());
483510
}
484511

485-
self.scan_triggers(logger, block_number, block_number, filter)
512+
self.adapter
513+
.triggers_in_block(logger, block, &filter.chain_filter)
486514
.await
487-
.map(|(mut blocks, _)| blocks.pop().unwrap())
488515
}
489516

490517
pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {

0 commit comments

Comments
 (0)