Skip to content

Commit 8e2a3ec

Browse files
authored
feat(cubesql): In subquery support (#7851)
1 parent 922699c commit 8e2a3ec

File tree

10 files changed

+113
-39
lines changed

10 files changed

+113
-39
lines changed

packages/cubejs-backend-native/Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesql/Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesql/cubesql/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ homepage = "https://cube.dev"
1010

1111
[dependencies]
1212
arc-swap = "1"
13-
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "28a07c390e7195dfd657c85118dee8cb73fc6bf7", default-features = false, features = ["regex_expressions", "unicode_expressions"] }
13+
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "a93bb9641d201b2b42ee92321f07e87bbd357d0e", default-features = false, features = ["regex_expressions", "unicode_expressions"] }
1414
anyhow = "1.0"
1515
thiserror = "1.0.50"
1616
cubeclient = { path = "../cubeclient" }
1717
pg-srv = { path = "../pg-srv" }
18-
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "347f769500e3305f1920d8b38832f483d8795bd3" }
18+
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "6a54d27d3b75a04b9f9cbe309a83078aa54b32fd" }
1919
lazy_static = "1.4.0"
2020
base64 = "0.13.0"
2121
tokio = { version = "^1.35", features = ["full", "rt", "tracing"] }

rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ fn filter_push_down(
498498
subqueries,
499499
input,
500500
schema,
501+
types,
501502
}) => {
502503
// TODO: Push Filter down Subquery
503504
issue_filter(
@@ -516,6 +517,7 @@ fn filter_push_down(
516517
optimizer_config,
517518
)?),
518519
schema: schema.clone(),
520+
types: types.clone(),
519521
}),
520522
)
521523
}

rust/cubesql/cubesql/src/compile/engine/df/optimizers/limit_push_down.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ fn limit_push_down(
339339
subqueries,
340340
input,
341341
schema,
342+
types,
342343
}) => {
343344
// TODO: Pushing Limit down Subquery?
344345
issue_limit(
@@ -359,6 +360,7 @@ fn limit_push_down(
359360
optimizer_config,
360361
)?),
361362
schema: schema.clone(),
363+
types: types.clone(),
362364
}),
363365
)
364366
}

rust/cubesql/cubesql/src/compile/engine/df/optimizers/sort_push_down.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ fn sort_push_down(
295295
subqueries,
296296
input,
297297
schema,
298+
types,
298299
}) => {
299300
// TODO: Pushing Sort down Subquery?
300301
issue_sort(
@@ -306,6 +307,7 @@ fn sort_push_down(
306307
.collect::<Result<_>>()?,
307308
input: Arc::new(sort_push_down(optimizer, input, None, optimizer_config)?),
308309
schema: schema.clone(),
310+
types: types.clone(),
309311
}),
310312
)
311313
}

rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ pub fn rewrite(expr: &Expr, map: &HashMap<Column, Option<Expr>>) -> Result<Optio
4848
right: Box::new(right),
4949
})
5050
}
51-
Expr::AnyExpr { left, op, right } => {
51+
Expr::AnyExpr {
52+
left,
53+
op,
54+
right,
55+
all,
56+
} => {
5257
let rewrites = match (rewrite(left, map)?, rewrite(right, map)?) {
5358
(Some(left), Some(right)) => Some((left, right)),
5459
_ => None,
@@ -57,6 +62,7 @@ pub fn rewrite(expr: &Expr, map: &HashMap<Column, Option<Expr>>) -> Result<Optio
5762
left: Box::new(left),
5863
op: op.clone(),
5964
right: Box::new(right),
65+
all: all.clone(),
6066
})
6167
}
6268
Expr::Like(Like {
@@ -310,6 +316,21 @@ pub fn rewrite(expr: &Expr, map: &HashMap<Column, Option<Expr>>) -> Result<Optio
310316
// As rewrites are used to push things down or up the plan, wildcards
311317
// might change the selection and should be marked as non-rewrittable
312318
Expr::Wildcard | Expr::QualifiedWildcard { .. } => None,
319+
Expr::InSubquery {
320+
expr,
321+
subquery,
322+
negated,
323+
} => {
324+
let rewrites = match (rewrite(expr, map)?, rewrite(subquery, map)?) {
325+
(Some(expr), Some(subquery)) => Some((expr, subquery)),
326+
_ => None,
327+
};
328+
rewrites.map(|(expr, subquery)| Expr::InSubquery {
329+
expr: Box::new(expr),
330+
subquery: Box::new(subquery),
331+
negated: negated.clone(),
332+
})
333+
}
313334
})
314335
}
315336

rust/cubesql/cubesql/src/compile/rewrite/converter.rs

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,22 @@ use crate::{
1010
rewrite::{
1111
analysis::LogicalPlanAnalysis, rewriter::Rewriter, AggregateFunctionExprDistinct,
1212
AggregateFunctionExprFun, AggregateSplit, AggregateUDFExprFun, AliasExprAlias,
13-
AnyExprOp, BetweenExprNegated, BinaryExprOp, CastExprDataType, ChangeUserMemberValue,
14-
ColumnExprColumn, CubeScanAliasToCube, CubeScanLimit, CubeScanOffset,
15-
CubeScanUngrouped, CubeScanWrapped, DimensionName, EmptyRelationProduceOneRow,
16-
FilterMemberMember, FilterMemberOp, FilterMemberValues, FilterOpOp, InListExprNegated,
17-
JoinJoinConstraint, JoinJoinType, JoinLeftOn, JoinRightOn, LikeExprEscapeChar,
18-
LikeExprLikeType, LikeExprNegated, LikeType, LimitFetch, LimitSkip, LiteralExprValue,
19-
LiteralMemberRelation, LiteralMemberValue, LogicalPlanLanguage, MeasureName,
20-
MemberErrorError, OrderAsc, OrderMember, OuterColumnExprColumn,
21-
OuterColumnExprDataType, ProjectionAlias, ProjectionSplit, QueryParamIndex,
22-
ScalarFunctionExprFun, ScalarUDFExprFun, ScalarVariableExprDataType,
23-
ScalarVariableExprVariable, SegmentMemberMember, SortExprAsc, SortExprNullsFirst,
24-
TableScanFetch, TableScanProjection, TableScanSourceTableName, TableScanTableName,
25-
TableUDFExprFun, TimeDimensionDateRange, TimeDimensionGranularity, TimeDimensionName,
26-
TryCastExprDataType, UnionAlias, WindowFunctionExprFun, WindowFunctionExprWindowFrame,
27-
WrappedSelectAlias, WrappedSelectJoinJoinType, WrappedSelectLimit, WrappedSelectOffset,
13+
AnyExprAll, AnyExprOp, BetweenExprNegated, BinaryExprOp, CastExprDataType,
14+
ChangeUserMemberValue, ColumnExprColumn, CubeScanAliasToCube, CubeScanLimit,
15+
CubeScanOffset, CubeScanUngrouped, CubeScanWrapped, DimensionName,
16+
EmptyRelationProduceOneRow, FilterMemberMember, FilterMemberOp, FilterMemberValues,
17+
FilterOpOp, InListExprNegated, InSubqueryNegated, JoinJoinConstraint, JoinJoinType,
18+
JoinLeftOn, JoinRightOn, LikeExprEscapeChar, LikeExprLikeType, LikeExprNegated,
19+
LikeType, LimitFetch, LimitSkip, LiteralExprValue, LiteralMemberRelation,
20+
LiteralMemberValue, LogicalPlanLanguage, MeasureName, MemberErrorError, OrderAsc,
21+
OrderMember, OuterColumnExprColumn, OuterColumnExprDataType, ProjectionAlias,
22+
ProjectionSplit, QueryParamIndex, ScalarFunctionExprFun, ScalarUDFExprFun,
23+
ScalarVariableExprDataType, ScalarVariableExprVariable, SegmentMemberMember,
24+
SortExprAsc, SortExprNullsFirst, SubqueryTypes, TableScanFetch, TableScanProjection,
25+
TableScanSourceTableName, TableScanTableName, TableUDFExprFun, TimeDimensionDateRange,
26+
TimeDimensionGranularity, TimeDimensionName, TryCastExprDataType, UnionAlias,
27+
WindowFunctionExprFun, WindowFunctionExprWindowFrame, WrappedSelectAlias,
28+
WrappedSelectJoinJoinType, WrappedSelectLimit, WrappedSelectOffset,
2829
WrappedSelectSelectType, WrappedSelectType, WrappedSelectUngrouped,
2930
},
3031
},
@@ -221,11 +222,18 @@ impl LogicalPlanToLanguageConverter {
221222
graph.add(LogicalPlanLanguage::LiteralExpr([value]))
222223
}
223224
}
224-
Expr::AnyExpr { left, op, right } => {
225+
Expr::AnyExpr {
226+
left,
227+
op,
228+
right,
229+
all,
230+
} => {
225231
let left = Self::add_expr_replace_params(graph, left, query_params)?;
226232
let op = add_expr_data_node!(graph, op, AnyExprOp);
227233
let right = Self::add_expr_replace_params(graph, right, query_params)?;
228-
graph.add(LogicalPlanLanguage::AnyExpr([left, op, right]))
234+
let all = add_expr_data_node!(graph, all, AnyExprAll);
235+
236+
graph.add(LogicalPlanLanguage::AnyExpr([left, op, right, all]))
229237
}
230238
Expr::BinaryExpr { left, op, right } => {
231239
let left = Self::add_expr_replace_params(graph, left, query_params)?;
@@ -414,6 +422,17 @@ impl LogicalPlanToLanguageConverter {
414422
let negated = add_expr_data_node!(graph, negated, InListExprNegated);
415423
graph.add(LogicalPlanLanguage::InListExpr([expr, list, negated]))
416424
}
425+
Expr::InSubquery {
426+
expr,
427+
subquery,
428+
negated,
429+
} => {
430+
let expr = Self::add_expr_replace_params(graph, expr, query_params)?;
431+
let subquery = Self::add_expr_replace_params(graph, subquery, query_params)?;
432+
let negated = add_expr_data_node!(graph, negated, InSubqueryNegated);
433+
434+
graph.add(LogicalPlanLanguage::InSubquery([expr, subquery, negated]))
435+
}
417436
Expr::Wildcard => graph.add(LogicalPlanLanguage::WildcardExpr([])),
418437
Expr::GetIndexedField { expr, key } => {
419438
let expr = Self::add_expr_replace_params(graph, expr, query_params)?;
@@ -548,8 +567,9 @@ impl LogicalPlanToLanguageConverter {
548567
self.add_logical_plan_replace_params(node.input.as_ref(), query_params)?;
549568
let subqueries =
550569
add_plan_list_node!(self, node.subqueries, query_params, SubquerySubqueries);
570+
let types = add_data_node!(self, node.types, SubqueryTypes);
551571
self.graph
552-
.add(LogicalPlanLanguage::Subquery([input, subqueries]))
572+
.add(LogicalPlanLanguage::Subquery([input, subqueries, types]))
553573
}
554574
LogicalPlan::TableUDFs(node) => {
555575
let expr =
@@ -818,7 +838,13 @@ pub fn node_to_expr(
818838
let left = Box::new(to_expr(params[0].clone())?);
819839
let op = match_data_node!(node_by_id, params[1], AnyExprOp);
820840
let right = Box::new(to_expr(params[2].clone())?);
821-
Expr::AnyExpr { left, op, right }
841+
let all = match_data_node!(node_by_id, params[3], AnyExprAll);
842+
Expr::AnyExpr {
843+
left,
844+
op,
845+
right,
846+
all,
847+
}
822848
}
823849
LogicalPlanLanguage::BinaryExpr(params) => {
824850
let left = Box::new(to_expr(params[0].clone())?);
@@ -1006,6 +1032,16 @@ pub fn node_to_expr(
10061032
"QueryParam can't be evaluated as an Expr node".to_string(),
10071033
));
10081034
}
1035+
LogicalPlanLanguage::InSubquery(params) => {
1036+
let expr = Box::new(to_expr(params[0].clone())?);
1037+
let subquery = Box::new(to_expr(params[1].clone())?);
1038+
let negated = match_data_node!(node_by_id, params[2], InSubqueryNegated);
1039+
Expr::InSubquery {
1040+
expr,
1041+
subquery,
1042+
negated,
1043+
}
1044+
}
10091045
x => panic!("Unexpected expression node: {:?}", x),
10101046
})
10111047
}
@@ -1179,8 +1215,9 @@ impl LanguageToLogicalPlanConverter {
11791215
.into_iter()
11801216
.map(|n| self.to_logical_plan(n))
11811217
.collect::<Result<Vec<_>, _>>()?;
1218+
let types = match_data_node!(node_by_id, params[2], SubqueryTypes);
11821219
LogicalPlanBuilder::from(input)
1183-
.subquery(subqueries)?
1220+
.subquery(subqueries, types)?
11841221
.build()?
11851222
}
11861223
LogicalPlanLanguage::TableUDFs(params) => {

0 commit comments

Comments
 (0)