Skip to content

Commit a2fd18f

Browse files
committed
graphql, store: add parse and validation for custom sql
Signed-off-by: Gustavo Inacio <[email protected]> create store/postgres/src/sql validator and formater: full refactor Signed-off-by: Gustavo Inacio <[email protected]> refactor: move sql to store/postgres/src/sql Signed-off-by: Gustavo Inacio <[email protected]> parser: test for array of byte fixed Signed-off-by: Tümay Tuzcu <[email protected]> parser: block_range and block$ added as available columns for tables, sequence functions moved to black list Signed-off-by: Tümay Tuzcu <[email protected]> parser: use latest block as cte filter parser: fix escape columns and tables Fixes STU-217 Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 8089833 commit a2fd18f

File tree

11 files changed

+1347
-3
lines changed

11 files changed

+1347
-3
lines changed

Cargo.lock

Lines changed: 24 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ members = [
1010
"store/*",
1111
"substreams/*",
1212
"graph",
13+
"graph/derive",
1314
"tests",
1415
"graph/derive",
1516
]
@@ -24,7 +25,13 @@ repository = "https://github.com/graphprotocol/graph-node"
2425
license = "MIT OR Apache-2.0"
2526

2627
[workspace.dependencies]
27-
diesel = { version = "2.1.3", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono"] }
28+
diesel = { version = "2.1.3", features = [
29+
"postgres",
30+
"serde_json",
31+
"numeric",
32+
"r2d2",
33+
"chrono",
34+
] }
2835
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
2936
diesel_derives = "2.1.3"
3037
diesel-dynamic-schema = "0.2.1"

graphql/src/store/resolver.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use graph::schema::{
1616
};
1717
use graph::schema::{ErrorPolicy, BLOCK_FIELD_TYPE};
1818

19+
1920
use crate::execution::{ast as a, Query};
2021
use crate::metrics::GraphQLMetrics;
2122
use crate::prelude::{ExecutionContext, Resolver};

store/postgres/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ git-testament = "0.2.5"
3030
itertools = "0.12.0"
3131
hex = "0.4.3"
3232
pretty_assertions = "1.4.0"
33+
sqlparser = { version = "0.40.0", features = ["visitor"] }
34+
thiserror = "1.0.25"
3335

3436
[dev-dependencies]
3537
clap = "3.2.25"

store/postgres/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod query_store;
3333
mod relational;
3434
mod relational_queries;
3535
mod retry;
36+
mod sql;
3637
mod store;
3738
mod store_events;
3839
mod subgraph_store;

store/postgres/src/query_store.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::HashMap;
22
use std::time::Instant;
33

44
use crate::deployment_store::{DeploymentStore, ReplicaId};
5+
use crate::sql::Parser;
56
use graph::components::store::{DeploymentId, QueryPermit, QueryStore as QueryStoreTrait};
67
use graph::data::query::Trace;
78
use graph::data::store::{QueryObject, SqlQueryObject};
@@ -16,6 +17,7 @@ pub(crate) struct QueryStore {
1617
store: Arc<DeploymentStore>,
1718
chain_store: Arc<crate::ChainStore>,
1819
api_version: Arc<ApiVersion>,
20+
sql_parser: Result<Parser, StoreError>,
1921
}
2022

2123
impl QueryStore {
@@ -26,12 +28,16 @@ impl QueryStore {
2628
replica_id: ReplicaId,
2729
api_version: Arc<ApiVersion>,
2830
) -> Self {
31+
let sql_parser = store
32+
.find_layout(site.clone())
33+
.map(|layout| Parser::new(layout));
2934
QueryStore {
3035
site,
3136
replica_id,
3237
store,
3338
chain_store,
3439
api_version,
40+
sql_parser,
3541
}
3642
}
3743
}
@@ -65,7 +71,15 @@ impl QueryStoreTrait for QueryStore {
6571
.store
6672
.get_replica_conn(self.replica_id)
6773
.map_err(|e| QueryExecutionError::SqlError(format!("SQL error: {}", e)))?;
68-
self.store.execute_sql(&mut conn, sql)
74+
75+
let parser = self
76+
.sql_parser
77+
.as_ref()
78+
.map_err(|e| QueryExecutionError::SqlError(format!("SQL error: {}", e)))?;
79+
80+
let sql = parser.parse_and_validate(sql)?;
81+
82+
self.store.execute_sql(&mut conn, &sql)
6983
}
7084

7185
/// Return true if the deployment with the given id is fully synced,

store/postgres/src/sql/constants.rs

Lines changed: 727 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use sqlparser::ast::{ObjectName, Statement, TableFactor, VisitMut, VisitorMut};
2+
use std::ops::ControlFlow;
3+
4+
pub struct Formatter<'a> {
5+
prelude: &'a str,
6+
}
7+
8+
impl<'a> Formatter<'a> {
9+
pub fn new(prelude: &'a str) -> Self {
10+
Self { prelude }
11+
}
12+
13+
fn prepend_prefix_to_object_name_mut(&self, name: &mut ObjectName) {
14+
let table_identifier: &mut Vec<_> = &mut name.0;
15+
// remove all but the last identifier
16+
table_identifier.drain(0..table_identifier.len() - 1);
17+
}
18+
19+
pub fn format(&mut self, statement: &mut Statement) -> String {
20+
statement.visit(self);
21+
22+
format!(
23+
"{} SELECT to_jsonb(sub.*) AS data FROM ( {} ) AS sub",
24+
self.prelude, statement
25+
)
26+
}
27+
}
28+
29+
impl VisitorMut for Formatter<'_> {
30+
type Break = ();
31+
32+
fn pre_visit_table_factor(
33+
&mut self,
34+
table_factor: &mut TableFactor,
35+
) -> ControlFlow<Self::Break> {
36+
if let TableFactor::Table { name, .. } = table_factor {
37+
self.prepend_prefix_to_object_name_mut(name);
38+
}
39+
ControlFlow::Continue(())
40+
}
41+
}
42+
43+
#[cfg(test)]
44+
mod test {
45+
use super::*;
46+
use crate::sql::constants::SQL_DIALECT;
47+
const CTE_PREFIX: &str = "WITH swap AS (
48+
SELECT
49+
id,
50+
amount_in,
51+
amount_out,
52+
concat('0x',encode(token_in,'hex') as token_in,
53+
concat('0x',token_out,'hex') AS token_out
54+
FROM
55+
sdg1.swap
56+
)";
57+
58+
#[test]
59+
fn format_sql() {
60+
let mut formatter = Formatter::new(CTE_PREFIX);
61+
62+
let sql = "SELECT token_in, SUM(amount_in) AS amount FROM unknown.swap GROUP BY token_in";
63+
64+
let mut statements = sqlparser::parser::Parser::parse_sql(&SQL_DIALECT, sql).unwrap();
65+
66+
let mut statement = statements.get_mut(0).unwrap();
67+
68+
let result = formatter.format(&mut statement);
69+
70+
assert_eq!(
71+
result,
72+
format!(
73+
"{} SELECT to_jsonb(sub.*) AS data FROM ( {} ) AS sub",
74+
CTE_PREFIX, "SELECT token_in, SUM(amount_in) AS amount FROM swap GROUP BY token_in"
75+
)
76+
);
77+
}
78+
}

store/postgres/src/sql/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
mod constants;
2+
mod formatter;
3+
mod parser;
4+
mod validation;
5+
6+
use std::collections::{HashMap, HashSet};
7+
8+
pub(self) type Schema = HashMap<String, HashSet<String>>; // HashMap<Table, HashSet<Column>>
9+
10+
pub use parser::Parser;

0 commit comments

Comments
 (0)