Skip to content

Commit 472f270

Browse files
author
Zoran Cvetkov
committed
deduce the types of operation create, modify or delete
1 parent 5de07ca commit 472f270

File tree

2 files changed

+124
-55
lines changed

2 files changed

+124
-55
lines changed

store/postgres/src/relational.rs

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ use std::sync::{Arc, Mutex};
5353
use std::time::{Duration, Instant};
5454

5555
use crate::relational_queries::{
56-
ConflictingEntitiesData, ConflictingEntitiesQuery, FindChangesQuery, FindDerivedQuery,
57-
FindPossibleDeletionsQuery, ReturnedEntityData,
56+
ConflictingEntitiesData, ConflictingEntitiesQuery, EntityDataExt, FindChangesQuery,
57+
FindDerivedQuery, FindPossibleDeletionsQuery, ReturnedEntityData,
5858
};
5959
use crate::{
6060
primary::{Namespace, Site},
@@ -529,28 +529,85 @@ impl Layout {
529529
et_map.insert(et.to_string(), Arc::new(et));
530530
}
531531
let mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>> = BTreeMap::new();
532-
if let Some(vec) = FindRangeQuery::new(&tables, false, block_range)
533-
.get_results::<EntityData>(conn)
532+
let lower_vec = FindRangeQuery::new(&tables, false, block_range.clone())
533+
.get_results::<EntityDataExt>(conn)
534534
.optional()?
535-
{
536-
// TODO: issue query with upper range and find modifictions and deletions
537-
for e in vec {
538-
let block = e.clone().deserialize_block_number::<Entity>()?;
539-
let entity_type = e.entity_type(&self.input_schema);
540-
let entity = e.deserialize_with_layout::<Entity>(self, None)?;
541-
let entity_op = EntitySubgraphOperation::Create;
542-
let ewt = EntityWithType {
543-
entity_op,
544-
entity_type,
545-
entity,
546-
};
547-
match entities.get_mut(&block) {
548-
Some(vec) => vec.push(ewt),
549-
None => {
550-
let _ = entities.insert(block, vec![ewt]);
535+
.unwrap_or_default();
536+
let upper_vec = FindRangeQuery::new(&tables, true, block_range)
537+
.get_results::<EntityDataExt>(conn)
538+
.optional()?
539+
.unwrap_or_default();
540+
let mut lower_iter = lower_vec.iter().fuse().peekable();
541+
let mut upper_iter = upper_vec.iter().fuse().peekable();
542+
let mut lower_now = lower_iter.next();
543+
let mut upper_now = upper_iter.next();
544+
let mut lower = lower_now.unwrap_or(&EntityDataExt::default()).clone();
545+
let mut upper = upper_now.unwrap_or(&EntityDataExt::default()).clone();
546+
let transform = |ede: EntityDataExt,
547+
entity_op: EntitySubgraphOperation|
548+
-> Result<(EntityWithType, BlockNumber), StoreError> {
549+
let e = EntityData::new(ede.entity, ede.data);
550+
let block = ede.block_number;
551+
let entity_type = e.entity_type(&self.input_schema);
552+
let entity = e.deserialize_with_layout::<Entity>(self, None)?;
553+
let ewt = EntityWithType {
554+
entity_op,
555+
entity_type,
556+
entity,
557+
};
558+
Ok((ewt, block))
559+
};
560+
while lower_now.is_some() || upper_now.is_some() {
561+
let (ewt, block) = if lower_now.is_some() {
562+
if upper_now.is_some() {
563+
if lower > upper {
564+
// we have upper bound at this block, but no lower bounds at the same block so it's deletion
565+
let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?;
566+
// advance upper
567+
upper_now = upper_iter.next();
568+
upper = upper_now.unwrap_or(&EntityDataExt::default()).clone();
569+
(ewt, block)
570+
} else if lower < upper {
571+
// we have lower bound at this block but no upper bound at the same block so its creation
572+
let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?;
573+
// advance lower
574+
lower_now = lower_iter.next();
575+
lower = lower_now.unwrap_or(&EntityDataExt::default()).clone();
576+
(ewt, block)
577+
} else {
578+
assert!(upper == lower);
579+
let (ewt, block) = transform(lower, EntitySubgraphOperation::Modify)?;
580+
// advance both
581+
lower_now = lower_iter.next();
582+
lower = lower_now.unwrap_or(&EntityDataExt::default()).clone();
583+
upper_now = upper_iter.next();
584+
upper = upper_now.unwrap_or(&EntityDataExt::default()).clone();
585+
(ewt, block)
551586
}
552-
};
553-
}
587+
} else {
588+
// we have lower bound at this block but no upper bound at the same block so its creation
589+
let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?;
590+
// advance lower
591+
lower_now = lower_iter.next();
592+
lower = lower_now.unwrap_or(&EntityDataExt::default()).clone();
593+
(ewt, block)
594+
}
595+
} else {
596+
// we have upper bound at this block, but no lower bounds at all so it's deletion
597+
assert!(upper_now.is_some());
598+
let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?;
599+
// advance upper
600+
upper_now = upper_iter.next();
601+
upper = upper_now.unwrap_or(&EntityDataExt::default()).clone();
602+
(ewt, block)
603+
};
604+
605+
match entities.get_mut(&block) {
606+
Some(vec) => vec.push(ewt),
607+
None => {
608+
let _ = entities.insert(block, vec![ewt]);
609+
}
610+
};
554611
}
555612
Ok(entities)
556613
}

store/postgres/src/relational_queries.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use graph::data::store::{Id, IdType, NULL};
1717
use graph::data::store::{IdList, IdRef, QueryObject};
1818
use graph::data::value::{Object, Word};
1919
use graph::data_source::CausalityRegion;
20-
use graph::prelude::regex::Regex;
2120
use graph::prelude::{
2221
anyhow, r, serde_json, BlockNumber, ChildMultiplicity, Entity, EntityCollection, EntityFilter,
2322
EntityLink, EntityOrder, EntityOrderByChild, EntityOrderByChildInfo, EntityRange, EntityWindow,
@@ -27,6 +26,7 @@ use graph::schema::{EntityKey, EntityType, FulltextAlgorithm, FulltextConfig, In
2726
use graph::{components::store::AttributeNames, data::store::scalar};
2827
use inflector::Inflector;
2928
use itertools::Itertools;
29+
use std::cmp::Ordering;
3030
use std::collections::{BTreeMap, BTreeSet, HashSet};
3131
use std::convert::TryFrom;
3232
use std::fmt::{self, Display};
@@ -478,40 +478,12 @@ pub struct EntityData {
478478
}
479479

480480
impl EntityData {
481-
pub fn entity_type(&self, schema: &InputSchema) -> EntityType {
482-
schema.entity_type(&self.entity).unwrap()
481+
pub fn new(entity: String, data: serde_json::Value) -> EntityData {
482+
EntityData { entity, data }
483483
}
484484

485-
pub fn deserialize_block_number<T: FromEntityData>(self) -> Result<BlockNumber, StoreError> {
486-
use serde_json::Value as j;
487-
match self.data {
488-
j::Object(map) => {
489-
let mut entries = map.into_iter().filter_map(move |(key, json)| {
490-
if key == "block_range" {
491-
let r = json.as_str().unwrap();
492-
let rx = Regex::new("\\[(?P<start>[0-9]+),([0-9]+)?\\)").unwrap();
493-
let cap = rx.captures(r).unwrap();
494-
let start = cap
495-
.name("start")
496-
.map(|mtch| mtch.as_str().to_string())
497-
.unwrap();
498-
let n = start.parse::<i32>().unwrap();
499-
Some(n)
500-
} else if key == "block$" {
501-
let block = json.as_i64().unwrap() as i32;
502-
Some(block)
503-
} else {
504-
None
505-
}
506-
});
507-
let en = entries.next().unwrap();
508-
assert!(entries.next().is_none()); // there should be just one block_range field
509-
Ok(en)
510-
}
511-
_ => unreachable!(
512-
"we use `to_json` in our queries, and will therefore always get an object back"
513-
),
514-
}
485+
pub fn entity_type(&self, schema: &InputSchema) -> EntityType {
486+
schema.entity_type(&self.entity).unwrap()
515487
}
516488

517489
/// Map the `EntityData` using the schema information in `Layout`
@@ -586,6 +558,46 @@ impl EntityData {
586558
}
587559
}
588560

561+
#[derive(QueryableByName, Clone, Debug, Default, Eq)]
562+
pub struct EntityDataExt {
563+
#[diesel(sql_type = Text)]
564+
pub entity: String,
565+
#[diesel(sql_type = Jsonb)]
566+
pub data: serde_json::Value,
567+
#[diesel(sql_type = Integer)]
568+
pub block_number: i32,
569+
#[diesel(sql_type = Text)]
570+
pub id: String,
571+
}
572+
573+
impl Ord for EntityDataExt {
574+
fn cmp(&self, other: &Self) -> Ordering {
575+
let ord = self.block_number.cmp(&other.block_number);
576+
if ord != Ordering::Equal {
577+
ord
578+
} else {
579+
let ord = self.entity.cmp(&other.entity);
580+
if ord != Ordering::Equal {
581+
ord
582+
} else {
583+
self.id.cmp(&other.id)
584+
}
585+
}
586+
}
587+
}
588+
589+
impl PartialOrd for EntityDataExt {
590+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
591+
Some(self.cmp(other))
592+
}
593+
}
594+
595+
impl PartialEq for EntityDataExt {
596+
fn eq(&self, other: &Self) -> bool {
597+
self.cmp(other) == Ordering::Equal
598+
}
599+
}
600+
589601
/// The equivalent of `graph::data::store::Value` but in a form that does
590602
/// not require further transformation during `walk_ast`. This form takes
591603
/// the idiosyncrasies of how we serialize values into account (e.g., that

0 commit comments

Comments
 (0)