Skip to content

Commit 22375df

Browse files
authored
Implement ppl exists subquery command with Calcite (#3388)
1 parent 92cb8ab commit 22375df

23 files changed

+1161
-91
lines changed

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.opensearch.sql.ast.expression.When;
3737
import org.opensearch.sql.ast.expression.WindowFunction;
3838
import org.opensearch.sql.ast.expression.Xor;
39+
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
3940
import org.opensearch.sql.ast.expression.subquery.InSubquery;
4041
import org.opensearch.sql.ast.statement.Explain;
4142
import org.opensearch.sql.ast.statement.Query;
@@ -346,4 +347,8 @@ public T visitLookup(Lookup node, C context) {
346347
public T visitSubqueryAlias(SubqueryAlias node, C context) {
347348
return visitChildren(node, context);
348349
}
350+
351+
public T visitExistsSubquery(ExistsSubquery node, C context) {
352+
return visitChildren(node, context);
353+
}
349354
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.expression.subquery;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.RequiredArgsConstructor;
13+
import org.opensearch.sql.ast.AbstractNodeVisitor;
14+
import org.opensearch.sql.ast.expression.UnresolvedExpression;
15+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
16+
import org.opensearch.sql.common.utils.StringUtils;
17+
18+
@Getter
19+
@EqualsAndHashCode(callSuper = false)
20+
@RequiredArgsConstructor
21+
public class ExistsSubquery extends UnresolvedExpression {
22+
private final UnresolvedPlan query;
23+
24+
@Override
25+
public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
26+
return nodeVisitor.visitExistsSubquery(this, context);
27+
}
28+
29+
@Override
30+
public List<UnresolvedExpression> getChild() {
31+
return ImmutableList.of();
32+
}
33+
34+
@Override
35+
public String toString() {
36+
return StringUtils.format("exists ( %s )", query);
37+
}
38+
}

core/src/main/java/org/opensearch/sql/ast/expression/subquery/InSubquery.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99
import lombok.EqualsAndHashCode;
1010
import lombok.Getter;
1111
import lombok.RequiredArgsConstructor;
12-
import lombok.ToString;
1312
import org.opensearch.sql.ast.AbstractNodeVisitor;
1413
import org.opensearch.sql.ast.expression.UnresolvedExpression;
1514
import org.opensearch.sql.ast.tree.UnresolvedPlan;
15+
import org.opensearch.sql.common.utils.StringUtils;
1616

1717
@Getter
18-
@ToString
1918
@EqualsAndHashCode(callSuper = false)
2019
@RequiredArgsConstructor
2120
public class InSubquery extends UnresolvedExpression {
@@ -31,4 +30,9 @@ public List<UnresolvedExpression> getChild() {
3130
public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
3231
return nodeVisitor.visitInSubquery(this, context);
3332
}
33+
34+
@Override
35+
public String toString() {
36+
return StringUtils.format("%s in ( %s )", value, query);
37+
}
3438
}

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;
99

1010
import java.sql.Connection;
11+
import java.util.Optional;
12+
import java.util.Stack;
1113
import java.util.function.BiFunction;
1214
import lombok.Getter;
1315
import lombok.Setter;
16+
import org.apache.calcite.rex.RexCorrelVariable;
1417
import org.apache.calcite.rex.RexNode;
1518
import org.apache.calcite.tools.FrameworkConfig;
1619
import org.apache.calcite.tools.RelBuilder;
@@ -25,6 +28,8 @@ public class CalcitePlanContext {
2528
public final ExtendedRexBuilder rexBuilder;
2629

2730
@Getter @Setter private boolean isResolvingJoinCondition = false;
31+
@Getter @Setter private boolean isResolvingExistsSubquery = false;
32+
private final Stack<RexCorrelVariable> correlVar = new Stack<>();
2833

2934
private CalcitePlanContext(FrameworkConfig config) {
3035
this.config = config;
@@ -42,6 +47,26 @@ public RexNode resolveJoinCondition(
4247
return result;
4348
}
4449

50+
public Optional<RexCorrelVariable> popCorrelVar() {
51+
if (!correlVar.empty()) {
52+
return Optional.of(correlVar.pop());
53+
} else {
54+
return Optional.empty();
55+
}
56+
}
57+
58+
public void pushCorrelVar(RexCorrelVariable v) {
59+
correlVar.push(v);
60+
}
61+
62+
public Optional<RexCorrelVariable> peekCorrelVar() {
63+
if (!correlVar.empty()) {
64+
return Optional.of(correlVar.peek());
65+
} else {
66+
return Optional.empty();
67+
}
68+
}
69+
4570
public static CalcitePlanContext create(FrameworkConfig config) {
4671
return new CalcitePlanContext(config);
4772
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
1313
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
1414

15+
import com.google.common.collect.ImmutableList;
1516
import java.util.ArrayList;
1617
import java.util.List;
1718
import java.util.Objects;
@@ -22,16 +23,22 @@
2223
import org.apache.calcite.rel.RelNode;
2324
import org.apache.calcite.rel.core.JoinRelType;
2425
import org.apache.calcite.rex.RexCall;
26+
import org.apache.calcite.rex.RexCorrelVariable;
2527
import org.apache.calcite.rex.RexLiteral;
2628
import org.apache.calcite.rex.RexNode;
2729
import org.apache.calcite.tools.RelBuilder;
2830
import org.apache.calcite.tools.RelBuilder.AggCall;
31+
import org.apache.calcite.util.Holder;
32+
import org.checkerframework.checker.nullness.qual.Nullable;
2933
import org.opensearch.sql.ast.AbstractNodeVisitor;
3034
import org.opensearch.sql.ast.expression.AllFields;
3135
import org.opensearch.sql.ast.expression.Argument;
36+
import org.opensearch.sql.ast.expression.Compare;
3237
import org.opensearch.sql.ast.expression.Field;
38+
import org.opensearch.sql.ast.expression.Not;
3339
import org.opensearch.sql.ast.expression.QualifiedName;
3440
import org.opensearch.sql.ast.expression.UnresolvedExpression;
41+
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
3542
import org.opensearch.sql.ast.tree.Aggregation;
3643
import org.opensearch.sql.ast.tree.Eval;
3744
import org.opensearch.sql.ast.tree.Filter;
@@ -84,11 +91,35 @@ private RelBuilder scan(RelOptTable tableSchema, CalcitePlanContext context) {
8491
@Override
8592
public RelNode visitFilter(Filter node, CalcitePlanContext context) {
8693
visitChildren(node, context);
94+
boolean containsExistsSubquery = containsExistsSubquery(node.getCondition());
95+
final Holder<@Nullable RexCorrelVariable> v = Holder.empty();
96+
if (containsExistsSubquery) {
97+
context.relBuilder.variable(v::set);
98+
context.pushCorrelVar(v.get());
99+
}
87100
RexNode condition = rexVisitor.analyze(node.getCondition(), context);
88-
context.relBuilder.filter(condition);
101+
if (containsExistsSubquery) {
102+
context.relBuilder.filter(ImmutableList.of(v.get().id), condition);
103+
context.popCorrelVar();
104+
} else {
105+
context.relBuilder.filter(condition);
106+
}
89107
return context.relBuilder.peek();
90108
}
91109

110+
private boolean containsExistsSubquery(Object condition) {
111+
if (condition instanceof ExistsSubquery) {
112+
return true;
113+
}
114+
if (condition instanceof Not n) {
115+
return containsExistsSubquery(n.getExpression());
116+
}
117+
if (condition instanceof Compare c) {
118+
return containsExistsSubquery(c.getLeft()) || containsExistsSubquery(c.getRight());
119+
}
120+
return false;
121+
}
122+
92123
@Override
93124
public RelNode visitProject(Project node, CalcitePlanContext context) {
94125
visitChildren(node, context);
@@ -174,6 +205,23 @@ public RelNode visitEval(Eval node, CalcitePlanContext context) {
174205
if (!overriding.isEmpty()) {
175206
List<RexNode> toDrop = context.relBuilder.fields(overriding);
176207
context.relBuilder.projectExcept(toDrop);
208+
209+
// the overriding field in Calcite will add a numeric suffix, for example:
210+
// `| eval SAL = SAL + 1` creates a field SAL0 to replace SAL, so we rename it back to SAL,
211+
// or query `| eval SAL=SAL + 1 | where exists [ source=DEPT | where emp.SAL=HISAL ]` fails.
212+
List<String> newNames =
213+
context.relBuilder.peek().getRowType().getFieldNames().stream()
214+
.map(
215+
cur -> {
216+
String noNumericSuffix = cur.replaceAll("\\d", "");
217+
if (overriding.contains(noNumericSuffix)) {
218+
return noNumericSuffix;
219+
} else {
220+
return cur;
221+
}
222+
})
223+
.toList();
224+
context.relBuilder.rename(newNames);
177225
}
178226
return context.relBuilder.peek();
179227
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
import org.apache.calcite.rel.type.RelDataType;
1919
import org.apache.calcite.rel.type.RelDataTypeFactory;
2020
import org.apache.calcite.rex.RexBuilder;
21+
import org.apache.calcite.rex.RexCorrelVariable;
2122
import org.apache.calcite.rex.RexNode;
2223
import org.apache.calcite.sql.SqlIntervalQualifier;
2324
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
2425
import org.apache.calcite.sql.parser.SqlParserUtil;
2526
import org.apache.calcite.sql.type.SqlTypeName;
2627
import org.apache.calcite.util.DateString;
28+
import org.apache.calcite.util.Holder;
2729
import org.apache.calcite.util.TimeString;
2830
import org.apache.calcite.util.TimestampString;
31+
import org.checkerframework.checker.nullness.qual.Nullable;
2932
import org.opensearch.sql.ast.AbstractNodeVisitor;
3033
import org.opensearch.sql.ast.expression.Alias;
3134
import org.opensearch.sql.ast.expression.And;
@@ -41,6 +44,7 @@
4144
import org.opensearch.sql.ast.expression.SpanUnit;
4245
import org.opensearch.sql.ast.expression.UnresolvedExpression;
4346
import org.opensearch.sql.ast.expression.Xor;
47+
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
4448
import org.opensearch.sql.ast.expression.subquery.InSubquery;
4549
import org.opensearch.sql.ast.tree.UnresolvedPlan;
4650
import org.opensearch.sql.calcite.utils.BuiltinFunctionUtils;
@@ -156,40 +160,62 @@ public RexNode visitEqualTo(EqualTo node, CalcitePlanContext context) {
156160

157161
@Override
158162
public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context) {
163+
// 1. resolve QualifiedName in join condition
159164
if (context.isResolvingJoinCondition()) {
160165
List<String> parts = node.getParts();
161166
if (parts.size() == 1) {
162-
// Handle the case of `id = cid`
167+
// 1.1 Handle the case of `id = cid`
163168
try {
164169
return context.relBuilder.field(2, 0, parts.getFirst());
165170
} catch (IllegalArgumentException ee) {
166171
return context.relBuilder.field(2, 1, parts.getFirst());
167172
}
168173
} else if (parts.size() == 2) {
169-
// Handle the case of `t1.id = t2.id` or `alias1.id = alias2.id`
174+
// 1.2 Handle the case of `t1.id = t2.id` or `alias1.id = alias2.id`
170175
return context.relBuilder.field(2, parts.get(0), parts.get(1));
171176
} else if (parts.size() == 3) {
172177
throw new UnsupportedOperationException("Unsupported qualified name: " + node);
173178
}
174179
}
180+
181+
// 2. resolve QualifiedName in non-join condition
175182
String qualifiedName = node.toString();
176183
List<String> currentFields = context.relBuilder.peek().getRowType().getFieldNames();
177184
if (currentFields.contains(qualifiedName)) {
185+
// 2.1 resolve QualifiedName from stack top
178186
return context.relBuilder.field(qualifiedName);
179187
} else if (node.getParts().size() == 2) {
188+
// 2.2 resolve QualifiedName with an alias or table name
180189
List<String> parts = node.getParts();
181-
return context.relBuilder.field(parts.get(0), parts.get(1));
190+
try {
191+
return context.relBuilder.field(1, parts.get(0), parts.get(1));
192+
} catch (IllegalArgumentException e) {
193+
// 2.3 resolve QualifiedName with outer alias
194+
return context
195+
.peekCorrelVar()
196+
.map(correlVar -> context.relBuilder.field(correlVar, parts.get(1)))
197+
.orElseThrow(() -> e); // Re-throw the exception if no correlated variable exists
198+
}
182199
} else if (currentFields.stream().noneMatch(f -> f.startsWith(qualifiedName))) {
183-
return context.relBuilder.field(qualifiedName);
200+
// 2.4 try resolving combination of 2.1 and 2.3 to resolve rest cases
201+
return context
202+
.peekCorrelVar()
203+
.map(correlVar -> context.relBuilder.field(correlVar, qualifiedName))
204+
.orElseGet(() -> context.relBuilder.field(qualifiedName));
184205
}
185-
// Handle the overriding fields, for example, `eval SAL = SAL + 1` will delete the original SAL
186-
// and add a SAL0
206+
// 3. resolve overriding fields, for example, `eval SAL = SAL + 1` will delete the original SAL
207+
// and add a SAL0. SAL0 in currentFields, but qualifiedName is SAL.
208+
// TODO now we cannot handle the case using a overriding fields in subquery, for example
209+
// source = EMP | eval DEPTNO = DEPTNO + 1 | where exists [ source = DEPT | where emp.DEPTNO =
210+
// DEPTNO ]
187211
Map<String, String> fieldMap =
188212
currentFields.stream().collect(Collectors.toMap(s -> s.replaceAll("\\d", ""), s -> s));
189213
if (fieldMap.containsKey(qualifiedName)) {
190214
return context.relBuilder.field(fieldMap.get(qualifiedName));
191215
} else {
192-
return null;
216+
throw new IllegalArgumentException(
217+
String.format(
218+
"field [%s] not found; input fields are: %s", qualifiedName, currentFields));
193219
}
194220
}
195221

@@ -256,20 +282,8 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
256282
@Override
257283
public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
258284
List<RexNode> nodes = node.getChild().stream().map(child -> analyze(child, context)).toList();
259-
// clear and store the outer state
260-
boolean isResolvingJoinConditionOuter = context.isResolvingJoinCondition();
261-
if (isResolvingJoinConditionOuter) {
262-
context.setResolvingJoinCondition(false);
263-
}
264285
UnresolvedPlan subquery = node.getQuery();
265-
266-
RelNode subqueryRel = subquery.accept(planVisitor, context);
267-
// pop the inner plan
268-
context.relBuilder.build();
269-
// restore to the previous state
270-
if (isResolvingJoinConditionOuter) {
271-
context.setResolvingJoinCondition(true);
272-
}
286+
RelNode subqueryRel = resolveSubqueryPlan(subquery, false, context);
273287
try {
274288
return context.relBuilder.in(subqueryRel, nodes);
275289
// TODO
@@ -288,4 +302,32 @@ public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
288302
+ " of columns in the output of subquery");
289303
}
290304
}
305+
306+
@Override
307+
public RexNode visitExistsSubquery(ExistsSubquery node, CalcitePlanContext context) {
308+
final Holder<@Nullable RexCorrelVariable> v = Holder.empty();
309+
return context.relBuilder.exists(
310+
b -> {
311+
UnresolvedPlan subquery = node.getQuery();
312+
return resolveSubqueryPlan(subquery, true, context);
313+
});
314+
}
315+
316+
private RelNode resolveSubqueryPlan(
317+
UnresolvedPlan subquery, boolean isExists, CalcitePlanContext context) {
318+
// clear and store the outer state
319+
boolean isResolvingJoinConditionOuter = context.isResolvingJoinCondition();
320+
if (isResolvingJoinConditionOuter) {
321+
context.setResolvingJoinCondition(false);
322+
}
323+
RelNode subqueryRel = subquery.accept(planVisitor, context);
324+
// pop the inner plan
325+
context.relBuilder.build();
326+
// clear the exists subquery resolving state
327+
// restore to the previous state
328+
if (isResolvingJoinConditionOuter) {
329+
context.setResolvingJoinCondition(true);
330+
}
331+
return subqueryRel;
332+
}
291333
}

core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public static ExprType convertRelDataTypeToExprType(RelDataType type) {
144144
return FLOAT;
145145
case DOUBLE:
146146
return DOUBLE;
147+
case CHAR:
147148
case VARCHAR:
148149
return STRING;
149150
case BOOLEAN:

0 commit comments

Comments
 (0)