Skip to content

Commit 261bc4a

Browse files
gusinaciolutter
authored andcommitted
graph, store: Create database sql executor
1 parent c7d8f5b commit 261bc4a

File tree

15 files changed

+1146
-10
lines changed

15 files changed

+1146
-10
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,8 @@ serde_derive = "1.0.125"
8181
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
8282
serde_regex = "1.1.0"
8383
serde_yaml = "0.9.21"
84-
slog = { version = "2.7.0", features = [
85-
"release_max_level_trace",
86-
"max_level_trace",
87-
] }
88-
sqlparser = "0.46.0"
84+
slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] }
85+
sqlparser = { version = "0.46.0", features = ["visitor"] }
8986
strum = { version = "0.26", features = ["derive"] }
9087
syn = { version = "2.0.106", features = ["full"] }
9188
test-store = { path = "./store/test-store" }

graph/src/components/store/traits.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::components::transaction_receipt;
1616
use crate::components::versions::ApiVersion;
1717
use crate::data::query::Trace;
1818
use crate::data::store::ethereum::call;
19-
use crate::data::store::QueryObject;
19+
use crate::data::store::{QueryObject, SqlQueryObject};
2020
use crate::data::subgraph::{status, DeploymentFeatures};
2121
use crate::data::{query::QueryTarget, subgraph::schema::*};
2222
use crate::prelude::{DeploymentState, NodeId, QueryExecutionError, SubgraphName};
@@ -652,6 +652,8 @@ pub trait QueryStore: Send + Sync {
652652
query: EntityQuery,
653653
) -> Result<(Vec<QueryObject>, Trace), QueryExecutionError>;
654654

655+
fn execute_sql(&self, sql: &str) -> Result<Vec<SqlQueryObject>, QueryExecutionError>;
656+
655657
async fn is_deployment_synced(&self) -> Result<bool, Error>;
656658

657659
async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError>;

graph/src/data/query/error.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ pub enum QueryExecutionError {
7373
InvalidSubgraphManifest,
7474
ResultTooBig(usize, usize),
7575
DeploymentNotFound(String),
76+
SqlError(String),
7677
IdMissing,
7778
IdNotString,
7879
InternalError(String),
@@ -135,6 +136,7 @@ impl QueryExecutionError {
135136
| IdMissing
136137
| IdNotString
137138
| InternalError(_) => false,
139+
SqlError(_) => false,
138140
}
139141
}
140142
}
@@ -213,7 +215,7 @@ impl fmt::Display for QueryExecutionError {
213215
}
214216
InvalidFilterError => write!(f, "Filter must by an object"),
215217
InvalidOrFilterStructure(fields, example) => {
216-
write!(f, "Cannot mix column filters with 'or' operator at the same level. Found column filter(s) {} alongside 'or' operator.\n\n{}",
218+
write!(f, "Cannot mix column filters with 'or' operator at the same level. Found column filter(s) {} alongside 'or' operator.\n\n{}",
217219
fields.join(", "), example)
218220
}
219221
EntityFieldError(e, a) => {
@@ -281,6 +283,7 @@ impl fmt::Display for QueryExecutionError {
281283
IdMissing => write!(f, "entity is missing an `id` attribute"),
282284
IdNotString => write!(f, "entity `id` attribute is not a string"),
283285
InternalError(msg) => write!(f, "internal error: {}", msg),
286+
SqlError(e) => write!(f, "sql error: {}", e),
284287
}
285288
}
286289
}

graph/src/data/store/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,9 @@ pub struct QueryObject {
11021102
pub entity: r::Object,
11031103
}
11041104

1105+
/// An object that is returned from a SQL query. It wraps an `r::Value`
1106+
pub struct SqlQueryObject(pub r::Value);
1107+
11051108
impl CacheWeight for QueryObject {
11061109
fn indirect_weight(&self) -> usize {
11071110
self.parent.indirect_weight() + self.entity.indirect_weight()

store/postgres/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ git-testament = "0.2.6"
3232
itertools = "0.14.0"
3333
hex = "0.4.3"
3434
pretty_assertions = "1.4.1"
35+
sqlparser = { workspace = true }
36+
thiserror = { workspace = true }
3537

3638
[dev-dependencies]
3739
clap.workspace = true

store/postgres/src/deployment_store.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use graph::components::store::{
1212
PruningStrategy, QueryPermit, StoredDynamicDataSource, VersionStats,
1313
};
1414
use graph::components::versions::VERSIONS;
15+
use graph::data::graphql::IntoValue;
1516
use graph::data::query::Trace;
16-
use graph::data::store::IdList;
17+
use graph::data::store::{IdList, SqlQueryObject};
1718
use graph::data::subgraph::{status, SPEC_VERSION_0_0_6};
1819
use graph::data_source::CausalityRegion;
1920
use graph::derive::CheapClone;
@@ -54,7 +55,7 @@ use crate::dynds::DataSourcesTable;
5455
use crate::primary::{DeploymentId, Primary};
5556
use crate::relational::index::{CreateIndex, IndexList, Method};
5657
use crate::relational::{self, Layout, LayoutCache, SqlName, Table};
57-
use crate::relational_queries::FromEntityData;
58+
use crate::relational_queries::{FromEntityData, JSONData};
5859
use crate::{advisory_lock, catalog, retry};
5960
use crate::{detail, ConnectionPool};
6061
use crate::{dynds, primary::Site};
@@ -290,6 +291,24 @@ impl DeploymentStore {
290291
layout.query(&logger, conn, query)
291292
}
292293

294+
pub(crate) fn execute_sql(
295+
&self,
296+
conn: &mut PgConnection,
297+
query: &str,
298+
) -> Result<Vec<SqlQueryObject>, QueryExecutionError> {
299+
let query = diesel::sql_query(query);
300+
301+
// Execute the provided SQL query
302+
let results = query
303+
.load::<JSONData>(conn)
304+
.map_err(|e| QueryExecutionError::SqlError(e.to_string()))?;
305+
306+
Ok(results
307+
.into_iter()
308+
.map(|e| SqlQueryObject(e.into_value()))
309+
.collect::<Vec<_>>())
310+
}
311+
293312
fn check_intf_uniqueness(
294313
&self,
295314
conn: &mut PgConnection,

store/postgres/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod query_store;
3030
mod relational;
3131
mod relational_queries;
3232
mod retry;
33+
mod sql;
3334
mod store;
3435
mod store_events;
3536
mod subgraph_store;

store/postgres/src/query_store.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use std::collections::HashMap;
22
use std::time::Instant;
33

44
use crate::deployment_store::{DeploymentStore, ReplicaId};
5+
use crate::sql::Parser;
56
use graph::components::store::{DeploymentId, QueryPermit, QueryStore as QueryStoreTrait};
67
use graph::data::query::Trace;
7-
use graph::data::store::QueryObject;
8+
use graph::data::store::{QueryObject, SqlQueryObject};
89
use graph::prelude::*;
910
use graph::schema::{ApiSchema, InputSchema};
1011

@@ -16,6 +17,7 @@ pub(crate) struct QueryStore {
1617
store: Arc<DeploymentStore>,
1718
chain_store: Arc<crate::ChainStore>,
1819
api_version: Arc<ApiVersion>,
20+
sql_parser: Result<Parser, StoreError>,
1921
}
2022

2123
impl QueryStore {
@@ -26,12 +28,16 @@ impl QueryStore {
2628
replica_id: ReplicaId,
2729
api_version: Arc<ApiVersion>,
2830
) -> Self {
31+
let sql_parser = store
32+
.find_layout(site.clone())
33+
.map(|layout| Parser::new(layout));
2934
QueryStore {
3035
site,
3136
replica_id,
3237
store,
3338
chain_store,
3439
api_version,
40+
sql_parser,
3541
}
3642
}
3743
}
@@ -57,6 +63,25 @@ impl QueryStoreTrait for QueryStore {
5763
})
5864
}
5965

66+
fn execute_sql(
67+
&self,
68+
sql: &str,
69+
) -> Result<Vec<SqlQueryObject>, graph::prelude::QueryExecutionError> {
70+
let mut conn = self
71+
.store
72+
.get_replica_conn(self.replica_id)
73+
.map_err(|e| QueryExecutionError::SqlError(format!("SQL error: {}", e)))?;
74+
75+
let parser = self
76+
.sql_parser
77+
.as_ref()
78+
.map_err(|e| QueryExecutionError::SqlError(format!("SQL error: {}", e)))?;
79+
80+
let sql = parser.parse_and_validate(sql)?;
81+
82+
self.store.execute_sql(&mut conn, &sql)
83+
}
84+
6085
/// Return true if the deployment with the given id is fully synced,
6186
/// and return false otherwise. Errors from the store are passed back up
6287
async fn is_deployment_synced(&self) -> Result<bool, Error> {

store/postgres/src/relational_queries.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use diesel::sql_types::{Array, BigInt, Binary, Bool, Int8, Integer, Jsonb, Text,
1414
use diesel::QuerySource as _;
1515
use graph::components::store::write::{EntityWrite, RowGroup, WriteChunk};
1616
use graph::components::store::{Child as StoreChild, DerivedEntityQuery};
17+
18+
use graph::data::graphql::IntoValue;
1719
use graph::data::store::{Id, IdType, NULL};
1820
use graph::data::store::{IdList, IdRef, QueryObject};
1921
use graph::data::value::{Object, Word};
@@ -439,6 +441,47 @@ pub fn parse_id(id_type: IdType, json: serde_json::Value) -> Result<Id, StoreErr
439441
}
440442
}
441443

444+
#[derive(QueryableByName, Debug)]
445+
pub struct JSONData {
446+
#[diesel(sql_type = Jsonb)]
447+
pub data: serde_json::Value,
448+
}
449+
450+
impl IntoValue for JSONData {
451+
fn into_value(self) -> r::Value {
452+
JSONData::to_value(self.data)
453+
}
454+
}
455+
456+
impl JSONData {
457+
pub fn to_value(data: serde_json::Value) -> r::Value {
458+
match data {
459+
serde_json::Value::Null => r::Value::Null,
460+
serde_json::Value::Bool(b) => r::Value::Boolean(b),
461+
serde_json::Value::Number(n) => {
462+
if let Some(i) = n.as_i64() {
463+
r::Value::Int(i)
464+
} else {
465+
r::Value::Float(n.as_f64().unwrap())
466+
}
467+
}
468+
serde_json::Value::String(s) => r::Value::String(s),
469+
serde_json::Value::Array(vals) => {
470+
let vals: Vec<_> = vals.into_iter().map(JSONData::to_value).collect::<Vec<_>>();
471+
r::Value::List(vals)
472+
}
473+
serde_json::Value::Object(map) => {
474+
let mut m = std::collections::BTreeMap::new();
475+
for (k, v) in map {
476+
let value = JSONData::to_value(v);
477+
m.insert(Word::from(k), value);
478+
}
479+
r::Value::object(m)
480+
}
481+
}
482+
}
483+
}
484+
442485
/// Helper struct for retrieving entities from the database. With diesel, we
443486
/// can only run queries that return columns whose number and type are known
444487
/// at compile time. Because of that, we retrieve the actual data for an

0 commit comments

Comments
 (0)