Skip to content

Commit a07c2cd

Browse files
authored
Implement ppl scalar subquery command with Calcite (#3392)
* Implement ppl scalar subquery command with Calcite Signed-off-by: Lantao Jin <ltjin@amazon.com> * more general subquery checker Signed-off-by: Lantao Jin <ltjin@amazon.com> * support correlated IN subquery Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 22375df commit a07c2cd

File tree

14 files changed

+823
-32
lines changed

14 files changed

+823
-32
lines changed

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.sql.ast.expression.Xor;
3939
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
4040
import org.opensearch.sql.ast.expression.subquery.InSubquery;
41+
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
4142
import org.opensearch.sql.ast.statement.Explain;
4243
import org.opensearch.sql.ast.statement.Query;
4344
import org.opensearch.sql.ast.statement.Statement;
@@ -348,6 +349,10 @@ public T visitSubqueryAlias(SubqueryAlias node, C context) {
348349
return visitChildren(node, context);
349350
}
350351

352+
public T visitScalarSubquery(ScalarSubquery node, C context) {
353+
return visitChildren(node, context);
354+
}
355+
351356
public T visitExistsSubquery(ExistsSubquery node, C context) {
352357
return visitChildren(node, context);
353358
}

core/src/main/java/org/opensearch/sql/ast/expression/subquery/ExistsSubquery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import org.opensearch.sql.common.utils.StringUtils;
1717

1818
@Getter
19-
@EqualsAndHashCode(callSuper = false)
19+
@EqualsAndHashCode(callSuper = true)
2020
@RequiredArgsConstructor
21-
public class ExistsSubquery extends UnresolvedExpression {
21+
public class ExistsSubquery extends SubqueryExpression {
2222
private final UnresolvedPlan query;
2323

2424
@Override

core/src/main/java/org/opensearch/sql/ast/expression/subquery/InSubquery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import org.opensearch.sql.common.utils.StringUtils;
1616

1717
@Getter
18-
@EqualsAndHashCode(callSuper = false)
18+
@EqualsAndHashCode(callSuper = true)
1919
@RequiredArgsConstructor
20-
public class InSubquery extends UnresolvedExpression {
20+
public class InSubquery extends SubqueryExpression {
2121
private final List<UnresolvedExpression> value;
2222
private final UnresolvedPlan query;
2323

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.expression.subquery;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.RequiredArgsConstructor;
13+
import lombok.ToString;
14+
import org.opensearch.sql.ast.AbstractNodeVisitor;
15+
import org.opensearch.sql.ast.expression.UnresolvedExpression;
16+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
17+
18+
@Getter
19+
@ToString
20+
@EqualsAndHashCode(callSuper = true)
21+
@RequiredArgsConstructor
22+
public class ScalarSubquery extends SubqueryExpression {
23+
private final UnresolvedPlan query;
24+
25+
@Override
26+
public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
27+
return nodeVisitor.visitScalarSubquery(this, context);
28+
}
29+
30+
@Override
31+
public List<UnresolvedExpression> getChild() {
32+
return ImmutableList.of();
33+
}
34+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.expression.subquery;
7+
8+
import lombok.EqualsAndHashCode;
9+
import org.opensearch.sql.ast.expression.UnresolvedExpression;
10+
11+
/** Basic class of subquery expression */
12+
@EqualsAndHashCode(callSuper = false)
13+
public abstract class SubqueryExpression extends UnresolvedExpression {}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
1414

1515
import com.google.common.collect.ImmutableList;
16+
import com.google.common.collect.Iterables;
1617
import java.util.ArrayList;
1718
import java.util.List;
1819
import java.util.Objects;
@@ -31,14 +32,14 @@
3132
import org.apache.calcite.util.Holder;
3233
import org.checkerframework.checker.nullness.qual.Nullable;
3334
import org.opensearch.sql.ast.AbstractNodeVisitor;
35+
import org.opensearch.sql.ast.Node;
3436
import org.opensearch.sql.ast.expression.AllFields;
3537
import org.opensearch.sql.ast.expression.Argument;
36-
import org.opensearch.sql.ast.expression.Compare;
3738
import org.opensearch.sql.ast.expression.Field;
38-
import org.opensearch.sql.ast.expression.Not;
39+
import org.opensearch.sql.ast.expression.Let;
3940
import org.opensearch.sql.ast.expression.QualifiedName;
4041
import org.opensearch.sql.ast.expression.UnresolvedExpression;
41-
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
42+
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
4243
import org.opensearch.sql.ast.tree.Aggregation;
4344
import org.opensearch.sql.ast.tree.Eval;
4445
import org.opensearch.sql.ast.tree.Filter;
@@ -91,14 +92,14 @@ private RelBuilder scan(RelOptTable tableSchema, CalcitePlanContext context) {
9192
@Override
9293
public RelNode visitFilter(Filter node, CalcitePlanContext context) {
9394
visitChildren(node, context);
94-
boolean containsExistsSubquery = containsExistsSubquery(node.getCondition());
95+
boolean containsSubqueryExpression = containsSubqueryExpression(node.getCondition());
9596
final Holder<@Nullable RexCorrelVariable> v = Holder.empty();
96-
if (containsExistsSubquery) {
97+
if (containsSubqueryExpression) {
9798
context.relBuilder.variable(v::set);
9899
context.pushCorrelVar(v.get());
99100
}
100101
RexNode condition = rexVisitor.analyze(node.getCondition(), context);
101-
if (containsExistsSubquery) {
102+
if (containsSubqueryExpression) {
102103
context.relBuilder.filter(ImmutableList.of(v.get().id), condition);
103104
context.popCorrelVar();
104105
} else {
@@ -107,15 +108,20 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
107108
return context.relBuilder.peek();
108109
}
109110

110-
private boolean containsExistsSubquery(Object condition) {
111-
if (condition instanceof ExistsSubquery) {
111+
private boolean containsSubqueryExpression(Node expr) {
112+
if (expr == null) {
113+
return false;
114+
}
115+
if (expr instanceof SubqueryExpression) {
112116
return true;
113117
}
114-
if (condition instanceof Not n) {
115-
return containsExistsSubquery(n.getExpression());
118+
if (expr instanceof Let l) {
119+
return containsSubqueryExpression(l.getExpression());
116120
}
117-
if (condition instanceof Compare c) {
118-
return containsExistsSubquery(c.getLeft()) || containsExistsSubquery(c.getRight());
121+
for (Node child : expr.getChild()) {
122+
if (containsSubqueryExpression(child)) {
123+
return true;
124+
}
119125
}
120126
return false;
121127
}
@@ -187,8 +193,25 @@ public RelNode visitEval(Eval node, CalcitePlanContext context) {
187193
node.getExpressionList().stream()
188194
.map(
189195
expr -> {
196+
boolean containsSubqueryExpression = containsSubqueryExpression(expr);
197+
final Holder<@Nullable RexCorrelVariable> v = Holder.empty();
198+
if (containsSubqueryExpression) {
199+
context.relBuilder.variable(v::set);
200+
context.pushCorrelVar(v.get());
201+
}
190202
RexNode eval = rexVisitor.analyze(expr, context);
191-
context.relBuilder.projectPlus(eval);
203+
if (containsSubqueryExpression) {
204+
// RelBuilder.projectPlus doesn't have a parameter with variablesSet:
205+
// projectPlus(Iterable<CorrelationId> variablesSet, RexNode... nodes)
206+
context.relBuilder.project(
207+
Iterables.concat(context.relBuilder.fields(), ImmutableList.of(eval)),
208+
ImmutableList.of(),
209+
false,
210+
ImmutableList.of(v.get().id));
211+
context.popCorrelVar();
212+
} else {
213+
context.relBuilder.projectPlus(eval);
214+
}
192215
return eval;
193216
})
194217
.collect(Collectors.toList());

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,14 @@
1818
import org.apache.calcite.rel.type.RelDataType;
1919
import org.apache.calcite.rel.type.RelDataTypeFactory;
2020
import org.apache.calcite.rex.RexBuilder;
21-
import org.apache.calcite.rex.RexCorrelVariable;
2221
import org.apache.calcite.rex.RexNode;
2322
import org.apache.calcite.sql.SqlIntervalQualifier;
2423
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
2524
import org.apache.calcite.sql.parser.SqlParserUtil;
2625
import org.apache.calcite.sql.type.SqlTypeName;
2726
import org.apache.calcite.util.DateString;
28-
import org.apache.calcite.util.Holder;
2927
import org.apache.calcite.util.TimeString;
3028
import org.apache.calcite.util.TimestampString;
31-
import org.checkerframework.checker.nullness.qual.Nullable;
3229
import org.opensearch.sql.ast.AbstractNodeVisitor;
3330
import org.opensearch.sql.ast.expression.Alias;
3431
import org.opensearch.sql.ast.expression.And;
@@ -46,6 +43,7 @@
4643
import org.opensearch.sql.ast.expression.Xor;
4744
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
4845
import org.opensearch.sql.ast.expression.subquery.InSubquery;
46+
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
4947
import org.opensearch.sql.ast.tree.UnresolvedPlan;
5048
import org.opensearch.sql.calcite.utils.BuiltinFunctionUtils;
5149
import org.opensearch.sql.exception.SemanticCheckException;
@@ -283,7 +281,7 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
283281
public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
284282
List<RexNode> nodes = node.getChild().stream().map(child -> analyze(child, context)).toList();
285283
UnresolvedPlan subquery = node.getQuery();
286-
RelNode subqueryRel = resolveSubqueryPlan(subquery, false, context);
284+
RelNode subqueryRel = resolveSubqueryPlan(subquery, context);
287285
try {
288286
return context.relBuilder.in(subqueryRel, nodes);
289287
// TODO
@@ -303,18 +301,25 @@ public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
303301
}
304302
}
305303

304+
@Override
305+
public RexNode visitScalarSubquery(ScalarSubquery node, CalcitePlanContext context) {
306+
return context.relBuilder.scalarQuery(
307+
b -> {
308+
UnresolvedPlan subquery = node.getQuery();
309+
return resolveSubqueryPlan(subquery, context);
310+
});
311+
}
312+
306313
@Override
307314
public RexNode visitExistsSubquery(ExistsSubquery node, CalcitePlanContext context) {
308-
final Holder<@Nullable RexCorrelVariable> v = Holder.empty();
309315
return context.relBuilder.exists(
310316
b -> {
311317
UnresolvedPlan subquery = node.getQuery();
312-
return resolveSubqueryPlan(subquery, true, context);
318+
return resolveSubqueryPlan(subquery, context);
313319
});
314320
}
315321

316-
private RelNode resolveSubqueryPlan(
317-
UnresolvedPlan subquery, boolean isExists, CalcitePlanContext context) {
322+
private RelNode resolveSubqueryPlan(UnresolvedPlan subquery, CalcitePlanContext context) {
318323
// clear and store the outer state
319324
boolean isResolvingJoinConditionOuter = context.isResolvingJoinCondition();
320325
if (isResolvingJoinConditionOuter) {

0 commit comments

Comments
 (0)