Skip to content

Commit fe363f3

Browse files
committed
Subgraph composition: sql more entities
1 parent d48f6b8 commit fe363f3

File tree

8 files changed

+412
-220
lines changed

8 files changed

+412
-220
lines changed

graph/src/blockchain/block_stream.rs

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::blockchain::SubgraphFilter;
2-
use crate::data_source::subgraph;
2+
use crate::data_source::{subgraph, CausalityRegion};
33
use crate::substreams::Clock;
44
use crate::substreams_rpc::response::Message as SubstreamsMessage;
55
use crate::substreams_rpc::BlockScopedData;
@@ -433,9 +433,19 @@ async fn scan_subgraph_triggers<C: Blockchain>(
433433
}
434434
}
435435

436+
#[derive(Debug)]
437+
pub enum EntitySubgraphOperation {
438+
Create,
439+
Modify,
440+
Delete,
441+
}
442+
443+
#[derive(Debug)]
436444
pub struct EntityWithType {
445+
pub entity_op: EntitySubgraphOperation,
437446
pub entity_type: EntityType,
438447
pub entity: Entity,
448+
pub vid: i64,
439449
}
440450

441451
async fn get_entities_for_range(
@@ -445,32 +455,12 @@ async fn get_entities_for_range(
445455
from: BlockNumber,
446456
to: BlockNumber,
447457
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, Error> {
448-
let mut entities_by_block = BTreeMap::new();
449-
450-
for entity_name in &filter.entities {
451-
let entity_type = schema.entity_type(entity_name)?;
452-
453-
let entity_ranges = store.get_range(&entity_type, from..to)?;
454-
455-
for (block_number, entity_vec) in entity_ranges {
456-
let mut entity_vec = entity_vec
457-
.into_iter()
458-
.map(|e| EntityWithType {
459-
entity_type: entity_type.clone(),
460-
entity: e,
461-
})
462-
.collect();
463-
464-
entities_by_block
465-
.entry(block_number)
466-
.and_modify(|existing_vec: &mut Vec<EntityWithType>| {
467-
existing_vec.append(&mut entity_vec);
468-
})
469-
.or_insert(entity_vec);
470-
}
471-
}
472-
473-
Ok(entities_by_block)
458+
let entity_types: Result<Vec<EntityType>> = filter
459+
.entities
460+
.iter()
461+
.map(|name| schema.entity_type(name))
462+
.collect();
463+
Ok(store.get_range(entity_types?, CausalityRegion::ONCHAIN, from..to)?)
474464
}
475465

476466
impl<C: Blockchain> TriggersAdapterWrapper<C> {

graph/src/components/store/traits.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use async_trait::async_trait;
66
use web3::types::{Address, H256};
77

88
use super::*;
9-
use crate::blockchain::block_stream::FirehoseCursor;
9+
use crate::blockchain::block_stream::{EntityWithType, FirehoseCursor};
1010
use crate::blockchain::{BlockTime, ChainIdentifier};
1111
use crate::components::metrics::stopwatch::StopwatchMetrics;
1212
use crate::components::server::index_node::VersionInfo;
@@ -299,9 +299,10 @@ pub trait SourceableStore: Sync + Send + 'static {
299299
/// changed in the given block_range.
300300
fn get_range(
301301
&self,
302-
entity_type: &EntityType,
302+
entity_types: Vec<EntityType>,
303+
causality_region: CausalityRegion,
303304
block_range: Range<BlockNumber>,
304-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;
305+
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError>;
305306

306307
fn input_schema(&self) -> InputSchema;
307308

@@ -314,10 +315,11 @@ pub trait SourceableStore: Sync + Send + 'static {
314315
impl<T: ?Sized + SourceableStore> SourceableStore for Arc<T> {
315316
fn get_range(
316317
&self,
317-
entity_type: &EntityType,
318+
entity_types: Vec<EntityType>,
319+
causality_region: CausalityRegion,
318320
block_range: Range<BlockNumber>,
319-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
320-
(**self).get_range(entity_type, block_range)
321+
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
322+
(**self).get_range(entity_types, causality_region, block_range)
321323
}
322324

323325
fn input_schema(&self) -> InputSchema {

store/postgres/src/block_range.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,36 +132,52 @@ impl<'a> QueryFragment<Pg> for BlockRangeUpperBoundClause<'a> {
132132
}
133133
}
134134

135+
#[derive(Debug, Clone, Copy)]
136+
pub enum BoundSide {
137+
Lower,
138+
Upper,
139+
}
140+
135141
/// Helper for generating SQL fragments for selecting entities in a specific block range
136142
#[derive(Debug, Clone, Copy)]
137143
pub enum EntityBlockRange {
138-
Mutable(BlockRange), // TODO: check if this is a proper type here (maybe Range<BlockNumber>?)
144+
Mutable((BlockRange, BoundSide)),
139145
Immutable(BlockRange),
140146
}
141147

142148
impl EntityBlockRange {
143-
pub fn new(table: &Table, block_range: std::ops::Range<BlockNumber>) -> Self {
149+
pub fn new(
150+
immutable: bool,
151+
block_range: std::ops::Range<BlockNumber>,
152+
bound_side: BoundSide,
153+
) -> Self {
144154
let start: Bound<BlockNumber> = Bound::Included(block_range.start);
145155
let end: Bound<BlockNumber> = Bound::Excluded(block_range.end);
146156
let block_range: BlockRange = BlockRange(start, end);
147-
if table.immutable {
157+
if immutable {
148158
Self::Immutable(block_range)
149159
} else {
150-
Self::Mutable(block_range)
160+
Self::Mutable((block_range, bound_side))
151161
}
152162
}
153163

154-
/// Output SQL that matches only rows whose block range contains `block`.
164+
/// Outputs SQL that matches only rows whose entities would trigger a change
165+
/// event (Create, Modify, Delete) in a given interval of blocks. Otherwise said
166+
/// a block_range border is contained in an interval of blocks. For instance
167+
/// one of the following:
168+
/// lower(block_range) >= $1 and lower(block_range) <= $2
169+
/// upper(block_range) >= $1 and upper(block_range) <= $2
170+
/// block$ >= $1 and block$ <= $2
155171
pub fn contains<'b>(&'b self, out: &mut AstPass<'_, 'b, Pg>) -> QueryResult<()> {
156172
out.unsafe_to_cache_prepared();
157173
let block_range = match self {
158-
EntityBlockRange::Mutable(br) => br,
174+
EntityBlockRange::Mutable((br, _)) => br,
159175
EntityBlockRange::Immutable(br) => br,
160176
};
161177
let BlockRange(start, finish) = block_range;
162178

163179
self.compare_column(out);
164-
out.push_sql(" >= ");
180+
out.push_sql(">= ");
165181
match start {
166182
Bound::Included(block) => out.push_bind_param::<Integer, _>(block)?,
167183
Bound::Excluded(block) => {
@@ -170,9 +186,9 @@ impl EntityBlockRange {
170186
}
171187
Bound::Unbounded => unimplemented!(),
172188
};
173-
out.push_sql(" AND ");
189+
out.push_sql(" and");
174190
self.compare_column(out);
175-
out.push_sql(" <= ");
191+
out.push_sql("<= ");
176192
match finish {
177193
Bound::Included(block) => {
178194
out.push_bind_param::<Integer, _>(block)?;
@@ -186,7 +202,12 @@ impl EntityBlockRange {
186202

187203
pub fn compare_column(&self, out: &mut AstPass<Pg>) {
188204
match self {
189-
EntityBlockRange::Mutable(_) => out.push_sql(" lower(block_range) "),
205+
EntityBlockRange::Mutable((_, BoundSide::Lower)) => {
206+
out.push_sql(" lower(block_range) ")
207+
}
208+
EntityBlockRange::Mutable((_, BoundSide::Upper)) => {
209+
out.push_sql(" upper(block_range) ")
210+
}
190211
EntityBlockRange::Immutable(_) => out.push_sql(" block$ "),
191212
}
192213
}

store/postgres/src/deployment_store.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use diesel::pg::PgConnection;
44
use diesel::r2d2::{ConnectionManager, PooledConnection};
55
use diesel::{prelude::*, sql_query};
66
use graph::anyhow::Context;
7-
use graph::blockchain::block_stream::FirehoseCursor;
7+
use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor};
88
use graph::blockchain::BlockTime;
99
use graph::components::store::write::RowGroup;
1010
use graph::components::store::{
@@ -1066,12 +1066,13 @@ impl DeploymentStore {
10661066
pub(crate) fn get_range(
10671067
&self,
10681068
site: Arc<Site>,
1069-
entity_type: &EntityType,
1069+
entity_types: Vec<EntityType>,
1070+
causality_region: CausalityRegion,
10701071
block_range: Range<BlockNumber>,
1071-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
1072+
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
10721073
let mut conn = self.get_conn()?;
10731074
let layout = self.layout(&mut conn, site)?;
1074-
layout.find_range(&mut conn, entity_type, block_range)
1075+
layout.find_range(&mut conn, entity_types, causality_region, block_range)
10751076
}
10761077

10771078
pub(crate) fn get_derived(

store/postgres/src/relational.rs

Lines changed: 125 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use diesel::{connection::SimpleConnection, Connection};
2828
use diesel::{
2929
debug_query, sql_query, OptionalExtension, PgConnection, QueryDsl, QueryResult, RunQueryDsl,
3030
};
31+
use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType};
3132
use graph::blockchain::BlockTime;
3233
use graph::cheap_clone::CheapClone;
3334
use graph::components::store::write::{RowGroup, WriteChunk};
@@ -57,8 +58,8 @@ use std::time::{Duration, Instant};
5758

5859
use crate::relational::value::{FromOidRow, OidRow};
5960
use crate::relational_queries::{
60-
ConflictingEntitiesData, ConflictingEntitiesQuery, FindChangesQuery, FindDerivedQuery,
61-
FindPossibleDeletionsQuery, ReturnedEntityData,
61+
ConflictingEntitiesData, ConflictingEntitiesQuery, EntityDataExt, FindChangesQuery,
62+
FindDerivedQuery, FindPossibleDeletionsQuery, ReturnedEntityData,
6263
};
6364
use crate::{
6465
primary::{Namespace, Site},
@@ -75,7 +76,7 @@ use graph::prelude::{
7576
QueryExecutionError, StoreError, StoreEvent, ValueType, BLOCK_NUMBER_MAX,
7677
};
7778

78-
use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
79+
use crate::block_range::{BoundSide, BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
7980
pub use crate::catalog::Catalog;
8081
use crate::connection_pool::ForeignServer;
8182
use crate::{catalog, deployment};
@@ -545,21 +546,129 @@ impl Layout {
545546
pub fn find_range(
546547
&self,
547548
conn: &mut PgConnection,
548-
entity_type: &EntityType,
549+
entity_types: Vec<EntityType>,
550+
causality_region: CausalityRegion,
549551
block_range: Range<BlockNumber>,
550-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
551-
let table = self.table_for_entity(entity_type)?;
552-
let mut entities: BTreeMap<BlockNumber, Vec<Entity>> = BTreeMap::new();
553-
if let Some(vec) = FindRangeQuery::new(table.as_ref(), block_range)
554-
.get_results::<EntityData>(conn)
555-
.optional()?
556-
{
557-
for e in vec {
558-
let block = e.clone().deserialize_block_number::<Entity>()?;
559-
let en = e.deserialize_with_layout::<Entity>(self, None)?;
560-
entities.entry(block).or_default().push(en);
561-
}
552+
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
553+
let mut tables = vec![];
554+
for et in entity_types {
555+
tables.push(self.table_for_entity(&et)?.as_ref());
562556
}
557+
let mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>> = BTreeMap::new();
558+
559+
// Collect all entities that have their 'lower(block_range)' attribute in the
560+
// interval of blocks defined by the variable block_range. For the immutable
561+
// entities the respective attribute is 'block$'.
562+
// Here are all entities that are created or modified in the block_range.
563+
let lower_vec = FindRangeQuery::new(
564+
&tables,
565+
causality_region,
566+
BoundSide::Lower,
567+
block_range.clone(),
568+
)
569+
.get_results::<EntityDataExt>(conn)
570+
.optional()?
571+
.unwrap_or_default();
572+
// Collect all entities that have their 'upper(block_range)' attribute in the
573+
// interval of blocks defined by the variable block_range. For the immutable
574+
// entities no entries are returned.
575+
// Here are all entities that are modified or deleted in the block_range,
576+
// but will have the previous versions, i.e. in the case of an update, it's
577+
// the version before the update, and lower_vec will have a corresponding
578+
// entry with the new version.
579+
let upper_vec =
580+
FindRangeQuery::new(&tables, causality_region, BoundSide::Upper, block_range)
581+
.get_results::<EntityDataExt>(conn)
582+
.optional()?
583+
.unwrap_or_default();
584+
let mut lower_iter = lower_vec.iter().fuse().peekable();
585+
let mut upper_iter = upper_vec.iter().fuse().peekable();
586+
let mut lower_now = lower_iter.next();
587+
let mut upper_now = upper_iter.next();
588+
// A closure to convert the entity data from the database into entity operation.
589+
let transform = |ede: &EntityDataExt,
590+
entity_op: EntitySubgraphOperation|
591+
-> Result<(EntityWithType, BlockNumber), StoreError> {
592+
let e = EntityData::new(ede.entity.clone(), ede.data.clone());
593+
let block = ede.block_number;
594+
let entity_type = e.entity_type(&self.input_schema);
595+
let entity = e.deserialize_with_layout::<Entity>(self, None)?;
596+
let vid = ede.vid;
597+
let ewt = EntityWithType {
598+
entity_op,
599+
entity_type,
600+
entity,
601+
vid,
602+
};
603+
Ok((ewt, block))
604+
};
605+
606+
// The algorithm is a similar to merge sort algorithm and it relays on the fact that both vectors
607+
// are ordered by (block_number, entity_type, entity_id). It advances simultaneously entities from
608+
// both lower_vec and upper_vec and tries to match entities that have entries in both vectors for
609+
// a particular block. The match is successful if an entry in one array has the same values in the
610+
// other one for the number of the block, entity type and the entity id. The comparison operation
611+
// over the EntityDataExt implements that check. If there is a match it’s a modification operation,
612+
// since both sides of a range are present for that block, entity type and id. If one side of the
613+
// range exists and the other is missing it is a creation or deletion depending on which side is
614+
// present. For immutable entities the entries in upper_vec are missing, hence they are considered
615+
// having a lower bound at particular block and upper bound at infinity.
616+
while lower_now.is_some() || upper_now.is_some() {
617+
let (ewt, block) = match (lower_now, upper_now) {
618+
(Some(lower), Some(upper)) => {
619+
match lower.cmp(&upper) {
620+
std::cmp::Ordering::Greater => {
621+
// we have upper bound at this block, but no lower bounds at the same block so it's deletion
622+
let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?;
623+
// advance upper_vec pointer
624+
upper_now = upper_iter.next();
625+
(ewt, block)
626+
}
627+
std::cmp::Ordering::Less => {
628+
// we have lower bound at this block but no upper bound at the same block so its creation
629+
let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?;
630+
// advance lower_vec pointer
631+
lower_now = lower_iter.next();
632+
(ewt, block)
633+
}
634+
std::cmp::Ordering::Equal => {
635+
let (ewt, block) = transform(lower, EntitySubgraphOperation::Modify)?;
636+
// advance both lower_vec and upper_vec pointers
637+
lower_now = lower_iter.next();
638+
upper_now = upper_iter.next();
639+
(ewt, block)
640+
}
641+
}
642+
}
643+
(Some(lower), None) => {
644+
// we have lower bound at this block but no upper bound at the same block so its creation
645+
let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?;
646+
// advance lower_vec pointer
647+
lower_now = lower_iter.next();
648+
(ewt, block)
649+
}
650+
(None, Some(upper)) => {
651+
let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?;
652+
// advance upper_vec pointer
653+
upper_now = upper_iter.next();
654+
(ewt, block)
655+
}
656+
_ => panic!("Imposible case to happen"),
657+
};
658+
659+
match entities.get_mut(&block) {
660+
Some(vec) => vec.push(ewt),
661+
None => {
662+
let _ = entities.insert(block, vec![ewt]);
663+
}
664+
};
665+
}
666+
667+
// sort the elements in each blocks bucket by vid
668+
for (_, vec) in &mut entities {
669+
vec.sort_by(|a, b| a.vid.cmp(&b.vid));
670+
}
671+
563672
Ok(entities)
564673
}
565674

0 commit comments

Comments
 (0)