Skip to content

Commit adaab9f

Browse files
committed
all: Add an EntityVersion struct that contains an Entity and its vid
1 parent 39d9c3a commit adaab9f

File tree

10 files changed

+105
-29
lines changed

10 files changed

+105
-29
lines changed

chain/ethereum/src/network_indexer/network_indexer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ fn load_parent_block_from_store(context: &Context, block_ptr: BlockPtr) -> Block
311311
.and_then(move |block| {
312312
future::result(
313313
block
314+
.data
314315
.get("parent")
315316
.ok_or_else(move || {
316317
anyhow!("block {} has no parent", block_ptr_for_missing_parent,)

graph/src/components/store.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,7 @@ pub trait WritableStore: Send + Sync + 'static {
10341034
async fn supports_proof_of_indexing(&self) -> Result<bool, StoreError>;
10351035

10361036
/// Looks up an entity using the given store key at the latest block.
1037-
fn get(&self, key: &EntityKey) -> Result<Option<Entity>, StoreError>;
1037+
fn get(&self, key: &EntityKey) -> Result<Option<EntityVersion>, StoreError>;
10381038

10391039
/// Transact the entity changes from a single block atomically into the store, and update the
10401040
/// subgraph block pointer to `block_ptr_to`, and update the firehose cursor to `firehose_cursor`
@@ -1222,7 +1222,7 @@ impl WritableStore for MockStore {
12221222
unimplemented!()
12231223
}
12241224

1225-
fn get(&self, _: &EntityKey) -> Result<Option<Entity>, StoreError> {
1225+
fn get(&self, _: &EntityKey) -> Result<Option<EntityVersion>, StoreError> {
12261226
unimplemented!()
12271227
}
12281228

@@ -1769,7 +1769,7 @@ impl LfuCache<EntityKey, Option<Entity>> {
17691769
) -> Result<Option<Entity>, QueryExecutionError> {
17701770
match self.get(key) {
17711771
None => {
1772-
let mut entity = store.get(key)?;
1772+
let mut entity = store.get(key)?.map(|ev| ev.data);
17731773
if let Some(entity) = &mut entity {
17741774
// `__typename` is for queries not for mappings.
17751775
entity.remove("__typename");

graph/src/data/store/mod.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use itertools::Itertools;
88
use serde::de;
99
use serde::{Deserialize, Serialize};
1010
use stable_hash::prelude::*;
11-
use std::convert::TryFrom;
1211
use std::fmt;
1312
use std::iter::FromIterator;
1413
use std::str::FromStr;
1514
use std::{
1615
borrow::Cow,
1716
collections::{BTreeMap, HashMap},
1817
};
18+
use std::{convert::TryFrom, num::NonZeroU64};
1919
use strum::AsStaticRef as _;
2020
use strum_macros::AsStaticStr;
2121

@@ -622,6 +622,33 @@ impl CacheWeight for Entity {
622622
}
623623
}
624624

625+
pub type Vid = Option<NonZeroU64>;
626+
627+
#[derive(Default, Debug)]
628+
pub struct EntityVersion {
629+
pub data: Entity,
630+
/// The `vid` of the entity if it exists in the store
631+
pub vid: Vid,
632+
}
633+
634+
impl EntityVersion {
635+
pub fn new(data: Entity, vid: Vid) -> Self {
636+
EntityVersion { data, vid }
637+
}
638+
}
639+
640+
impl From<EntityVersion> for Entity {
641+
fn from(ev: EntityVersion) -> Self {
642+
ev.data
643+
}
644+
}
645+
646+
impl From<Entity> for EntityVersion {
647+
fn from(data: Entity) -> Self {
648+
Self { data, vid: None }
649+
}
650+
}
651+
625652
/// A value that can (maybe) be converted to an `Entity`.
626653
pub trait TryIntoEntity {
627654
fn try_into_entity(self) -> Result<Entity, Error>;

store/postgres/src/deployment_store.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use diesel::pg::PgConnection;
44
use diesel::prelude::*;
55
use diesel::r2d2::{ConnectionManager, PooledConnection};
66
use graph::components::store::{EntityType, StoredDynamicDataSource};
7+
use graph::data::store::EntityVersion;
78
use graph::data::subgraph::status;
89
use graph::prelude::{
910
tokio, CancelHandle, CancelToken, CancelableError, PoolWaitStats, SubgraphDeploymentEntity,
@@ -801,7 +802,7 @@ impl DeploymentStore {
801802
&self,
802803
site: Arc<Site>,
803804
key: &EntityKey,
804-
) -> Result<Option<Entity>, StoreError> {
805+
) -> Result<Option<EntityVersion>, StoreError> {
805806
let conn = self.get_conn()?;
806807
let layout = self.layout(&conn, site)?;
807808

store/postgres/src/relational.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
use graph::components::store::EntityType;
3333
use graph::data::graphql::ext::{DirectiveFinder, DocumentExt, ObjectTypeExt};
3434
use graph::data::schema::{FulltextConfig, FulltextDefinition, Schema, SCHEMA_TYPE_NAME};
35-
use graph::data::store::BYTES_SCALAR;
35+
use graph::data::store::{EntityVersion, BYTES_SCALAR};
3636
use graph::data::subgraph::schema::{POI_OBJECT, POI_TABLE};
3737
use graph::prelude::{
3838
anyhow, info, BlockNumber, DeploymentHash, Entity, EntityChange, EntityCollection,
@@ -522,7 +522,7 @@ impl Layout {
522522
entity: &EntityType,
523523
id: &str,
524524
block: BlockNumber,
525-
) -> Result<Option<Entity>, StoreError> {
525+
) -> Result<Option<EntityVersion>, StoreError> {
526526
let table = self.table_for_entity(entity)?;
527527
FindQuery::new(table.as_ref(), id, block)
528528
.get_result::<EntityData>(conn)

store/postgres/src/relational_queries.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use diesel::query_dsl::{LoadQuery, RunQueryDsl};
1111
use diesel::result::{Error as DieselError, QueryResult};
1212
use diesel::sql_types::{Array, BigInt, Binary, Bool, Integer, Jsonb, Range, Text};
1313
use diesel::Connection;
14+
use graph::data::store::{EntityVersion, Vid};
1415
use lazy_static::lazy_static;
1516

1617
use graph::prelude::{
@@ -29,6 +30,7 @@ use std::convert::TryFrom;
2930
use std::env;
3031
use std::fmt::{self, Display};
3132
use std::iter::FromIterator;
33+
use std::num::NonZeroU64;
3234
use std::str::FromStr;
3335

3436
use crate::relational::{
@@ -273,6 +275,8 @@ pub trait FromEntityData: Default {
273275
type Value: FromColumnValue;
274276

275277
fn insert_entity_data(&mut self, key: String, v: Self::Value);
278+
279+
fn set_vid(&mut self, vid: Vid);
276280
}
277281

278282
impl FromEntityData for Entity {
@@ -281,6 +285,10 @@ impl FromEntityData for Entity {
281285
fn insert_entity_data(&mut self, key: String, v: Self::Value) {
282286
self.insert(key, v);
283287
}
288+
289+
fn set_vid(&mut self, _vid: Vid) {
290+
/* ignore */
291+
}
284292
}
285293

286294
impl FromEntityData for BTreeMap<String, r::Value> {
@@ -289,6 +297,22 @@ impl FromEntityData for BTreeMap<String, r::Value> {
289297
fn insert_entity_data(&mut self, key: String, v: Self::Value) {
290298
self.insert(key, v);
291299
}
300+
301+
fn set_vid(&mut self, _vid: Vid) {
302+
/* ignore */
303+
}
304+
}
305+
306+
impl FromEntityData for EntityVersion {
307+
type Value = graph::prelude::Value;
308+
309+
fn insert_entity_data(&mut self, key: String, v: Self::Value) {
310+
self.data.insert(key, v);
311+
}
312+
313+
fn set_vid(&mut self, vid: Vid) {
314+
self.vid = vid
315+
}
292316
}
293317

294318
pub trait FromColumnValue: Sized {
@@ -461,6 +485,8 @@ impl FromColumnValue for graph::prelude::Value {
461485
pub struct EntityData {
462486
#[sql_type = "Text"]
463487
entity: String,
488+
#[sql_type = "BigInt"]
489+
vid: i64,
464490
#[sql_type = "Jsonb"]
465491
data: serde_json::Value,
466492
}
@@ -486,6 +512,7 @@ impl EntityData {
486512
"__typename".to_owned(),
487513
T::Value::from_string(entity_type.into_string()),
488514
);
515+
out.set_vid(NonZeroU64::new(self.vid as u64));
489516
for (key, json) in map {
490517
// Simply ignore keys that do not have an underlying table
491518
// column; those will be things like the block_range that
@@ -1169,7 +1196,7 @@ impl<'a> QueryFragment<Pg> for FindQuery<'a> {
11691196
// from schema.table e where id = $1
11701197
out.push_sql("select ");
11711198
out.push_bind_param::<Text, _>(&self.table.object.as_str())?;
1172-
out.push_sql(" as entity, to_jsonb(e.*) as data\n");
1199+
out.push_sql(" as entity, e.vid, to_jsonb(e.*) as data\n");
11731200
out.push_sql(" from ");
11741201
out.push_sql(self.table.qualified_name.as_str());
11751202
out.push_sql(" e\n where ");
@@ -1221,7 +1248,7 @@ impl<'a> QueryFragment<Pg> for FindManyQuery<'a> {
12211248
}
12221249
out.push_sql("select ");
12231250
out.push_bind_param::<Text, _>(&table.object.as_str())?;
1224-
out.push_sql(" as entity, to_jsonb(e.*) as data\n");
1251+
out.push_sql(" as entity, e.vid, to_jsonb(e.*) as data\n");
12251252
out.push_sql(" from ");
12261253
out.push_sql(table.qualified_name.as_str());
12271254
out.push_sql(" e\n where ");
@@ -2296,7 +2323,7 @@ impl<'a> FilterQuery<'a> {
22962323
fn select_entity_and_data(table: &Table, out: &mut AstPass<Pg>) {
22972324
out.push_sql("select '");
22982325
out.push_sql(table.object.as_str());
2299-
out.push_sql("' as entity, to_jsonb(c.*) as data");
2326+
out.push_sql("' as entity, c.vid, to_jsonb(c.*) as data");
23002327
}
23012328

23022329
/// Only one table/filter pair, and no window
@@ -2412,7 +2439,7 @@ impl<'a> FilterQuery<'a> {
24122439
if i > 0 {
24132440
out.push_sql("\nunion all\n");
24142441
}
2415-
out.push_sql("select m.entity, ");
2442+
out.push_sql("select m.entity, m.vid, ");
24162443
jsonb_build_object(column_names, "c", &table, &mut out)?;
24172444
out.push_sql(" as data, c.id");
24182445
self.sort_key.select(&mut out)?;

store/postgres/src/subgraph_store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use graph::{
1717
constraint_violation,
1818
data::query::QueryTarget,
1919
data::subgraph::schema::SubgraphError,
20-
data::subgraph::status,
20+
data::{store::EntityVersion, subgraph::status},
2121
prelude::StoreEvent,
2222
prelude::SubgraphDeploymentEntity,
2323
prelude::{
@@ -1228,7 +1228,7 @@ impl WritableStoreTrait for WritableStore {
12281228
.await
12291229
}
12301230

1231-
fn get(&self, key: &EntityKey) -> Result<Option<Entity>, StoreError> {
1231+
fn get(&self, key: &EntityKey) -> Result<Option<EntityVersion>, StoreError> {
12321232
self.retry("get", || self.writable.get(self.site.cheap_clone(), key))
12331233
}
12341234

store/postgres/tests/relational.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ macro_rules! assert_entity_eq {
437437
let mut pass = true;
438438

439439
for (key, left_value) in left.clone().sorted() {
440-
match right.get(&key) {
440+
match right.data.get(&key) {
441441
None => {
442442
pass = false;
443443
println!("key '{}' missing from right", key);
@@ -453,7 +453,7 @@ macro_rules! assert_entity_eq {
453453
}
454454
}
455455
}
456-
for (key, _) in right.clone().sorted() {
456+
for (key, _) in right.data.clone().sorted() {
457457
if left.get(&key).is_none() {
458458
pass = false;
459459
println!("key '{}' missing from left", key);
@@ -619,6 +619,7 @@ fn update_many() {
619619
.find(conn, &*SCALAR, id, BLOCK_NUMBER_MAX)
620620
.expect(&format!("Failed to read Scalar[{}]", id))
621621
.unwrap()
622+
.data
622623
})
623624
.collect();
624625
let new_one = &updated[0];

store/postgres/tests/relational_bytes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ macro_rules! assert_entity_eq {
145145
let mut pass = true;
146146

147147
for (key, left_value) in left.clone().sorted() {
148-
match right.get(&key) {
148+
match right.data.get(&key) {
149149
None => {
150150
pass = false;
151151
println!("key '{}' missing from right", key);
@@ -161,7 +161,7 @@ macro_rules! assert_entity_eq {
161161
}
162162
}
163163
}
164-
for (key, _) in right.clone().sorted() {
164+
for (key, _) in right.data.clone().sorted() {
165165
if left.get(&key).is_none() {
166166
pass = false;
167167
println!("key '{}' missing from left", key);

0 commit comments

Comments
 (0)