Skip to content

Commit 68eaac1

Browse files
authored
Revert vid optimization (#3046)
* Revert "core, graph, store: Adjust vids of cached entities to reflect db changes" This reverts commit 29cc7aa. * Revert "store: Don't implement FromEntityData for Entity" This reverts commit d8a8e36. * Revert "graph, store: Include vid in entity modifications use in ClampRangeQuery" This reverts commit 4396f8a. * Revert "store: Simplify update_entity in relational test" This reverts commit d8a89d0. * Revert "core, graph: Track EntityVersion not Entity in the EntityCache" This reverts commit 2289b27. * Revert "all: Add an EntityVersion struct that contains an Entity and its vid" This reverts commit adaab9f. * store: Add 'allow(dead_code)' because of new Rust version
1 parent ab553e2 commit 68eaac1

File tree

15 files changed

+271
-499
lines changed

15 files changed

+271
-499
lines changed

chain/ethereum/src/network_indexer/network_indexer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,6 @@ fn load_parent_block_from_store(context: &Context, block_ptr: BlockPtr) -> Block
311311
.and_then(move |block| {
312312
future::result(
313313
block
314-
.data
315314
.get("parent")
316315
.ok_or_else(move || {
317316
anyhow!("block {} has no parent", block_ptr_for_missing_parent,)

core/src/subgraph/instance_manager.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use atomic_refcell::AtomicRefCell;
44
use fail::fail_point;
55
use graph::blockchain::{BlockchainKind, DataSource};
66
use graph::data::store::scalar::Bytes;
7-
use graph::data::store::EntityVersion;
87
use graph::data::subgraph::{UnifiedMappingApiVersion, MAX_SPEC_VERSION};
98
use graph::prelude::TryStreamExt;
109
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
@@ -73,7 +72,7 @@ struct IndexingState<T: RuntimeHostBuilder<C>, C: Blockchain> {
7372
instance: SubgraphInstance<C, T>,
7473
instances: SharedInstanceKeepAliveMap,
7574
filter: C::TriggerFilter,
76-
entity_lfu_cache: LfuCache<EntityKey, Option<EntityVersion>>,
75+
entity_lfu_cache: LfuCache<EntityKey, Option<Entity>>,
7776
}
7877

7978
struct IndexingContext<T: RuntimeHostBuilder<C>, C: Blockchain> {
@@ -1067,7 +1066,7 @@ async fn process_block<T: RuntimeHostBuilder<C>, C: Blockchain>(
10671066
data_sources,
10681067
deterministic_errors,
10691068
) {
1070-
Ok(vid_map) => {
1069+
Ok(_) => {
10711070
// For subgraphs with `nonFatalErrors` feature disabled, we consider
10721071
// any error as fatal.
10731072
//
@@ -1095,10 +1094,6 @@ async fn process_block<T: RuntimeHostBuilder<C>, C: Blockchain>(
10951094
return Err(BlockProcessingError::Canceled);
10961095
}
10971096

1098-
// Adjust the vids of cached entities because inserts and
1099-
// updates will have changed them
1100-
ctx.state.entity_lfu_cache.update_vids(vid_map);
1101-
11021097
Ok(needs_restart)
11031098
}
11041099

graph/src/components/store.rs

Lines changed: 32 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ impl<'a> FromIterator<&'a EntityModification> for StoreEvent {
553553
.map(|op| {
554554
use self::EntityModification::*;
555555
match op {
556-
Insert { key, .. } | Overwrite { key, .. } | Remove { key, .. } => {
556+
Insert { key, .. } | Overwrite { key, .. } | Remove { key } => {
557557
EntityChange::for_data(key.clone())
558558
}
559559
}
@@ -1034,7 +1034,7 @@ pub trait WritableStore: Send + Sync + 'static {
10341034
async fn supports_proof_of_indexing(&self) -> Result<bool, StoreError>;
10351035

10361036
/// Looks up an entity using the given store key at the latest block.
1037-
fn get(&self, key: &EntityKey) -> Result<Option<EntityVersion>, StoreError>;
1037+
fn get(&self, key: &EntityKey) -> Result<Option<Entity>, StoreError>;
10381038

10391039
/// Transact the entity changes from a single block atomically into the store, and update the
10401040
/// subgraph block pointer to `block_ptr_to`, and update the firehose cursor to `firehose_cursor`
@@ -1048,14 +1048,14 @@ pub trait WritableStore: Send + Sync + 'static {
10481048
stopwatch: StopwatchMetrics,
10491049
data_sources: Vec<StoredDynamicDataSource>,
10501050
deterministic_errors: Vec<SubgraphError>,
1051-
) -> Result<Vec<(EntityKey, Vid)>, StoreError>;
1051+
) -> Result<(), StoreError>;
10521052

10531053
/// Look up multiple entities as of the latest block. Returns a map of
10541054
/// entities by type.
10551055
fn get_many(
10561056
&self,
10571057
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
1058-
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError>;
1058+
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError>;
10591059

10601060
/// The deployment `id` finished syncing, mark it as synced in the database
10611061
/// and promote it to the current version in the subgraphs where it was the
@@ -1101,7 +1101,7 @@ mock! {
11011101
fn get_many_mock<'a>(
11021102
&self,
11031103
_ids_for_type: BTreeMap<&'a EntityType, Vec<&'a str>>,
1104-
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError>;
1104+
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError>;
11051105
}
11061106
}
11071107

@@ -1224,7 +1224,7 @@ impl WritableStore for MockStore {
12241224
unimplemented!()
12251225
}
12261226

1227-
fn get(&self, _: &EntityKey) -> Result<Option<EntityVersion>, StoreError> {
1227+
fn get(&self, _: &EntityKey) -> Result<Option<Entity>, StoreError> {
12281228
unimplemented!()
12291229
}
12301230

@@ -1236,14 +1236,14 @@ impl WritableStore for MockStore {
12361236
_: StopwatchMetrics,
12371237
_: Vec<StoredDynamicDataSource>,
12381238
_: Vec<SubgraphError>,
1239-
) -> Result<Vec<(EntityKey, Vid)>, StoreError> {
1239+
) -> Result<(), StoreError> {
12401240
unimplemented!()
12411241
}
12421242

12431243
fn get_many(
12441244
&self,
12451245
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
1246-
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError> {
1246+
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
12471247
self.get_many_mock(ids_for_type)
12481248
}
12491249

@@ -1457,20 +1457,16 @@ pub enum EntityModification {
14571457
/// Insert the entity
14581458
Insert { key: EntityKey, data: Entity },
14591459
/// Update the entity by overwriting it
1460-
Overwrite {
1461-
key: EntityKey,
1462-
data: Entity,
1463-
prev_vid: Vid,
1464-
},
1460+
Overwrite { key: EntityKey, data: Entity },
14651461
/// Remove the entity
1466-
Remove { key: EntityKey, prev_vid: Vid },
1462+
Remove { key: EntityKey },
14671463
}
14681464

14691465
impl EntityModification {
14701466
pub fn entity_key(&self) -> &EntityKey {
14711467
use EntityModification::*;
14721468
match self {
1473-
Insert { key, .. } | Overwrite { key, .. } | Remove { key, .. } => key,
1469+
Insert { key, .. } | Overwrite { key, .. } | Remove { key } => key,
14741470
}
14751471
}
14761472

@@ -1533,7 +1529,7 @@ impl EntityOp {
15331529
pub struct EntityCache {
15341530
/// The state of entities in the store. An entry of `None`
15351531
/// means that the entity is not present in the store
1536-
current: LfuCache<EntityKey, Option<EntityVersion>>,
1532+
current: LfuCache<EntityKey, Option<Entity>>,
15371533

15381534
/// The accumulated changes to an entity.
15391535
updates: HashMap<EntityKey, EntityOp>,
@@ -1562,7 +1558,7 @@ impl Debug for EntityCache {
15621558
pub struct ModificationsAndCache {
15631559
pub modifications: Vec<EntityModification>,
15641560
pub data_sources: Vec<StoredDynamicDataSource>,
1565-
pub entity_lfu_cache: LfuCache<EntityKey, Option<EntityVersion>>,
1561+
pub entity_lfu_cache: LfuCache<EntityKey, Option<Entity>>,
15661562
}
15671563

15681564
impl EntityCache {
@@ -1579,7 +1575,7 @@ impl EntityCache {
15791575

15801576
pub fn with_current(
15811577
store: Arc<dyn WritableStore>,
1582-
current: LfuCache<EntityKey, Option<EntityVersion>>,
1578+
current: LfuCache<EntityKey, Option<Entity>>,
15831579
) -> EntityCache {
15841580
EntityCache {
15851581
current,
@@ -1615,10 +1611,7 @@ impl EntityCache {
16151611

16161612
pub fn get(&mut self, key: &EntityKey) -> Result<Option<Entity>, QueryExecutionError> {
16171613
// Get the current entity, apply any updates from `updates`, then from `handler_updates`.
1618-
let mut entity = self
1619-
.current
1620-
.get_entity(&*self.store, key)?
1621-
.map(|ev| ev.data);
1614+
let mut entity = self.current.get_entity(&*self.store, key)?;
16221615
if let Some(op) = self.updates.get(key).cloned() {
16231616
entity = op.apply_to(entity)
16241617
}
@@ -1708,14 +1701,14 @@ impl EntityCache {
17081701
}
17091702

17101703
for (subgraph_id, keys) in missing_by_subgraph {
1711-
for (entity_type, evs) in self.store.get_many(keys)? {
1712-
for ev in evs {
1704+
for (entity_type, entities) in self.store.get_many(keys)? {
1705+
for entity in entities {
17131706
let key = EntityKey {
17141707
subgraph_id: subgraph_id.clone(),
17151708
entity_type: entity_type.clone(),
1716-
entity_id: ev.data.id().unwrap(),
1709+
entity_id: entity.id().unwrap(),
17171710
};
1718-
self.current.insert(key, Some(ev));
1711+
self.current.insert(key, Some(entity));
17191712
}
17201713
}
17211714
}
@@ -1730,47 +1723,33 @@ impl EntityCache {
17301723
// Merging with an empty entity removes null fields.
17311724
let mut data = Entity::new();
17321725
data.merge_remove_null_fields(updates);
1733-
let ev = EntityVersion::new(data.clone(), None);
1734-
self.current.insert(key.clone(), Some(ev));
1726+
self.current.insert(key.clone(), Some(data.clone()));
17351727
Some(Insert { key, data })
17361728
}
17371729
// Entity may have been changed
17381730
(Some(current), EntityOp::Update(updates)) => {
1739-
let mut data = current.data.clone();
1731+
let mut data = current.clone();
17401732
data.merge_remove_null_fields(updates);
1741-
let ev = EntityVersion::new(data.clone(), current.vid);
1742-
self.current.insert(key.clone(), Some(ev));
1743-
if current.data != data {
1744-
Some(Overwrite {
1745-
key,
1746-
data,
1747-
prev_vid: current.vid,
1748-
})
1733+
self.current.insert(key.clone(), Some(data.clone()));
1734+
if current != data {
1735+
Some(Overwrite { key, data })
17491736
} else {
17501737
None
17511738
}
17521739
}
17531740
// Entity was removed and then updated, so it will be overwritten
17541741
(Some(current), EntityOp::Overwrite(data)) => {
1755-
let ev = EntityVersion::new(data.clone(), current.vid);
1756-
self.current.insert(key.clone(), Some(ev));
1757-
if current.data != data {
1758-
Some(Overwrite {
1759-
key,
1760-
data,
1761-
prev_vid: current.vid,
1762-
})
1742+
self.current.insert(key.clone(), Some(data.clone()));
1743+
if current != data {
1744+
Some(Overwrite { key, data })
17631745
} else {
17641746
None
17651747
}
17661748
}
17671749
// Existing entity was deleted
1768-
(Some(current), EntityOp::Remove) => {
1750+
(Some(_), EntityOp::Remove) => {
17691751
self.current.insert(key.clone(), None);
1770-
Some(Remove {
1771-
key,
1772-
prev_vid: current.vid,
1773-
})
1752+
Some(Remove { key })
17741753
}
17751754
// Entity was deleted, but it doesn't exist in the store
17761755
(None, EntityOp::Remove) => None,
@@ -1787,39 +1766,26 @@ impl EntityCache {
17871766
}
17881767
}
17891768

1790-
impl LfuCache<EntityKey, Option<EntityVersion>> {
1769+
impl LfuCache<EntityKey, Option<Entity>> {
17911770
// Helper for cached lookup of an entity.
17921771
fn get_entity(
17931772
&mut self,
17941773
store: &(impl WritableStore + ?Sized),
17951774
key: &EntityKey,
1796-
) -> Result<Option<EntityVersion>, QueryExecutionError> {
1775+
) -> Result<Option<Entity>, QueryExecutionError> {
17971776
match self.get(key) {
17981777
None => {
17991778
let mut entity = store.get(key)?;
18001779
if let Some(entity) = &mut entity {
18011780
// `__typename` is for queries not for mappings.
1802-
entity.data.remove("__typename");
1781+
entity.remove("__typename");
18031782
}
18041783
self.insert(key.clone(), entity.clone());
18051784
Ok(entity)
18061785
}
18071786
Some(data) => Ok(data.to_owned()),
18081787
}
18091788
}
1810-
1811-
/// Update the `vid` of cached entities to reflect changes made in the
1812-
/// database. When entities stay cached across insert/update operations,
1813-
/// their vid in the database changes as a result of these operations
1814-
/// and needs to be updated
1815-
pub fn update_vids(&mut self, vid_map: Vec<(EntityKey, Vid)>) {
1816-
for (key, vid) in vid_map {
1817-
assert!(vid.is_some());
1818-
if let Some(Some(x)) = self.get_mut(key) {
1819-
x.vid = vid;
1820-
}
1821-
}
1822-
}
18231789
}
18241790

18251791
/// Determines which columns should be selected in a table.

graph/src/components/subgraph/instance.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::blockchain::Blockchain;
2-
use crate::data::store::EntityVersion;
32
use crate::prelude::*;
43
use crate::util::lfu_cache::LfuCache;
54
use crate::{components::store::WritableStore, data::subgraph::schema::SubgraphError};
@@ -28,7 +27,7 @@ pub struct BlockState<C: Blockchain> {
2827
impl<C: Blockchain> BlockState<C> {
2928
pub fn new(
3029
store: Arc<dyn WritableStore>,
31-
lfu_cache: LfuCache<EntityKey, Option<EntityVersion>>,
30+
lfu_cache: LfuCache<EntityKey, Option<Entity>>,
3231
) -> Self {
3332
BlockState {
3433
entity_cache: EntityCache::with_current(store, lfu_cache),

graph/src/data/store/mod.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use itertools::Itertools;
88
use serde::de;
99
use serde::{Deserialize, Serialize};
1010
use stable_hash::prelude::*;
11+
use std::convert::TryFrom;
1112
use std::fmt;
1213
use std::iter::FromIterator;
1314
use std::str::FromStr;
1415
use std::{
1516
borrow::Cow,
1617
collections::{BTreeMap, HashMap},
1718
};
18-
use std::{convert::TryFrom, num::NonZeroU64};
1919
use strum::AsStaticRef as _;
2020
use strum_macros::AsStaticStr;
2121

@@ -622,33 +622,6 @@ impl CacheWeight for Entity {
622622
}
623623
}
624624

625-
pub type Vid = Option<NonZeroU64>;
626-
627-
#[derive(Clone, Default, Debug)]
628-
pub struct EntityVersion {
629-
pub data: Entity,
630-
/// The `vid` of the entity if it exists in the store
631-
pub vid: Vid,
632-
}
633-
634-
impl EntityVersion {
635-
pub fn new(data: Entity, vid: Vid) -> Self {
636-
EntityVersion { data, vid }
637-
}
638-
}
639-
640-
impl From<EntityVersion> for Entity {
641-
fn from(ev: EntityVersion) -> Self {
642-
ev.data
643-
}
644-
}
645-
646-
impl CacheWeight for EntityVersion {
647-
fn indirect_weight(&self) -> usize {
648-
self.data.weight()
649-
}
650-
}
651-
652625
/// A value that can (maybe) be converted to an `Entity`.
653626
pub trait TryIntoEntity {
654627
fn try_into_entity(self) -> Result<Entity, Error>;

graph/src/util/lfu_cache.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl<K: Clone + Ord + Eq + Hash + Debug + CacheWeight, V: CacheWeight + Default>
8787
/// Updates and bumps freceny if already present.
8888
pub fn insert(&mut self, key: K, value: V) {
8989
let weight = CacheEntry::weight(&key, &value);
90-
match self.get_mut_entry(key.clone()) {
90+
match self.get_mut(key.clone()) {
9191
None => {
9292
self.total_weight += weight;
9393
self.queue.push(
@@ -119,7 +119,7 @@ impl<K: Clone + Ord + Eq + Hash + Debug + CacheWeight, V: CacheWeight + Default>
119119
.unwrap_or(0)
120120
}
121121

122-
fn get_mut_entry(&mut self, key: K) -> Option<&mut CacheEntry<K, V>> {
122+
fn get_mut(&mut self, key: K) -> Option<&mut CacheEntry<K, V>> {
123123
// Increment the frequency by 1
124124
let key_entry = CacheEntry::cache_key(key);
125125
self.queue
@@ -130,12 +130,8 @@ impl<K: Clone + Ord + Eq + Hash + Debug + CacheWeight, V: CacheWeight + Default>
130130
})
131131
}
132132

133-
pub fn get_mut(&mut self, key: K) -> Option<&mut V> {
134-
self.get_mut_entry(key).map(|x| &mut x.value)
135-
}
136-
137133
pub fn get(&mut self, key: &K) -> Option<&V> {
138-
self.get_mut_entry(key.clone()).map(|x| &x.value)
134+
self.get_mut(key.clone()).map(|x| &x.value)
139135
}
140136

141137
pub fn remove(&mut self, key: &K) -> Option<V> {

0 commit comments

Comments
 (0)