Skip to content

Commit b113c15

Browse files
committed
graph, graphql, server: Add rudimentary SQL query support
1 parent 5df824a commit b113c15

File tree

7 files changed

+149
-8
lines changed

7 files changed

+149
-8
lines changed

graph/src/components/graphql.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use crate::data::query::QueryResults;
21
use crate::data::query::{Query, QueryTarget};
3-
use crate::prelude::DeploymentHash;
2+
use crate::data::query::{QueryResults, SqlQueryReq};
3+
use crate::data::store::SqlQueryObject;
4+
use crate::prelude::{DeploymentHash, QueryExecutionError};
45

56
use async_trait::async_trait;
67
use std::sync::Arc;
@@ -28,6 +29,11 @@ pub trait GraphQlRunner: Send + Sync + 'static {
2829
) -> QueryResults;
2930

3031
fn metrics(&self) -> Arc<dyn GraphQLMetrics>;
32+
33+
async fn run_sql_query(
34+
self: Arc<Self>,
35+
req: SqlQueryReq,
36+
) -> Result<Vec<SqlQueryObject>, QueryExecutionError>;
3137
}
3238

3339
pub trait GraphQLMetrics: Send + Sync + 'static {

graph/src/data/query/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ mod trace;
66

77
pub use self::cache_status::CacheStatus;
88
pub use self::error::{QueryError, QueryExecutionError};
9-
pub use self::query::{Query, QueryTarget, QueryVariables};
9+
pub use self::query::{Query, QueryTarget, QueryVariables, SqlQueryMode, SqlQueryReq};
1010
pub use self::result::{LatestBlockInfo, QueryResult, QueryResults};
1111
pub use self::trace::Trace;

graph/src/data/query/query.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use serde::de::Deserializer;
2-
use serde::Deserialize;
2+
use serde::{Deserialize, Serialize};
33
use std::collections::{BTreeMap, HashMap};
44
use std::convert::TryFrom;
5+
use std::hash::{DefaultHasher, Hash as _, Hasher as _};
56
use std::ops::{Deref, DerefMut};
67
use std::sync::Arc;
78

@@ -165,3 +166,26 @@ impl Query {
165166
}
166167
}
167168
}
169+
170+
#[derive(Copy, Clone, Debug, Deserialize, Serialize)]
171+
#[serde(rename_all = "snake_case")]
172+
pub enum SqlQueryMode {
173+
Data,
174+
Info,
175+
}
176+
177+
#[derive(Clone, Debug, Deserialize, Serialize)]
178+
pub struct SqlQueryReq {
179+
pub deployment: DeploymentHash,
180+
pub query: String,
181+
pub mode: SqlQueryMode,
182+
}
183+
184+
impl SqlQueryReq {
185+
pub fn query_hash(&self) -> u64 {
186+
let mut hasher = DefaultHasher::new();
187+
self.deployment.hash(&mut hasher);
188+
self.query.hash(&mut hasher);
189+
hasher.finish()
190+
}
191+
}

graph/src/data/store/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,6 +1103,7 @@ pub struct QueryObject {
11031103
}
11041104

11051105
/// An object that is returned from a SQL query. It wraps an `r::Value`
1106+
#[derive(CacheWeight, Serialize)]
11061107
pub struct SqlQueryObject(pub r::Value);
11071108

11081109
impl CacheWeight for QueryObject {

graphql/src/runner.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ use std::time::Instant;
44
use crate::metrics::GraphQLMetrics;
55
use crate::prelude::{QueryExecutionOptions, StoreResolver};
66
use crate::query::execute_query;
7+
use graph::data::query::{CacheStatus, SqlQueryReq};
8+
use graph::data::store::SqlQueryObject;
79
use graph::futures03::future;
8-
use graph::prelude::MetricsRegistry;
910
use graph::prelude::{
1011
async_trait, o, CheapClone, DeploymentState, GraphQLMetrics as GraphQLMetricsTrait,
1112
GraphQlRunner as GraphQlRunnerTrait, Logger, Query, QueryExecutionError, ENV_VARS,
1213
};
14+
use graph::prelude::{ApiVersion, MetricsRegistry};
1315
use graph::{data::graphql::load_manager::LoadManager, prelude::QueryStoreManager};
1416
use graph::{
1517
data::query::{LatestBlockInfo, QueryResults, QueryTarget},
@@ -251,4 +253,43 @@ where
251253
fn metrics(&self) -> Arc<dyn GraphQLMetricsTrait> {
252254
self.graphql_metrics.clone()
253255
}
256+
257+
async fn run_sql_query(
258+
self: Arc<Self>,
259+
req: SqlQueryReq,
260+
) -> Result<Vec<SqlQueryObject>, QueryExecutionError> {
261+
let store = self
262+
.store
263+
.query_store(QueryTarget::Deployment(
264+
req.deployment.clone(),
265+
ApiVersion::default(),
266+
))
267+
.await?;
268+
269+
let query_hash = req.query_hash();
270+
self.load_manager
271+
.decide(
272+
&store.wait_stats(),
273+
store.shard(),
274+
store.deployment_id(),
275+
query_hash,
276+
&req.query,
277+
)
278+
.to_result()?;
279+
280+
let query_start = Instant::now();
281+
let result = store
282+
.execute_sql(&req.query)
283+
.map_err(|e| QueryExecutionError::from(e));
284+
285+
self.load_manager.record_work(
286+
store.shard(),
287+
store.deployment_id(),
288+
query_hash,
289+
query_start.elapsed(),
290+
CacheStatus::Miss,
291+
);
292+
293+
result
294+
}
254295
}

server/http/src/service.rs

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use graph::components::server::query::ServerResponse;
99
use graph::components::server::query::ServerResult;
1010
use graph::components::versions::ApiVersion;
1111
use graph::data::query::QueryResult;
12+
use graph::data::query::SqlQueryMode;
13+
use graph::data::query::SqlQueryReq;
1214
use graph::data::subgraph::DeploymentHash;
1315
use graph::data::subgraph::SubgraphName;
1416
use graph::env::ENV_VARS;
@@ -21,6 +23,8 @@ use graph::hyper::{body::Body, header::HeaderValue};
2123
use graph::hyper::{Method, Request, Response, StatusCode};
2224
use graph::prelude::serde_json;
2325
use graph::prelude::serde_json::json;
26+
use graph::prelude::CacheWeight as _;
27+
use graph::prelude::QueryError;
2428
use graph::semver::VersionReq;
2529
use graph::slog::error;
2630
use graph::slog::Logger;
@@ -195,6 +199,51 @@ where
195199
Ok(result.as_http_response())
196200
}
197201

202+
async fn handle_sql_query<T: Body>(&self, request: Request<T>) -> ServerResult {
203+
let body = request
204+
.collect()
205+
.await
206+
.map_err(|_| ServerError::InternalError("Failed to read request body".into()))?
207+
.to_bytes();
208+
let sql_req: SqlQueryReq = serde_json::from_slice(&body)
209+
.map_err(|e| ServerError::ClientError(format!("{}", e)))?;
210+
211+
let mode = sql_req.mode;
212+
let result = self
213+
.graphql_runner
214+
.cheap_clone()
215+
.run_sql_query(sql_req)
216+
.await
217+
.map_err(|e| ServerError::QueryError(QueryError::from(e)));
218+
219+
use SqlQueryMode::*;
220+
let response_obj = match (result, mode) {
221+
(Ok(result), Info) => {
222+
json!({
223+
"count": result.len(),
224+
"bytes" : result.weight(),
225+
})
226+
}
227+
(Ok(result), Data) => {
228+
json!({
229+
"data": result,
230+
})
231+
}
232+
(Err(e), _) => json!({
233+
"error": e.to_string(),
234+
}),
235+
};
236+
237+
let response_str = serde_json::to_string(&response_obj).unwrap();
238+
239+
Ok(Response::builder()
240+
.status(200)
241+
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
242+
.header(CONTENT_TYPE, "application/json")
243+
.body(Full::from(response_str))
244+
.unwrap())
245+
}
246+
198247
// Handles OPTIONS requests
199248
fn handle_graphql_options<T>(&self, _request: Request<T>) -> ServerResult {
200249
Ok(Response::builder()
@@ -327,7 +376,9 @@ where
327376
let dest = format!("/{}/graphql", filtered_path);
328377
self.handle_temp_redirect(dest)
329378
}
330-
379+
(Method::POST, &["subgraphs", "sql"] | &["subgraphs", "sql", ""]) => {
380+
self.handle_sql_query(req).await
381+
}
331382
(Method::POST, &["subgraphs", "id", subgraph_id]) => {
332383
self.handle_graphql_query_by_id(subgraph_id.to_owned(), req)
333384
.await
@@ -395,14 +446,15 @@ where
395446

396447
#[cfg(test)]
397448
mod tests {
449+
use graph::data::store::SqlQueryObject;
398450
use graph::data::value::{Object, Word};
399451
use graph::http_body_util::{BodyExt, Full};
400452
use graph::hyper::body::Bytes;
401453
use graph::hyper::header::{CONTENT_LENGTH, CONTENT_TYPE};
402454
use graph::hyper::{Method, Request, StatusCode};
403455
use graph::prelude::serde_json::json;
404456

405-
use graph::data::query::{QueryResults, QueryTarget};
457+
use graph::data::query::{QueryResults, QueryTarget, SqlQueryReq};
406458
use graph::prelude::*;
407459

408460
use crate::test_utils;
@@ -449,6 +501,13 @@ mod tests {
449501
fn metrics(&self) -> Arc<dyn GraphQLMetrics> {
450502
Arc::new(TestGraphQLMetrics)
451503
}
504+
505+
async fn run_sql_query(
506+
self: Arc<Self>,
507+
_req: SqlQueryReq,
508+
) -> Result<Vec<SqlQueryObject>, QueryExecutionError> {
509+
unimplemented!()
510+
}
452511
}
453512

454513
#[tokio::test]

server/http/tests/server.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use graph::http::StatusCode;
1+
use graph::{
2+
data::{query::SqlQueryReq, store::SqlQueryObject},
3+
http::StatusCode,
4+
};
25
use std::time::Duration;
36

47
use graph::data::{
@@ -66,6 +69,13 @@ impl GraphQlRunner for TestGraphQlRunner {
6669
fn metrics(&self) -> Arc<dyn GraphQLMetrics> {
6770
Arc::new(TestGraphQLMetrics)
6871
}
72+
73+
async fn run_sql_query(
74+
self: Arc<Self>,
75+
_req: SqlQueryReq,
76+
) -> Result<Vec<SqlQueryObject>, QueryExecutionError> {
77+
unimplemented!();
78+
}
6979
}
7080

7181
#[cfg(test)]

0 commit comments

Comments
 (0)