Skip to content

Commit 5049ac7

Browse files
author
Zoran Cvetkov
committed
add vid
1 parent 90e949d commit 5049ac7

File tree

8 files changed

+79
-4
lines changed

8 files changed

+79
-4
lines changed

graph/src/components/store/entity_cache.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,11 +461,13 @@ impl EntityCache {
461461
updates.remove_null_fields();
462462
let data = Arc::new(updates);
463463
self.current.insert(key.clone(), Some(data.cheap_clone()));
464+
let vid = data.vid_opt().unwrap_or_default();
464465
Some(Insert {
465466
key,
466467
data,
467468
block,
468469
end: None,
470+
vid,
469471
})
470472
}
471473
// Entity may have been changed
@@ -476,11 +478,13 @@ impl EntityCache {
476478
let data = Arc::new(data);
477479
self.current.insert(key.clone(), Some(data.cheap_clone()));
478480
if current != data {
481+
let vid = data.vid_opt().unwrap_or_default();
479482
Some(Overwrite {
480483
key,
481484
data,
482485
block,
483486
end: None,
487+
vid,
484488
})
485489
} else {
486490
None
@@ -491,11 +495,13 @@ impl EntityCache {
491495
let data = Arc::new(data);
492496
self.current.insert(key.clone(), Some(data.clone()));
493497
if current != data {
498+
let vid = data.vid();
494499
Some(Overwrite {
495500
key,
496501
data,
497502
block,
498503
end: None,
504+
vid,
499505
})
500506
} else {
501507
None

graph/src/components/store/write.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,15 @@ pub enum EntityModification {
4545
data: Arc<Entity>,
4646
block: BlockNumber,
4747
end: Option<BlockNumber>,
48+
vid: i64,
4849
},
4950
/// Update the entity by overwriting it
5051
Overwrite {
5152
key: EntityKey,
5253
data: Arc<Entity>,
5354
block: BlockNumber,
5455
end: Option<BlockNumber>,
56+
vid: i64,
5557
},
5658
/// Remove the entity
5759
Remove { key: EntityKey, block: BlockNumber },
@@ -67,6 +69,7 @@ pub struct EntityWrite<'a> {
6769
// The end of the block range for which this write is valid. The value
6870
// of `end` itself is not included in the range
6971
pub end: Option<BlockNumber>,
72+
pub vid: i64,
7073
}
7174

7275
impl std::fmt::Display for EntityWrite<'_> {
@@ -89,24 +92,28 @@ impl<'a> TryFrom<&'a EntityModification> for EntityWrite<'a> {
8992
data,
9093
block,
9194
end,
95+
vid,
9296
} => Ok(EntityWrite {
9397
id: &key.entity_id,
9498
entity: data,
9599
causality_region: key.causality_region,
96100
block: *block,
97101
end: *end,
102+
vid: *vid,
98103
}),
99104
EntityModification::Overwrite {
100105
key,
101106
data,
102107
block,
103108
end,
109+
vid,
104110
} => Ok(EntityWrite {
105111
id: &key.entity_id,
106112
entity: &data,
107113
causality_region: key.causality_region,
108114
block: *block,
109115
end: *end,
116+
vid: *vid,
110117
}),
111118

112119
EntityModification::Remove { .. } => Err(()),
@@ -213,11 +220,13 @@ impl EntityModification {
213220
data,
214221
block,
215222
end,
223+
vid,
216224
} => Ok(Insert {
217225
key,
218226
data,
219227
block,
220228
end,
229+
vid,
221230
}),
222231
Remove { key, .. } => {
223232
return Err(constraint_violation!(
@@ -271,21 +280,23 @@ impl EntityModification {
271280
}
272281

273282
impl EntityModification {
274-
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
283+
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
275284
EntityModification::Insert {
276285
key,
277286
data: Arc::new(data),
278287
block,
279288
end: None,
289+
vid,
280290
}
281291
}
282292

283-
pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
293+
pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
284294
EntityModification::Overwrite {
285295
key,
286296
data: Arc::new(data),
287297
block,
288298
end: None,
299+
vid,
289300
}
290301
}
291302

@@ -1017,31 +1028,36 @@ mod test {
10171028

10181029
let value = value.clone();
10191030
let key = THING_TYPE.parse_key("one").unwrap();
1031+
let vid = 0;
10201032
match value {
10211033
Ins(block) => EntityModification::Insert {
10221034
key,
10231035
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
10241036
block,
10251037
end: None,
1038+
vid,
10261039
},
10271040
Ovw(block) => EntityModification::Overwrite {
10281041
key,
10291042
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
10301043
block,
10311044
end: None,
1045+
vid,
10321046
},
10331047
Rem(block) => EntityModification::Remove { key, block },
10341048
InsC(block, end) => EntityModification::Insert {
10351049
key,
10361050
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
10371051
block,
10381052
end: Some(end),
1053+
vid,
10391054
},
10401055
OvwC(block, end) => EntityModification::Overwrite {
10411056
key,
10421057
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
10431058
block,
10441059
end: Some(end),
1060+
vid,
10451061
},
10461062
}
10471063
}

graph/src/components/subgraph/instance.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub struct BlockState {
7777
// data source that have been processed.
7878
pub processed_data_sources: Vec<StoredDynamicDataSource>,
7979

80+
pub vid_seq: i32,
81+
8082
// Marks whether a handler is currently executing.
8183
in_handler: bool,
8284

@@ -92,6 +94,7 @@ impl BlockState {
9294
persisted_data_sources: Vec::new(),
9395
handler_created_data_sources: Vec::new(),
9496
processed_data_sources: Vec::new(),
97+
vid_seq: 0,
9598
in_handler: false,
9699
metrics: BlockStateMetrics::new(),
97100
}
@@ -109,6 +112,7 @@ impl BlockState {
109112
persisted_data_sources,
110113
handler_created_data_sources,
111114
processed_data_sources,
115+
vid_seq: _,
112116
in_handler,
113117
metrics,
114118
} = self;
@@ -178,4 +182,9 @@ impl BlockState {
178182
pub fn persist_data_source(&mut self, ds: StoredDynamicDataSource) {
179183
self.persisted_data_sources.push(ds)
180184
}
185+
pub fn next_vid(&mut self, block_number: BlockNumber) -> i64 {
186+
let vid = ((block_number as i64) << 32) + self.vid_seq as i64;
187+
self.vid_seq += 1;
188+
vid
189+
}
181190
}

graph/src/data/store/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,9 @@ where
735735
lazy_static! {
736736
/// The name of the id attribute, `"id"`
737737
pub static ref ID: Word = Word::from("id");
738+
739+
/// The name of the vid attribute, `"vid"`
740+
pub static ref VID: Word = Word::from("vid");
738741
}
739742

740743
/// An entity is represented as a map of attribute names to values.
@@ -910,6 +913,17 @@ impl Entity {
910913
Id::try_from(self.get("id").unwrap().clone()).expect("the id is set to a valid value")
911914
}
912915

916+
pub fn vid(&self) -> i64 {
917+
self.get("vid")
918+
.expect("the vid is set")
919+
.as_int8()
920+
.expect("the vid is set to a valid value")
921+
}
922+
923+
pub fn vid_opt(&self) -> Option<i64> {
924+
self.get("vid").map(|vid| vid.as_int8()).unwrap_or_default()
925+
}
926+
913927
/// Merges an entity update `update` into this entity.
914928
///
915929
/// If a key exists in both entities, the value from `update` is chosen.

graph/src/schema/input/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub(crate) const POI_OBJECT: &str = "Poi$";
3535
const POI_DIGEST: &str = "digest";
3636
/// The name of the PoI attribute for storing the block time
3737
const POI_BLOCK_TIME: &str = "blockTime";
38+
const VID: &str = "vid";
3839

3940
pub mod kw {
4041
pub const ENTITY: &str = "entity";
@@ -1487,6 +1488,9 @@ impl InputSchema {
14871488
}
14881489

14891490
pub fn has_field_with_name(&self, entity_type: &EntityType, field: &str) -> bool {
1491+
if field == VID {
1492+
return true;
1493+
}
14901494
let field = self.inner.pool.lookup(field);
14911495

14921496
match field {
@@ -1597,6 +1601,8 @@ fn atom_pool(document: &s::Document) -> AtomPool {
15971601
pool.intern(POI_DIGEST);
15981602
pool.intern(POI_BLOCK_TIME);
15991603

1604+
pool.intern(VID);
1605+
16001606
for definition in &document.definitions {
16011607
match definition {
16021608
s::Definition::TypeDefinition(typedef) => match typedef {

runtime/wasm/src/host_exports.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ impl HostExports {
248248
gas: &GasCounter,
249249
) -> Result<(), HostExportError> {
250250
let entity_type = state.entity_cache.schema.entity_type(&entity_type)?;
251+
let vid = state.next_vid(block);
251252

252253
Self::expect_object_type(&entity_type, "set")?;
253254

@@ -314,6 +315,7 @@ impl HostExports {
314315
data.insert(store::ID.clone(), value);
315316
}
316317
}
318+
data.insert(store::VID.clone(), Value::Int8(vid));
317319

318320
self.check_invalid_fields(
319321
self.data_source.api_version.clone(),

store/postgres/src/relational/ddl.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{
44
};
55

66
use graph::{
7+
data::subgraph::schema::POI_TABLE,
78
prelude::{BLOCK_NUMBER_MAX, ENV_VARS},
89
schema::InputSchema,
910
};
@@ -116,19 +117,26 @@ impl Table {
116117
Ok(cols)
117118
}
118119

120+
let vid_type = if self.name.as_str() == POI_TABLE {
121+
"bigserial"
122+
} else {
123+
"bigint"
124+
};
125+
119126
if self.immutable {
120127
writeln!(
121128
out,
122129
"
123130
create table {qname} (
124-
{vid} bigserial primary key,
131+
{vid} {vid_type} primary key,
125132
{block} int not null,\n\
126133
{cols},
127134
unique({id})
128135
);",
129136
qname = self.qualified_name,
130137
cols = columns_ddl(self)?,
131138
vid = VID_COLUMN,
139+
vid_type = vid_type,
132140
block = BLOCK_COLUMN,
133141
id = self.primary_key().name
134142
)
@@ -137,13 +145,14 @@ impl Table {
137145
out,
138146
r#"
139147
create table {qname} (
140-
{vid} bigserial primary key,
148+
{vid} {vid_type} primary key,
141149
{block_range} int4range not null,
142150
{cols}
143151
);"#,
144152
qname = self.qualified_name,
145153
cols = columns_ddl(self)?,
146154
vid = VID_COLUMN,
155+
vid_type = vid_type,
147156
block_range = BLOCK_RANGE_COLUMN
148157
)?;
149158

store/postgres/src/relational_queries.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use graph::components::store::write::{EntityWrite, RowGroup, WriteChunk};
1515
use graph::components::store::{Child as StoreChild, DerivedEntityQuery};
1616
use graph::data::store::{Id, IdType, NULL};
1717
use graph::data::store::{IdList, IdRef, QueryObject};
18+
use graph::data::subgraph::schema::POI_TABLE;
1819
use graph::data::value::{Object, Word};
1920
use graph::data_source::CausalityRegion;
2021
use graph::prelude::{
@@ -2266,6 +2267,7 @@ struct InsertRow<'a> {
22662267
values: Vec<InsertValue<'a>>,
22672268
br_value: BlockRangeValue,
22682269
causality_region: CausalityRegion,
2270+
vid: i64,
22692271
}
22702272

22712273
impl<'a> InsertRow<'a> {
@@ -2302,10 +2304,12 @@ impl<'a> InsertRow<'a> {
23022304
}
23032305
let br_value = BlockRangeValue::new(table, row.block, row.end);
23042306
let causality_region = row.causality_region;
2307+
let vid = row.vid;
23052308
Ok(Self {
23062309
values,
23072310
br_value,
23082311
causality_region,
2312+
vid,
23092313
})
23102314
}
23112315
}
@@ -2391,6 +2395,8 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
23912395
let out = &mut out;
23922396
out.unsafe_to_cache_prepared();
23932397

2398+
let not_poi = self.table.name.as_str() != POI_TABLE;
2399+
23942400
// Construct a query
23952401
// insert into schema.table(column, ...)
23962402
// values
@@ -2416,6 +2422,9 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
24162422
out.push_sql(CAUSALITY_REGION_COLUMN);
24172423
};
24182424

2425+
if not_poi {
2426+
out.push_sql(", vid");
2427+
}
24192428
out.push_sql(") values\n");
24202429

24212430
for (i, row) in self.rows.iter().enumerate() {
@@ -2433,6 +2442,10 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
24332442
out.push_sql(", ");
24342443
out.push_bind_param::<Integer, _>(&row.causality_region)?;
24352444
};
2445+
if not_poi {
2446+
out.push_sql(", ");
2447+
out.push_bind_param::<BigInt, _>(&row.vid)?;
2448+
}
24362449
out.push_sql(")");
24372450
}
24382451

0 commit comments

Comments
 (0)