Skip to content

Commit a0d12dd

Browse files
Zoran Cvetkovincrypto32
authored andcommitted
Add casuality region to the SQL querry
1 parent af722c6 commit a0d12dd

File tree

5 files changed

+43
-66
lines changed

5 files changed

+43
-66
lines changed

graph/src/blockchain/block_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::blockchain::SubgraphFilter;
2-
use crate::data_source::subgraph;
2+
use crate::data_source::{subgraph, CausalityRegion};
33
use crate::substreams::Clock;
44
use crate::substreams_rpc::response::Message as SubstreamsMessage;
55
use crate::substreams_rpc::BlockScopedData;
@@ -456,7 +456,7 @@ async fn get_entities_for_range(
456456
let entity_type = schema.entity_type(entity_name)?;
457457
entity_types.push(entity_type);
458458
}
459-
Ok(store.get_range(entity_types, from..to)?)
459+
Ok(store.get_range(entity_types, CausalityRegion::ONCHAIN, from..to)?)
460460
}
461461

462462
impl<C: Blockchain> TriggersAdapterWrapper<C> {

store/postgres/src/deployment_store.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1060,11 +1060,12 @@ impl DeploymentStore {
10601060
&self,
10611061
site: Arc<Site>,
10621062
entity_types: Vec<EntityType>,
1063+
causality_region: CausalityRegion,
10631064
block_range: Range<BlockNumber>,
10641065
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
10651066
let mut conn = self.get_conn()?;
10661067
let layout = self.layout(&mut conn, site)?;
1067-
layout.find_range(&mut conn, entity_types, block_range)
1068+
layout.find_range(&mut conn, entity_types, causality_region, block_range)
10681069
}
10691070

10701071
pub(crate) fn get_derived(

store/postgres/src/relational.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,7 @@ impl Layout {
520520
&self,
521521
conn: &mut PgConnection,
522522
entity_types: Vec<EntityType>,
523+
causality_region: CausalityRegion,
523524
block_range: Range<BlockNumber>,
524525
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
525526
let mut tables = vec![];
@@ -529,11 +530,11 @@ impl Layout {
529530
et_map.insert(et.to_string(), Arc::new(et));
530531
}
531532
let mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>> = BTreeMap::new();
532-
let lower_vec = FindRangeQuery::new(&tables, false, block_range.clone())
533+
let lower_vec = FindRangeQuery::new(&tables, causality_region, false, block_range.clone())
533534
.get_results::<EntityDataExt>(conn)
534535
.optional()?
535536
.unwrap_or_default();
536-
let upper_vec = FindRangeQuery::new(&tables, true, block_range)
537+
let upper_vec = FindRangeQuery::new(&tables, causality_region, true, block_range)
537538
.get_results::<EntityDataExt>(conn)
538539
.optional()?
539540
.unwrap_or_default();

store/postgres/src/relational_queries.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2024,6 +2024,7 @@ impl<'a, Conn> RunQueryDsl<Conn> for FindQuery<'a> {}
20242024
#[derive(Debug, Clone)]
20252025
pub struct FindRangeQuery<'a> {
20262026
tables: &'a Vec<&'a Table>,
2027+
causality_region: CausalityRegion,
20272028
is_upper_range: bool,
20282029
imm_range: EntityBlockRange,
20292030
mut_range: EntityBlockRange,
@@ -2032,13 +2033,15 @@ pub struct FindRangeQuery<'a> {
20322033
impl<'a> FindRangeQuery<'a> {
20332034
pub fn new(
20342035
tables: &'a Vec<&Table>,
2036+
causality_region: CausalityRegion,
20352037
is_upper_range: bool,
20362038
block_range: Range<BlockNumber>,
20372039
) -> Self {
20382040
let imm_range = EntityBlockRange::new(true, block_range.clone(), false);
20392041
let mut_range = EntityBlockRange::new(false, block_range, is_upper_range);
20402042
Self {
20412043
tables,
2044+
causality_region,
20422045
is_upper_range,
20432046
imm_range,
20442047
mut_range,
@@ -2075,12 +2078,12 @@ impl<'a> QueryFragment<Pg> for FindRangeQuery<'a> {
20752078
out.push_sql(" from ");
20762079
out.push_sql(table.qualified_name.as_str());
20772080
out.push_sql(" e\n where");
2078-
// TODO: add casuality region to the query
2079-
// if self.table.has_causality_region {
2080-
// out.push_sql("causality_region = ");
2081-
// out.push_bind_param::<Integer, _>(&self.key.causality_region)?;
2082-
// out.push_sql(" and ");
2083-
// }
2081+
// add casuality region to the query
2082+
if table.has_causality_region {
2083+
out.push_sql("causality_region = ");
2084+
out.push_bind_param::<Integer, _>(&self.causality_region)?;
2085+
out.push_sql(" and ");
2086+
}
20842087
if table.immutable {
20852088
self.imm_range.contains(&mut out)?;
20862089
} else {

store/test-store/tests/postgres/writable.rs

Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -376,58 +376,30 @@ fn read_range_test() {
376376
let subgraph_store = store.subgraph_store();
377377
writable.deployment_synced().unwrap();
378378

379-
// First test sourceable store with individual types
380-
let mutable_count = read_range(
381-
store.clone(),
382-
writable.clone(),
383-
sourceable.clone(),
384-
deployment.clone(),
385-
true,
386-
)
387-
.await;
388-
let immutable_count = read_range(
389-
store.clone(),
390-
writable.clone(),
391-
sourceable.clone(),
392-
deployment.clone(),
393-
false,
394-
)
395-
.await;
396-
assert_eq!(mutable_count, 4); // Fixed: range is half-open
397-
assert_eq!(immutable_count, 4); // Fixed: range is half-open
398-
399-
// Then test writable store with the detailed entity checks
400-
for count in 1..=5 {
401-
insert_count(&subgraph_store, &deployment, count, 2 * count, true).await;
402-
}
403-
writable.flush().await.unwrap();
404-
writable.deployment_synced().unwrap();
405-
406-
let br: Range<BlockNumber> = 0..18;
407-
let entity_types = vec![COUNTER_TYPE.clone(), COUNTER2_TYPE.clone()];
408-
let e: BTreeMap<i32, Vec<EntityWithType>> = writable
409-
.get_range(entity_types.clone(), br.clone())
410-
.unwrap();
411-
assert_eq!(e.len(), 5);
412-
for en in &e {
413-
let index = *en.0 - 1;
414-
let a = result_entities[index as usize];
415-
assert_eq!(a, format!("{:?}", en));
416-
}
417-
418-
for count in 6..=7 {
419-
insert_count(&subgraph_store, &deployment, count, 2 * count, true).await;
420-
}
421-
writable.flush().await.unwrap();
422-
writable.deployment_synced().unwrap();
423-
424-
let e: BTreeMap<i32, Vec<EntityWithType>> = writable.get_range(entity_types, br).unwrap();
425-
assert_eq!(e.len(), 7);
426-
for en in &e {
427-
let index = *en.0 - 1;
428-
let a = result_entities[index as usize];
429-
assert_eq!(a, format!("{:?}", en));
430-
}
431-
},
432-
)
433-
}
379+
let br: Range<BlockNumber> = 0..18;
380+
let entity_types = vec![COUNTER_TYPE.clone(), COUNTER2_TYPE.clone()];
381+
let e: BTreeMap<i32, Vec<EntityWithType>> = writable
382+
.get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone())
383+
.unwrap();
384+
assert_eq!(e.len(), 5);
385+
for en in &e {
386+
let index = *en.0 - 1;
387+
let a = result_entities[index as usize];
388+
assert_eq!(a, format!("{:?}", en));
389+
}
390+
for count in 6..=7 {
391+
insert_count(&subgraph_store, &deployment, count, 2 * count, true).await;
392+
}
393+
writable.flush().await.unwrap();
394+
writable.deployment_synced().unwrap();
395+
let e: BTreeMap<i32, Vec<EntityWithType>> = writable
396+
.get_range(entity_types, CausalityRegion::ONCHAIN, br)
397+
.unwrap();
398+
assert_eq!(e.len(), 7);
399+
for en in &e {
400+
let index = *en.0 - 1;
401+
let a = result_entities[index as usize];
402+
assert_eq!(a, format!("{:?}", en));
403+
}
404+
})
405+
}

0 commit comments

Comments
 (0)