Skip to content

Commit 4396f8a

Browse files
committed
graph, store: Include vid in entity modifications use in ClampRangeQuery
1 parent d8a89d0 commit 4396f8a

File tree

7 files changed

+213
-98
lines changed

7 files changed

+213
-98
lines changed

graph/src/components/store.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ impl<'a> FromIterator<&'a EntityModification> for StoreEvent {
553553
.map(|op| {
554554
use self::EntityModification::*;
555555
match op {
556-
Insert { key, .. } | Overwrite { key, .. } | Remove { key } => {
556+
Insert { key, .. } | Overwrite { key, .. } | Remove { key, .. } => {
557557
EntityChange::for_data(key.clone())
558558
}
559559
}
@@ -1451,16 +1451,20 @@ pub enum EntityModification {
14511451
/// Insert the entity
14521452
Insert { key: EntityKey, data: Entity },
14531453
/// Update the entity by overwriting it
1454-
Overwrite { key: EntityKey, data: Entity },
1454+
Overwrite {
1455+
key: EntityKey,
1456+
data: Entity,
1457+
prev_vid: Vid,
1458+
},
14551459
/// Remove the entity
1456-
Remove { key: EntityKey },
1460+
Remove { key: EntityKey, prev_vid: Vid },
14571461
}
14581462

14591463
impl EntityModification {
14601464
pub fn entity_key(&self) -> &EntityKey {
14611465
use EntityModification::*;
14621466
match self {
1463-
Insert { key, .. } | Overwrite { key, .. } | Remove { key } => key,
1467+
Insert { key, .. } | Overwrite { key, .. } | Remove { key, .. } => key,
14641468
}
14651469
}
14661470

@@ -1731,7 +1735,11 @@ impl EntityCache {
17311735
let ev = EntityVersion::new(data.clone(), current.vid);
17321736
self.current.insert(key.clone(), Some(ev));
17331737
if current.data != data {
1734-
Some(Overwrite { key, data })
1738+
Some(Overwrite {
1739+
key,
1740+
data,
1741+
prev_vid: current.vid,
1742+
})
17351743
} else {
17361744
None
17371745
}
@@ -1741,15 +1749,22 @@ impl EntityCache {
17411749
let ev = EntityVersion::new(data.clone(), current.vid);
17421750
self.current.insert(key.clone(), Some(ev));
17431751
if current.data != data {
1744-
Some(Overwrite { key, data })
1752+
Some(Overwrite {
1753+
key,
1754+
data,
1755+
prev_vid: current.vid,
1756+
})
17451757
} else {
17461758
None
17471759
}
17481760
}
17491761
// Existing entity was deleted
1750-
(Some(_), EntityOp::Remove) => {
1762+
(Some(current), EntityOp::Remove) => {
17511763
self.current.insert(key.clone(), None);
1752-
Some(Remove { key })
1764+
Some(Remove {
1765+
key,
1766+
prev_vid: current.vid,
1767+
})
17531768
}
17541769
// Entity was deleted, but it doesn't exist in the store
17551770
(None, EntityOp::Remove) => None,

graph/tests/entity_cache.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,18 +144,22 @@ fn overwrite_modifications() {
144144
cache.set(sigurros_key.clone(), sigurros_data.clone());
145145

146146
let result = cache.as_modifications();
147+
let one = NonZeroU64::new(1);
148+
let two = NonZeroU64::new(2);
147149
assert_eq!(
148150
sort_by_entity_key(result.unwrap().modifications),
149-
sort_by_entity_key(vec![
151+
vec![
150152
EntityModification::Overwrite {
151153
key: mogwai_key,
152154
data: mogwai_data,
155+
prev_vid: one,
153156
},
154157
EntityModification::Overwrite {
155158
key: sigurros_key,
156159
data: sigurros_data,
160+
prev_vid: two,
157161
}
158-
])
162+
]
159163
);
160164
}
161165

@@ -205,15 +209,17 @@ fn consecutive_modifications() {
205209
// We expect a single overwrite modification for the above that leaves "id"
206210
// and "name" untouched, sets "founded" and removes the "label" field.
207211
let result = cache.as_modifications();
212+
let one = NonZeroU64::new(1);
208213
assert_eq!(
209-
sort_by_entity_key(result.unwrap().modifications),
210-
sort_by_entity_key(vec![EntityModification::Overwrite {
214+
result.unwrap().modifications,
215+
vec![EntityModification::Overwrite {
211216
key: update_key,
212217
data: Entity::from(vec![
213218
("id", "mogwai".into()),
214219
("name", "Mogwai".into()),
215220
("founded", 1995.into()),
216221
]),
217-
},])
222+
prev_vid: one
223+
},]
218224
);
219225
}

store/postgres/src/deployment_store.rs

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use diesel::pg::PgConnection;
44
use diesel::prelude::*;
55
use diesel::r2d2::{ConnectionManager, PooledConnection};
66
use graph::components::store::{EntityType, StoredDynamicDataSource};
7-
use graph::data::store::EntityVersion;
7+
use graph::data::store::{EntityVersion, Vid};
88
use graph::data::subgraph::status;
99
use graph::prelude::{
1010
tokio, CancelHandle, CancelToken, CancelableError, PoolWaitStats, SubgraphDeploymentEntity,
@@ -327,17 +327,23 @@ impl DeploymentStore {
327327
.or_insert_with(Vec::new)
328328
.push((key, Cow::from(data)));
329329
}
330-
Overwrite { key, data } => {
331-
overwrites
330+
Overwrite {
331+
key,
332+
data,
333+
prev_vid,
334+
} => {
335+
let (entities, vids) = overwrites
332336
.entry(key.entity_type.clone())
333-
.or_insert_with(Vec::new)
334-
.push((key, Cow::from(data)));
337+
.or_insert_with(|| (Vec::new(), Vec::new()));
338+
339+
entities.push((key, Cow::from(data)));
340+
vids.push(*prev_vid);
335341
}
336-
Remove { key } => {
342+
Remove { key, prev_vid } => {
337343
removals
338344
.entry(key.entity_type.clone())
339345
.or_insert_with(Vec::new)
340-
.push(key.entity_id.as_str());
346+
.push(*prev_vid);
341347
}
342348
}
343349
}
@@ -351,21 +357,24 @@ impl DeploymentStore {
351357
}
352358

353359
// Overwrites:
354-
for (entity_type, mut entities) in overwrites.into_iter() {
360+
for (entity_type, (mut entities, vids)) in overwrites.into_iter() {
355361
// we do not update the count since the number of entities remains the same
356-
self.overwrite_entities(&entity_type, &mut entities, conn, layout, ptr, &stopwatch)?;
357-
}
358-
359-
// Removals
360-
for (entity_type, entity_keys) in removals.into_iter() {
361-
count -= self.remove_entities(
362+
self.overwrite_entities(
362363
&entity_type,
363-
entity_keys.as_slice(),
364+
&mut entities,
365+
vids.as_slice(),
364366
conn,
365367
layout,
366368
ptr,
367369
&stopwatch,
368-
)? as i32;
370+
)?;
371+
}
372+
373+
// Removals
374+
for (entity_type, vids) in removals.into_iter() {
375+
count -=
376+
self.remove_entities(&entity_type, vids.as_slice(), conn, layout, ptr, &stopwatch)?
377+
as i32;
369378
}
370379
Ok(count)
371380
}
@@ -394,6 +403,7 @@ impl DeploymentStore {
394403
&'a self,
395404
entity_type: &'a EntityType,
396405
data: &'a mut [(&'a EntityKey, Cow<'a, Entity>)],
406+
vids: &'a [Vid],
397407
conn: &PgConnection,
398408
layout: &'a Layout,
399409
ptr: &BlockPtr,
@@ -407,28 +417,29 @@ impl DeploymentStore {
407417
section.end();
408418

409419
let _section = stopwatch.start_section("apply_entity_modifications_update");
410-
layout.update(conn, &entity_type, data, block_number(ptr), stopwatch)
420+
layout.update(conn, &entity_type, data, vids, block_number(ptr), stopwatch)
411421
}
412422

413423
fn remove_entities(
414424
&self,
415425
entity_type: &EntityType,
416-
entity_keys: &[&str],
426+
vids: &[Vid],
417427
conn: &PgConnection,
418428
layout: &Layout,
419429
ptr: &BlockPtr,
420430
stopwatch: &StopwatchMetrics,
421431
) -> Result<usize, StoreError> {
422432
let _section = stopwatch.start_section("apply_entity_modifications_delete");
423433
layout
424-
.delete(
425-
conn,
426-
entity_type,
427-
&entity_keys,
428-
block_number(ptr),
429-
stopwatch,
430-
)
431-
.map_err(|_error| anyhow!("Failed to remove entities: {:?}", entity_keys).into())
434+
.delete(conn, entity_type, vids, block_number(ptr), stopwatch)
435+
.map_err(|_error| {
436+
anyhow!(
437+
"Failed to remove entities for type {}: {:?}",
438+
entity_type,
439+
vids
440+
)
441+
.into()
442+
})
432443
}
433444

434445
/// Execute a closure with a connection to the database.

store/postgres/src/relational.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
use graph::components::store::EntityType;
3333
use graph::data::graphql::ext::{DirectiveFinder, DocumentExt, ObjectTypeExt};
3434
use graph::data::schema::{FulltextConfig, FulltextDefinition, Schema, SCHEMA_TYPE_NAME};
35-
use graph::data::store::{EntityVersion, BYTES_SCALAR};
35+
use graph::data::store::{EntityVersion, Vid, BYTES_SCALAR};
3636
use graph::data::subgraph::schema::{POI_OBJECT, POI_TABLE};
3737
use graph::prelude::{
3838
anyhow, info, BlockNumber, DeploymentHash, Entity, EntityChange, EntityCollection,
@@ -681,17 +681,14 @@ impl Layout {
681681
conn: &PgConnection,
682682
entity_type: &'a EntityType,
683683
entities: &'a mut [(&'a EntityKey, Cow<'a, Entity>)],
684+
vids: &'a [Vid],
684685
block: BlockNumber,
685686
stopwatch: &StopwatchMetrics,
686687
) -> Result<usize, StoreError> {
687688
let table = self.table_for_entity(&entity_type)?;
688-
let entity_keys: Vec<&str> = entities
689-
.iter()
690-
.map(|(key, _)| key.entity_id.as_str())
691-
.collect();
692689

693690
let section = stopwatch.start_section("update_modification_clamp_range_query");
694-
ClampRangeQuery::new(table, &entity_type, &entity_keys, block).execute(conn)?;
691+
ClampRangeQuery::new(table, &vids, block)?.execute(conn)?;
695692
section.end();
696693

697694
let _section = stopwatch.start_section("update_modification_insert_query");
@@ -712,15 +709,15 @@ impl Layout {
712709
&self,
713710
conn: &PgConnection,
714711
entity_type: &EntityType,
715-
entity_ids: &[&str],
712+
vids: &[Vid],
716713
block: BlockNumber,
717714
stopwatch: &StopwatchMetrics,
718715
) -> Result<usize, StoreError> {
719716
let table = self.table_for_entity(&entity_type)?;
720717
let _section = stopwatch.start_section("delete_modification_clamp_range_query");
721718
let mut count = 0;
722-
for chunk in entity_ids.chunks(DELETE_OPERATION_CHUNK_SIZE) {
723-
count += ClampRangeQuery::new(table, &entity_type, chunk, block).execute(conn)?
719+
for chunk in vids.chunks(DELETE_OPERATION_CHUNK_SIZE) {
720+
count += ClampRangeQuery::new(table, chunk, block)?.execute(conn)?
724721
}
725722
Ok(count)
726723
}

store/postgres/src/relational_queries.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2598,23 +2598,36 @@ impl<'a, Conn> RunQueryDsl<Conn> for FilterQuery<'a> {}
25982598

25992599
/// Reduce the upper bound of the current entry's block range to `block` as
26002600
/// long as that does not result in an empty block range
2601-
#[derive(Debug, Clone, Constructor)]
2602-
pub struct ClampRangeQuery<'a, S> {
2601+
#[derive(Debug, Clone)]
2602+
pub struct ClampRangeQuery<'a> {
26032603
table: &'a Table,
2604-
entity_type: &'a EntityType,
2605-
entity_ids: &'a [S],
2604+
vids: Vec<i64>,
26062605
block: BlockNumber,
26072606
}
26082607

2609-
impl<'a, S> QueryFragment<Pg> for ClampRangeQuery<'a, S>
2610-
where
2611-
S: AsRef<str> + diesel::serialize::ToSql<Text, Pg>,
2612-
{
2608+
fn i64_from_vid(vid: &Vid) -> Result<i64, StoreError> {
2609+
match vid {
2610+
Some(vid) => i64::try_from(vid.get())
2611+
.map_err(|_| StoreError::ConstraintViolation(format!("vid out of bounds: {}", vid))),
2612+
None => Err(StoreError::ConstraintViolation("missing vid".to_string())),
2613+
}
2614+
}
2615+
2616+
impl<'a> ClampRangeQuery<'a> {
2617+
pub fn new(table: &'a Table, vids: &'a [Vid], block: BlockNumber) -> Result<Self, StoreError> {
2618+
let vids = vids
2619+
.into_iter()
2620+
.map(i64_from_vid)
2621+
.collect::<Result<Vec<_>, _>>()?;
2622+
Ok(Self { table, vids, block })
2623+
}
2624+
}
2625+
2626+
impl<'a> QueryFragment<Pg> for ClampRangeQuery<'a> {
26132627
fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
26142628
// update table
26152629
// set block_range = int4range(lower(block_range), $block)
2616-
// where id in (id1, id2, ..., idN)
2617-
// and block_range @> INTMAX
2630+
// where vid = any([vid1, vid2, ..., vidN])
26182631
out.unsafe_to_cache_prepared();
26192632
out.push_sql("update ");
26202633
out.push_sql(self.table.qualified_name.as_str());
@@ -2624,27 +2637,21 @@ where
26242637
out.push_identifier(BLOCK_RANGE_COLUMN)?;
26252638
out.push_sql("), ");
26262639
out.push_bind_param::<Integer, _>(&self.block)?;
2627-
out.push_sql(")\n where ");
2628-
2629-
self.table.primary_key().is_in(self.entity_ids, &mut out)?;
2630-
out.push_sql(" and (");
2631-
out.push_sql(BLOCK_RANGE_CURRENT);
2640+
out.push_sql(")\n where vid = any(");
2641+
out.push_bind_param::<Array<BigInt>, _>(&self.vids)?;
26322642
out.push_sql(")");
26332643

26342644
Ok(())
26352645
}
26362646
}
26372647

2638-
impl<'a, S> QueryId for ClampRangeQuery<'a, S>
2639-
where
2640-
S: AsRef<str> + diesel::serialize::ToSql<Text, Pg>,
2641-
{
2648+
impl<'a> QueryId for ClampRangeQuery<'a> {
26422649
type QueryId = ();
26432650

26442651
const HAS_STATIC_QUERY_ID: bool = false;
26452652
}
26462653

2647-
impl<'a, S, Conn> RunQueryDsl<Conn> for ClampRangeQuery<'a, S> {}
2654+
impl<'a, Conn> RunQueryDsl<Conn> for ClampRangeQuery<'a> {}
26482655

26492656
/// Helper struct for returning the id's touched by the RevertRemove and
26502657
/// RevertExtend queries

0 commit comments

Comments
 (0)