Skip to content

Commit eaa3549

Browse files
committed
all: Perform entity validation in EntityCache.set
That makes it possible to keep the SubgraphStore out of the runtime host altogether and helps clarify that it doesn't need direct store access except for ENS lookups
1 parent 51e52a9 commit eaa3549

File tree

6 files changed

+75
-43
lines changed

6 files changed

+75
-43
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ where
404404
let host_builder = graph_runtime_wasm::RuntimeHostBuilder::new(
405405
chain.runtime_adapter(),
406406
self.link_resolver.cheap_clone(),
407-
subgraph_store,
407+
subgraph_store.ens_lookup(),
408408
);
409409

410410
let features = manifest.features.clone();
@@ -1198,7 +1198,7 @@ async fn update_proof_of_indexing(
11981198
digest: updated_proof_of_indexing,
11991199
};
12001200

1201-
entity_cache.set(entity_key, new_poi_entity);
1201+
entity_cache.set(entity_key, new_poi_entity)?;
12021202
}
12031203

12041204
Ok(())

graph/src/components/store.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1485,8 +1485,33 @@ impl EntityCache {
14851485
self.entity_op(key, EntityOp::Remove);
14861486
}
14871487

1488-
pub fn set(&mut self, key: EntityKey, entity: Entity) {
1489-
self.entity_op(key, EntityOp::Update(entity))
1488+
/// Store the `entity` under the given `key`. The `entity` may be only a
1489+
/// partial entity; the cache will ensure partial updates get merged
1490+
/// with existing data. The entity will be validated against the
1491+
/// subgraph schema, and any errors will result in an `Err` being
1492+
/// returned.
1493+
pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> {
1494+
let is_valid = entity
1495+
.validate(&self.store.input_schema().document, &key)
1496+
.is_ok();
1497+
1498+
self.entity_op(key.clone(), EntityOp::Update(entity));
1499+
1500+
// The updates we were given are not valid by themselves; force a
1501+
// lookup in the database and check again with an entity that merges
1502+
// the existing entity with the changes
1503+
if !is_valid {
1504+
let entity = self.get(&key)?.ok_or_else(|| {
1505+
anyhow!(
1506+
"Failed to read entity {}[{}] back from cache",
1507+
key.entity_type,
1508+
key.entity_id
1509+
)
1510+
})?;
1511+
entity.validate(&self.store.input_schema().document, &key)?;
1512+
}
1513+
1514+
Ok(())
14901515
}
14911516

14921517
pub fn append(&mut self, operations: Vec<EntityOperation>) {

graph/tests/entity_cache.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,27 @@ use std::sync::Arc;
1010
use graph::components::store::{EntityType, StoredDynamicDataSource, WritableStore};
1111
use graph::{
1212
components::store::{DeploymentId, DeploymentLocator},
13-
prelude::{DeploymentHash, Entity, EntityCache, EntityKey, EntityModification, Value},
13+
prelude::{anyhow, DeploymentHash, Entity, EntityCache, EntityKey, EntityModification, Value},
1414
};
1515

1616
lazy_static! {
1717
static ref SUBGRAPH_ID: DeploymentHash = DeploymentHash::new("entity_cache").unwrap();
1818
static ref DEPLOYMENT: DeploymentLocator =
1919
DeploymentLocator::new(DeploymentId::new(-12), SUBGRAPH_ID.clone());
20+
static ref SCHEMA: Arc<Schema> = Arc::new(
21+
Schema::parse(
22+
"
23+
type Band @entity {
24+
id: ID!
25+
name: String!
26+
founded: Int
27+
label: String
28+
}
29+
",
30+
SUBGRAPH_ID.clone(),
31+
)
32+
.expect("Test schema invalid")
33+
);
2034
}
2135

2236
struct MockStore {
@@ -64,8 +78,17 @@ impl WritableStore for MockStore {
6478
unimplemented!()
6579
}
6680

67-
fn get(&self, _: &EntityKey) -> Result<Option<Entity>, StoreError> {
68-
unimplemented!()
81+
fn get(&self, key: &EntityKey) -> Result<Option<Entity>, StoreError> {
82+
match self.get_many_res.get(&key.entity_type) {
83+
Some(entities) => Ok(entities
84+
.iter()
85+
.find(|entity| entity.id().ok().as_ref() == Some(&key.entity_id))
86+
.cloned()),
87+
None => Err(StoreError::Unknown(anyhow!(
88+
"nothing for type {}",
89+
key.entity_type
90+
))),
91+
}
6992
}
7093

7194
fn transact_block_operations(
@@ -112,7 +135,7 @@ impl WritableStore for MockStore {
112135
}
113136

114137
fn input_schema(&self) -> Arc<Schema> {
115-
unimplemented!()
138+
SCHEMA.clone()
116139
}
117140
}
118141

@@ -149,13 +172,15 @@ fn insert_modifications() {
149172
"mogwai",
150173
vec![("id", "mogwai".into()), ("name", "Mogwai".into())],
151174
);
152-
cache.set(mogwai_key.clone(), mogwai_data.clone());
175+
cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap();
153176

154177
let (sigurros_key, sigurros_data) = make_band(
155178
"sigurros",
156179
vec![("id", "sigurros".into()), ("name", "Sigur Ros".into())],
157180
);
158-
cache.set(sigurros_key.clone(), sigurros_data.clone());
181+
cache
182+
.set(sigurros_key.clone(), sigurros_data.clone())
183+
.unwrap();
159184

160185
let result = cache.as_modifications();
161186
assert_eq!(
@@ -213,7 +238,7 @@ fn overwrite_modifications() {
213238
("founded", 1995.into()),
214239
],
215240
);
216-
cache.set(mogwai_key.clone(), mogwai_data.clone());
241+
cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap();
217242

218243
let (sigurros_key, sigurros_data) = make_band(
219244
"sigurros",
@@ -223,7 +248,9 @@ fn overwrite_modifications() {
223248
("founded", 1994.into()),
224249
],
225250
);
226-
cache.set(sigurros_key.clone(), sigurros_data.clone());
251+
cache
252+
.set(sigurros_key.clone(), sigurros_data.clone())
253+
.unwrap();
227254

228255
let result = cache.as_modifications();
229256
assert_eq!(
@@ -273,14 +300,14 @@ fn consecutive_modifications() {
273300
("label", "Rock Action Records".into()),
274301
],
275302
);
276-
cache.set(update_key.clone(), update_data.clone());
303+
cache.set(update_key.clone(), update_data.clone()).unwrap();
277304

278305
// Then, just reset the "label".
279306
let (update_key, update_data) = make_band(
280307
"mogwai",
281308
vec![("id", "mogwai".into()), ("label", Value::Null)],
282309
);
283-
cache.set(update_key.clone(), update_data.clone());
310+
cache.set(update_key.clone(), update_data.clone()).unwrap();
284311

285312
// We expect a single overwrite modification for the above that leaves "id"
286313
// and "name" untouched, sets "founded" and removes the "label" field.

runtime/test/src/common.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ fn mock_host_exports(
5757
network,
5858
Arc::new(templates),
5959
Arc::new(graph_core::LinkResolver::from(IpfsClient::localhost())),
60-
store,
6160
ens_lookup,
6261
)
6362
}

runtime/wasm/src/host.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use futures03::channel::oneshot::channel;
88
use graph::blockchain::RuntimeAdapter;
99
use graph::blockchain::{Blockchain, DataSource};
1010
use graph::blockchain::{HostFn, TriggerWithHandler};
11-
use graph::components::store::{EnsLookup, SubgraphStore};
11+
use graph::components::store::EnsLookup;
1212
use graph::components::subgraph::{MappingError, SharedProofOfIndexing};
1313
use graph::prelude::{
1414
RuntimeHost as RuntimeHostTrait, RuntimeHostBuilder as RuntimeHostBuilderTrait, *,
@@ -30,15 +30,15 @@ lazy_static! {
3030
pub struct RuntimeHostBuilder<C: Blockchain> {
3131
runtime_adapter: Arc<C::RuntimeAdapter>,
3232
link_resolver: Arc<dyn LinkResolver>,
33-
store: Arc<dyn SubgraphStore>,
33+
ens_lookup: Arc<dyn EnsLookup>,
3434
}
3535

3636
impl<C: Blockchain> Clone for RuntimeHostBuilder<C> {
3737
fn clone(&self) -> Self {
3838
RuntimeHostBuilder {
3939
runtime_adapter: self.runtime_adapter.cheap_clone(),
4040
link_resolver: self.link_resolver.cheap_clone(),
41-
store: self.store.cheap_clone(),
41+
ens_lookup: self.ens_lookup.cheap_clone(),
4242
}
4343
}
4444
}
@@ -47,12 +47,12 @@ impl<C: Blockchain> RuntimeHostBuilder<C> {
4747
pub fn new(
4848
runtime_adapter: Arc<C::RuntimeAdapter>,
4949
link_resolver: Arc<dyn LinkResolver>,
50-
store: Arc<dyn SubgraphStore>,
50+
ens_lookup: Arc<dyn EnsLookup>,
5151
) -> Self {
5252
RuntimeHostBuilder {
5353
runtime_adapter,
5454
link_resolver,
55-
store,
55+
ens_lookup,
5656
}
5757
}
5858
}
@@ -93,14 +93,13 @@ impl<C: Blockchain> RuntimeHostBuilderTrait<C> for RuntimeHostBuilder<C> {
9393
RuntimeHost::new(
9494
self.runtime_adapter.cheap_clone(),
9595
self.link_resolver.clone(),
96-
self.store.clone(),
9796
network_name,
9897
subgraph_id,
9998
data_source,
10099
templates,
101100
mapping_request_sender,
102101
metrics,
103-
self.store.ens_lookup(),
102+
self.ens_lookup.cheap_clone(),
104103
)
105104
}
106105
}
@@ -120,7 +119,6 @@ where
120119
fn new(
121120
runtime_adapter: Arc<C::RuntimeAdapter>,
122121
link_resolver: Arc<dyn LinkResolver>,
123-
store: Arc<dyn SubgraphStore>,
124122
network_name: String,
125123
subgraph_id: DeploymentHash,
126124
data_source: C::DataSource,
@@ -137,7 +135,6 @@ where
137135
network_name,
138136
templates,
139137
link_resolver,
140-
store,
141138
ens_lookup,
142139
));
143140

runtime/wasm/src/host_exports.rs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ pub struct HostExports<C: Blockchain> {
6767
causality_region: String,
6868
templates: Arc<Vec<C::DataSourceTemplate>>,
6969
pub(crate) link_resolver: Arc<dyn LinkResolver>,
70-
store: Arc<dyn SubgraphStore>,
7170
ens_lookup: Arc<dyn EnsLookup>,
7271
}
7372

@@ -78,7 +77,6 @@ impl<C: Blockchain> HostExports<C> {
7877
data_source_network: String,
7978
templates: Arc<Vec<C::DataSourceTemplate>>,
8079
link_resolver: Arc<dyn LinkResolver>,
81-
store: Arc<dyn SubgraphStore>,
8280
ens_lookup: Arc<dyn EnsLookup>,
8381
) -> Self {
8482
Self {
@@ -91,7 +89,6 @@ impl<C: Blockchain> HostExports<C> {
9189
data_source_network,
9290
templates,
9391
link_resolver,
94-
store,
9592
ens_lookup,
9693
}
9794
}
@@ -168,7 +165,7 @@ impl<C: Blockchain> HostExports<C> {
168165
}
169166

170167
id_insert_section.end();
171-
let validation_section = stopwatch.start_section("host_export_store_set__validation");
168+
let validation_section = stopwatch.start_section("host_export_store_set");
172169
let key = EntityKey {
173170
subgraph_id: self.subgraph_id.clone(),
174171
entity_type: EntityType::new(entity_type),
@@ -178,22 +175,9 @@ impl<C: Blockchain> HostExports<C> {
178175
gas.consume_host_fn(gas::STORE_SET.with_args(complexity::Linear, (&key, &data)))?;
179176

180177
let entity = Entity::from(data);
181-
let schema = self.store.input_schema(&self.subgraph_id)?;
182-
let is_valid = entity.validate(&schema.document, &key).is_ok();
183-
state.entity_cache.set(key.clone(), entity);
184-
178+
state.entity_cache.set(key.clone(), entity)?;
185179
validation_section.end();
186-
// Validate the changes against the subgraph schema.
187-
// If the set of fields we have is already valid, avoid hitting the DB.
188-
if !is_valid {
189-
stopwatch.start_section("host_export_store_set__post_validation");
190-
let entity = state
191-
.entity_cache
192-
.get(&key)
193-
.map_err(|e| HostExportError::Unknown(e.into()))?
194-
.expect("we just stored this entity");
195-
entity.validate(&schema.document, &key)?;
196-
}
180+
197181
Ok(())
198182
}
199183

0 commit comments

Comments
 (0)