Skip to content
11 changes: 10 additions & 1 deletion graph/src/data/graphql/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use crate::prelude::s::{
TypeDefinition, Value,
};
use crate::prelude::{ValueType, ENV_VARS};
use crate::schema::{META_FIELD_TYPE, SCHEMA_TYPE_NAME};
use crate::schema::{META_FIELD_TYPE, SCHEMA_TYPE_NAME, SQL_FIELD_TYPE};
use std::collections::{BTreeMap, HashMap};

pub trait ObjectTypeExt {
fn field(&self, name: &str) -> Option<&Field>;
fn is_meta(&self) -> bool;
fn is_sql(&self) -> bool;
}

impl ObjectTypeExt for ObjectType {
Expand All @@ -23,6 +24,10 @@ impl ObjectTypeExt for ObjectType {
fn is_meta(&self) -> bool {
self.name == META_FIELD_TYPE
}

fn is_sql(&self) -> bool {
self.name == SQL_FIELD_TYPE
}
}

impl ObjectTypeExt for InterfaceType {
Expand All @@ -33,6 +38,10 @@ impl ObjectTypeExt for InterfaceType {
fn is_meta(&self) -> bool {
false
}

fn is_sql(&self) -> bool {
false
}
}

pub trait DocumentExt {
Expand Down
7 changes: 7 additions & 0 deletions graph/src/data/graphql/object_or_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,11 @@ impl<'a> ObjectOrInterface<'a> {
ObjectOrInterface::Interface(i) => i.is_meta(),
}
}

pub fn is_sql(&self) -> bool {
match self {
ObjectOrInterface::Object(o) => o.is_sql(),
ObjectOrInterface::Interface(i) => i.is_sql(),
}
}
}
36 changes: 19 additions & 17 deletions graphql/src/execution/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use graph::{
},
futures03::future::TryFutureExt,
prelude::{s, CheapClone},
schema::{is_introspection_field, INTROSPECTION_QUERY_TYPE, META_FIELD_NAME},
schema::{is_introspection_field, INTROSPECTION_QUERY_TYPE, META_FIELD_NAME, SQL_FIELD_NAME},
util::{herd_cache::HerdCache, lfu_cache::EvictStats, timed_rw_lock::TimedMutex},
};
use lazy_static::lazy_static;
Expand Down Expand Up @@ -278,31 +278,33 @@ pub(crate) async fn execute_root_selection_set_uncached(
let mut data_set = a::SelectionSet::empty_from(selection_set);
let mut intro_set = a::SelectionSet::empty_from(selection_set);
let mut meta_items = Vec::new();
let mut sql_items = Vec::new();

for field in selection_set.fields_for(root_type)? {
// See if this is an introspection or data field. We don't worry about
// non-existent fields; those will cause an error later when we execute
// the data_set SelectionSet
if is_introspection_field(&field.name) {
intro_set.push(field)?
} else if field.name == META_FIELD_NAME || field.name == "__typename" {
meta_items.push(field)
} else {
data_set.push(field)?
match field.name.as_str() {
name if is_introspection_field(name) => intro_set.push(field)?,
META_FIELD_NAME | "__typename" => meta_items.push(field),
SQL_FIELD_NAME => sql_items.push(field),
_ => data_set.push(field)?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment above should say something why sql_items gets split out here (it's also missing an explanation why meta_items are split out). In a nutshell:

  • intro_set needs to be handled differently because we use a different resolver (IntrospectionResolver)
  • meta_items need to be handled separately because prefetch can not handle them
  • sql_items need to be handled separately for the same reason (?) If so, maybe we can just put them into meta_items and rename that variable to something more descriptive (maybe noprefetch_items though I don't love that either)

}
}

// If we are getting regular data, prefetch it from the database
let (mut values, trace) = if data_set.is_empty() && meta_items.is_empty() {
(Object::default(), Trace::None)
} else {
let (initial_data, trace) = ctx.resolver.prefetch(ctx, &data_set)?;
data_set.push_fields(meta_items)?;
(
execute_selection_set_to_map(ctx, &data_set, root_type, initial_data).await?,
trace,
)
};
let (mut values, trace) =
if data_set.is_empty() && meta_items.is_empty() && sql_items.is_empty() {
(Object::default(), Trace::None)
} else {
let (initial_data, trace) = ctx.resolver.prefetch(ctx, &data_set)?;
data_set.push_fields(meta_items)?;
data_set.push_fields(sql_items)?;
(
execute_selection_set_to_map(ctx, &data_set, root_type, initial_data).await?,
trace,
)
};

// Resolve introspection fields, if there are any
if !intro_set.is_empty() {
Expand Down
92 changes: 90 additions & 2 deletions graphql/src/store/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use graph::derive::CheapClone;
use graph::prelude::*;
use graph::schema::{
ast as sast, ApiSchema, INTROSPECTION_SCHEMA_FIELD_NAME, INTROSPECTION_TYPE_FIELD_NAME,
META_FIELD_NAME, META_FIELD_TYPE,
META_FIELD_NAME, META_FIELD_TYPE, SQL_CSV_FIELD_TYPE, SQL_JSON_FIELD_TYPE,
};
use graph::schema::{ErrorPolicy, BLOCK_FIELD_TYPE};

Expand Down Expand Up @@ -256,7 +256,6 @@ impl StoreResolver {
let parent_hash = parent_hash
.map(|hash| r::Value::String(format!("{}", hash)))
.unwrap_or(r::Value::Null);

let mut map = BTreeMap::new();
let block = object! {
hash: hash,
Expand All @@ -281,6 +280,90 @@ impl StoreResolver {
);
return Ok(r::Value::object(map));
}

fn handle_sql(&self, field: &a::Field) -> Result<r::Value, QueryExecutionError> {

let input = field
.argument_value("input")
.ok_or_else(|| QueryExecutionError::EmptyQuery)?;

let input = match input {
graph::data::value::Value::Object(s) => s,
_ => {
return Err(QueryExecutionError::SqlError(
"Input is not an object".into(),
))
}
};

enum Format {
Json,
Csv,
}

let format = match input.get("format") {
Some(graph::data::value::Value::Enum(s)) => match s.as_str() {
"JSON" => Format::Json,
"CSV" => Format::Csv,
_ => {
return Err(QueryExecutionError::SqlError(
"Format must be json or csv".into(),
))
}
},
_ => Format::Json,
};

let query = match input.get("query") {
Some(graph::data::value::Value::String(s)) => s,
_ => {
return Err(QueryExecutionError::SqlError(
"Query must be a string".into(),
))
}
};

let result = self.store.execute_sql(&query)?;
let result = result.into_iter().map(|q| q.0).collect::<Vec<_>>();
let row_count = result.len();
// columns should be available even if there's no data
// diesel doesn't support "dynamic query" so it doesn't return column names
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's now a DynamicSelectStatement; PR #5372 uses that, but it requires quite a bit of ceremony. For now, this is fine, but would be great to avoid the whole to_jsonb dance at a later date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good to know. I got to this crate when I was developing but at the time graph-node wasn't using diesel v2. Gonna take a look at it

// we are using this hacky way to get column names
// from the first row of the result
let columns = match result.first() {
Some(r::Value::Object(obj)) => obj
.iter()
.map(|(key, _)| r::Value::String(key.into()))
.collect::<Vec<_>>(),
_ => vec![],
};
let sql_result = match format {
Format::Json => object! {
__typename: SQL_JSON_FIELD_TYPE,
columns: r::Value::List(columns),
rows: result,
rowCount: r::Value::Int(row_count as i64),
},
Format::Csv => object! {
__typename: SQL_CSV_FIELD_TYPE,
columns: r::Value::List(columns),
result: r::Value::String(result.into_iter().filter_map(|v| {
match v {
r::Value::Object(obj) => Some(
obj
.iter()
.map(|(_, v)| v.to_string())
.collect::<Vec<_>>()
.join(",")),
_ => None,
}
}).collect::<Vec<_>>().join("\n")),
rowCount: r::Value::Int(row_count as i64),
},
};

Ok(sql_result)
}
}

#[async_trait]
Expand Down Expand Up @@ -329,6 +412,11 @@ impl Resolver for StoreResolver {
if object_type.is_meta() {
return self.lookup_meta(field).await;
}

if object_type.is_sql() {
return self.handle_sql(field);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ENV_VARS check shouldn't be necessary - when SQL is turned off, a query that uses it shouldn't even make it here as it would have been rejected during validation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason why we have this check here is in the remote case where the schema contains a type called Sql which a disabled SQL service graph-node would allow it.


if let Some(r::Value::List(children)) = prefetched_object {
if children.len() > 1 {
let derived_from_field =
Expand Down