Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ impl EntityCache {
for (key, update) in self.updates {
use EntityModification::*;

let is_poi = key.entity_type.is_poi();
let current = self.current.remove(&key).and_then(|entity| entity);
let modification = match (current, update) {
// Entity was created
Expand All @@ -461,11 +462,13 @@ impl EntityCache {
updates.remove_null_fields();
let data = Arc::new(updates);
self.current.insert(key.clone(), Some(data.cheap_clone()));
let vid = if is_poi { 0 } else { data.vid() };
Some(Insert {
key,
data,
block,
end: None,
vid,
})
}
// Entity may have been changed
Expand All @@ -476,11 +479,13 @@ impl EntityCache {
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
let vid = if is_poi { 0 } else { data.vid() };
Some(Overwrite {
key,
data,
block,
end: None,
vid,
})
} else {
None
Expand All @@ -491,11 +496,13 @@ impl EntityCache {
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.clone()));
if current != data {
let vid = if is_poi { 0 } else { data.vid() };
Some(Overwrite {
key,
data,
block,
end: None,
vid,
})
} else {
None
Expand Down
20 changes: 18 additions & 2 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ pub enum EntityModification {
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Update the entity by overwriting it
Overwrite {
key: EntityKey,
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Remove the entity
Remove { key: EntityKey, block: BlockNumber },
Expand All @@ -67,6 +69,7 @@ pub struct EntityWrite<'a> {
// The end of the block range for which this write is valid. The value
// of `end` itself is not included in the range
pub end: Option<BlockNumber>,
pub vid: i64,
}

impl std::fmt::Display for EntityWrite<'_> {
Expand All @@ -89,24 +92,28 @@ impl<'a> TryFrom<&'a EntityModification> for EntityWrite<'a> {
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),
EntityModification::Overwrite {
key,
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: &data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),

EntityModification::Remove { .. } => Err(()),
Expand Down Expand Up @@ -213,11 +220,13 @@ impl EntityModification {
data,
block,
end,
vid,
} => Ok(Insert {
key,
data,
block,
end,
vid,
}),
Remove { key, .. } => {
return Err(constraint_violation!(
Expand Down Expand Up @@ -271,21 +280,23 @@ impl EntityModification {
}

impl EntityModification {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
EntityModification::Insert {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
EntityModification::Overwrite {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

Expand Down Expand Up @@ -1017,31 +1028,36 @@ mod test {

let value = value.clone();
let key = THING_TYPE.parse_key("one").unwrap();
let vid = 0;
match value {
Ins(block) => EntityModification::Insert {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: None,
vid,
},
Ovw(block) => EntityModification::Overwrite {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: None,
vid,
},
Rem(block) => EntityModification::Remove { key, block },
InsC(block, end) => EntityModification::Insert {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: Some(end),
vid,
},
OvwC(block, end) => EntityModification::Overwrite {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: Some(end),
vid,
},
}
}
Expand Down
9 changes: 9 additions & 0 deletions graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub struct BlockState {
// data source that have been processed.
pub processed_data_sources: Vec<StoredDynamicDataSource>,

pub vid_seq: i32,

// Marks whether a handler is currently executing.
in_handler: bool,

Expand All @@ -92,6 +94,7 @@ impl BlockState {
persisted_data_sources: Vec::new(),
handler_created_data_sources: Vec::new(),
processed_data_sources: Vec::new(),
vid_seq: 0,
in_handler: false,
metrics: BlockStateMetrics::new(),
}
Expand All @@ -109,6 +112,7 @@ impl BlockState {
persisted_data_sources,
handler_created_data_sources,
processed_data_sources,
vid_seq: _,
in_handler,
metrics,
} = self;
Expand Down Expand Up @@ -178,4 +182,9 @@ impl BlockState {
pub fn persist_data_source(&mut self, ds: StoredDynamicDataSource) {
self.persisted_data_sources.push(ds)
}
pub fn next_vid(&mut self, block_number: BlockNumber) -> i64 {
let vid = ((block_number as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
vid
}
}
10 changes: 10 additions & 0 deletions graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,9 @@ where
lazy_static! {
/// The name of the id attribute, `"id"`
pub static ref ID: Word = Word::from("id");

/// The name of the vid attribute, `"vid"`
pub static ref VID: Word = Word::from("vid");
}

/// An entity is represented as a map of attribute names to values.
Expand Down Expand Up @@ -910,6 +913,13 @@ impl Entity {
Id::try_from(self.get("id").unwrap().clone()).expect("the id is set to a valid value")
}

pub fn vid(&self) -> i64 {
self.get("vid")
.expect("the vid is set")
.as_int8()
.expect("the vid is set to a valid value")
}

/// Merges an entity update `update` into this entity.
///
/// If a key exists in both entities, the value from `update` is chosen.
Expand Down
7 changes: 7 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub(crate) const POI_OBJECT: &str = "Poi$";
const POI_DIGEST: &str = "digest";
/// The name of the PoI attribute for storing the block time
const POI_BLOCK_TIME: &str = "blockTime";
const VID: &str = "vid";

pub mod kw {
pub const ENTITY: &str = "entity";
Expand Down Expand Up @@ -1487,6 +1488,10 @@ impl InputSchema {
}

pub fn has_field_with_name(&self, entity_type: &EntityType, field: &str) -> bool {
// TODO: check if it is needed
if field == VID {
return true;
}
let field = self.inner.pool.lookup(field);

match field {
Expand Down Expand Up @@ -1597,6 +1602,8 @@ fn atom_pool(document: &s::Document) -> AtomPool {
pool.intern(POI_DIGEST);
pool.intern(POI_BLOCK_TIME);

pool.intern(VID);

for definition in &document.definitions {
match definition {
s::Definition::TypeDefinition(typedef) => match typedef {
Expand Down
17 changes: 9 additions & 8 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,17 +477,18 @@ async fn test_ipfs_block() {
// The user_data value we use with calls to ipfs_map
const USER_DATA: &str = "user_data";

fn make_thing(id: &str, value: &str) -> (String, EntityModification) {
const DOCUMENT: &str = " type Thing @entity { id: String!, value: String!, extra: String }";
fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) {
const DOCUMENT: &str =
" type Thing @entity { id: String!, value: String!, extra: String, vid: Int8 }";
lazy_static! {
static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter");
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
}
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA };
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid:vid };
let key = THING_TYPE.parse_key(id).unwrap();
(
format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value),
EntityModification::insert(key, data, 0),
EntityModification::insert(key, data, 0, vid),
)
}

Expand Down Expand Up @@ -553,8 +554,8 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) {
let subgraph_id = "ipfsMap";

// Try it with two valid objects
let (str1, thing1) = make_thing("one", "eins");
let (str2, thing2) = make_thing("two", "zwei");
let (str1, thing1) = make_thing("one", "eins", 0);
let (str2, thing2) = make_thing("two", "zwei", 0);
let ops = run_ipfs_map(
subgraph_id,
format!("{}\n{}", str1, str2),
Expand Down Expand Up @@ -1001,8 +1002,8 @@ async fn test_entity_store(api_version: Version) {

let schema = store.input_schema(&deployment.hash).unwrap();

let alex = entity! { schema => id: "alex", name: "Alex" };
let steve = entity! { schema => id: "steve", name: "Steve" };
let alex = entity! { schema => id: "alex", name: "Alex", vid: 0i64};
let steve = entity! { schema => id: "steve", name: "Steve", vid: 1i64};
let user_type = schema.entity_type("User").unwrap();
test_store::insert_entities(
&deployment,
Expand Down
2 changes: 2 additions & 0 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ impl HostExports {
gas: &GasCounter,
) -> Result<(), HostExportError> {
let entity_type = state.entity_cache.schema.entity_type(&entity_type)?;
let vid = state.next_vid(block);

Self::expect_object_type(&entity_type, "set")?;

Expand Down Expand Up @@ -314,6 +315,7 @@ impl HostExports {
data.insert(store::ID.clone(), value);
}
}
data.insert(store::VID.clone(), Value::Int8(vid));

self.check_invalid_fields(
self.data_source.api_version.clone(),
Expand Down
12 changes: 10 additions & 2 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,26 @@ impl Table {
Ok(cols)
}

let vid_type = if self.object.is_poi() || !self.object.is_object_type() {
"bigserial"
} else {
"bigint"
};

if self.immutable {
writeln!(
out,
"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block} int not null,\n\
{cols},
unique({id})
);",
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block = BLOCK_COLUMN,
id = self.primary_key().name
)
Expand All @@ -137,13 +144,14 @@ impl Table {
out,
r#"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block_range} int4range not null,
{cols}
);"#,
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block_range = BLOCK_RANGE_COLUMN
)?;

Expand Down
Loading
Loading