Skip to content

Commit 8089833

Browse files
committed
graph, graphql: add resolver for sql
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 813dfe9 commit 8089833

File tree

4 files changed

+126
-20
lines changed

4 files changed

+126
-20
lines changed

graph/src/data/graphql/ext.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ use crate::prelude::s::{
77
TypeDefinition, Value,
88
};
99
use crate::prelude::{ValueType, ENV_VARS};
10-
use crate::schema::{META_FIELD_TYPE, SCHEMA_TYPE_NAME};
10+
use crate::schema::{META_FIELD_TYPE, SCHEMA_TYPE_NAME, SQL_FIELD_TYPE};
1111
use std::collections::{BTreeMap, HashMap};
1212

1313
pub trait ObjectTypeExt {
1414
fn field(&self, name: &str) -> Option<&Field>;
1515
fn is_meta(&self) -> bool;
16+
fn is_sql(&self) -> bool;
1617
}
1718

1819
impl ObjectTypeExt for ObjectType {
@@ -23,6 +24,10 @@ impl ObjectTypeExt for ObjectType {
2324
fn is_meta(&self) -> bool {
2425
self.name == META_FIELD_TYPE
2526
}
27+
28+
fn is_sql(&self) -> bool {
29+
self.name == SQL_FIELD_TYPE
30+
}
2631
}
2732

2833
impl ObjectTypeExt for InterfaceType {
@@ -33,6 +38,10 @@ impl ObjectTypeExt for InterfaceType {
3338
fn is_meta(&self) -> bool {
3439
false
3540
}
41+
42+
fn is_sql(&self) -> bool {
43+
false
44+
}
3645
}
3746

3847
pub trait DocumentExt {

graph/src/data/graphql/object_or_interface.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,11 @@ impl<'a> ObjectOrInterface<'a> {
134134
ObjectOrInterface::Interface(i) => i.is_meta(),
135135
}
136136
}
137+
138+
pub fn is_sql(&self) -> bool {
139+
match self {
140+
ObjectOrInterface::Object(o) => o.is_sql(),
141+
ObjectOrInterface::Interface(i) => i.is_sql(),
142+
}
143+
}
137144
}

graphql/src/execution/execution.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use graph::{
88
},
99
futures03::future::TryFutureExt,
1010
prelude::{s, CheapClone},
11-
schema::{is_introspection_field, INTROSPECTION_QUERY_TYPE, META_FIELD_NAME},
11+
schema::{is_introspection_field, INTROSPECTION_QUERY_TYPE, META_FIELD_NAME, SQL_FIELD_NAME},
1212
util::{herd_cache::HerdCache, lfu_cache::EvictStats, timed_rw_lock::TimedMutex},
1313
};
1414
use lazy_static::lazy_static;
@@ -278,31 +278,33 @@ pub(crate) async fn execute_root_selection_set_uncached(
278278
let mut data_set = a::SelectionSet::empty_from(selection_set);
279279
let mut intro_set = a::SelectionSet::empty_from(selection_set);
280280
let mut meta_items = Vec::new();
281+
let mut sql_items = Vec::new();
281282

282283
for field in selection_set.fields_for(root_type)? {
283284
// See if this is an introspection or data field. We don't worry about
284285
// non-existent fields; those will cause an error later when we execute
285286
// the data_set SelectionSet
286-
if is_introspection_field(&field.name) {
287-
intro_set.push(field)?
288-
} else if field.name == META_FIELD_NAME || field.name == "__typename" {
289-
meta_items.push(field)
290-
} else {
291-
data_set.push(field)?
287+
match field.name.as_str() {
288+
name if is_introspection_field(name) => intro_set.push(field)?,
289+
META_FIELD_NAME | "__typename" => meta_items.push(field),
290+
SQL_FIELD_NAME => sql_items.push(field),
291+
_ => data_set.push(field)?,
292292
}
293293
}
294294

295295
// If we are getting regular data, prefetch it from the database
296-
let (mut values, trace) = if data_set.is_empty() && meta_items.is_empty() {
297-
(Object::default(), Trace::None)
298-
} else {
299-
let (initial_data, trace) = ctx.resolver.prefetch(ctx, &data_set)?;
300-
data_set.push_fields(meta_items)?;
301-
(
302-
execute_selection_set_to_map(ctx, &data_set, root_type, initial_data).await?,
303-
trace,
304-
)
305-
};
296+
let (mut values, trace) =
297+
if data_set.is_empty() && meta_items.is_empty() && sql_items.is_empty() {
298+
(Object::default(), Trace::None)
299+
} else {
300+
let (initial_data, trace) = ctx.resolver.prefetch(ctx, &data_set)?;
301+
data_set.push_fields(meta_items)?;
302+
data_set.push_fields(sql_items)?;
303+
(
304+
execute_selection_set_to_map(ctx, &data_set, root_type, initial_data).await?,
305+
trace,
306+
)
307+
};
306308

307309
// Resolve introspection fields, if there are any
308310
if !intro_set.is_empty() {

graphql/src/store/resolver.rs

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use graph::derive::CheapClone;
1212
use graph::prelude::*;
1313
use graph::schema::{
1414
ast as sast, ApiSchema, INTROSPECTION_SCHEMA_FIELD_NAME, INTROSPECTION_TYPE_FIELD_NAME,
15-
META_FIELD_NAME, META_FIELD_TYPE,
15+
META_FIELD_NAME, META_FIELD_TYPE, SQL_CSV_FIELD_TYPE, SQL_JSON_FIELD_TYPE,
1616
};
1717
use graph::schema::{ErrorPolicy, BLOCK_FIELD_TYPE};
1818

@@ -256,7 +256,6 @@ impl StoreResolver {
256256
let parent_hash = parent_hash
257257
.map(|hash| r::Value::String(format!("{}", hash)))
258258
.unwrap_or(r::Value::Null);
259-
260259
let mut map = BTreeMap::new();
261260
let block = object! {
262261
hash: hash,
@@ -281,6 +280,90 @@ impl StoreResolver {
281280
);
282281
return Ok(r::Value::object(map));
283282
}
283+
284+
fn handle_sql(&self, field: &a::Field) -> Result<r::Value, QueryExecutionError> {
285+
286+
let input = field
287+
.argument_value("input")
288+
.ok_or_else(|| QueryExecutionError::EmptyQuery)?;
289+
290+
let input = match input {
291+
graph::data::value::Value::Object(s) => s,
292+
_ => {
293+
return Err(QueryExecutionError::SqlError(
294+
"Input is not an object".into(),
295+
))
296+
}
297+
};
298+
299+
enum Format {
300+
Json,
301+
Csv,
302+
}
303+
304+
let format = match input.get("format") {
305+
Some(graph::data::value::Value::Enum(s)) => match s.as_str() {
306+
"JSON" => Format::Json,
307+
"CSV" => Format::Csv,
308+
_ => {
309+
return Err(QueryExecutionError::SqlError(
310+
"Format must be json or csv".into(),
311+
))
312+
}
313+
},
314+
_ => Format::Json,
315+
};
316+
317+
let query = match input.get("query") {
318+
Some(graph::data::value::Value::String(s)) => s,
319+
_ => {
320+
return Err(QueryExecutionError::SqlError(
321+
"Query must be a string".into(),
322+
))
323+
}
324+
};
325+
326+
let result = self.store.execute_sql(&query)?;
327+
let result = result.into_iter().map(|q| q.0).collect::<Vec<_>>();
328+
let row_count = result.len();
329+
// columns should be available even if there's no data
330+
// diesel doesn't support "dynamic query" so it doesn't return column names
331+
// we are using this hacky way to get column names
332+
// from the first row of the result
333+
let columns = match result.first() {
334+
Some(r::Value::Object(obj)) => obj
335+
.iter()
336+
.map(|(key, _)| r::Value::String(key.into()))
337+
.collect::<Vec<_>>(),
338+
_ => vec![],
339+
};
340+
let sql_result = match format {
341+
Format::Json => object! {
342+
__typename: SQL_JSON_FIELD_TYPE,
343+
columns: r::Value::List(columns),
344+
rows: result,
345+
rowCount: r::Value::Int(row_count as i64),
346+
},
347+
Format::Csv => object! {
348+
__typename: SQL_CSV_FIELD_TYPE,
349+
columns: r::Value::List(columns),
350+
result: r::Value::String(result.into_iter().filter_map(|v| {
351+
match v {
352+
r::Value::Object(obj) => Some(
353+
obj
354+
.iter()
355+
.map(|(_, v)| v.to_string())
356+
.collect::<Vec<_>>()
357+
.join(",")),
358+
_ => None,
359+
}
360+
}).collect::<Vec<_>>().join("\n")),
361+
rowCount: r::Value::Int(row_count as i64),
362+
},
363+
};
364+
365+
Ok(sql_result)
366+
}
284367
}
285368

286369
#[async_trait]
@@ -329,6 +412,11 @@ impl Resolver for StoreResolver {
329412
if object_type.is_meta() {
330413
return self.lookup_meta(field).await;
331414
}
415+
416+
if object_type.is_sql() {
417+
return self.handle_sql(field);
418+
}
419+
332420
if let Some(r::Value::List(children)) = prefetched_object {
333421
if children.len() > 1 {
334422
let derived_from_field =

0 commit comments

Comments
 (0)