Skip to content

Commit 2289b27

Browse files
committed
core, graph: Track EntityVersion not Entity in the EntityCache
1 parent adaab9f commit 2289b27

File tree

9 files changed

+86
-72
lines changed

9 files changed

+86
-72
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use atomic_refcell::AtomicRefCell;
44
use fail::fail_point;
55
use graph::blockchain::{BlockchainKind, DataSource};
66
use graph::data::store::scalar::Bytes;
7+
use graph::data::store::EntityVersion;
78
use graph::data::subgraph::{UnifiedMappingApiVersion, MAX_SPEC_VERSION};
89
use graph::prelude::TryStreamExt;
910
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
@@ -62,7 +63,7 @@ struct IndexingState<T: RuntimeHostBuilder<C>, C: Blockchain> {
6263
instance: SubgraphInstance<C, T>,
6364
instances: SharedInstanceKeepAliveMap,
6465
filter: C::TriggerFilter,
65-
entity_lfu_cache: LfuCache<EntityKey, Option<Entity>>,
66+
entity_lfu_cache: LfuCache<EntityKey, Option<EntityVersion>>,
6667
}
6768

6869
struct IndexingContext<T: RuntimeHostBuilder<C>, C: Blockchain> {

graph/src/components/store.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,7 +1055,7 @@ pub trait WritableStore: Send + Sync + 'static {
10551055
fn get_many(
10561056
&self,
10571057
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
1058-
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError>;
1058+
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError>;
10591059

10601060
/// The deployment `id` finished syncing, mark it as synced in the database
10611061
/// and promote it to the current version in the subgraphs where it was the
@@ -1099,7 +1099,7 @@ mock! {
10991099
fn get_many_mock<'a>(
11001100
&self,
11011101
_ids_for_type: BTreeMap<&'a EntityType, Vec<&'a str>>,
1102-
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError>;
1102+
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError>;
11031103
}
11041104
}
11051105

@@ -1241,7 +1241,7 @@ impl WritableStore for MockStore {
12411241
fn get_many(
12421242
&self,
12431243
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
1244-
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
1244+
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError> {
12451245
self.get_many_mock(ids_for_type)
12461246
}
12471247

@@ -1523,7 +1523,7 @@ impl EntityOp {
15231523
pub struct EntityCache {
15241524
/// The state of entities in the store. An entry of `None`
15251525
/// means that the entity is not present in the store
1526-
current: LfuCache<EntityKey, Option<Entity>>,
1526+
current: LfuCache<EntityKey, Option<EntityVersion>>,
15271527

15281528
/// The accumulated changes to an entity.
15291529
updates: HashMap<EntityKey, EntityOp>,
@@ -1552,7 +1552,7 @@ impl Debug for EntityCache {
15521552
pub struct ModificationsAndCache {
15531553
pub modifications: Vec<EntityModification>,
15541554
pub data_sources: Vec<StoredDynamicDataSource>,
1555-
pub entity_lfu_cache: LfuCache<EntityKey, Option<Entity>>,
1555+
pub entity_lfu_cache: LfuCache<EntityKey, Option<EntityVersion>>,
15561556
}
15571557

15581558
impl EntityCache {
@@ -1569,7 +1569,7 @@ impl EntityCache {
15691569

15701570
pub fn with_current(
15711571
store: Arc<dyn WritableStore>,
1572-
current: LfuCache<EntityKey, Option<Entity>>,
1572+
current: LfuCache<EntityKey, Option<EntityVersion>>,
15731573
) -> EntityCache {
15741574
EntityCache {
15751575
current,
@@ -1605,7 +1605,10 @@ impl EntityCache {
16051605

16061606
pub fn get(&mut self, key: &EntityKey) -> Result<Option<Entity>, QueryExecutionError> {
16071607
// Get the current entity, apply any updates from `updates`, then from `handler_updates`.
1608-
let mut entity = self.current.get_entity(&*self.store, key)?;
1608+
let mut entity = self
1609+
.current
1610+
.get_entity(&*self.store, key)?
1611+
.map(|ev| ev.data);
16091612
if let Some(op) = self.updates.get(key).cloned() {
16101613
entity = op.apply_to(entity)
16111614
}
@@ -1695,14 +1698,14 @@ impl EntityCache {
16951698
}
16961699

16971700
for (subgraph_id, keys) in missing_by_subgraph {
1698-
for (entity_type, entities) in self.store.get_many(keys)? {
1699-
for entity in entities {
1701+
for (entity_type, evs) in self.store.get_many(keys)? {
1702+
for ev in evs {
17001703
let key = EntityKey {
17011704
subgraph_id: subgraph_id.clone(),
17021705
entity_type: entity_type.clone(),
1703-
entity_id: entity.id().unwrap(),
1706+
entity_id: ev.data.id().unwrap(),
17041707
};
1705-
self.current.insert(key, Some(entity));
1708+
self.current.insert(key, Some(ev));
17061709
}
17071710
}
17081711
}
@@ -1717,24 +1720,27 @@ impl EntityCache {
17171720
// Merging with an empty entity removes null fields.
17181721
let mut data = Entity::new();
17191722
data.merge_remove_null_fields(updates);
1720-
self.current.insert(key.clone(), Some(data.clone()));
1723+
let ev = EntityVersion::new(data.clone(), None);
1724+
self.current.insert(key.clone(), Some(ev));
17211725
Some(Insert { key, data })
17221726
}
17231727
// Entity may have been changed
17241728
(Some(current), EntityOp::Update(updates)) => {
1725-
let mut data = current.clone();
1729+
let mut data = current.data.clone();
17261730
data.merge_remove_null_fields(updates);
1727-
self.current.insert(key.clone(), Some(data.clone()));
1728-
if current != data {
1731+
let ev = EntityVersion::new(data.clone(), current.vid);
1732+
self.current.insert(key.clone(), Some(ev));
1733+
if current.data != data {
17291734
Some(Overwrite { key, data })
17301735
} else {
17311736
None
17321737
}
17331738
}
17341739
// Entity was removed and then updated, so it will be overwritten
17351740
(Some(current), EntityOp::Overwrite(data)) => {
1736-
self.current.insert(key.clone(), Some(data.clone()));
1737-
if current != data {
1741+
let ev = EntityVersion::new(data.clone(), current.vid);
1742+
self.current.insert(key.clone(), Some(ev));
1743+
if current.data != data {
17381744
Some(Overwrite { key, data })
17391745
} else {
17401746
None
@@ -1760,19 +1766,19 @@ impl EntityCache {
17601766
}
17611767
}
17621768

1763-
impl LfuCache<EntityKey, Option<Entity>> {
1769+
impl LfuCache<EntityKey, Option<EntityVersion>> {
17641770
// Helper for cached lookup of an entity.
17651771
fn get_entity(
17661772
&mut self,
17671773
store: &(impl WritableStore + ?Sized),
17681774
key: &EntityKey,
1769-
) -> Result<Option<Entity>, QueryExecutionError> {
1775+
) -> Result<Option<EntityVersion>, QueryExecutionError> {
17701776
match self.get(key) {
17711777
None => {
1772-
let mut entity = store.get(key)?.map(|ev| ev.data);
1778+
let mut entity = store.get(key)?;
17731779
if let Some(entity) = &mut entity {
17741780
// `__typename` is for queries not for mappings.
1775-
entity.remove("__typename");
1781+
entity.data.remove("__typename");
17761782
}
17771783
self.insert(key.clone(), entity.clone());
17781784
Ok(entity)

graph/src/components/subgraph/instance.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::blockchain::Blockchain;
2+
use crate::data::store::EntityVersion;
23
use crate::prelude::*;
34
use crate::util::lfu_cache::LfuCache;
45
use crate::{components::store::WritableStore, data::subgraph::schema::SubgraphError};
@@ -27,7 +28,7 @@ pub struct BlockState<C: Blockchain> {
2728
impl<C: Blockchain> BlockState<C> {
2829
pub fn new(
2930
store: Arc<dyn WritableStore>,
30-
lfu_cache: LfuCache<EntityKey, Option<Entity>>,
31+
lfu_cache: LfuCache<EntityKey, Option<EntityVersion>>,
3132
) -> Self {
3233
BlockState {
3334
entity_cache: EntityCache::with_current(store, lfu_cache),

graph/src/data/store/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ impl CacheWeight for Entity {
624624

625625
pub type Vid = Option<NonZeroU64>;
626626

627-
#[derive(Default, Debug)]
627+
#[derive(Clone, Default, Debug)]
628628
pub struct EntityVersion {
629629
pub data: Entity,
630630
/// The `vid` of the entity if it exists in the store
@@ -643,9 +643,9 @@ impl From<EntityVersion> for Entity {
643643
}
644644
}
645645

646-
impl From<Entity> for EntityVersion {
647-
fn from(data: Entity) -> Self {
648-
Self { data, vid: None }
646+
impl CacheWeight for EntityVersion {
647+
fn indirect_weight(&self) -> usize {
648+
self.data.weight()
649649
}
650650
}
651651

graph/tests/entity_cache.rs

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use graph::data::store::EntityVersion;
12
use graph::prelude::SubgraphStore;
23
use lazy_static::lazy_static;
34
use slog::{o, Logger};
45
use std::collections::BTreeMap;
6+
use std::num::NonZeroU64;
57
use std::sync::Arc;
68

79
use graph::{components::store::EntityType, mock::MockStore};
@@ -81,32 +83,41 @@ fn insert_modifications() {
8183
);
8284
}
8385

86+
fn entity_version_map(
87+
entity_type: &str,
88+
entities: Vec<Entity>,
89+
) -> BTreeMap<EntityType, Vec<EntityVersion>> {
90+
let evs = entities
91+
.into_iter()
92+
.enumerate()
93+
.map(|(i, entity)| EntityVersion::new(entity, NonZeroU64::new((i + 1) as u64)))
94+
.collect();
95+
96+
let mut map = BTreeMap::new();
97+
map.insert(EntityType::from(entity_type), evs);
98+
map
99+
}
100+
84101
#[test]
85102
fn overwrite_modifications() {
86103
let mut store = MockStore::new();
87104

88105
// Pre-populate the store with entities so that the cache treats
89106
// every set operation as an overwrite.
90107
store.expect_get_many_mock().returning(|_| {
91-
let mut map = BTreeMap::new();
92-
93-
map.insert(
94-
EntityType::from("Band"),
95-
vec![
96-
make_band(
97-
"mogwai",
98-
vec![("id", "mogwai".into()), ("name", "Mogwai".into())],
99-
)
100-
.1,
101-
make_band(
102-
"sigurros",
103-
vec![("id", "sigurros".into()), ("name", "Sigur Ros".into())],
104-
)
105-
.1,
106-
],
107-
);
108-
109-
Ok(map)
108+
let entities = vec![
109+
make_band(
110+
"mogwai",
111+
vec![("id", "mogwai".into()), ("name", "Mogwai".into())],
112+
)
113+
.1,
114+
make_band(
115+
"sigurros",
116+
vec![("id", "sigurros".into()), ("name", "Sigur Ros".into())],
117+
)
118+
.1,
119+
];
120+
Ok(entity_version_map("Band", entities))
110121
});
111122

112123
let store = Arc::new(store);
@@ -155,24 +166,19 @@ fn consecutive_modifications() {
155166
// Pre-populate the store with data so that we can test setting a field to
156167
// `Value::Null`.
157168
store.expect_get_many_mock().returning(|_| {
158-
let mut map = BTreeMap::new();
159-
160-
map.insert(
161-
EntityType::from("Band"),
162-
vec![
163-
make_band(
164-
"mogwai",
165-
vec![
166-
("id", "mogwai".into()),
167-
("name", "Mogwai".into()),
168-
("label", "Chemikal Underground".into()),
169-
],
170-
)
171-
.1,
172-
],
173-
);
174-
175-
Ok(map)
169+
let entities = vec![
170+
make_band(
171+
"mogwai",
172+
vec![
173+
("id", "mogwai".into()),
174+
("name", "Mogwai".into()),
175+
("label", "Chemikal Underground".into()),
176+
],
177+
)
178+
.1,
179+
];
180+
181+
Ok(entity_version_map("Band", entities))
176182
});
177183

178184
let store = Arc::new(store);

store/postgres/src/deployment_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ impl DeploymentStore {
818818
&self,
819819
site: Arc<Site>,
820820
ids_for_type: &BTreeMap<&EntityType, Vec<&str>>,
821-
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
821+
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError> {
822822
if ids_for_type.is_empty() {
823823
return Ok(BTreeMap::new());
824824
}

store/postgres/src/relational.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ impl Layout {
536536
conn: &PgConnection,
537537
ids_for_type: &BTreeMap<&EntityType, Vec<&str>>,
538538
block: BlockNumber,
539-
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
539+
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError> {
540540
if ids_for_type.is_empty() {
541541
return Ok(BTreeMap::new());
542542
}
@@ -551,7 +551,7 @@ impl Layout {
551551
tables,
552552
block,
553553
};
554-
let mut entities_for_type: BTreeMap<EntityType, Vec<Entity>> = BTreeMap::new();
554+
let mut entities_for_type: BTreeMap<EntityType, Vec<EntityVersion>> = BTreeMap::new();
555555
for data in query.load::<EntityData>(conn)? {
556556
entities_for_type
557557
.entry(data.entity_type())

store/postgres/src/subgraph_store.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ use graph::{
2222
prelude::SubgraphDeploymentEntity,
2323
prelude::{
2424
anyhow, futures03::future::join_all, lazy_static, o, web3::types::Address, ApiSchema,
25-
BlockPtr, DeploymentHash, Entity, EntityKey, EntityModification, Error, Logger, NodeId,
26-
Schema, StopwatchMetrics, StoreError, SubgraphName, SubgraphStore as SubgraphStoreTrait,
25+
BlockPtr, DeploymentHash, EntityKey, EntityModification, Error, Logger, NodeId, Schema,
26+
StopwatchMetrics, StoreError, SubgraphName, SubgraphStore as SubgraphStoreTrait,
2727
SubgraphVersionSwitchingMode,
2828
},
2929
slog::{error, warn},
@@ -889,7 +889,7 @@ impl SubgraphStoreInner {
889889
pub fn find(
890890
&self,
891891
query: graph::prelude::EntityQuery,
892-
) -> Result<Vec<Entity>, graph::prelude::QueryExecutionError> {
892+
) -> Result<Vec<graph::prelude::Entity>, graph::prelude::QueryExecutionError> {
893893
let (store, site) = self.store(&query.subgraph_id)?;
894894
store.find(site, query)
895895
}
@@ -1264,7 +1264,7 @@ impl WritableStoreTrait for WritableStore {
12641264
fn get_many(
12651265
&self,
12661266
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
1267-
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
1267+
) -> Result<BTreeMap<EntityType, Vec<EntityVersion>>, StoreError> {
12681268
self.retry("get_many", || {
12691269
self.writable
12701270
.get_many(self.site.cheap_clone(), &ids_for_type)

store/postgres/tests/relational_bytes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ fn find_many() {
266266
.get(&*THING)
267267
.expect("We got some things")
268268
.iter()
269-
.map(|thing| thing.id().unwrap())
269+
.map(|thing| thing.data.id().unwrap())
270270
.collect::<Vec<_>>();
271271

272272
assert_eq!(2, ids.len());

0 commit comments

Comments
 (0)