diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index dfaae80f76a..24493343b0f 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -453,6 +453,7 @@ impl EntityCache { for (key, update) in self.updates { use EntityModification::*; + let is_poi = key.entity_type.is_poi(); let current = self.current.remove(&key).and_then(|entity| entity); let modification = match (current, update) { // Entity was created @@ -461,11 +462,13 @@ impl EntityCache { updates.remove_null_fields(); let data = Arc::new(updates); self.current.insert(key.clone(), Some(data.cheap_clone())); + let vid = if is_poi { 0 } else { data.vid() }; Some(Insert { key, data, block, end: None, + vid, }) } // Entity may have been changed @@ -476,11 +479,13 @@ impl EntityCache { let data = Arc::new(data); self.current.insert(key.clone(), Some(data.cheap_clone())); if current != data { + let vid = if is_poi { 0 } else { data.vid() }; Some(Overwrite { key, data, block, end: None, + vid, }) } else { None @@ -491,11 +496,13 @@ impl EntityCache { let data = Arc::new(data); self.current.insert(key.clone(), Some(data.clone())); if current != data { + let vid = if is_poi { 0 } else { data.vid() }; Some(Overwrite { key, data, block, end: None, + vid, }) } else { None diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 721e3d80bc1..6dd2cda472b 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -45,6 +45,7 @@ pub enum EntityModification { data: Arc, block: BlockNumber, end: Option, + vid: i64, }, /// Update the entity by overwriting it Overwrite { @@ -52,6 +53,7 @@ pub enum EntityModification { data: Arc, block: BlockNumber, end: Option, + vid: i64, }, /// Remove the entity Remove { key: EntityKey, block: BlockNumber }, @@ -67,6 +69,7 @@ pub struct EntityWrite<'a> { // The end of the block range for which this write is valid. The value // of `end` itself is not included in the range pub end: Option, + pub vid: i64, } impl std::fmt::Display for EntityWrite<'_> { @@ -89,24 +92,28 @@ impl<'a> TryFrom<&'a EntityModification> for EntityWrite<'a> { data, block, end, + vid, } => Ok(EntityWrite { id: &key.entity_id, entity: data, causality_region: key.causality_region, block: *block, end: *end, + vid: *vid, }), EntityModification::Overwrite { key, data, block, end, + vid, } => Ok(EntityWrite { id: &key.entity_id, entity: &data, causality_region: key.causality_region, block: *block, end: *end, + vid: *vid, }), EntityModification::Remove { .. } => Err(()), @@ -213,11 +220,13 @@ impl EntityModification { data, block, end, + vid, } => Ok(Insert { key, data, block, end, + vid, }), Remove { key, .. } => { return Err(constraint_violation!( @@ -271,21 +280,23 @@ impl EntityModification { } impl EntityModification { - pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self { + pub fn insert(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self { EntityModification::Insert { key, data: Arc::new(data), block, end: None, + vid, } } - pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self { + pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self { EntityModification::Overwrite { key, data: Arc::new(data), block, end: None, + vid, } } @@ -1017,18 +1028,21 @@ mod test { let value = value.clone(); let key = THING_TYPE.parse_key("one").unwrap(); + let vid = 0; match value { Ins(block) => EntityModification::Insert { key, data: Arc::new(entity! { SCHEMA => id: "one", count: block }), block, end: None, + vid, }, Ovw(block) => EntityModification::Overwrite { key, data: Arc::new(entity! { SCHEMA => id: "one", count: block }), block, end: None, + vid, }, Rem(block) => EntityModification::Remove { key, block }, InsC(block, end) => EntityModification::Insert { @@ -1036,12 +1050,14 @@ mod test { data: Arc::new(entity! { SCHEMA => id: "one", count: block }), block, end: Some(end), + vid, }, OvwC(block, end) => EntityModification::Overwrite { key, data: Arc::new(entity! { SCHEMA => id: "one", count: block }), block, end: Some(end), + vid, }, } } diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index 470e50334d3..81a483fae77 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -77,6 +77,8 @@ pub struct BlockState { // data source that have been processed. pub processed_data_sources: Vec, + pub vid_seq: i32, + // Marks whether a handler is currently executing. in_handler: bool, @@ -92,6 +94,7 @@ impl BlockState { persisted_data_sources: Vec::new(), handler_created_data_sources: Vec::new(), processed_data_sources: Vec::new(), + vid_seq: 0, in_handler: false, metrics: BlockStateMetrics::new(), } @@ -109,6 +112,7 @@ impl BlockState { persisted_data_sources, handler_created_data_sources, processed_data_sources, + vid_seq: _, in_handler, metrics, } = self; @@ -178,4 +182,9 @@ impl BlockState { pub fn persist_data_source(&mut self, ds: StoredDynamicDataSource) { self.persisted_data_sources.push(ds) } + pub fn next_vid(&mut self, block_number: BlockNumber) -> i64 { + let vid = ((block_number as i64) << 32) + self.vid_seq as i64; + self.vid_seq += 1; + vid + } } diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index 33d9286ceec..9707468146a 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -735,6 +735,9 @@ where lazy_static! { /// The name of the id attribute, `"id"` pub static ref ID: Word = Word::from("id"); + + /// The name of the vid attribute, `"vid"` + pub static ref VID: Word = Word::from("vid"); } /// An entity is represented as a map of attribute names to values. @@ -910,6 +913,13 @@ impl Entity { Id::try_from(self.get("id").unwrap().clone()).expect("the id is set to a valid value") } + pub fn vid(&self) -> i64 { + self.get("vid") + .expect("the vid is set") + .as_int8() + .expect("the vid is set to a valid value") + } + /// Merges an entity update `update` into this entity. /// /// If a key exists in both entities, the value from `update` is chosen. diff --git a/graph/src/schema/input/mod.rs b/graph/src/schema/input/mod.rs index 84897299785..a71be5a500f 100644 --- a/graph/src/schema/input/mod.rs +++ b/graph/src/schema/input/mod.rs @@ -35,6 +35,7 @@ pub(crate) const POI_OBJECT: &str = "Poi$"; const POI_DIGEST: &str = "digest"; /// The name of the PoI attribute for storing the block time const POI_BLOCK_TIME: &str = "blockTime"; +const VID: &str = "vid"; pub mod kw { pub const ENTITY: &str = "entity"; @@ -1487,6 +1488,10 @@ impl InputSchema { } pub fn has_field_with_name(&self, entity_type: &EntityType, field: &str) -> bool { + // TODO: check if it is needed + if field == VID { + return true; + } let field = self.inner.pool.lookup(field); match field { @@ -1597,6 +1602,8 @@ fn atom_pool(document: &s::Document) -> AtomPool { pool.intern(POI_DIGEST); pool.intern(POI_BLOCK_TIME); + pool.intern(VID); + for definition in &document.definitions { match definition { s::Definition::TypeDefinition(typedef) => match typedef { diff --git a/runtime/test/src/test.rs b/runtime/test/src/test.rs index 561820760d4..77082a0a9d6 100644 --- a/runtime/test/src/test.rs +++ b/runtime/test/src/test.rs @@ -477,17 +477,18 @@ async fn test_ipfs_block() { // The user_data value we use with calls to ipfs_map const USER_DATA: &str = "user_data"; -fn make_thing(id: &str, value: &str) -> (String, EntityModification) { - const DOCUMENT: &str = " type Thing @entity { id: String!, value: String!, extra: String }"; +fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) { + const DOCUMENT: &str = + " type Thing @entity { id: String!, value: String!, extra: String, vid: Int8 }"; lazy_static! { static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter"); static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap(); } - let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA }; + let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid:vid }; let key = THING_TYPE.parse_key(id).unwrap(); ( format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value), - EntityModification::insert(key, data, 0), + EntityModification::insert(key, data, 0, vid), ) } @@ -553,8 +554,8 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) { let subgraph_id = "ipfsMap"; // Try it with two valid objects - let (str1, thing1) = make_thing("one", "eins"); - let (str2, thing2) = make_thing("two", "zwei"); + let (str1, thing1) = make_thing("one", "eins", 0); + let (str2, thing2) = make_thing("two", "zwei", 0); let ops = run_ipfs_map( subgraph_id, format!("{}\n{}", str1, str2), @@ -1001,8 +1002,8 @@ async fn test_entity_store(api_version: Version) { let schema = store.input_schema(&deployment.hash).unwrap(); - let alex = entity! { schema => id: "alex", name: "Alex" }; - let steve = entity! { schema => id: "steve", name: "Steve" }; + let alex = entity! { schema => id: "alex", name: "Alex", vid: 0i64}; + let steve = entity! { schema => id: "steve", name: "Steve", vid: 1i64}; let user_type = schema.entity_type("User").unwrap(); test_store::insert_entities( &deployment, diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 4d050db23de..bc0071c3372 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -248,6 +248,7 @@ impl HostExports { gas: &GasCounter, ) -> Result<(), HostExportError> { let entity_type = state.entity_cache.schema.entity_type(&entity_type)?; + let vid = state.next_vid(block); Self::expect_object_type(&entity_type, "set")?; @@ -314,6 +315,7 @@ impl HostExports { data.insert(store::ID.clone(), value); } } + data.insert(store::VID.clone(), Value::Int8(vid)); self.check_invalid_fields( self.data_source.api_version.clone(), diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index aa3aefd3561..dd459735e0f 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -116,12 +116,18 @@ impl Table { Ok(cols) } + let vid_type = if self.object.is_poi() || !self.object.is_object_type() { + "bigserial" + } else { + "bigint" + }; + if self.immutable { writeln!( out, " create table {qname} ( - {vid} bigserial primary key, + {vid} {vid_type} primary key, {block} int not null,\n\ {cols}, unique({id}) @@ -129,6 +135,7 @@ impl Table { qname = self.qualified_name, cols = columns_ddl(self)?, vid = VID_COLUMN, + vid_type = vid_type, block = BLOCK_COLUMN, id = self.primary_key().name ) @@ -137,13 +144,14 @@ impl Table { out, r#" create table {qname} ( - {vid} bigserial primary key, + {vid} {vid_type} primary key, {block_range} int4range not null, {cols} );"#, qname = self.qualified_name, cols = columns_ddl(self)?, vid = VID_COLUMN, + vid_type = vid_type, block_range = BLOCK_RANGE_COLUMN )?; diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index b7f9b44afac..86e9f232d49 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -384,7 +384,7 @@ create type sgd0815."size" as enum ('large', 'medium', 'small'); create table "sgd0815"."thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "big_thing" text not null @@ -405,7 +405,7 @@ create index attr_0_1_thing_big_thing create table "sgd0815"."scalar" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "bool" boolean, @@ -444,7 +444,7 @@ create index attr_1_7_scalar_color create table "sgd0815"."file_thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, causality_region int not null, "id" text not null @@ -469,7 +469,7 @@ create type sgd0815."size" as enum ('large', 'medium', 'small'); create table "sgd0815"."thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "big_thing" text not null @@ -490,7 +490,7 @@ create index attr_0_1_thing_big_thing create table "sgd0815"."scalar" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "bool" boolean, @@ -515,7 +515,7 @@ create index attr_1_0_scalar_id create table "sgd0815"."file_thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, causality_region int not null, "id" text not null @@ -575,7 +575,7 @@ type SongStat @entity { played: Int! }"#; const MUSIC_DDL: &str = r#"create table "sgd0815"."musician" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "name" text not null, @@ -598,7 +598,7 @@ create index attr_0_2_musician_main_band on "sgd0815"."musician" using gist("main_band", block_range); create table "sgd0815"."band" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "name" text not null, @@ -618,8 +618,8 @@ create index attr_1_1_band_name on "sgd0815"."band" using btree(left("name", 256)); create table "sgd0815"."song" ( - vid bigserial primary key, - block$ int not null, + vid bigint primary key, + block$ int not null, "id" text not null, "title" text not null, "written_by" text not null, @@ -634,7 +634,7 @@ create index attr_2_1_song_written_by on "sgd0815"."song" using btree("written_by", block$); create table "sgd0815"."song_stat" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "played" int4 not null @@ -676,7 +676,7 @@ type Habitat @entity { }"#; const FOREST_DDL: &str = r#"create table "sgd0815"."animal" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "forest" text @@ -695,8 +695,8 @@ create index attr_0_1_animal_forest on "sgd0815"."animal" using gist("forest", block_range); create table "sgd0815"."forest" ( - vid bigserial primary key, - block_range int4range not null, + vid bigint primary key, + block_range int4range not null, "id" text not null ); alter table "sgd0815"."forest" @@ -711,7 +711,7 @@ create index attr_1_0_forest_id on "sgd0815"."forest" using btree("id"); create table "sgd0815"."habitat" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "most_common" text not null, @@ -763,7 +763,7 @@ type Habitat @entity { }"#; const FULLTEXT_DDL: &str = r#"create table "sgd0815"."animal" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "name" text not null, @@ -791,7 +791,7 @@ create index attr_0_4_animal_search on "sgd0815"."animal" using gin("search"); create table "sgd0815"."forest" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null ); @@ -808,7 +808,7 @@ create index attr_1_0_forest_id on "sgd0815"."forest" using btree("id"); create table "sgd0815"."habitat" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "most_common" text not null, @@ -843,7 +843,7 @@ enum Orientation { const FORWARD_ENUM_SQL: &str = r#"create type sgd0815."orientation" as enum ('DOWN', 'UP'); create table "sgd0815"."thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "orientation" "sgd0815"."orientation" not null @@ -880,8 +880,8 @@ type Stats @aggregation(intervals: ["hour", "day"], source: "Data") { const TS_SQL: &str = r#" create table "sgd0815"."data" ( - vid bigserial primary key, - block$ int not null, + vid bigint primary key, + block$ int not null, "id" int8 not null, "timestamp" timestamptz not null, "amount" numeric not null, @@ -896,7 +896,7 @@ create index attr_0_1_data_amount create table "sgd0815"."stats_hour" ( vid bigserial primary key, - block$ int not null, + block$ int not null, "id" int8 not null, "timestamp" timestamptz not null, "volume" numeric not null, @@ -971,9 +971,9 @@ const LIFETIME_GQL: &str = r#" const LIFETIME_SQL: &str = r#" create table "sgd0815"."data" ( - vid bigserial primary key, - block$ int not null, -"id" int8 not null, + vid bigint primary key, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_1" int4 not null, "group_2" int4 not null, @@ -993,8 +993,8 @@ on "sgd0815"."data" using btree("amount"); create table "sgd0815"."stats_1_hour" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "volume" numeric not null, unique(id) @@ -1009,8 +1009,8 @@ on "sgd0815"."stats_1_hour" using btree("volume"); create table "sgd0815"."stats_1_day" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "volume" numeric not null, unique(id) @@ -1025,8 +1025,8 @@ on "sgd0815"."stats_1_day" using btree("volume"); create table "sgd0815"."stats_2_hour" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_1" int4 not null, "volume" numeric not null, @@ -1045,8 +1045,8 @@ on "sgd0815"."stats_2_hour"(group_1, timestamp); create table "sgd0815"."stats_2_day" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_1" int4 not null, "volume" numeric not null, @@ -1065,8 +1065,8 @@ on "sgd0815"."stats_2_day"(group_1, timestamp); create table "sgd0815"."stats_3_hour" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_2" int4 not null, "group_1" int4 not null, @@ -1088,8 +1088,8 @@ on "sgd0815"."stats_3_hour"(group_2, group_1, timestamp); create table "sgd0815"."stats_3_day" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_2" int4 not null, "group_1" int4 not null, diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 6b5fcdc6940..c72d5ddd774 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -17,12 +17,7 @@ use graph::{ }; use itertools::Itertools; -use crate::{ - catalog, - copy::AdaptiveBatchSize, - deployment, - relational::{Table, VID_COLUMN}, -}; +use crate::{catalog, copy::AdaptiveBatchSize, deployment, relational::Table}; use super::{Catalog, Layout, Namespace}; @@ -73,7 +68,6 @@ struct TablePair { // has the same name as `src` but is in a different namespace dst: Arc, src_nsp: Namespace, - dst_nsp: Namespace, } impl TablePair { @@ -100,12 +94,7 @@ impl TablePair { } conn.batch_execute(&query)?; - Ok(TablePair { - src, - dst, - src_nsp, - dst_nsp, - }) + Ok(TablePair { src, dst, src_nsp }) } /// Copy all entity versions visible between `earliest_block` and @@ -239,10 +228,6 @@ impl TablePair { let src_qname = &self.src.qualified_name; let dst_qname = &self.dst.qualified_name; let src_nsp = &self.src_nsp; - let dst_nsp = &self.dst_nsp; - - let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name); - let mut query = String::new(); // What we are about to do would get blocked by autovacuum on our @@ -252,12 +237,13 @@ impl TablePair { "src" => src_nsp.as_str(), "error" => e.to_string()); } + // TODO: check if this is needed // Make sure the vid sequence // continues from where it was - writeln!( - query, - "select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));" - )?; + // writeln!( + // query, + // "select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));" + // )?; writeln!(query, "drop table {src_qname};")?; writeln!(query, "alter table {dst_qname} set schema {src_nsp}")?; diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 56ad1aafacb..4c2acf9ea86 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -16,6 +16,7 @@ use graph::components::store::write::{EntityWrite, RowGroup, WriteChunk}; use graph::components::store::{Child as StoreChild, DerivedEntityQuery}; use graph::data::store::{Id, IdType, NULL}; use graph::data::store::{IdList, IdRef, QueryObject}; +use graph::data::subgraph::schema::POI_TABLE; use graph::data::value::{Object, Word}; use graph::data_source::CausalityRegion; use graph::prelude::{ @@ -2095,6 +2096,7 @@ struct InsertRow<'a> { values: Vec>, br_value: BlockRangeValue, causality_region: CausalityRegion, + vid: i64, } impl<'a> InsertRow<'a> { @@ -2131,10 +2133,12 @@ impl<'a> InsertRow<'a> { } let br_value = BlockRangeValue::new(table, row.block, row.end); let causality_region = row.causality_region; + let vid = row.vid; Ok(Self { values, br_value, causality_region, + vid, }) } } @@ -2220,6 +2224,8 @@ impl<'a> QueryFragment for InsertQuery<'a> { let out = &mut out; out.unsafe_to_cache_prepared(); + let not_poi = self.table.name.as_str() != POI_TABLE; + // Construct a query // insert into schema.table(column, ...) // values @@ -2245,6 +2251,9 @@ impl<'a> QueryFragment for InsertQuery<'a> { out.push_sql(CAUSALITY_REGION_COLUMN); }; + if not_poi { + out.push_sql(", vid"); + } out.push_sql(") values\n"); for (i, row) in self.rows.iter().enumerate() { @@ -2262,6 +2271,10 @@ impl<'a> QueryFragment for InsertQuery<'a> { out.push_sql(", "); out.push_bind_param::(&row.causality_region)?; }; + if not_poi { + out.push_sql(", "); + out.push_bind_param::(&row.vid)?; + } out.push_sql(")"); } @@ -4661,6 +4674,8 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { out.unsafe_to_cache_prepared(); + let not_poi = self.dst.name.as_str() != POI_TABLE; + // Construct a query // insert into {dst}({columns}) // select {columns} from {src} @@ -4681,6 +4696,9 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { out.push_sql(", "); out.push_sql(CAUSALITY_REGION_COLUMN); }; + if not_poi { + out.push_sql(", vid"); + } out.push_sql(")\nselect "); for column in &self.columns { @@ -4746,6 +4764,10 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { )); } } + if not_poi { + out.push_sql(", vid"); + } + out.push_sql(" from "); out.push_sql(self.src.qualified_name.as_str()); out.push_sql(" where vid >= "); diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index 59a65535cf3..6a70d898326 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -105,6 +105,16 @@ lazy_static! { }; } +pub fn filter_vid(arr: Vec) -> Vec { + arr.into_iter() + .map(|mut e| { + e.remove("vid"); + e.remove_null_fields(); + e + }) + .collect() +} + /// Run the `test` after performing `setup`. The result of `setup` is passed /// into `test`. All tests using `run_test_sequentially` are run in sequence, /// never in parallel. The `test` is passed a `Store`, but it is permissible diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index 9089ec4f572..ef23dca54b5 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -272,7 +272,7 @@ specVersion: 0.0.2 .unwrap(); // Adds an example entity. - let thing = entity! { schema => id: "datthing" }; + let thing = entity! { schema => id: "datthing", vid : 0i64 }; test_store::insert_entities( &deployment, vec![(schema.entity_type("Thing").unwrap(), thing)], @@ -372,7 +372,7 @@ specVersion: 0.0.2 msg ); - let thing = entity! { schema => id: "datthing" }; + let thing = entity! { schema => id: "datthing", vid : 1i64 }; test_store::insert_entities( &deployment, vec![(schema.entity_type("Thing").unwrap(), thing)], diff --git a/store/test-store/tests/core/interfaces.rs b/store/test-store/tests/core/interfaces.rs index 78eb2fda390..7f3718e8563 100644 --- a/store/test-store/tests/core/interfaces.rs +++ b/store/test-store/tests/core/interfaces.rs @@ -69,7 +69,7 @@ async fn one_interface_one_entity() { type Animal implements Legged @entity { id: ID!, legs: Int }"; let schema = InputSchema::raw(document, subgraph_id); - let entity = ("Animal", entity! { schema => id: "1", legs: 3 }); + let entity = ("Animal", entity! { schema => id: "1", legs: 3, vid: 0i64 }); // Collection query. let query = "query { leggeds(first: 100) { legs } }"; @@ -97,7 +97,7 @@ async fn one_interface_one_entity_typename() { type Animal implements Legged @entity { id: ID!, legs: Int }"; let schema = InputSchema::raw(document, subgraph_id); - let entity = ("Animal", entity! { schema => id: "1", legs: 3 }); + let entity = ("Animal", entity! { schema => id: "1", legs: 3, vid: 0i64 }); let query = "query { leggeds(first: 100) { __typename } }"; @@ -118,8 +118,11 @@ async fn one_interface_multiple_entities() { "; let schema = InputSchema::raw(document, subgraph_id); - let animal = ("Animal", entity! { schema => id: "1", legs: 3 }); - let furniture = ("Furniture", entity! { schema => id: "2", legs: 4 }); + let animal = ("Animal", entity! { schema => id: "1", legs: 3, vid: 0i64 }); + let furniture = ( + "Furniture", + entity! { schema => id: "2", legs: 4, vid: 0i64 }, + ); let query = "query { leggeds(first: 100, orderBy: legs) { legs } }"; @@ -150,8 +153,8 @@ async fn reference_interface() { let query = "query { leggeds(first: 100) { leg { id } } }"; - let leg = ("Leg", entity! { schema => id: "1" }); - let animal = ("Animal", entity! { schema => id: "1", leg: 1 }); + let leg = ("Leg", entity! { schema => id: "1", vid: 0i64 }); + let animal = ("Animal", entity! { schema => id: "1", leg: 1, vid: 0i64 }); let res = insert_and_query(subgraph_id, document, vec![leg, animal], query) .await @@ -201,16 +204,16 @@ async fn reference_interface_derived() { let query = "query { events { id transaction { id } } }"; - let buy = ("BuyEvent", entity! { schema => id: "buy" }); - let sell1 = ("SellEvent", entity! { schema => id: "sell1" }); - let sell2 = ("SellEvent", entity! { schema => id: "sell2" }); + let buy = ("BuyEvent", entity! { schema => id: "buy", vid: 0i64 }); + let sell1 = ("SellEvent", entity! { schema => id: "sell1", vid: 0i64 }); + let sell2 = ("SellEvent", entity! { schema => id: "sell2", vid: 1i64 }); let gift = ( "GiftEvent", - entity! { schema => id: "gift", transaction: "txn" }, + entity! { schema => id: "gift", transaction: "txn", vid: 0i64 }, ); let txn = ( "Transaction", - entity! { schema => id: "txn", buyEvent: "buy", sellEvents: vec!["sell1", "sell2"] }, + entity! { schema => id: "txn", buyEvent: "buy", sellEvents: vec!["sell1", "sell2"], vid: 0i64 }, ); let entities = vec![buy, sell1, sell2, gift, txn]; @@ -278,11 +281,11 @@ async fn follow_interface_reference() { let parent = ( "Animal", - entity! { schema => id: "parent", legs: 4, parent: Value::Null }, + entity! { schema => id: "parent", legs: 4, parent: Value::Null, vid: 0i64}, ); let child = ( "Animal", - entity! { schema => id: "child", legs: 3, parent: "parent" }, + entity! { schema => id: "child", legs: 3, parent: "parent" , vid: 1i64}, ); let res = insert_and_query(subgraph_id, document, vec![parent, child], query) @@ -305,8 +308,11 @@ async fn conflicting_implementors_id() { "; let schema = InputSchema::raw(document, subgraph_id); - let animal = ("Animal", entity! { schema => id: "1", legs: 3 }); - let furniture = ("Furniture", entity! { schema => id: "1", legs: 3 }); + let animal = ("Animal", entity! { schema => id: "1", legs: 3, vid: 0i64 }); + let furniture = ( + "Furniture", + entity! { schema => id: "1", legs: 3, vid: 0i64 }, + ); let query = "query { leggeds(first: 100) { legs } }"; @@ -334,8 +340,11 @@ async fn derived_interface_relationship() { "; let schema = InputSchema::raw(document, subgraph_id); - let forest = ("Forest", entity! { schema => id: "1" }); - let animal = ("Animal", entity! { schema => id: "1", forest: "1" }); + let forest = ("Forest", entity! { schema => id: "1", vid: 0i64 }); + let animal = ( + "Animal", + entity! { schema => id: "1", forest: "1", vid: 0i64 }, + ); let query = "query { forests(first: 100) { dwellers(first: 100) { id } } }"; @@ -362,9 +371,12 @@ async fn two_interfaces() { "; let schema = InputSchema::raw(document, subgraph_id); - let a = ("A", entity! { schema => id: "1", foo: "bla" }); - let b = ("B", entity! { schema => id: "1", bar: 100 }); - let ab = ("AB", entity! { schema => id: "2", foo: "ble", bar: 200 }); + let a = ("A", entity! { schema => id: "1", foo: "bla", vid: 0i64 }); + let b = ("B", entity! { schema => id: "1", bar: 100, vid: 0i64 }); + let ab = ( + "AB", + entity! { schema => id: "2", foo: "ble", bar: 200, vid: 0i64 }, + ); let query = "query { ibars(first: 100, orderBy: bar) { bar } @@ -390,7 +402,7 @@ async fn interface_non_inline_fragment() { let entity = ( "Animal", - entity! { schema => id: "1", name: "cow", legs: 3 }, + entity! { schema => id: "1", name: "cow", legs: 3, vid: 0i64 }, ); // Query only the fragment. @@ -422,9 +434,12 @@ async fn interface_inline_fragment() { let animal = ( "Animal", - entity! { schema => id: "1", name: "cow", legs: 4 }, + entity! { schema => id: "1", name: "cow", legs: 4, vid: 0i64 }, + ); + let bird = ( + "Bird", + entity! { schema => id: "2", airspeed: 24, legs: 2, vid: 0i64 }, ); - let bird = ("Bird", entity! { schema => id: "2", airspeed: 24, legs: 2 }); let query = "query { leggeds(orderBy: legs) { ... on Animal { name } ...on Bird { airspeed } } }"; @@ -459,16 +474,16 @@ async fn interface_inline_fragment_with_subquery() { "; let schema = InputSchema::raw(document, subgraph_id); - let mama_cow = ("Parent", entity! { schema => id: "mama_cow" }); + let mama_cow = ("Parent", entity! { schema => id: "mama_cow", vid: 0i64 }); let cow = ( "Animal", - entity! { schema => id: "1", name: "cow", legs: 4, parent: "mama_cow" }, + entity! { schema => id: "1", name: "cow", legs: 4, parent: "mama_cow", vid: 0i64 }, ); - let mama_bird = ("Parent", entity! { schema => id: "mama_bird" }); + let mama_bird = ("Parent", entity! { schema => id: "mama_bird", vid: 1i64 }); let bird = ( "Bird", - entity! { schema => id: "2", airspeed: 5, legs: 2, parent: "mama_bird" }, + entity! { schema => id: "2", airspeed: 5, legs: 2, parent: "mama_bird", vid: 1i64 }, ); let query = "query { leggeds(orderBy: legs) { legs ... on Bird { airspeed parent { id } } } }"; @@ -545,11 +560,11 @@ async fn alias() { let parent = ( "Animal", - entity! { schema => id: "parent", legs: 4, parent: Value::Null }, + entity! { schema => id: "parent", legs: 4, parent: Value::Null, vid: 0i64 }, ); let child = ( "Animal", - entity! { schema => id: "child", legs: 3, parent: "parent" }, + entity! { schema => id: "child", legs: 3, parent: "parent", vid: 1i64 }, ); let res = insert_and_query(subgraph_id, document, vec![parent, child], query) @@ -608,9 +623,15 @@ async fn fragments_dont_panic() { "; // The panic manifests if two parents exist. - let parent = ("Parent", entity! { schema => id: "p", child: "c" }); - let parent2 = ("Parent", entity! { schema => id: "p2", child: Value::Null }); - let child = ("Child", entity! { schema => id:"c" }); + let parent = ( + "Parent", + entity! { schema => id: "p", child: "c", vid: 0i64 }, + ); + let parent2 = ( + "Parent", + entity! { schema => id: "p2", child: Value::Null, vid: 1i64 }, + ); + let child = ("Child", entity! { schema => id:"c", vid: 2i64 }); let res = insert_and_query(subgraph_id, document, vec![parent, parent2, child], query) .await @@ -668,12 +689,15 @@ async fn fragments_dont_duplicate_data() { "; // This bug manifests if two parents exist. - let parent = ("Parent", entity! { schema => id: "p", children: vec!["c"] }); + let parent = ( + "Parent", + entity! { schema => id: "p", children: vec!["c"], vid: 0i64 }, + ); let parent2 = ( "Parent", - entity! { schema => id: "b", children: Vec::::new() }, + entity! { schema => id: "b", children: Vec::::new(), vid: 1i64 }, ); - let child = ("Child", entity! { schema => id:"c" }); + let child = ("Child", entity! { schema => id:"c", vid: 2i64 }); let res = insert_and_query(subgraph_id, document, vec![parent, parent2, child], query) .await @@ -721,11 +745,11 @@ async fn redundant_fields() { let parent = ( "Animal", - entity! { schema => id: "parent", parent: Value::Null }, + entity! { schema => id: "parent", parent: Value::Null, vid: 0i64 }, ); let child = ( "Animal", - entity! { schema => id: "child", parent: "parent" }, + entity! { schema => id: "child", parent: "parent", vid: 1i64 }, ); let res = insert_and_query(subgraph_id, document, vec![parent, child], query) @@ -783,8 +807,11 @@ async fn fragments_merge_selections() { } "; - let parent = ("Parent", entity! { schema => id: "p", children: vec!["c"] }); - let child = ("Child", entity! { schema => id: "c", foo: 1 }); + let parent = ( + "Parent", + entity! { schema => id: "p", children: vec!["c"], vid: 0i64 }, + ); + let child = ("Child", entity! { schema => id: "c", foo: 1, vid: 1i64 }); let res = insert_and_query(subgraph_id, document, vec![parent, child], query) .await @@ -840,8 +867,14 @@ async fn merge_fields_not_in_interface() { } }"; - let animal = ("Animal", entity! { schema => id: "cow", human: "fred" }); - let human = ("Human", entity! { schema => id: "fred", animal: "cow" }); + let animal = ( + "Animal", + entity! { schema => id: "cow", human: "fred", vid: 0i64 }, + ); + let human = ( + "Human", + entity! { schema => id: "fred", animal: "cow", vid: 0i64 }, + ); let res = insert_and_query(subgraph_id, document, vec![animal, human], query) .await @@ -914,15 +947,15 @@ async fn nested_interface_fragments() { } }"; - let foo = ("Foo", entity! { schema => id: "foo" }); - let one = ("One", entity! { schema => id: "1", foo1: "foo" }); + let foo = ("Foo", entity! { schema => id: "foo", vid: 0i64 }); + let one = ("One", entity! { schema => id: "1", foo1: "foo", vid: 0i64 }); let two = ( "Two", - entity! { schema => id: "2", foo1: "foo", foo2: "foo" }, + entity! { schema => id: "2", foo1: "foo", foo2: "foo", vid: 0i64 }, ); let three = ( "Three", - entity! { schema => id: "3", foo1: "foo", foo2: "foo", foo3: "foo" }, + entity! { schema => id: "3", foo1: "foo", foo2: "foo", foo3: "foo", vid: 0i64 }, ); let res = insert_and_query(subgraph_id, document, vec![foo, one, two, three], query) @@ -995,9 +1028,9 @@ async fn nested_interface_fragments_overlapping() { } }"; - let foo = ("Foo", entity! { schema => id: "foo" }); - let one = ("One", entity! { schema => id: "1", foo1: "foo" }); - let two = ("Two", entity! { schema => id: "2", foo1: "foo" }); + let foo = ("Foo", entity! { schema => id: "foo", vid: 0i64 }); + let one = ("One", entity! { schema => id: "1", foo1: "foo", vid: 0i64 }); + let two = ("Two", entity! { schema => id: "2", foo1: "foo", vid: 0i64 }); let res = insert_and_query(subgraph_id, document, vec![foo, one, two], query) .await .unwrap(); @@ -1081,11 +1114,11 @@ async fn enums() { let entities = vec![ ( "Trajectory", - entity! { schema => id: "1", direction: "EAST", meters: 10 }, + entity! { schema => id: "1", direction: "EAST", meters: 10, vid: 0i64}, ), ( "Trajectory", - entity! { schema => id: "2", direction: "NORTH", meters: 15 }, + entity! { schema => id: "2", direction: "NORTH", meters: 15, vid: 1i64}, ), ]; let query = "query { trajectories { id, direction, meters } }"; @@ -1134,15 +1167,15 @@ async fn enum_list_filters() { let entities = vec![ ( "Trajectory", - entity! { schema => id: "1", direction: "EAST", meters: 10 }, + entity! { schema => id: "1", direction: "EAST", meters: 10, vid: 0i64 }, ), ( "Trajectory", - entity! { schema => id: "2", direction: "NORTH", meters: 15 }, + entity! { schema => id: "2", direction: "NORTH", meters: 15, vid: 1i64 }, ), ( "Trajectory", - entity! { schema => id: "3", direction: "WEST", meters: 20 }, + entity! { schema => id: "3", direction: "WEST", meters: 20, vid: 2i64 }, ), ]; @@ -1272,10 +1305,13 @@ async fn mixed_mutability() { let query = "query { events { id } }"; let entities = vec![ - ("Mutable", entity! { schema => id: "mut0", name: "mut0" }), + ( + "Mutable", + entity! { schema => id: "mut0", name: "mut0", vid: 0i64 }, + ), ( "Immutable", - entity! { schema => id: "immo0", name: "immo0" }, + entity! { schema => id: "immo0", name: "immo0", vid: 0i64 }, ), ]; @@ -1326,9 +1362,15 @@ async fn derived_interface_bytes() { let query = "query { pools { trades { id } } }"; let entities = vec![ - ("Pool", entity! { schema => id: b("0xf001") }), - ("Sell", entity! { schema => id: b("0xc0"), pool: "0xf001"}), - ("Buy", entity! { schema => id: b("0xb0"), pool: "0xf001"}), + ("Pool", entity! { schema => id: b("0xf001"), vid: 0i64 }), + ( + "Sell", + entity! { schema => id: b("0xc0"), pool: "0xf001", vid: 0i64}, + ), + ( + "Buy", + entity! { schema => id: b("0xb0"), pool: "0xf001", vid: 0i64}, + ), ]; let res = insert_and_query(subgraph_id, document, entities, query) diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index b90283f6c93..3af0d232e3f 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use web3::types::H256; use graph_store_postgres::SubgraphStore as DieselSubgraphStore; +use test_store::store::filter_vid; use test_store::*; lazy_static! { @@ -207,11 +208,11 @@ fn insert_modifications() { let store = Arc::new(store); let mut cache = EntityCache::new(store); - let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai" }; + let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", vid: 0i64 }; let mogwai_key = make_band_key("mogwai"); cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap(); - let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" }; + let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", vid: 0i64 }; let sigurros_key = make_band_key("sigurros"); cache .set(sigurros_key.clone(), sigurros_data.clone()) @@ -221,8 +222,8 @@ fn insert_modifications() { assert_eq!( sort_by_entity_key(result.unwrap().modifications), sort_by_entity_key(vec![ - EntityModification::insert(mogwai_key, mogwai_data, 0), - EntityModification::insert(sigurros_key, sigurros_data, 0) + EntityModification::insert(mogwai_key, mogwai_data, 0, 0), + EntityModification::insert(sigurros_key, sigurros_data, 0, 0) ]) ); } @@ -242,8 +243,8 @@ fn overwrite_modifications() { // every set operation as an overwrite. let store = { let entities = vec![ - entity! { SCHEMA => id: "mogwai", name: "Mogwai" }, - entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" }, + entity! { SCHEMA => id: "mogwai", name: "Mogwai", vid: 0i64 }, + entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", vid: 0i64 }, ]; MockStore::new(entity_version_map("Band", entities)) }; @@ -251,11 +252,12 @@ fn overwrite_modifications() { let store = Arc::new(store); let mut cache = EntityCache::new(store); - let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }; + let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995, vid: 0i64 }; let mogwai_key = make_band_key("mogwai"); cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap(); - let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994 }; + let sigurros_data = + entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994, vid: 0i64 }; let sigurros_key = make_band_key("sigurros"); cache .set(sigurros_key.clone(), sigurros_data.clone()) @@ -265,8 +267,8 @@ fn overwrite_modifications() { assert_eq!( sort_by_entity_key(result.unwrap().modifications), sort_by_entity_key(vec![ - EntityModification::overwrite(mogwai_key, mogwai_data, 0), - EntityModification::overwrite(sigurros_key, sigurros_data, 0) + EntityModification::overwrite(mogwai_key, mogwai_data, 0, 0), + EntityModification::overwrite(sigurros_key, sigurros_data, 0, 0) ]) ); } @@ -276,8 +278,9 @@ fn consecutive_modifications() { // Pre-populate the store with data so that we can test setting a field to // `Value::Null`. let store = { - let entities = - vec![entity! { SCHEMA => id: "mogwai", name: "Mogwai", label: "Chemikal Underground" }]; + let entities = vec![ + entity! { SCHEMA => id: "mogwai", name: "Mogwai", label: "Chemikal Underground", vid: 0i64 }, + ]; MockStore::new(entity_version_map("Band", entities)) }; @@ -303,7 +306,8 @@ fn consecutive_modifications() { sort_by_entity_key(result.unwrap().modifications), sort_by_entity_key(vec![EntityModification::overwrite( update_key, - entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }, + entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995, vid: 0i64 }, + 0, 0 )]) ); @@ -428,17 +432,17 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator .unwrap(); // 1 account 3 wallets - let test_entity_1 = create_account_entity("1", "Johnton", "tonofjohn@email.com", 67_i32); + let test_entity_1 = create_account_entity("1", "Johnton", "tonofjohn@email.com", 67_i32, 1); let id_one = WALLET_TYPE.parse_id("1").unwrap(); - let wallet_entity_1 = create_wallet_operation("1", &id_one, 67_i32); - let wallet_entity_2 = create_wallet_operation("2", &id_one, 92_i32); - let wallet_entity_3 = create_wallet_operation("3", &id_one, 192_i32); + let wallet_entity_1 = create_wallet_operation("1", &id_one, 67_i32, 1); + let wallet_entity_2 = create_wallet_operation("2", &id_one, 92_i32, 2); + let wallet_entity_3 = create_wallet_operation("3", &id_one, 192_i32, 3); // 1 account 1 wallet - let test_entity_2 = create_account_entity("2", "Cindini", "dinici@email.com", 42_i32); + let test_entity_2 = create_account_entity("2", "Cindini", "dinici@email.com", 42_i32, 2); let id_two = WALLET_TYPE.parse_id("2").unwrap(); - let wallet_entity_4 = create_wallet_operation("4", &id_two, 32_i32); + let wallet_entity_4 = create_wallet_operation("4", &id_two, 32_i32, 4); // 1 account 0 wallets - let test_entity_3 = create_account_entity("3", "Shaqueeena", "queensha@email.com", 28_i32); + let test_entity_3 = create_account_entity("3", "Shaqueeena", "queensha@email.com", 28_i32, 3); transact_entity_operations( &store, &deployment, @@ -458,9 +462,9 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator deployment } -fn create_account_entity(id: &str, name: &str, email: &str, age: i32) -> EntityOperation { +fn create_account_entity(id: &str, name: &str, email: &str, age: i32, vid: i64) -> EntityOperation { let test_entity = - entity! { LOAD_RELATED_SUBGRAPH => id: id, name: name, email: email, age: age }; + entity! { LOAD_RELATED_SUBGRAPH => id: id, name: name, email: email, age: age, vid: vid }; EntityOperation::Set { key: ACCOUNT_TYPE.parse_key(id).unwrap(), @@ -468,12 +472,12 @@ fn create_account_entity(id: &str, name: &str, email: &str, age: i32) -> EntityO } } -fn create_wallet_entity(id: &str, account_id: &Id, balance: i32) -> Entity { +fn create_wallet_entity(id: &str, account_id: &Id, balance: i32, vid: i64) -> Entity { let account_id = Value::from(account_id.clone()); - entity! { LOAD_RELATED_SUBGRAPH => id: id, account: account_id, balance: balance } + entity! { LOAD_RELATED_SUBGRAPH => id: id, account: account_id, balance: balance, vid: vid} } -fn create_wallet_operation(id: &str, account_id: &Id, balance: i32) -> EntityOperation { - let test_wallet = create_wallet_entity(id, account_id, balance); +fn create_wallet_operation(id: &str, account_id: &Id, balance: i32, vid: i64) -> EntityOperation { + let test_wallet = create_wallet_entity(id, account_id, balance, vid); EntityOperation::Set { key: WALLET_TYPE.parse_key(id).unwrap(), data: test_wallet, @@ -491,12 +495,12 @@ fn check_for_account_with_multiple_wallets() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("1", &account_id, 67_i32); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_1 = create_wallet_entity("1", &account_id, 67_i32, 0); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 1); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 2); let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; - assert_eq!(result, expeted_vec); + assert_eq!(result, filter_vid(expeted_vec)); }); } @@ -511,10 +515,10 @@ fn check_for_account_with_single_wallet() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("4", &account_id, 32_i32); + let wallet_1 = create_wallet_entity("4", &account_id, 32_i32, 1); let expeted_vec = vec![wallet_1]; - assert_eq!(result, expeted_vec); + assert_eq!(result, filter_vid(expeted_vec)); }); } @@ -577,8 +581,8 @@ fn check_for_insert_async_store() { run_store_test(|mut cache, store, deployment, _writable| async move { let account_id = ACCOUNT_TYPE.parse_id("2").unwrap(); // insert a new wallet - let wallet_entity_5 = create_wallet_operation("5", &account_id, 79_i32); - let wallet_entity_6 = create_wallet_operation("6", &account_id, 200_i32); + let wallet_entity_5 = create_wallet_operation("5", &account_id, 79_i32, 12); + let wallet_entity_6 = create_wallet_operation("6", &account_id, 200_i32, 13); transact_entity_operations( &store, @@ -595,21 +599,22 @@ fn check_for_insert_async_store() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("4", &account_id, 32_i32); - let wallet_2 = create_wallet_entity("5", &account_id, 79_i32); - let wallet_3 = create_wallet_entity("6", &account_id, 200_i32); + let wallet_1 = create_wallet_entity("4", &account_id, 32_i32, 21); + let wallet_2 = create_wallet_entity("5", &account_id, 79_i32, 22); + let wallet_3 = create_wallet_entity("6", &account_id, 200_i32, 23); let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; - assert_eq!(result, expeted_vec); + assert_eq!(filter_vid(result), filter_vid(expeted_vec)); }); } + #[test] fn check_for_insert_async_not_related() { run_store_test(|mut cache, store, deployment, _writable| async move { let account_id = ACCOUNT_TYPE.parse_id("2").unwrap(); // insert a new wallet - let wallet_entity_5 = create_wallet_operation("5", &account_id, 79_i32); - let wallet_entity_6 = create_wallet_operation("6", &account_id, 200_i32); + let wallet_entity_5 = create_wallet_operation("5", &account_id, 79_i32, 5); + let wallet_entity_6 = create_wallet_operation("6", &account_id, 200_i32, 6); transact_entity_operations( &store, @@ -627,12 +632,12 @@ fn check_for_insert_async_not_related() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("1", &account_id, 67_i32); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_1 = create_wallet_entity("1", &account_id, 67_i32, 1); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; - assert_eq!(result, expeted_vec); + assert_eq!(result, filter_vid(expeted_vec)); }); } @@ -641,7 +646,7 @@ fn check_for_update_async_related() { run_store_test(|mut cache, store, deployment, writable| async move { let entity_key = WALLET_TYPE.parse_key("1").unwrap(); let account_id = entity_key.entity_id.clone(); - let wallet_entity_update = create_wallet_operation("1", &account_id, 79_i32); + let wallet_entity_update = create_wallet_operation("1", &account_id, 79_i32, 11); let new_data = match wallet_entity_update { EntityOperation::Set { ref data, .. } => data.clone(), @@ -665,11 +670,11 @@ fn check_for_update_async_related() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 12); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 13); let expeted_vec = vec![new_data, wallet_2, wallet_3]; - assert_eq!(result, expeted_vec); + assert_eq!(filter_vid(result), filter_vid(expeted_vec)); }); } @@ -695,11 +700,11 @@ fn check_for_delete_async_related() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); let expeted_vec = vec![wallet_2, wallet_3]; - assert_eq!(result, expeted_vec); + assert_eq!(result, filter_vid(expeted_vec)); }); } @@ -709,11 +714,11 @@ fn scoped_get() { // Key for an existing entity that is in the store let account1 = ACCOUNT_TYPE.parse_id("1").unwrap(); let key1 = WALLET_TYPE.parse_key("1").unwrap(); - let wallet1 = create_wallet_entity("1", &account1, 67); + let wallet1 = create_wallet_entity("1", &account1, 67, 1); // Create a new entity that is not in the store let account5 = ACCOUNT_TYPE.parse_id("5").unwrap(); - let wallet5 = create_wallet_entity("5", &account5, 100); + let wallet5 = create_wallet_entity("5", &account5, 100, 5); let key5 = WALLET_TYPE.parse_key("5").unwrap(); cache.set(key5.clone(), wallet5.clone()).unwrap(); @@ -728,7 +733,10 @@ fn scoped_get() { let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); assert_eq!(None, act1); let act1 = cache.get(&key1, GetScope::Store).unwrap(); - assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref())); + assert_eq!( + filter_vid(vec![wallet1.clone()]), + vec![act1.as_ref().map(|e| e.as_ref()).unwrap().clone()] + ); // Even after reading from the store, the entity is not visible with // `InBlock` let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); diff --git a/store/test-store/tests/graphql/query.rs b/store/test-store/tests/graphql/query.rs index 08ad26ef9b9..e1fafe0f915 100644 --- a/store/test-store/tests/graphql/query.rs +++ b/store/test-store/tests/graphql/query.rs @@ -468,115 +468,118 @@ async fn insert_test_entities( ( "Musician", vec![ - entity! { is => id: "m1", name: "John", mainBand: "b1", bands: vec!["b1", "b2"], favoriteCount: 10, birthDate: timestamp.clone() }, - entity! { is => id: "m2", name: "Lisa", mainBand: "b1", bands: vec!["b1"], favoriteCount: 100, birthDate: timestamp.clone() }, + entity! { is => id: "m1", name: "John", mainBand: "b1", bands: vec!["b1", "b2"], favoriteCount: 10, birthDate: timestamp.clone(), vid: 0i64 }, + entity! { is => id: "m2", name: "Lisa", mainBand: "b1", bands: vec!["b1"], favoriteCount: 100, birthDate: timestamp.clone(), vid: 1i64 }, ], ), - ("Publisher", vec![entity! { is => id: pub1 }]), + ("Publisher", vec![entity! { is => id: pub1, vid: 0i64 }]), ( "Band", vec![ - entity! { is => id: "b1", name: "The Musicians", originalSongs: vec![s[1], s[2]] }, - entity! { is => id: "b2", name: "The Amateurs", originalSongs: vec![s[1], s[3], s[4]] }, + entity! { is => id: "b1", name: "The Musicians", originalSongs: vec![s[1], s[2]], vid: 0i64 }, + entity! { is => id: "b2", name: "The Amateurs", originalSongs: vec![s[1], s[3], s[4]], vid: 1i64 }, ], ), ( "Song", vec![ - entity! { is => id: s[1], sid: "s1", title: "Cheesy Tune", publisher: pub1, writtenBy: "m1", media: vec![md[1], md[2]] }, - entity! { is => id: s[2], sid: "s2", title: "Rock Tune", publisher: pub1, writtenBy: "m2", media: vec![md[3], md[4]] }, - entity! { is => id: s[3], sid: "s3", title: "Pop Tune", publisher: pub1, writtenBy: "m1", media: vec![md[5]] }, - entity! { is => id: s[4], sid: "s4", title: "Folk Tune", publisher: pub1, writtenBy: "m3", media: vec![md[6]] }, + entity! { is => id: s[1], sid: "s1", title: "Cheesy Tune", publisher: pub1, writtenBy: "m1", media: vec![md[1], md[2]], vid: 0i64 }, + entity! { is => id: s[2], sid: "s2", title: "Rock Tune", publisher: pub1, writtenBy: "m2", media: vec![md[3], md[4]], vid: 1i64 }, + entity! { is => id: s[3], sid: "s3", title: "Pop Tune", publisher: pub1, writtenBy: "m1", media: vec![md[5]], vid: 2i64 }, + entity! { is => id: s[4], sid: "s4", title: "Folk Tune", publisher: pub1, writtenBy: "m3", media: vec![md[6]], vid: 3i64 }, ], ), ( "User", vec![ - entity! { is => id: "u1", name: "User 1", latestSongReview: "r3", latestBandReview: "r1", latestReview: "r3" }, + entity! { is => id: "u1", name: "User 1", latestSongReview: "r3", latestBandReview: "r1", latestReview: "r3", vid: 0i64 }, ], ), ( "SongStat", vec![ - entity! { is => id: s[1], played: 10 }, - entity! { is => id: s[2], played: 15 }, + entity! { is => id: s[1], played: 10, vid: 0i64 }, + entity! { is => id: s[2], played: 15, vid: 1i64 }, ], ), ( "BandReview", vec![ - entity! { is => id: "r1", body: "Bad musicians", band: "b1", author: "u1" }, - entity! { is => id: "r2", body: "Good amateurs", band: "b2", author: "u2" }, - entity! { is => id: "r5", body: "Very Bad musicians", band: "b1", author: "u3" }, + entity! { is => id: "r1", body: "Bad musicians", band: "b1", author: "u1", vid: 0i64 }, + entity! { is => id: "r2", body: "Good amateurs", band: "b2", author: "u2", vid: 1i64 }, + entity! { is => id: "r5", body: "Very Bad musicians", band: "b1", author: "u3", vid: 2i64 }, ], ), ( "SongReview", vec![ - entity! { is => id: "r3", body: "Bad", song: s[2], author: "u1" }, - entity! { is => id: "r4", body: "Good", song: s[3], author: "u2" }, - entity! { is => id: "r6", body: "Very Bad", song: s[2], author: "u3" }, + entity! { is => id: "r3", body: "Bad", song: s[2], author: "u1", vid: 0i64 }, + entity! { is => id: "r4", body: "Good", song: s[3], author: "u2", vid: 1i64 }, + entity! { is => id: "r6", body: "Very Bad", song: s[2], author: "u3", vid: 2i64 }, ], ), ( "User", vec![ - entity! { is => id: "u1", name: "Baden", latestSongReview: "r3", latestBandReview: "r1", latestReview: "r1" }, - entity! { is => id: "u2", name: "Goodwill", latestSongReview: "r4", latestBandReview: "r2", latestReview: "r2" }, + entity! { is => id: "u1", name: "Baden", latestSongReview: "r3", latestBandReview: "r1", latestReview: "r1", vid: 0i64 }, + entity! { is => id: "u2", name: "Goodwill", latestSongReview: "r4", latestBandReview: "r2", latestReview: "r2", vid: 1i64 }, ], ), ( "AnonymousUser", vec![ - entity! { is => id: "u3", name: "Anonymous 3", latestSongReview: "r6", latestBandReview: "r5", latestReview: "r5" }, + entity! { is => id: "u3", name: "Anonymous 3", latestSongReview: "r6", latestBandReview: "r5", latestReview: "r5", vid: 0i64 }, ], ), ( "Photo", vec![ - entity! { is => id: md[1], title: "Cheesy Tune Single Cover", author: "u1" }, - entity! { is => id: md[3], title: "Rock Tune Single Cover", author: "u1" }, - entity! { is => id: md[5], title: "Pop Tune Single Cover", author: "u1" }, + entity! { is => id: md[1], title: "Cheesy Tune Single Cover", author: "u1", vid: 0i64 }, + entity! { is => id: md[3], title: "Rock Tune Single Cover", author: "u1", vid: 1i64 }, + entity! { is => id: md[5], title: "Pop Tune Single Cover", author: "u1", vid: 2i64 }, ], ), ( "Video", vec![ - entity! { is => id: md[2], title: "Cheesy Tune Music Video", author: "u2" }, - entity! { is => id: md[4], title: "Rock Tune Music Video", author: "u2" }, - entity! { is => id: md[6], title: "Folk Tune Music Video", author: "u2" }, + entity! { is => id: md[2], title: "Cheesy Tune Music Video", author: "u2", vid: 0i64 }, + entity! { is => id: md[4], title: "Rock Tune Music Video", author: "u2", vid: 1i64 }, + entity! { is => id: md[6], title: "Folk Tune Music Video", author: "u2", vid: 2i64 }, ], ), ( "Album", - vec![entity! { is => id: "rl1", title: "Pop and Folk", songs: vec![s[3], s[4]] }], + vec![ + entity! { is => id: "rl1", title: "Pop and Folk", songs: vec![s[3], s[4]], vid: 0i64 }, + ], ), ( "Single", vec![ - entity! { is => id: "rl2", title: "Rock", songs: vec![s[2]] }, - entity! { is => id: "rl3", title: "Cheesy", songs: vec![s[1]] }, - entity! { is => id: "rl4", title: "Silence", songs: Vec::::new() }, + entity! { is => id: "rl2", title: "Rock", songs: vec![s[2]], vid: 0i64 }, + entity! { is => id: "rl3", title: "Cheesy", songs: vec![s[1]], vid: 1i64 }, + entity! { is => id: "rl4", title: "Silence", songs: Vec::::new(), vid: 2i64 }, ], ), ( "Plays", vec![ - entity! { is => id: 1i64, timestamp: ts0, song: s[1], user: "u1"}, - entity! { is => id: 2i64, timestamp: ts0, song: s[1], user: "u2"}, - entity! { is => id: 3i64, timestamp: ts0, song: s[2], user: "u1"}, - entity! { is => id: 4i64, timestamp: ts0, song: s[1], user: "u1"}, - entity! { is => id: 5i64, timestamp: ts0, song: s[1], user: "u1"}, + entity! { is => id: 1i64, timestamp: ts0, song: s[1], user: "u1", vid: 0i64 }, + entity! { is => id: 2i64, timestamp: ts0, song: s[1], user: "u2", vid: 1i64 }, + entity! { is => id: 3i64, timestamp: ts0, song: s[2], user: "u1", vid: 2i64 }, + entity! { is => id: 4i64, timestamp: ts0, song: s[1], user: "u1", vid: 3i64 }, + entity! { is => id: 5i64, timestamp: ts0, song: s[1], user: "u1", vid: 4i64 }, ], ), ]; + let entities0 = insert_ops(&manifest.schema, entities0); let entities1 = vec![( "Musician", vec![ - entity! { is => id: "m3", name: "Tom", mainBand: "b2", bands: vec!["b1", "b2"], favoriteCount: 5, birthDate: timestamp.clone() }, - entity! { is => id: "m4", name: "Valerie", bands: Vec::::new(), favoriteCount: 20, birthDate: timestamp.clone() }, + entity! { is => id: "m3", name: "Tom", mainBand: "b2", bands: vec!["b1", "b2"], favoriteCount: 5, birthDate: timestamp.clone(), vid: 2i64 }, + entity! { is => id: "m4", name: "Valerie", bands: Vec::::new(), favoriteCount: 20, birthDate: timestamp.clone(), vid: 3i64 }, ], )]; let entities1 = insert_ops(&manifest.schema, entities1); diff --git a/store/test-store/tests/postgres/aggregation.rs b/store/test-store/tests/postgres/aggregation.rs index 432bc685a62..223d62c40ef 100644 --- a/store/test-store/tests/postgres/aggregation.rs +++ b/store/test-store/tests/postgres/aggregation.rs @@ -125,8 +125,8 @@ async fn insert_test_data(store: Arc, deployment: DeploymentL let ts64 = TIMES[0]; let entities = vec![ - entity! { schema => id: 1i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(1), amount: bd(10) }, - entity! { schema => id: 2i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(1), amount: bd(1) }, + entity! { schema => id: 1i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(1), amount: bd(10), vid: 11i64 }, + entity! { schema => id: 2i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(1), amount: bd(1), vid: 12i64 }, ]; insert(&store, &deployment, BLOCKS[0].clone(), TIMES[0], entities) @@ -135,8 +135,8 @@ async fn insert_test_data(store: Arc, deployment: DeploymentL let ts64 = TIMES[1]; let entities = vec![ - entity! { schema => id: 11i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(2), amount: bd(2) }, - entity! { schema => id: 12i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(2), amount: bd(20) }, + entity! { schema => id: 11i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(2), amount: bd(2), vid: 21i64 }, + entity! { schema => id: 12i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(2), amount: bd(20), vid: 22i64 }, ]; insert(&store, &deployment, BLOCKS[1].clone(), TIMES[1], entities) .await @@ -144,8 +144,8 @@ async fn insert_test_data(store: Arc, deployment: DeploymentL let ts64 = TIMES[2]; let entities = vec![ - entity! { schema => id: 21i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(3), amount: bd(30) }, - entity! { schema => id: 22i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(3), amount: bd(3) }, + entity! { schema => id: 21i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(3), amount: bd(30), vid: 31i64 }, + entity! { schema => id: 22i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(3), amount: bd(3), vid: 32i64 }, ]; insert(&store, &deployment, BLOCKS[2].clone(), TIMES[2], entities) .await @@ -153,8 +153,8 @@ async fn insert_test_data(store: Arc, deployment: DeploymentL let ts64 = TIMES[3]; let entities = vec![ - entity! { schema => id: 31i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(4), amount: bd(4) }, - entity! { schema => id: 32i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(4), amount: bd(40) }, + entity! { schema => id: 31i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(4), amount: bd(4), vid: 41i64 }, + entity! { schema => id: 32i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(4), amount: bd(40), vid: 42i64 }, ]; insert(&store, &deployment, BLOCKS[3].clone(), TIMES[3], entities) .await diff --git a/store/test-store/tests/postgres/graft.rs b/store/test-store/tests/postgres/graft.rs index 1580a62b1aa..4848e0ba1ee 100644 --- a/store/test-store/tests/postgres/graft.rs +++ b/store/test-store/tests/postgres/graft.rs @@ -175,6 +175,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 184.4, false, None, + 0, ); transact_entity_operations(&store, &deployment, BLOCKS[0].clone(), vec![test_entity_1]) .await @@ -189,6 +190,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 159.1, true, Some("red"), + 1, ); let test_entity_3_1 = create_test_entity( "3", @@ -199,6 +201,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 111.7, false, Some("blue"), + 2, ); transact_entity_operations( &store, @@ -218,6 +221,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 111.7, false, None, + 3, ); transact_entity_operations( &store, @@ -241,6 +245,7 @@ fn create_test_entity( weight: f64, coffee: bool, favorite_color: Option<&str>, + vid: i64, ) -> EntityOperation { let bin_name = scalar::Bytes::from_str(&hex::encode(name)).unwrap(); let test_entity = entity! { TEST_SUBGRAPH_SCHEMA => @@ -252,7 +257,8 @@ fn create_test_entity( seconds_age: age * 31557600, weight: Value::BigDecimal(weight.into()), coffee: coffee, - favorite_color: favorite_color + favorite_color: favorite_color, + vid: vid, }; let entity_type = TEST_SUBGRAPH_SCHEMA.entity_type(entity_type).unwrap(); @@ -324,6 +330,7 @@ async fn check_graft( // Make our own entries for block 2 shaq.set("email", "shaq@gmail.com").unwrap(); + shaq.set("vid", 5i64).unwrap(); let op = EntityOperation::Set { key: user_type.parse_key("3").unwrap(), data: shaq, @@ -601,6 +608,7 @@ fn prune() { 157.1, true, Some("red"), + 4, ); transact_and_wait(&store, &src, BLOCKS[5].clone(), vec![user2]) .await diff --git a/store/test-store/tests/postgres/relational.rs b/store/test-store/tests/postgres/relational.rs index fe366b34509..8c47c464fe0 100644 --- a/store/test-store/tests/postgres/relational.rs +++ b/store/test-store/tests/postgres/relational.rs @@ -205,11 +205,13 @@ lazy_static! { bigInt: big_int.clone(), bigIntArray: vec![big_int.clone(), (big_int + 1.into())], color: "yellow", + vid: 0i64, } }; static ref EMPTY_NULLABLESTRINGS_ENTITY: Entity = { entity! { THINGS_SCHEMA => id: "one", + vid: 0i64, } }; static ref SCALAR_TYPE: EntityType = THINGS_SCHEMA.entity_type("Scalar").unwrap(); @@ -318,6 +320,7 @@ fn insert_user_entity( drinks: Option>, visits: i64, block: BlockNumber, + vid: i64, ) { let user = make_user( &layout.input_schema, @@ -330,6 +333,7 @@ fn insert_user_entity( favorite_color, drinks, visits, + vid, ); insert_entity_at(conn, layout, entity_type, vec![user], block); @@ -346,6 +350,7 @@ fn make_user( favorite_color: Option<&str>, drinks: Option>, visits: i64, + vid: i64, ) -> Entity { let favorite_color = favorite_color .map(|s| Value::String(s.to_owned())) @@ -361,7 +366,8 @@ fn make_user( weight: BigDecimal::from(weight), coffee: coffee, favorite_color: favorite_color, - visits: visits + visits: visits, + vid: vid, }; if let Some(drinks) = drinks { user.insert("drinks", drinks.into()).unwrap(); @@ -384,6 +390,7 @@ fn insert_users(conn: &mut PgConnection, layout: &Layout) { None, 60, 0, + 0, ); insert_user_entity( conn, @@ -399,6 +406,7 @@ fn insert_users(conn: &mut PgConnection, layout: &Layout) { Some(vec!["beer", "wine"]), 50, 0, + 1, ); insert_user_entity( conn, @@ -414,6 +422,7 @@ fn insert_users(conn: &mut PgConnection, layout: &Layout) { Some(vec!["coffee", "tea"]), 22, 0, + 2, ); } @@ -431,6 +440,7 @@ fn update_user_entity( drinks: Option>, visits: i64, block: BlockNumber, + vid: i64, ) { let user = make_user( &layout.input_schema, @@ -443,6 +453,7 @@ fn update_user_entity( favorite_color, drinks, visits, + vid, ); update_entity_at(conn, layout, entity_type, vec![user], block); } @@ -454,17 +465,19 @@ fn insert_pet( id: &str, name: &str, block: BlockNumber, + vid: i64, ) { let pet = entity! { layout.input_schema => id: id, - name: name + name: name, + vid: vid, }; insert_entity_at(conn, layout, entity_type, vec![pet], block); } fn insert_pets(conn: &mut PgConnection, layout: &Layout) { - insert_pet(conn, layout, &*DOG_TYPE, "pluto", "Pluto", 0); - insert_pet(conn, layout, &*CAT_TYPE, "garfield", "Garfield", 0); + insert_pet(conn, layout, &*DOG_TYPE, "pluto", "Pluto", 0, 0); + insert_pet(conn, layout, &*CAT_TYPE, "garfield", "Garfield", 0, 1); } fn create_schema(conn: &mut PgConnection) -> Layout { @@ -484,6 +497,7 @@ fn create_schema(conn: &mut PgConnection) -> Layout { fn scrub(entity: &Entity) -> Entity { let mut scrubbed = entity.clone(); scrubbed.remove_null_fields(); + scrubbed.remove("vid"); scrubbed } @@ -597,6 +611,7 @@ fn update() { entity.set("string", "updated").unwrap(); entity.remove("strings"); entity.set("bool", Value::Null).unwrap(); + entity.set("vid", 1i64).unwrap(); let key = SCALAR_TYPE.key(entity.id()); let entity_type = layout.input_schema.entity_type("Scalar").unwrap(); @@ -624,8 +639,10 @@ fn update_many() { let mut one = SCALAR_ENTITY.clone(); let mut two = SCALAR_ENTITY.clone(); two.set("id", "two").unwrap(); + two.set("vid", 1i64).unwrap(); let mut three = SCALAR_ENTITY.clone(); three.set("id", "three").unwrap(); + three.set("vid", 2i64).unwrap(); insert_entity( conn, layout, @@ -647,6 +664,10 @@ fn update_many() { three.remove("strings"); three.set("color", "red").unwrap(); + one.set("vid", 3i64).unwrap(); + two.set("vid", 4i64).unwrap(); + three.set("vid", 5i64).unwrap(); + // generate keys let entity_type = layout.input_schema.entity_type("Scalar").unwrap(); let keys: Vec = ["one", "two", "three"] @@ -713,10 +734,13 @@ fn serialize_bigdecimal() { // Update with overwrite let mut entity = SCALAR_ENTITY.clone(); + let mut vid = 1i64; for d in &["50", "50.00", "5000", "0.5000", "0.050", "0.5", "0.05"] { let d = BigDecimal::from_str(d).unwrap(); entity.set("bigDecimal", d).unwrap(); + entity.set("vid", vid).unwrap(); + vid += 1; let key = SCALAR_TYPE.key(entity.id()); let entity_type = layout.input_schema.entity_type("Scalar").unwrap(); @@ -734,6 +758,7 @@ fn serialize_bigdecimal() { ) .expect("Failed to read Scalar[one]") .unwrap(); + entity.remove("vid"); assert_entity_eq!(entity, actual); } }); @@ -761,6 +786,7 @@ fn delete() { insert_entity(conn, layout, &*SCALAR_TYPE, vec![SCALAR_ENTITY.clone()]); let mut two = SCALAR_ENTITY.clone(); two.set("id", "two").unwrap(); + two.set("vid", 1i64).unwrap(); insert_entity(conn, layout, &*SCALAR_TYPE, vec![two]); // Delete where nothing is getting deleted @@ -795,8 +821,10 @@ fn insert_many_and_delete_many() { let one = SCALAR_ENTITY.clone(); let mut two = SCALAR_ENTITY.clone(); two.set("id", "two").unwrap(); + two.set("vid", 1i64).unwrap(); let mut three = SCALAR_ENTITY.clone(); three.set("id", "three").unwrap(); + three.set("vid", 2i64).unwrap(); insert_entity(conn, layout, &*SCALAR_TYPE, vec![one, two, three]); // confidence test: there should be 3 scalar entities in store right now @@ -877,6 +905,7 @@ fn conflicting_entity() { cat: &str, dog: &str, ferret: &str, + vid: i64, ) { let conflicting = |conn: &mut PgConnection, entity_type: &EntityType, types: Vec<&EntityType>| { @@ -891,6 +920,7 @@ fn conflicting_entity() { data: fred, block: 2, end: None, + vid: 0, }, 2, ) @@ -902,7 +932,7 @@ fn conflicting_entity() { let dog_type = layout.input_schema.entity_type(dog).unwrap(); let ferret_type = layout.input_schema.entity_type(ferret).unwrap(); - let fred = entity! { layout.input_schema => id: id.clone(), name: id.clone() }; + let fred = entity! { layout.input_schema => id: id.clone(), name: id.clone(), vid: vid }; insert_entity(conn, layout, &cat_type, vec![fred]); // If we wanted to create Fred the dog, which is forbidden, we'd run this: @@ -916,10 +946,10 @@ fn conflicting_entity() { run_test(|mut conn, layout| { let id = Value::String("fred".to_string()); - check(&mut conn, layout, id, "Cat", "Dog", "Ferret"); + check(&mut conn, layout, id, "Cat", "Dog", "Ferret", 0); let id = Value::Bytes(scalar::Bytes::from_str("0xf1ed").unwrap()); - check(&mut conn, layout, id, "ByteCat", "ByteDog", "ByteFerret"); + check(&mut conn, layout, id, "ByteCat", "ByteDog", "ByteFerret", 1); }) } @@ -931,7 +961,8 @@ fn revert_block() { let set_fred = |conn: &mut PgConnection, name, block| { let fred = entity! { layout.input_schema => id: id, - name: name + name: name, + vid: block as i64, }; if block == 0 { insert_entity_at(conn, layout, &*CAT_TYPE, vec![fred], block); @@ -971,6 +1002,7 @@ fn revert_block() { let marty = entity! { layout.input_schema => id: id, order: block, + vid: (block + 10) as i64 }; insert_entity_at(conn, layout, &*MINK_TYPE, vec![marty], block); } @@ -1049,6 +1081,7 @@ impl<'a> QueryChecker<'a> { None, 23, 0, + 3, ); insert_pets(conn, layout); @@ -1161,6 +1194,7 @@ fn check_block_finds() { None, 55, 1, + 4, ); checker @@ -1703,10 +1737,10 @@ struct FilterChecker<'a> { impl<'a> FilterChecker<'a> { fn new(conn: &'a mut PgConnection, layout: &'a Layout) -> Self { let (a1, a2, a2b, a3) = ferrets(); - insert_pet(conn, layout, &*FERRET_TYPE, "a1", &a1, 0); - insert_pet(conn, layout, &*FERRET_TYPE, "a2", &a2, 0); - insert_pet(conn, layout, &*FERRET_TYPE, "a2b", &a2b, 0); - insert_pet(conn, layout, &*FERRET_TYPE, "a3", &a3, 0); + insert_pet(conn, layout, &*FERRET_TYPE, "a1", &a1, 0, 0); + insert_pet(conn, layout, &*FERRET_TYPE, "a2", &a2, 0, 1); + insert_pet(conn, layout, &*FERRET_TYPE, "a2b", &a2b, 0, 2); + insert_pet(conn, layout, &*FERRET_TYPE, "a3", &a3, 0, 3); Self { conn, layout } } @@ -1850,7 +1884,8 @@ fn check_filters() { &*FERRET_TYPE, vec![entity! { layout.input_schema => id: "a1", - name: "Test" + name: "Test", + vid: 5i64 }], 1, ); diff --git a/store/test-store/tests/postgres/relational_bytes.rs b/store/test-store/tests/postgres/relational_bytes.rs index b7b8f36b7d7..41aa79bf9b7 100644 --- a/store/test-store/tests/postgres/relational_bytes.rs +++ b/store/test-store/tests/postgres/relational_bytes.rs @@ -57,6 +57,7 @@ lazy_static! { static ref BEEF_ENTITY: Entity = entity! { THINGS_SCHEMA => id: scalar::Bytes::from_str("deadbeef").unwrap(), name: "Beef", + vid: 0i64 }; static ref NAMESPACE: Namespace = Namespace::new("sgd0815".to_string()).unwrap(); static ref THING_TYPE: EntityType = THINGS_SCHEMA.entity_type("Thing").unwrap(); @@ -83,8 +84,9 @@ pub fn row_group_update( ) -> RowGroup { let mut group = RowGroup::new(entity_type.clone(), false); for (key, data) in data { + let vid = data.vid(); group - .push(EntityModification::overwrite(key, data, block), block) + .push(EntityModification::overwrite(key, data, block, vid), block) .unwrap(); } group @@ -97,8 +99,9 @@ pub fn row_group_insert( ) -> RowGroup { let mut group = RowGroup::new(entity_type.clone(), false); for (key, data) in data { + let vid = data.vid(); group - .push(EntityModification::insert(key, data, block), block) + .push(EntityModification::insert(key, data, block, vid), block) .unwrap(); } group @@ -128,14 +131,15 @@ fn insert_entity(conn: &mut PgConnection, layout: &Layout, entity_type: &str, en layout.insert(conn, &group, &MOCK_STOPWATCH).expect(&errmsg); } -fn insert_thing(conn: &mut PgConnection, layout: &Layout, id: &str, name: &str) { +fn insert_thing(conn: &mut PgConnection, layout: &Layout, id: &str, name: &str, vid: i64) { insert_entity( conn, layout, "Thing", entity! { layout.input_schema => id: id, - name: name + name: name, + vid: vid, }, ); } @@ -158,6 +162,7 @@ fn create_schema(conn: &mut PgConnection) -> Layout { fn scrub(entity: &Entity) -> Entity { let mut scrubbed = entity.clone(); scrubbed.remove_null_fields(); + scrubbed.remove("vid"); scrubbed } @@ -265,7 +270,7 @@ fn find() { const ID: &str = "deadbeef"; const NAME: &str = "Beef"; - insert_thing(&mut conn, layout, ID, NAME); + insert_thing(&mut conn, layout, ID, NAME, 0); // Happy path: find existing entity let entity = find_entity(conn, layout, ID).unwrap(); @@ -285,8 +290,8 @@ fn find_many() { const NAME: &str = "Beef"; const ID2: &str = "0xdeadbeef02"; const NAME2: &str = "Moo"; - insert_thing(&mut conn, layout, ID, NAME); - insert_thing(&mut conn, layout, ID2, NAME2); + insert_thing(&mut conn, layout, ID, NAME, 0); + insert_thing(&mut conn, layout, ID2, NAME2, 1); let mut id_map = BTreeMap::default(); let ids = IdList::try_from_iter( @@ -318,6 +323,7 @@ fn update() { // Update the entity let mut entity = BEEF_ENTITY.clone(); entity.set("name", "Moo").unwrap(); + entity.set("vid", 1i64).unwrap(); let key = THING_TYPE.key(entity.id()); let entity_id = entity.id(); @@ -333,7 +339,7 @@ fn update() { .expect("Failed to read Thing[deadbeef]") .unwrap(); - assert_entity_eq!(entity, actual); + assert_entity_eq!(scrub(&entity), actual); }); } @@ -345,6 +351,7 @@ fn delete() { insert_entity(&mut conn, layout, "Thing", BEEF_ENTITY.clone()); let mut two = BEEF_ENTITY.clone(); two.set("id", TWO_ID).unwrap(); + two.set("vid", 1i64).unwrap(); insert_entity(&mut conn, layout, "Thing", two); // Delete where nothing is getting deleted @@ -392,29 +399,34 @@ fn make_thing_tree(conn: &mut PgConnection, layout: &Layout) -> (Entity, Entity, let root = entity! { layout.input_schema => id: ROOT, name: "root", - children: vec!["babe01", "babe02"] + children: vec!["babe01", "babe02"], + vid: 0i64, }; let child1 = entity! { layout.input_schema => id: CHILD1, name: "child1", parent: "dead00", - children: vec![GRANDCHILD1] + children: vec![GRANDCHILD1], + vid: 1i64, }; let child2 = entity! { layout.input_schema => id: CHILD2, name: "child2", parent: "dead00", - children: vec![GRANDCHILD1] + children: vec![GRANDCHILD1], + vid: 2i64, }; let grand_child1 = entity! { layout.input_schema => id: GRANDCHILD1, name: "grandchild1", - parent: CHILD1 + parent: CHILD1, + vid: 3i64, }; let grand_child2 = entity! { layout.input_schema => id: GRANDCHILD2, name: "grandchild2", - parent: CHILD2 + parent: CHILD2, + vid: 4i64, }; insert_entity(conn, layout, "Thing", root.clone()); diff --git a/store/test-store/tests/postgres/store.rs b/store/test-store/tests/postgres/store.rs index aba953975a3..b73efa87ad8 100644 --- a/store/test-store/tests/postgres/store.rs +++ b/store/test-store/tests/postgres/store.rs @@ -200,6 +200,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 184.4, false, None, + 0, ); transact_entity_operations( &store, @@ -219,6 +220,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 159.1, true, Some("red"), + 1, ); let test_entity_3_1 = create_test_entity( "3", @@ -229,6 +231,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 111.7, false, Some("blue"), + 2, ); transact_entity_operations( &store, @@ -248,6 +251,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 111.7, false, None, + 3, ); transact_and_wait( &store, @@ -271,6 +275,7 @@ fn create_test_entity( weight: f64, coffee: bool, favorite_color: Option<&str>, + vid: i64, ) -> EntityOperation { let bin_name = scalar::Bytes::from_str(&hex::encode(name)).unwrap(); let test_entity = entity! { TEST_SUBGRAPH_SCHEMA => @@ -283,6 +288,7 @@ fn create_test_entity( weight: Value::BigDecimal(weight.into()), coffee: coffee, favorite_color: favorite_color, + vid: vid, }; EntityOperation::Set { @@ -397,6 +403,7 @@ fn insert_entity() { 111.7, true, Some("green"), + 5, ); let count = get_entity_count(store.clone(), &deployment.hash); transact_and_wait( @@ -428,6 +435,7 @@ fn update_existing() { 111.7, true, Some("green"), + 6, ); let mut new_data = match op { EntityOperation::Set { ref data, .. } => data.clone(), @@ -466,7 +474,8 @@ fn partially_update_existing() { let entity_key = USER_TYPE.parse_key("1").unwrap(); let schema = writable.input_schema(); - let partial_entity = entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null }; + let partial_entity = + entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null, vid: 11i64 }; let original_entity = writable .get(&entity_key) @@ -1076,7 +1085,8 @@ fn revert_block_with_partial_update() { let entity_key = USER_TYPE.parse_key("1").unwrap(); let schema = writable.input_schema(); - let partial_entity = entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null }; + let partial_entity = + entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null, vid: 5i64 }; let original_entity = writable.get(&entity_key).unwrap().expect("missing entity"); @@ -1171,7 +1181,8 @@ fn revert_block_with_dynamic_data_source_operations() { // Create operations to add a user let user_key = USER_TYPE.parse_key("1").unwrap(); - let partial_entity = entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null }; + let partial_entity = + entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null, vid: 5i64 }; // Get the original user for comparisons let original_user = writable.get(&user_key).unwrap().expect("missing entity"); @@ -1290,9 +1301,12 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { let added_entities = vec![ ( "1".to_owned(), - entity! { schema => id: "1", name: "Johnny Boy" }, + entity! { schema => id: "1", name: "Johnny Boy", vid: 5i64 }, + ), + ( + "2".to_owned(), + entity! { schema => id: "2", name: "Tessa", vid: 6i64 }, ), - ("2".to_owned(), entity! { schema => id: "2", name: "Tessa" }), ]; transact_and_wait( &store.subgraph_store(), @@ -1310,7 +1324,7 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { .unwrap(); // Update an entity in the store - let updated_entity = entity! { schema => id: "1", name: "Johnny" }; + let updated_entity = entity! { schema => id: "1", name: "Johnny", vid: 7i64 }; let update_op = EntityOperation::Set { key: USER_TYPE.parse_key("1").unwrap(), data: updated_entity.clone(), @@ -1386,6 +1400,7 @@ fn throttle_subscription_delivers() { 120.7, false, None, + 7, ); transact_entity_operations( @@ -1431,6 +1446,7 @@ fn throttle_subscription_throttles() { 120.7, false, None, + 8, ); transact_entity_operations( @@ -1504,12 +1520,13 @@ fn handle_large_string_with_index() { name: &str, schema: &InputSchema, block: BlockNumber, + vid: i64, ) -> EntityModification { - let data = entity! { schema => id: id, name: name }; + let data = entity! { schema => id: id, name: name, vid: vid }; let key = USER_TYPE.parse_key(id).unwrap(); - EntityModification::insert(key, data, block) + EntityModification::insert(key, data, block, vid) } run_test(|store, writable, deployment| async move { @@ -1538,8 +1555,8 @@ fn handle_large_string_with_index() { BlockTime::for_test(&*TEST_BLOCK_3_PTR), FirehoseCursor::None, vec![ - make_insert_op(ONE, &long_text, &schema, block), - make_insert_op(TWO, &other_text, &schema, block), + make_insert_op(ONE, &long_text, &schema, block, 11), + make_insert_op(TWO, &other_text, &schema, block, 12), ], &stopwatch_metrics, Vec::new(), @@ -1603,12 +1620,13 @@ fn handle_large_bytea_with_index() { name: &[u8], schema: &InputSchema, block: BlockNumber, + vid: i64, ) -> EntityModification { - let data = entity! { schema => id: id, bin_name: scalar::Bytes::from(name) }; + let data = entity! { schema => id: id, bin_name: scalar::Bytes::from(name), vid: vid }; let key = USER_TYPE.parse_key(id).unwrap(); - EntityModification::insert(key, data, block) + EntityModification::insert(key, data, block, vid) } run_test(|store, writable, deployment| async move { @@ -1642,8 +1660,8 @@ fn handle_large_bytea_with_index() { BlockTime::for_test(&*TEST_BLOCK_3_PTR), FirehoseCursor::None, vec![ - make_insert_op(ONE, &long_bytea, &schema, block), - make_insert_op(TWO, &other_bytea, &schema, block), + make_insert_op(ONE, &long_bytea, &schema, block, 10), + make_insert_op(TWO, &other_bytea, &schema, block, 11), ], &stopwatch_metrics, Vec::new(), @@ -1811,8 +1829,10 @@ fn window() { id: &str, color: &str, age: i32, + vid: i64, ) -> EntityOperation { - let entity = entity! { TEST_SUBGRAPH_SCHEMA => id: id, age: age, favorite_color: color }; + let entity = + entity! { TEST_SUBGRAPH_SCHEMA => id: id, age: age, favorite_color: color, vid: vid }; EntityOperation::Set { key: entity_type.parse_key(id).unwrap(), @@ -1820,25 +1840,25 @@ fn window() { } } - fn make_user(id: &str, color: &str, age: i32) -> EntityOperation { - make_color_and_age(&*USER_TYPE, id, color, age) + fn make_user(id: &str, color: &str, age: i32, vid: i64) -> EntityOperation { + make_color_and_age(&*USER_TYPE, id, color, age, vid) } - fn make_person(id: &str, color: &str, age: i32) -> EntityOperation { - make_color_and_age(&*PERSON_TYPE, id, color, age) + fn make_person(id: &str, color: &str, age: i32, vid: i64) -> EntityOperation { + make_color_and_age(&*PERSON_TYPE, id, color, age, vid) } let ops = vec![ - make_user("4", "green", 34), - make_user("5", "green", 17), - make_user("6", "green", 41), - make_user("7", "red", 25), - make_user("8", "red", 45), - make_user("9", "yellow", 37), - make_user("10", "blue", 27), - make_user("11", "blue", 19), - make_person("p1", "green", 12), - make_person("p2", "red", 15), + make_user("4", "green", 34, 11), + make_user("5", "green", 17, 12), + make_user("6", "green", 41, 13), + make_user("7", "red", 25, 14), + make_user("8", "red", 45, 15), + make_user("9", "yellow", 37, 16), + make_user("10", "blue", 27, 17), + make_user("11", "blue", 19, 18), + make_person("p1", "green", 12, 19), + make_person("p2", "red", 15, 20), ]; run_test(|store, _, deployment| async move { @@ -2076,6 +2096,7 @@ fn reorg_tracking() { deployment: &DeploymentLocator, age: i32, block: &BlockPtr, + vid: i64, ) { let test_entity_1 = create_test_entity( "1", @@ -2086,6 +2107,7 @@ fn reorg_tracking() { 184.4, false, None, + vid, ); transact_and_wait(store, deployment, block.clone(), vec![test_entity_1]) .await @@ -2136,15 +2158,15 @@ fn reorg_tracking() { check_state!(store, 2, 2, 2); // Forward to block 3 - update_john(&subgraph_store, &deployment, 70, &TEST_BLOCK_3_PTR).await; + update_john(&subgraph_store, &deployment, 70, &TEST_BLOCK_3_PTR, 30).await; check_state!(store, 2, 2, 3); // Forward to block 4 - update_john(&subgraph_store, &deployment, 71, &TEST_BLOCK_4_PTR).await; + update_john(&subgraph_store, &deployment, 71, &TEST_BLOCK_4_PTR, 40).await; check_state!(store, 2, 2, 4); // Forward to block 5 - update_john(&subgraph_store, &deployment, 72, &TEST_BLOCK_5_PTR).await; + update_john(&subgraph_store, &deployment, 72, &TEST_BLOCK_5_PTR, 50).await; check_state!(store, 2, 2, 5); // Revert all the way back to block 2 diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index df04615898a..4b231d556b5 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -114,7 +114,8 @@ fn count_key(id: &str) -> EntityKey { async fn insert_count(store: &Arc, deployment: &DeploymentLocator, count: u8) { let data = entity! { TEST_SUBGRAPH_SCHEMA => id: "1", - count: count as i32 + count: count as i32, + vid: count as i64, }; let entity_op = EntityOperation::Set { key: count_key(&data.get("id").unwrap().to_string()), @@ -245,7 +246,7 @@ fn restart() { // Cause an error by leaving out the non-nullable `count` attribute let entity_ops = vec![EntityOperation::Set { key: count_key("1"), - data: entity! { schema => id: "1" }, + data: entity! { schema => id: "1", vid: 0i64 }, }]; transact_entity_operations( &subgraph_store, @@ -269,7 +270,7 @@ fn restart() { // Retry our write with correct data let entity_ops = vec![EntityOperation::Set { key: count_key("1"), - data: entity! { schema => id: "1", count: 1 }, + data: entity! { schema => id: "1", count: 1, vid: 0i64 }, }]; // `SubgraphStore` caches the correct writable so that this call // uses the restarted writable, and is equivalent to using