Skip to content

Commit 29cc7aa

Browse files
committed
core, graph, store: Adjust vids of cached entities to reflect db changes
When entities are inserted or updated, their vid changes. That change needs to be propagated to the entities that stay in the EntityCache across blocks so that a future update to them modifies the right version (vid)
1 parent d8a8e36 commit 29cc7aa

File tree

9 files changed

+94
-48
lines changed

9 files changed

+94
-48
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,7 @@ async fn process_block<T: RuntimeHostBuilder<C>, C: Blockchain>(
992992
data_sources,
993993
deterministic_errors,
994994
) {
995-
Ok(_) => {
995+
Ok(vid_map) => {
996996
// For subgraphs with `nonFatalErrors` feature disabled, we consider
997997
// any error as fatal.
998998
//
@@ -1020,6 +1020,10 @@ async fn process_block<T: RuntimeHostBuilder<C>, C: Blockchain>(
10201020
return Err(BlockProcessingError::Canceled);
10211021
}
10221022

1023+
// Adjust the vids of cached entities because inserts and
1024+
// updates will have changed them
1025+
ctx.state.entity_lfu_cache.update_vids(vid_map);
1026+
10231027
Ok(needs_restart)
10241028
}
10251029

graph/src/components/store.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@ pub trait WritableStore: Send + Sync + 'static {
10481048
stopwatch: StopwatchMetrics,
10491049
data_sources: Vec<StoredDynamicDataSource>,
10501050
deterministic_errors: Vec<SubgraphError>,
1051-
) -> Result<(), StoreError>;
1051+
) -> Result<Vec<(EntityKey, Vid)>, StoreError>;
10521052

10531053
/// Look up multiple entities as of the latest block. Returns a map of
10541054
/// entities by type.
@@ -1234,7 +1234,7 @@ impl WritableStore for MockStore {
12341234
_: StopwatchMetrics,
12351235
_: Vec<StoredDynamicDataSource>,
12361236
_: Vec<SubgraphError>,
1237-
) -> Result<(), StoreError> {
1237+
) -> Result<Vec<(EntityKey, Vid)>, StoreError> {
12381238
unimplemented!()
12391239
}
12401240

@@ -1801,6 +1801,19 @@ impl LfuCache<EntityKey, Option<EntityVersion>> {
18011801
Some(data) => Ok(data.to_owned()),
18021802
}
18031803
}
1804+
1805+
/// Update the `vid` of cached entities to reflect changes made in the
1806+
/// database. When entities stay cached across insert/update operations,
1807+
/// their vid in the database changes as a result of these operations
1808+
/// and needs to be updated
1809+
pub fn update_vids(&mut self, vid_map: Vec<(EntityKey, Vid)>) {
1810+
for (key, vid) in vid_map {
1811+
assert!(vid.is_some());
1812+
if let Some(Some(x)) = self.get_mut(key) {
1813+
x.vid = vid;
1814+
}
1815+
}
1816+
}
18041817
}
18051818

18061819
/// Determines which columns should be selected in a table.

graph/src/util/lfu_cache.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl<K: Clone + Ord + Eq + Hash + Debug + CacheWeight, V: CacheWeight + Default>
8787
/// Updates and bumps freceny if already present.
8888
pub fn insert(&mut self, key: K, value: V) {
8989
let weight = CacheEntry::weight(&key, &value);
90-
match self.get_mut(key.clone()) {
90+
match self.get_mut_entry(key.clone()) {
9191
None => {
9292
self.total_weight += weight;
9393
self.queue.push(
@@ -119,7 +119,7 @@ impl<K: Clone + Ord + Eq + Hash + Debug + CacheWeight, V: CacheWeight + Default>
119119
.unwrap_or(0)
120120
}
121121

122-
fn get_mut(&mut self, key: K) -> Option<&mut CacheEntry<K, V>> {
122+
fn get_mut_entry(&mut self, key: K) -> Option<&mut CacheEntry<K, V>> {
123123
// Increment the frequency by 1
124124
let key_entry = CacheEntry::cache_key(key);
125125
self.queue
@@ -130,8 +130,12 @@ impl<K: Clone + Ord + Eq + Hash + Debug + CacheWeight, V: CacheWeight + Default>
130130
})
131131
}
132132

133+
pub fn get_mut(&mut self, key: K) -> Option<&mut V> {
134+
self.get_mut_entry(key).map(|x| &mut x.value)
135+
}
136+
133137
pub fn get(&mut self, key: &K) -> Option<&V> {
134-
self.get_mut(key.clone()).map(|x| &x.value)
138+
self.get_mut_entry(key.clone()).map(|x| &x.value)
135139
}
136140

137141
pub fn remove(&mut self, key: &K) -> Option<V> {

store/postgres/src/deployment_store.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ impl DeploymentStore {
311311
mods: &[EntityModification],
312312
ptr: &BlockPtr,
313313
stopwatch: StopwatchMetrics,
314-
) -> Result<i32, StoreError> {
314+
) -> Result<(i32, Vec<(EntityKey, Vid)>), StoreError> {
315315
use EntityModification::*;
316316
let mut count = 0;
317317

@@ -349,25 +349,38 @@ impl DeploymentStore {
349349
}
350350

351351
// Apply modification groups.
352+
let mut vid_map = Vec::new();
353+
352354
// Inserts:
353355
for (entity_type, mut entities) in inserts.into_iter() {
354-
count +=
356+
for (id, vid) in
355357
self.insert_entities(&entity_type, &mut entities, conn, layout, ptr, &stopwatch)?
356-
as i32
358+
{
359+
count += 1;
360+
vid_map.push((
361+
EntityKey::data(layout.site.deployment.clone(), entity_type.to_string(), id),
362+
vid,
363+
))
364+
}
357365
}
358366

359367
// Overwrites:
360368
for (entity_type, (mut entities, vids)) in overwrites.into_iter() {
361369
// we do not update the count since the number of entities remains the same
362-
self.overwrite_entities(
370+
for (id, vid) in self.overwrite_entities(
363371
&entity_type,
364372
&mut entities,
365373
vids.as_slice(),
366374
conn,
367375
layout,
368376
ptr,
369377
&stopwatch,
370-
)?;
378+
)? {
379+
vid_map.push((
380+
EntityKey::data(layout.site.deployment.clone(), entity_type.to_string(), id),
381+
vid,
382+
))
383+
}
371384
}
372385

373386
// Removals
@@ -376,7 +389,7 @@ impl DeploymentStore {
376389
self.remove_entities(&entity_type, vids.as_slice(), conn, layout, ptr, &stopwatch)?
377390
as i32;
378391
}
379-
Ok(count)
392+
Ok((count, vid_map))
380393
}
381394

382395
fn insert_entities<'a>(
@@ -387,7 +400,7 @@ impl DeploymentStore {
387400
layout: &'a Layout,
388401
ptr: &BlockPtr,
389402
stopwatch: &StopwatchMetrics,
390-
) -> Result<usize, StoreError> {
403+
) -> Result<Vec<(String, Vid)>, StoreError> {
391404
let section = stopwatch.start_section("check_interface_entity_uniqueness");
392405
for (key, _) in data.iter() {
393406
// WARNING: This will potentially execute 2 queries for each entity key.
@@ -408,7 +421,7 @@ impl DeploymentStore {
408421
layout: &'a Layout,
409422
ptr: &BlockPtr,
410423
stopwatch: &StopwatchMetrics,
411-
) -> Result<usize, StoreError> {
424+
) -> Result<Vec<(String, Vid)>, StoreError> {
412425
let section = stopwatch.start_section("check_interface_entity_uniqueness");
413426
for (key, _) in data.iter() {
414427
// WARNING: This will potentially execute 2 queries for each entity key.
@@ -863,7 +876,7 @@ impl DeploymentStore {
863876
stopwatch: StopwatchMetrics,
864877
data_sources: &[StoredDynamicDataSource],
865878
deterministic_errors: &[SubgraphError],
866-
) -> Result<StoreEvent, StoreError> {
879+
) -> Result<(StoreEvent, Vec<(EntityKey, Vid)>), StoreError> {
867880
// All operations should apply only to data or metadata for this subgraph
868881
if mods
869882
.iter()
@@ -881,7 +894,7 @@ impl DeploymentStore {
881894
self.get_conn()?
882895
};
883896

884-
let event = conn.transaction(|| -> Result<_, StoreError> {
897+
conn.transaction(|| -> Result<_, StoreError> {
885898
// Emit a store event for the changes we are about to make. We
886899
// wait with sending it until we have done all our other work
887900
// so that we do not hold a lock on the notification queue
@@ -891,7 +904,7 @@ impl DeploymentStore {
891904
// Make the changes
892905
let layout = self.layout(&conn, site.clone())?;
893906
let section = stopwatch.start_section("apply_entity_modifications");
894-
let count = self.apply_entity_modifications(
907+
let (count, vid_map) = self.apply_entity_modifications(
895908
&conn,
896909
layout.as_ref(),
897910
mods,
@@ -925,10 +938,8 @@ impl DeploymentStore {
925938
}
926939
}
927940

928-
Ok(event)
929-
})?;
930-
931-
Ok(event)
941+
Ok((event, vid_map))
942+
})
932943
}
933944

934945
fn rewind_with_conn(

store/postgres/src/relational.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
1818
use std::convert::{From, TryFrom};
1919
use std::env;
2020
use std::fmt::{self, Write};
21+
use std::num::NonZeroU64;
2122
use std::str::FromStr;
2223
use std::sync::{Arc, Mutex};
2324
use std::time::{Duration, Instant};
@@ -568,21 +569,21 @@ impl Layout {
568569
entities: &'a mut [(&'a EntityKey, Cow<'a, Entity>)],
569570
block: BlockNumber,
570571
stopwatch: &StopwatchMetrics,
571-
) -> Result<usize, StoreError> {
572+
) -> Result<Vec<(String, Vid)>, StoreError> {
572573
let table = self.table_for_entity(entity_type)?;
573574
let _section = stopwatch.start_section("insert_modification_insert_query");
574-
let mut count = 0;
575+
let mut vid_map = Vec::with_capacity(entities.len());
575576
// Each operation must respect the maximum number of bindings allowed in PostgreSQL queries,
576577
// so we need to act in chunks whose size is defined by the number of entities times the
577578
// number of attributes each entity type has.
578579
// We add 1 to account for the `block_range` bind parameter
579580
let chunk_size = POSTGRES_MAX_PARAMETERS / (table.columns.len() + 1);
580581
for chunk in entities.chunks_mut(chunk_size) {
581-
count += InsertQuery::new(table, chunk, block)?
582-
.get_results(conn)
583-
.map(|ids| ids.len())?
582+
for red in InsertQuery::new(table, chunk, block)?.get_results(conn)? {
583+
vid_map.push((red.id, NonZeroU64::new(red.vid as u64)))
584+
}
584585
}
585-
Ok(count)
586+
Ok(vid_map)
586587
}
587588

588589
pub fn conflicting_entity(
@@ -684,25 +685,27 @@ impl Layout {
684685
vids: &'a [Vid],
685686
block: BlockNumber,
686687
stopwatch: &StopwatchMetrics,
687-
) -> Result<usize, StoreError> {
688+
) -> Result<Vec<(String, Vid)>, StoreError> {
688689
let table = self.table_for_entity(&entity_type)?;
689690

690691
let section = stopwatch.start_section("update_modification_clamp_range_query");
691692
ClampRangeQuery::new(table, &vids, block)?.execute(conn)?;
692693
section.end();
693694

694695
let _section = stopwatch.start_section("update_modification_insert_query");
695-
let mut count = 0;
696696

697697
// Each operation must respect the maximum number of bindings allowed in PostgreSQL queries,
698698
// so we need to act in chunks whose size is defined by the number of entities times the
699699
// number of attributes each entity type has.
700700
// We add 1 to account for the `block_range` bind parameter
701701
let chunk_size = POSTGRES_MAX_PARAMETERS / (table.columns.len() + 1);
702+
let mut vid_map = Vec::with_capacity(entities.len());
702703
for chunk in entities.chunks_mut(chunk_size) {
703-
count += InsertQuery::new(table, chunk, block)?.execute(conn)?;
704+
for red in InsertQuery::new(table, chunk, block)?.get_results(conn)? {
705+
vid_map.push((red.id, NonZeroU64::new(red.vid as u64)))
706+
}
704707
}
705-
Ok(count)
708+
Ok(vid_map)
706709
}
707710

708711
pub fn delete(

store/postgres/src/relational_queries.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,7 +1384,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
13841384
}
13851385
out.push_sql("\nreturning ");
13861386
out.push_sql(PRIMARY_KEY_COLUMN);
1387-
out.push_sql("::text");
1387+
out.push_sql("::text, vid");
13881388

13891389
Ok(())
13901390
}
@@ -2647,6 +2647,8 @@ impl<'a, Conn> RunQueryDsl<Conn> for ClampRangeQuery<'a> {}
26472647
pub struct ReturnedEntityData {
26482648
#[sql_type = "Text"]
26492649
pub id: String,
2650+
#[sql_type = "BigInt"]
2651+
pub vid: i64,
26502652
}
26512653

26522654
impl ReturnedEntityData {
@@ -2689,7 +2691,7 @@ impl<'a> QueryFragment<Pg> for RevertRemoveQuery<'a> {
26892691
out.push_bind_param::<Integer, _>(&self.block)?;
26902692
out.push_sql("\nreturning ");
26912693
out.push_sql(PRIMARY_KEY_COLUMN);
2692-
out.push_sql("::text");
2694+
out.push_sql("::text, vid");
26932695
Ok(())
26942696
}
26952697
}
@@ -2760,7 +2762,7 @@ impl<'a> QueryFragment<Pg> for RevertClampQuery<'a> {
27602762
out.push_sql("), 2147483647) < 2147483647");
27612763
out.push_sql("\nreturning ");
27622764
out.push_sql(PRIMARY_KEY_COLUMN);
2763-
out.push_sql("::text");
2765+
out.push_sql("::text, vid");
27642766
Ok(())
27652767
}
27662768
}

store/postgres/src/subgraph_store.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ use graph::{
1717
constraint_violation,
1818
data::query::QueryTarget,
1919
data::subgraph::schema::SubgraphError,
20-
data::{store::EntityVersion, subgraph::status},
20+
data::{
21+
store::{EntityVersion, Vid},
22+
subgraph::status,
23+
},
2124
prelude::StoreEvent,
2225
prelude::SubgraphDeploymentEntity,
2326
prelude::{
@@ -1240,13 +1243,13 @@ impl WritableStoreTrait for WritableStore {
12401243
stopwatch: StopwatchMetrics,
12411244
data_sources: Vec<StoredDynamicDataSource>,
12421245
deterministic_errors: Vec<SubgraphError>,
1243-
) -> Result<(), StoreError> {
1246+
) -> Result<Vec<(EntityKey, Vid)>, StoreError> {
12441247
assert!(
12451248
same_subgraph(&mods, &self.site.deployment),
12461249
"can only transact operations within one shard"
12471250
);
12481251
self.retry("transact_block_operations", move || {
1249-
let event = self.writable.transact_block_operations(
1252+
let (event, vid_map) = self.writable.transact_block_operations(
12501253
self.site.clone(),
12511254
&block_ptr_to,
12521255
firehose_cursor.as_deref(),
@@ -1257,7 +1260,8 @@ impl WritableStoreTrait for WritableStore {
12571260
)?;
12581261

12591262
let _section = stopwatch.start_section("send_store_event");
1260-
self.try_send_store_event(event)
1263+
self.try_send_store_event(event)?;
1264+
Ok(vid_map)
12611265
})
12621266
}
12631267

store/postgres/tests/relational.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ fn insert_entity(
235235
0,
236236
&MOCK_STOPWATCH,
237237
)
238-
.expect(&errmsg);
238+
.expect(&errmsg)
239+
.len();
239240
assert_eq!(inserted, entities_with_keys_owned.len());
240241
let ids_for_type = {
241242
let ids: Vec<_> = entities_with_keys_owned
@@ -288,7 +289,8 @@ fn update_entity(
288289
0,
289290
&MOCK_STOPWATCH,
290291
)
291-
.expect(&errmsg);
292+
.expect(&errmsg)
293+
.len();
292294
assert_eq!(updated, 1);
293295
}
294296

0 commit comments

Comments
 (0)