Skip to content

Commit afcbb3d

Browse files
committed
Subgraph composition interfacing
1 parent eb5e46a commit afcbb3d

File tree

2 files changed

+68
-60
lines changed

2 files changed

+68
-60
lines changed

graph/src/blockchain/block_stream.rs

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::data::store::scalar;
1+
use crate::blockchain::SubgraphFilter;
22
use crate::data_source::subgraph;
33
use crate::substreams::Clock;
44
use crate::substreams_rpc::response::Message as SubstreamsMessage;
@@ -7,17 +7,16 @@ use anyhow::Error;
77
use async_stream::stream;
88
use futures03::Stream;
99
use prost_types::Any;
10-
use std::collections::HashSet;
10+
use std::collections::{BTreeMap, HashSet};
1111
use std::fmt;
12+
use std::ops::Range;
1213
use std::sync::Arc;
1314
use std::time::Instant;
1415
use thiserror::Error;
1516
use tokio::sync::mpsc::{self, Receiver, Sender};
1617

1718
use super::substreams_block_stream::SubstreamsLogData;
18-
use super::{
19-
Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper,
20-
};
19+
use super::{Block, BlockPtr, BlockTime, Blockchain, Trigger, TriggerFilterWrapper};
2120
use crate::anyhow::Result;
2221
use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore};
2322
use crate::data::subgraph::UnifiedMappingApiVersion;
@@ -353,11 +352,37 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
353352
filter: &Arc<TriggerFilterWrapper<C>>,
354353
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
355354
if !filter.subgraph_filter.is_empty() {
356-
return self
357-
.subgraph_triggers(Logger::root(slog::Discard, o!()), from, to, filter)
358-
.await;
355+
// TODO: handle empty range, or empty entity set bellow
356+
357+
if let Some(SubgraphFilter {
358+
subgraph: dh,
359+
start_block: _sb,
360+
entities: ent,
361+
}) = filter.subgraph_filter.first()
362+
{
363+
if let Some(store) = self.source_subgraph_stores.first() {
364+
let schema = store.input_schema();
365+
let dh2 = schema.id();
366+
if dh == dh2 {
367+
if let Some(entity_type) = ent.first() {
368+
let et = schema.entity_type(entity_type).unwrap();
369+
370+
let br: Range<BlockNumber> = from..to;
371+
let entities = store.get_range(&et, br)?;
372+
return self
373+
.subgraph_triggers(
374+
Logger::root(slog::Discard, o!()),
375+
from,
376+
to,
377+
filter,
378+
entities,
379+
)
380+
.await;
381+
}
382+
}
383+
}
384+
}
359385
}
360-
361386
self.adapter
362387
.scan_triggers(from, to, &filter.chain_filter)
363388
.await
@@ -384,70 +409,54 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
384409
self.adapter.chain_head_ptr().await
385410
}
386411

387-
// TODO(krishna): Currently this is a mock implementation of subgraph triggers.
388-
// This will be replaced with the actual implementation which will use the filters to
389-
// query the database of the source subgraph and return the entity triggers.
390412
async fn subgraph_triggers(
391413
&self,
392414
logger: Logger,
393415
from: BlockNumber,
394416
to: BlockNumber,
395417
filter: &Arc<TriggerFilterWrapper<C>>,
418+
entities: BTreeMap<BlockNumber, Vec<Entity>>,
396419
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
397420
let logger2 = logger.cheap_clone();
398421
let adapter = self.adapter.clone();
399-
// let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?;
400-
// let to = to_ptr.block_number();
401-
402422
let first_filter = filter.subgraph_filter.first().unwrap();
403-
404423
let blocks = adapter
405-
.load_blocks_by_numbers(logger, HashSet::from_iter(from..=to))
424+
.load_blocks_by_numbers(logger, HashSet::from_iter(from..to))
406425
.await?
407426
.into_iter()
408427
.map(|block| {
409-
let trigger_data = vec![Self::create_mock_subgraph_trigger(first_filter, &block)];
410-
BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger2)
428+
let key = block.number();
429+
match entities.get(&key) {
430+
Some(e) => {
431+
let trigger_data =
432+
Self::create_subgraph_trigger_from_entity(first_filter, e);
433+
Some(BlockWithTriggers::new_with_subgraph_triggers(
434+
block,
435+
trigger_data,
436+
&logger2,
437+
))
438+
}
439+
None => None,
440+
}
411441
})
442+
.flatten()
412443
.collect();
413444

414445
Ok((blocks, to))
415446
}
416447

417-
fn create_mock_subgraph_trigger(
448+
fn create_subgraph_trigger_from_entity(
418449
filter: &SubgraphFilter,
419-
block: &C::Block,
420-
) -> subgraph::TriggerData {
421-
let mock_entity = Self::create_mock_entity(block);
422-
subgraph::TriggerData {
423-
source: filter.subgraph.clone(),
424-
entity: mock_entity,
425-
entity_type: filter.entities.first().unwrap().clone(),
426-
}
427-
}
428-
429-
fn create_mock_entity(block: &C::Block) -> Entity {
430-
let id = DeploymentHash::new("test").unwrap();
431-
let data_schema = InputSchema::parse_latest(
432-
"type Block @entity { id: Bytes!, number: BigInt!, hash: Bytes! }",
433-
id.clone(),
434-
)
435-
.unwrap();
436-
437-
let block = block.ptr();
438-
let hash = Value::Bytes(scalar::Bytes::from(block.hash_slice().to_vec()));
439-
let data = data_schema
440-
.make_entity(vec![
441-
("id".into(), hash.clone()),
442-
(
443-
"number".into(),
444-
Value::BigInt(scalar::BigInt::from(block.block_number())),
445-
),
446-
("hash".into(), hash),
447-
])
448-
.unwrap();
449-
450-
data
450+
entity: &Vec<Entity>,
451+
) -> Vec<subgraph::TriggerData> {
452+
entity
453+
.iter()
454+
.map(|e| subgraph::TriggerData {
455+
source: filter.subgraph.clone(),
456+
entity: e.clone(),
457+
entity_type: filter.entities.first().unwrap().clone(),
458+
})
459+
.collect()
451460
}
452461
}
453462

tests/tests/integration_tests.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -522,16 +522,15 @@ async fn subgraph_data_sources(ctx: TestContext) -> anyhow::Result<()> {
522522
assert!(subgraph.healthy);
523523
let expected_response = json!({
524524
"mirrorBlocks": [
525-
{ "number": "0" },
526-
{ "number": "1" },
525+
{ "number": "1" },
527526
{ "number": "2" },
528-
{ "number": "3" },
529-
{ "number": "4" },
530-
{ "number": "5" },
527+
{ "number": "3" },
528+
{ "number": "4" },
529+
{ "number": "5" },
531530
{ "number": "6" },
532-
{ "number": "7" },
533-
{ "number": "8" },
534-
{ "number": "9" },
531+
{ "number": "7" },
532+
{ "number": "8" },
533+
{ "number": "9" },
535534
]
536535
});
537536

0 commit comments

Comments
 (0)