Skip to content

Commit bc71e4f

Browse files
committed
graph, store: Load dynamic data sources directly from the store
Load the parts of dynamic data sources that can't be reconstructed from the manifest we have in memory
1 parent 41d2b2d commit bc71e4f

File tree

8 files changed

+184
-9
lines changed

8 files changed

+184
-9
lines changed

graph/src/components/store.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ use std::sync::{Arc, RwLock};
1515
use std::time::{Duration, Instant};
1616
use web3::types::{Address, H256};
1717

18-
use crate::data::store::*;
1918
use crate::data::subgraph::schema::*;
2019
use crate::data::subgraph::status;
20+
use crate::data::{store::*, subgraph::Source};
2121
use crate::prelude::*;
2222
use crate::util::lfu_cache::LfuCache;
2323

@@ -796,6 +796,12 @@ pub enum TransactionAbortError {
796796
Other(String),
797797
}
798798

799+
pub struct StoredDynamicDataSource {
800+
pub name: String,
801+
pub source: Source,
802+
pub context: Option<String>,
803+
}
804+
799805
/// Common trait for store implementations.
800806
#[async_trait]
801807
pub trait Store: Send + Sync + 'static {
@@ -1005,6 +1011,12 @@ pub trait Store: Send + Sync + 'static {
10051011
fn query_store(self: Arc<Self>, for_subscription: bool) -> Arc<dyn QueryStore + Send + Sync>;
10061012

10071013
fn status(&self, filter: status::Filter) -> Result<Vec<status::Info>, StoreError>;
1014+
1015+
/// Load the dynamic data sources for the given deployment
1016+
fn load_dynamic_data_sources(
1017+
&self,
1018+
subgraph_id: &SubgraphDeploymentId,
1019+
) -> Result<Vec<StoredDynamicDataSource>, StoreError>;
10081020
}
10091021

10101022
mock! {
@@ -1181,6 +1193,13 @@ impl Store for MockStore {
11811193
fn status(&self, _: status::Filter) -> Result<Vec<status::Info>, StoreError> {
11821194
unimplemented!()
11831195
}
1196+
1197+
fn load_dynamic_data_sources(
1198+
&self,
1199+
_subgraph_id: &SubgraphDeploymentId,
1200+
) -> Result<Vec<StoredDynamicDataSource>, StoreError> {
1201+
unimplemented!()
1202+
}
11841203
}
11851204

11861205
#[automock]

mock/src/store.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use mockall::predicate::*;
22
use mockall::*;
33
use std::collections::BTreeMap;
44

5-
use graph::data::subgraph::schema::*;
65
use graph::data::subgraph::status;
76
use graph::prelude::*;
7+
use graph::{components::store::StoredDynamicDataSource, data::subgraph::schema::SubgraphError};
88
use graph_graphql::prelude::api_schema;
99
use web3::types::{Address, H256};
1010

@@ -218,6 +218,13 @@ impl Store for MockStore {
218218
fn status(&self, _: status::Filter) -> Result<Vec<status::Info>, StoreError> {
219219
unimplemented!()
220220
}
221+
222+
fn load_dynamic_data_sources(
223+
&self,
224+
_: &SubgraphDeploymentId,
225+
) -> Result<Vec<StoredDynamicDataSource>, StoreError> {
226+
unimplemented!()
227+
}
221228
}
222229

223230
pub fn mock_store_with_users_subgraph() -> (Arc<MockStore>, SubgraphDeploymentId) {

store/postgres/src/dynds.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
//! SQL queries to load dynamic data sources
2+
3+
use diesel::pg::PgConnection;
4+
use diesel::prelude::{ExpressionMethods, JoinOnDsl, QueryDsl, RunQueryDsl};
5+
6+
use graph::{
7+
components::store::StoredDynamicDataSource,
8+
data::subgraph::Source,
9+
prelude::{bigdecimal::ToPrimitive, web3::types::H160, BigDecimal, StoreError},
10+
};
11+
12+
// Diesel tables for some of the metadata
13+
// See also: ed42d219c6704a4aab57ce1ea66698e7
14+
// Changes to the GraphQL schema might require changes to these tables.
15+
// The definitions of the tables can be generated with
16+
// cargo run -p graph-store-postgres --example layout -- \
17+
// -g diesel store/postgres/src/subgraphs.graphql subgraphs
18+
// BEGIN GENERATED CODE
19+
table! {
20+
subgraphs.dynamic_ethereum_contract_data_source (vid) {
21+
vid -> BigInt,
22+
id -> Text,
23+
kind -> Text,
24+
name -> Text,
25+
network -> Nullable<Text>,
26+
source -> Text,
27+
mapping -> Text,
28+
templates -> Nullable<Array<Text>>,
29+
ethereum_block_hash -> Binary,
30+
ethereum_block_number -> Numeric,
31+
deployment -> Text,
32+
context -> Nullable<Text>,
33+
block_range -> Range<Integer>,
34+
}
35+
}
36+
37+
table! {
38+
subgraphs.ethereum_contract_source (vid) {
39+
vid -> BigInt,
40+
id -> Text,
41+
address -> Nullable<Binary>,
42+
abi -> Text,
43+
start_block -> Nullable<Numeric>,
44+
block_range -> Range<Integer>,
45+
}
46+
}
47+
48+
// END GENERATED CODE
49+
50+
allow_tables_to_appear_in_same_query!(
51+
dynamic_ethereum_contract_data_source,
52+
ethereum_contract_source
53+
);
54+
55+
fn to_source(
56+
deployment: &str,
57+
ds_id: &str,
58+
(address, abi, start_block): (Option<Vec<u8>>, String, Option<BigDecimal>),
59+
) -> Result<Source, StoreError> {
60+
// Treat a missing address as an error. TODO: Is that correct?
61+
let address = match address {
62+
Some(address) => address,
63+
None => {
64+
return Err(StoreError::ConstraintViolation(format!(
65+
"Dynamic data source {} for deployment {} is missing an address",
66+
ds_id, deployment
67+
)));
68+
}
69+
};
70+
if address.len() != 20 {
71+
return Err(StoreError::ConstraintViolation(format!(
72+
"Data source address 0x`{:?}` for dynamic data source {} in deployment {} should have be 20 bytes long but is {} bytes long",
73+
address, ds_id, deployment,
74+
address.len()
75+
)));
76+
}
77+
let address = Some(H160::from_slice(address.as_slice()));
78+
79+
// Assume a missing start block is the same as 0
80+
let start_block = start_block
81+
.map(|s| {
82+
s.to_u64().ok_or_else(|| {
83+
StoreError::ConstraintViolation(format!(
84+
"Start block {:?} for dynamic data source {} in deployment {} is not a u64",
85+
s, ds_id, deployment
86+
))
87+
})
88+
})
89+
.transpose()?
90+
.unwrap_or(0);
91+
92+
Ok(Source {
93+
address,
94+
abi,
95+
start_block,
96+
})
97+
}
98+
99+
#[allow(dead_code)]
100+
pub fn load(conn: &PgConnection, id: &str) -> Result<Vec<StoredDynamicDataSource>, StoreError> {
101+
use dynamic_ethereum_contract_data_source as decds;
102+
use ethereum_contract_source as ecs;
103+
104+
let dds: Vec<_> = decds::table
105+
.inner_join(ecs::table.on(decds::source.eq(ecs::id)))
106+
.filter(decds::deployment.eq(id))
107+
.select((
108+
decds::id,
109+
decds::name,
110+
decds::context,
111+
(ecs::address, ecs::abi, ecs::start_block),
112+
))
113+
.load::<(
114+
String,
115+
String,
116+
Option<String>,
117+
(Option<Vec<u8>>, String, Option<BigDecimal>),
118+
)>(conn)?;
119+
120+
let mut data_sources = Vec::new();
121+
for (ds_id, name, context, source) in dds.into_iter() {
122+
let source = to_source(id, &ds_id, source)?;
123+
let data_source = StoredDynamicDataSource {
124+
name,
125+
source,
126+
context,
127+
};
128+
data_sources.push(data_source);
129+
}
130+
Ok(data_sources)
131+
}

store/postgres/src/entities.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use diesel::sql_types::{Integer, Text};
2626
use diesel::Connection as _;
2727
use diesel::ExpressionMethods;
2828
use diesel::{OptionalExtension, QueryDsl, RunQueryDsl};
29-
use lazy_static::lazy_static;
3029
use maybe_owned::MaybeOwned;
3130
use std::collections::hash_map::DefaultHasher;
3231
use std::collections::{BTreeMap, HashMap};
@@ -50,7 +49,7 @@ use crate::notification_listener::JsonNotification;
5049
use crate::relational::{Catalog, Layout};
5150

5251
#[cfg(debug_assertions)]
53-
lazy_static! {
52+
lazy_static::lazy_static! {
5453
/// Tests set this to true so that `send_store_event` will store a copy
5554
/// of each event sent in `EVENT_TAP`
5655
pub static ref EVENT_TAP_ENABLED: Mutex<bool> = Mutex::new(false);

store/postgres/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ mod chain_store;
2727
pub mod connection_pool;
2828
mod db_schema;
2929
mod detail;
30+
mod dynds;
3031
mod entities;
3132
mod functions;
3233
mod jsonb;

store/postgres/src/network_store.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

33
use graph::{
4+
components::store::StoredDynamicDataSource,
45
data::subgraph::schema::SubgraphError,
56
data::subgraph::status,
67
prelude::{
@@ -250,6 +251,13 @@ impl StoreTrait for NetworkStore {
250251
fn status(&self, filter: status::Filter) -> Result<Vec<status::Info>, StoreError> {
251252
self.store.status(filter)
252253
}
254+
255+
fn load_dynamic_data_sources(
256+
&self,
257+
subgraph_id: &SubgraphDeploymentId,
258+
) -> Result<Vec<StoredDynamicDataSource>, StoreError> {
259+
self.store.load_dynamic_data_sources(subgraph_id)
260+
}
253261
}
254262

255263
impl SubgraphDeploymentStore for NetworkStore {

store/postgres/src/store.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ use diesel::prelude::*;
44
use diesel::r2d2::{ConnectionManager, PooledConnection};
55
use diesel::{insert_into, update};
66
use futures03::FutureExt as _;
7-
use graph::prelude::{
8-
CancelGuard, CancelHandle, CancelToken, CancelableError, NodeId, PoolWaitStats,
9-
SubgraphVersionSwitchingMode,
7+
use graph::{
8+
components::store::StoredDynamicDataSource,
9+
prelude::{
10+
CancelGuard, CancelHandle, CancelToken, CancelableError, NodeId, PoolWaitStats,
11+
SubgraphVersionSwitchingMode,
12+
},
1013
};
1114
use graph::{data::subgraph::status, prelude::TryFutureExt};
1215
use lazy_static::lazy_static;
@@ -1370,6 +1373,14 @@ impl StoreTrait for Store {
13701373
fn status(&self, filter: status::Filter) -> Result<Vec<status::Info>, StoreError> {
13711374
self.status_internal(filter)
13721375
}
1376+
1377+
fn load_dynamic_data_sources(
1378+
&self,
1379+
id: &SubgraphDeploymentId,
1380+
) -> Result<Vec<StoredDynamicDataSource>, StoreError> {
1381+
let econn = self.get_entity_conn(&*SUBGRAPHS_ID, ReplicaId::Main)?;
1382+
econn.transaction(|| crate::dynds::load(&econn.conn, id.as_str()))
1383+
}
13731384
}
13741385

13751386
impl SubgraphDeploymentStore for Store {

store/test-store/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use graph_store_postgres::{
1717
};
1818
use hex_literal::hex;
1919
use lazy_static::lazy_static;
20-
use std::collections::BTreeSet;
2120
use std::env;
2221
use std::sync::Mutex;
2322
use std::time::Instant;
@@ -154,7 +153,7 @@ fn create_subgraph(
154153
id: subgraph_id.clone(),
155154
location: String::new(),
156155
spec_version: "1".to_owned(),
157-
features: BTreeSet::new(),
156+
features: std::collections::BTreeSet::new(),
158157
description: None,
159158
repository: None,
160159
schema: schema.clone(),

0 commit comments

Comments
 (0)