Skip to content

Commit 29970e1

Browse files
committed
feat: Add SQL queries support in /v1/sql endpoint
1 parent c15f1ca commit 29970e1

File tree

6 files changed

+717
-1
lines changed

6 files changed

+717
-1
lines changed

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
QueryType as QueryTypeEnum, ResultType
3434
} from './types/enums';
3535
import {
36+
BaseRequest,
3637
RequestContext,
3738
ExtendedRequestContext,
3839
Request,
@@ -324,6 +325,17 @@ class ApiGateway {
324325
}));
325326

326327
app.get(`${this.basePath}/v1/sql`, userMiddlewares, userAsyncHandler(async (req: any, res) => {
328+
// TODO parse req.query with zod/joi/...
329+
330+
if (req.query.format === 'sql') {
331+
await this.sql4sql({
332+
query: req.query.query,
333+
context: req.context,
334+
res: this.resToResultFn(res)
335+
});
336+
return;
337+
}
338+
327339
await this.sql({
328340
query: req.query.query,
329341
context: req.context,
@@ -332,6 +344,17 @@ class ApiGateway {
332344
}));
333345

334346
app.post(`${this.basePath}/v1/sql`, jsonParser, userMiddlewares, userAsyncHandler(async (req, res) => {
347+
// TODO parse req.body with zod/joi/...
348+
349+
if (req.body.format === 'sql') {
350+
await this.sql4sql({
351+
query: req.body.query,
352+
context: req.context,
353+
res: this.resToResultFn(res)
354+
});
355+
return;
356+
}
357+
335358
await this.sql({
336359
query: req.body.query,
337360
context: req.context,
@@ -1272,6 +1295,25 @@ class ApiGateway {
12721295
return [queryType, normalizedQueries, queryNormalizationResult.map((it) => remapToQueryAdapterFormat(it.normalizedQuery))];
12731296
}
12741297

1298+
public async sql4sql({
1299+
query,
1300+
context,
1301+
res,
1302+
}: {query: string} & BaseRequest) {
1303+
try {
1304+
await this.assertApiScope('meta', context.securityContext);
1305+
const result = await this.sqlServer.sql4sql(query, context.securityContext);
1306+
res({ sql: result });
1307+
} catch (e: any) {
1308+
this.handleError({
1309+
e,
1310+
context,
1311+
query,
1312+
res,
1313+
});
1314+
}
1315+
}
1316+
12751317
public async sql({
12761318
query,
12771319
context,

packages/cubejs-api-gateway/src/sql-server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ import {
33
registerInterface,
44
shutdownInterface,
55
execSql,
6+
sql4sql,
67
SqlInterfaceInstance,
78
Request as NativeRequest,
89
LoadRequestMeta,
10+
Sql4SqlResponse,
911
} from '@cubejs-backend/native';
1012
import type { ShutdownMode } from '@cubejs-backend/native';
1113
import { displayCLIWarning, getEnv } from '@cubejs-backend/shared';
@@ -62,6 +64,10 @@ export class SQLServer {
6264
await execSql(this.sqlInterfaceInstance!, sqlQuery, stream, securityContext);
6365
}
6466

67+
public async sql4sql(sqlQuery: string, securityContext?: any): Promise<Sql4SqlResponse> {
68+
return sql4sql(this.sqlInterfaceInstance!, sqlQuery, securityContext);
69+
}
70+
6571
protected buildCheckSqlAuth(options: SQLServerOptions): CheckSQLAuthFn {
6672
return (options.checkSqlAuth && this.wrapCheckSqlAuthFn(options.checkSqlAuth))
6773
|| this.createDefaultCheckSqlAuthFn(options);

packages/cubejs-backend-native/js/index.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,21 @@ export type DBResponsePrimitive =
124124
number |
125125
string;
126126

127+
// TODO type this better, to make it proper disjoint union
128+
export type Sql4SqlOk = {
129+
sql: string,
130+
values: Array<string | null>,
131+
};
132+
export type Sql4SqlError = { error: string };
133+
export type Sql4SqlCommon = {
134+
query_type: {
135+
regular: boolean;
136+
post_processing: boolean;
137+
pushdown: boolean;
138+
}
139+
};
140+
export type Sql4SqlResponse = Sql4SqlCommon & (Sql4SqlOk | Sql4SqlError);
141+
127142
let loadedNative: any = null;
128143

129144
export function loadNative() {
@@ -389,6 +404,13 @@ export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string,
389404
await native.execSql(instance, sqlQuery, stream, securityContext ? JSON.stringify(securityContext) : null);
390405
};
391406

407+
// TODO parse result from native code
408+
export const sql4sql = async (instance: SqlInterfaceInstance, sqlQuery: string, securityContext?: any): Promise<Sql4SqlResponse> => {
409+
const native = loadNative();
410+
411+
return native.sql4sql(instance, sqlQuery, securityContext ? JSON.stringify(securityContext) : null);
412+
};
413+
392414
export const buildSqlAndParams = (cubeEvaluator: any): String => {
393415
const native = loadNative();
394416

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 236 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
use cubesql::compile::datafusion::logical_plan::LogicalPlan;
2+
use cubesql::compile::engine::df::scan::CubeScanNode;
3+
use cubesql::compile::engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode};
14
use cubesql::compile::DatabaseProtocol;
25
use cubesql::compile::{convert_sql_to_cube_query, get_df_batches};
36
use cubesql::config::processing_loop::ShutdownMode;
47
use cubesql::config::ConfigObj;
58
use cubesql::sql::{Session, SessionManager};
6-
use cubesql::transport::TransportService;
9+
use cubesql::transport::{MetaContext, TransportService};
710
use futures::StreamExt;
811

912
use serde_json::Map;
@@ -342,6 +345,183 @@ async fn handle_sql_query(
342345
.await
343346
}
344347

348+
struct Sql4SqlQueryType {
349+
regular: bool,
350+
post_processing: bool,
351+
pushdown: bool,
352+
}
353+
354+
impl Sql4SqlQueryType {
355+
fn regular() -> Self {
356+
Self {
357+
regular: true,
358+
post_processing: false,
359+
pushdown: false,
360+
}
361+
}
362+
363+
fn post_processing() -> Self {
364+
Self {
365+
regular: false,
366+
post_processing: true,
367+
pushdown: false,
368+
}
369+
}
370+
371+
fn pushdown() -> Self {
372+
Self {
373+
regular: false,
374+
post_processing: false,
375+
pushdown: true,
376+
}
377+
}
378+
379+
pub fn to_js<'ctx>(&self, cx: &mut impl Context<'ctx>) -> JsResult<'ctx, JsObject> {
380+
let obj = cx.empty_object();
381+
382+
let regular = cx.boolean(self.regular);
383+
obj.set(cx, "regular", regular)?;
384+
let post_processing = cx.boolean(self.post_processing);
385+
obj.set(cx, "post_processing", post_processing)?;
386+
let pushdown = cx.boolean(self.pushdown);
387+
obj.set(cx, "pushdown", pushdown)?;
388+
389+
Ok(obj)
390+
}
391+
}
392+
393+
enum Sql4SqlResponseResult {
394+
Ok {
395+
sql: String,
396+
values: Vec<Option<String>>,
397+
},
398+
Error {
399+
error: String,
400+
},
401+
}
402+
403+
struct Sql4SqlResponse {
404+
result: Sql4SqlResponseResult,
405+
query_type: Sql4SqlQueryType,
406+
}
407+
408+
impl Sql4SqlResponse {
409+
pub fn to_js<'ctx>(&self, cx: &mut impl Context<'ctx>) -> JsResult<'ctx, JsObject> {
410+
let obj = cx.empty_object();
411+
412+
match &self.result {
413+
Sql4SqlResponseResult::Ok { sql, values } => {
414+
let sql = cx.string(sql);
415+
obj.set(cx, "sql", sql)?;
416+
let js_values = cx.empty_array();
417+
for (i, v) in values.iter().enumerate() {
418+
use std::convert::TryFrom;
419+
let i = u32::try_from(i).unwrap();
420+
let v: Handle<JsValue> = v
421+
.as_ref()
422+
.map(|v| cx.string(v).upcast())
423+
.unwrap_or_else(|| cx.null().upcast());
424+
js_values.set(cx, i, v)?;
425+
}
426+
obj.set(cx, "values", js_values)?;
427+
}
428+
Sql4SqlResponseResult::Error { error } => {
429+
let error = cx.string(error);
430+
obj.set(cx, "error", error)?;
431+
}
432+
}
433+
434+
let query_type = self.query_type.to_js(cx)?;
435+
obj.set(cx, "query_type", query_type)?;
436+
437+
Ok(obj)
438+
}
439+
}
440+
441+
async fn get_sql(
442+
session: &Session,
443+
meta_context: Arc<MetaContext>,
444+
plan: Arc<LogicalPlan>,
445+
) -> Result<Sql4SqlResponse, CubeError> {
446+
let auth_context = session
447+
.state
448+
.auth_context()
449+
.ok_or_else(|| CubeError::internal("Unexpected missing auth context".to_string()))?;
450+
451+
match plan.as_ref() {
452+
LogicalPlan::Extension(extension) => {
453+
let cube_scan_wrapped_sql = extension
454+
.node
455+
.as_any()
456+
.downcast_ref::<CubeScanWrappedSqlNode>();
457+
458+
if let Some(cube_scan_wrapped_sql) = cube_scan_wrapped_sql {
459+
return Ok(Sql4SqlResponse {
460+
result: Sql4SqlResponseResult::Ok {
461+
sql: cube_scan_wrapped_sql.wrapped_sql.sql.clone(),
462+
values: cube_scan_wrapped_sql.wrapped_sql.values.clone(),
463+
},
464+
query_type: Sql4SqlQueryType::pushdown(),
465+
});
466+
}
467+
468+
if extension.node.as_any().is::<CubeScanNode>() {
469+
let cube_scan_wrapper = CubeScanWrapperNode::new(
470+
plan,
471+
meta_context,
472+
auth_context,
473+
None,
474+
session.server.config_obj.clone(),
475+
);
476+
let wrapped_sql = cube_scan_wrapper
477+
.generate_sql(
478+
session.server.transport.clone(),
479+
Arc::new(session.state.get_load_request_meta("sql")),
480+
)
481+
.await?;
482+
483+
return Ok(Sql4SqlResponse {
484+
result: Sql4SqlResponseResult::Ok {
485+
sql: wrapped_sql.wrapped_sql.sql.clone(),
486+
values: wrapped_sql.wrapped_sql.values.clone(),
487+
},
488+
query_type: Sql4SqlQueryType::regular(),
489+
});
490+
}
491+
492+
Err(CubeError::internal(
493+
"Unexpected extension in logical plan root".to_string(),
494+
))
495+
}
496+
_ => Ok(Sql4SqlResponse {
497+
result: Sql4SqlResponseResult::Error {
498+
error: "Provided query can not be executed without post-processing.".to_string(),
499+
},
500+
query_type: Sql4SqlQueryType::post_processing(),
501+
}),
502+
}
503+
}
504+
505+
async fn handle_sql4sql_query(
506+
services: Arc<NodeCubeServices>,
507+
native_auth_ctx: Arc<NativeAuthContext>,
508+
sql_query: &str,
509+
) -> Result<Sql4SqlResponse, CubeError> {
510+
with_session(&services, native_auth_ctx.clone(), |session| async move {
511+
let transport = session.server.transport.clone();
512+
// todo: can we use compiler_cache?
513+
let meta_context = transport
514+
.meta(native_auth_ctx)
515+
.await
516+
.map_err(|err| CubeError::internal(format!("Failed to get meta context: {err}")))?;
517+
let query_plan =
518+
convert_sql_to_cube_query(sql_query, meta_context.clone(), session.clone()).await?;
519+
let logical_plan = query_plan.try_as_logical_plan()?;
520+
get_sql(&session, meta_context, Arc::new(logical_plan.clone())).await
521+
})
522+
.await
523+
}
524+
345525
struct WritableStreamMethods {
346526
stream: Arc<Root<JsObject>>,
347527
on: Arc<Root<JsFunction>>,
@@ -451,6 +631,60 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult<JsValue> {
451631
Ok(promise.upcast::<JsValue>())
452632
}
453633

634+
fn sql4sql(mut cx: FunctionContext) -> JsResult<JsValue> {
635+
let interface = cx.argument::<JsBox<SQLInterface>>(0)?;
636+
let sql_query = cx.argument::<JsString>(1)?.value(&mut cx);
637+
638+
let security_context: Option<serde_json::Value> = match cx.argument::<JsValue>(2) {
639+
Ok(string) => match string.downcast::<JsString, _>(&mut cx) {
640+
Ok(v) => v.value(&mut cx).parse::<serde_json::Value>().ok(),
641+
Err(_) => None,
642+
},
643+
Err(_) => None,
644+
};
645+
646+
let services = interface.services.clone();
647+
let runtime = tokio_runtime_node(&mut cx)?;
648+
649+
let channel = cx.channel();
650+
651+
let native_auth_ctx = Arc::new(NativeAuthContext {
652+
user: Some(String::from("unknown")),
653+
superuser: false,
654+
security_context,
655+
});
656+
657+
let (deferred, promise) = cx.promise();
658+
659+
// In case spawned task panics or gets aborted before settle call it will leave permanently pending Promise in JS land
660+
// We don't want to just waste whole thread (doesn't really matter main or worker or libuv thread pool)
661+
// just busy waiting that JoinHandle
662+
// TODO handle JoinError
663+
// keep JoinHandle alive in JS thread
664+
// check join handle from JS thread periodically, reject promise on JoinError
665+
// maybe register something like uv_check handle (libuv itself does not have ABI stability of N-API)
666+
// can do it relatively rare, and in a single loop for all JoinHandles
667+
// this is just a watchdog for a Very Bad case, so latency requirement can be quite relaxed
668+
runtime.spawn(async move {
669+
let result = handle_sql4sql_query(services, native_auth_ctx, &sql_query).await;
670+
671+
if let Err(err) = deferred.try_settle_with(&channel, move |mut cx| {
672+
// `neon::result::ResultExt` is implemented only for Result<Handle, Handle>, even though Ok variant is not touched
673+
let response = result.or_else(|err| cx.throw_error(err.to_string()))?;
674+
let response = response.to_js(&mut cx)?;
675+
Ok(response)
676+
}) {
677+
// There is not much we can do at this point
678+
// TODO lift this error to task => JoinHandle => JS watchdog
679+
log::error!(
680+
"Unable to settle JS promise from tokio task, try_settle_with failed, err: {err}"
681+
);
682+
}
683+
});
684+
685+
Ok(promise.upcast::<JsValue>())
686+
}
687+
454688
fn is_fallback_build(mut cx: FunctionContext) -> JsResult<JsBoolean> {
455689
#[cfg(feature = "python")]
456690
{
@@ -537,6 +771,7 @@ pub fn register_module_exports<C: NodeConfiguration + 'static>(
537771
cx.export_function("registerInterface", register_interface::<C>)?;
538772
cx.export_function("shutdownInterface", shutdown_interface)?;
539773
cx.export_function("execSql", exec_sql)?;
774+
cx.export_function("sql4sql", sql4sql)?;
540775
cx.export_function("isFallbackBuild", is_fallback_build)?;
541776
cx.export_function("__js_to_clrepr_to_js", debug_js_to_clrepr_to_js)?;
542777

0 commit comments

Comments
 (0)