Skip to content

Commit 8262890

Browse files
committed
Merge origin/main into refactor-udf
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
2 parents 5991c99 + b784b87 commit 8262890

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1998
-260
lines changed

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.opensearch.sql.ast.tree.TableFunction;
7171
import org.opensearch.sql.ast.tree.Trendline;
7272
import org.opensearch.sql.ast.tree.Values;
73+
import org.opensearch.sql.ast.tree.Window;
7374

7475
/** AST nodes visitor Defines the traverse path. */
7576
public abstract class AbstractNodeVisitor<T, C> {
@@ -347,6 +348,10 @@ public T visitPatterns(Patterns patterns, C context) {
347348
return visitChildren(patterns, context);
348349
}
349350

351+
public T visitWindow(Window window, C context) {
352+
return visitChildren(window, context);
353+
}
354+
350355
public T visitJoin(Join node, C context) {
351356
return visitChildren(node, context);
352357
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.expression;
7+
8+
import lombok.Getter;
9+
10+
public abstract class WindowBound {
11+
private WindowBound() {}
12+
13+
@Getter
14+
public static class OffSetWindowBound extends WindowBound {
15+
private final long offset;
16+
private final boolean isPreceding;
17+
18+
OffSetWindowBound(long offset, boolean isPreceding) {
19+
this.offset = offset;
20+
this.isPreceding = isPreceding;
21+
}
22+
23+
public boolean isPreceding() {
24+
return isPreceding;
25+
}
26+
}
27+
28+
public static class CurrentRowWindowBound extends WindowBound {
29+
CurrentRowWindowBound() {}
30+
31+
@Override
32+
public String toString() {
33+
return "CURRENT ROW";
34+
}
35+
}
36+
37+
public static class UnboundedWindowBound extends WindowBound {
38+
private final boolean isPreceding;
39+
40+
UnboundedWindowBound(boolean isPreceding) {
41+
this.isPreceding = isPreceding;
42+
}
43+
44+
public boolean isPreceding() {
45+
return isPreceding;
46+
}
47+
48+
@Override
49+
public boolean equals(Object o) {
50+
return this == o
51+
|| o instanceof UnboundedWindowBound
52+
&& isPreceding == ((UnboundedWindowBound) o).isPreceding;
53+
}
54+
55+
@Override
56+
public String toString() {
57+
return isPreceding ? "UNBOUNDED PRECEDING" : "UNBOUNDED FOLLOWING";
58+
}
59+
}
60+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.expression;
7+
8+
import java.util.Locale;
9+
import lombok.EqualsAndHashCode;
10+
import lombok.Getter;
11+
import lombok.RequiredArgsConstructor;
12+
import lombok.ToString;
13+
14+
@EqualsAndHashCode(callSuper = false)
15+
@Getter
16+
@RequiredArgsConstructor
17+
@ToString
18+
public class WindowFrame extends UnresolvedExpression {
19+
private final FrameType type;
20+
private final WindowBound lower;
21+
private final WindowBound upper;
22+
23+
public enum FrameType {
24+
RANGE,
25+
ROWS
26+
}
27+
28+
public static WindowFrame defaultFrame() {
29+
return new WindowFrame(
30+
FrameType.ROWS, createBound("UNBOUNDED PRECEDING"), createBound("UNBOUNDED FOLLOWING"));
31+
}
32+
33+
public static WindowFrame create(FrameType type, Literal lower, Literal upper) {
34+
WindowBound lowerBound = null;
35+
WindowBound upperBound = null;
36+
if (lower != null) {
37+
if (lower.getType() == DataType.STRING) {
38+
lowerBound = createBound(lower.getValue().toString());
39+
} else {
40+
throw new IllegalArgumentException(
41+
String.format("Unsupported bound type: %s", lower.getType()));
42+
}
43+
}
44+
if (upper != null) {
45+
if (upper.getType() == DataType.STRING) {
46+
upperBound = createBound(upper.getValue().toString());
47+
} else {
48+
throw new IllegalArgumentException(
49+
String.format("Unsupported bound type: %s", upper.getType()));
50+
}
51+
}
52+
return new WindowFrame(type, lowerBound, upperBound);
53+
}
54+
55+
private static WindowBound createBound(String boundType) {
56+
boundType = boundType.trim().toUpperCase(Locale.ROOT);
57+
if ("CURRENT ROW".equals(boundType)) {
58+
return new WindowBound.CurrentRowWindowBound();
59+
} else if ("UNBOUNDED PRECEDING".equals(boundType)) {
60+
return new WindowBound.UnboundedWindowBound(true);
61+
} else if ("UNBOUNDED FOLLOWING".equals(boundType)) {
62+
return new WindowBound.UnboundedWindowBound(false);
63+
} else if (boundType.endsWith(" PRECEDING")) {
64+
long number = Long.parseLong(boundType.split(" PRECEDING")[0]);
65+
return new WindowBound.OffSetWindowBound(number, true);
66+
} else if (boundType.endsWith(" FOLLOWING")) {
67+
long number = Long.parseLong(boundType.split(" FOLLOWING")[0]);
68+
return new WindowBound.OffSetWindowBound(number, false);
69+
} else {
70+
throw new IllegalArgumentException(String.format("Unsupported bound type: %s", boundType));
71+
}
72+
}
73+
}

core/src/main/java/org/opensearch/sql/ast/expression/WindowFunction.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,41 @@
55

66
package org.opensearch.sql.ast.expression;
77

8-
import com.google.common.collect.ImmutableList;
8+
import java.util.ArrayList;
99
import java.util.List;
10-
import lombok.AllArgsConstructor;
1110
import lombok.EqualsAndHashCode;
1211
import lombok.Getter;
1312
import lombok.RequiredArgsConstructor;
13+
import lombok.Setter;
1414
import lombok.ToString;
1515
import org.apache.commons.lang3.tuple.Pair;
1616
import org.opensearch.sql.ast.AbstractNodeVisitor;
1717
import org.opensearch.sql.ast.Node;
1818
import org.opensearch.sql.ast.tree.Sort.SortOption;
1919

20-
@AllArgsConstructor
20+
@RequiredArgsConstructor
2121
@EqualsAndHashCode(callSuper = false)
2222
@Getter
23-
@RequiredArgsConstructor
2423
@ToString
2524
public class WindowFunction extends UnresolvedExpression {
2625

2726
private final UnresolvedExpression function;
28-
private List<UnresolvedExpression> partitionByList;
29-
private List<Pair<SortOption, UnresolvedExpression>> sortList;
27+
@Setter private List<UnresolvedExpression> partitionByList = new ArrayList<>();
28+
@Setter private List<Pair<SortOption, UnresolvedExpression>> sortList = new ArrayList<>();
29+
@Setter private WindowFrame windowFrame = WindowFrame.defaultFrame();
30+
31+
public WindowFunction(
32+
UnresolvedExpression function,
33+
List<UnresolvedExpression> partitionByList,
34+
List<Pair<SortOption, UnresolvedExpression>> sortList) {
35+
this.function = function;
36+
this.partitionByList = partitionByList;
37+
this.sortList = sortList;
38+
}
3039

3140
@Override
3241
public List<? extends Node> getChild() {
33-
ImmutableList.Builder<UnresolvedExpression> children = ImmutableList.builder();
34-
children.add(function);
35-
children.addAll(partitionByList);
36-
sortList.forEach(pair -> children.add(pair.getRight()));
37-
return children.build();
42+
return List.of(function);
3843
}
3944

4045
@Override

core/src/main/java/org/opensearch/sql/ast/tree/Patterns.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
public class Patterns extends UnresolvedPlan {
3030

3131
private final UnresolvedExpression windowFunction;
32-
3332
private UnresolvedPlan child;
3433

3534
@Override
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.RequiredArgsConstructor;
13+
import lombok.ToString;
14+
import org.opensearch.sql.ast.AbstractNodeVisitor;
15+
import org.opensearch.sql.ast.expression.UnresolvedExpression;
16+
17+
@Getter
18+
@ToString
19+
@EqualsAndHashCode(callSuper = false)
20+
@RequiredArgsConstructor
21+
public class Window extends UnresolvedPlan {
22+
23+
private final List<UnresolvedExpression> windowFunctionList;
24+
private UnresolvedPlan child;
25+
26+
@Override
27+
public Window attach(UnresolvedPlan child) {
28+
this.child = child;
29+
return this;
30+
}
31+
32+
@Override
33+
public List<UnresolvedPlan> getChild() {
34+
return ImmutableList.of(this.child);
35+
}
36+
37+
@Override
38+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
39+
return nodeVisitor.visitWindow(this, context);
40+
}
41+
}

core/src/main/java/org/opensearch/sql/calcite/CalciteAggCallVisitor.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
import org.opensearch.sql.ast.expression.AggregateFunction;
1515
import org.opensearch.sql.ast.expression.Alias;
1616
import org.opensearch.sql.ast.expression.UnresolvedExpression;
17-
import org.opensearch.sql.calcite.utils.AggregateUtils;
17+
import org.opensearch.sql.calcite.utils.PlanUtils;
18+
import org.opensearch.sql.expression.function.BuiltinFunctionName;
1819

1920
public class CalciteAggCallVisitor extends AbstractNodeVisitor<AggCall, CalcitePlanContext> {
2021
private final CalciteRexNodeVisitor rexNodeVisitor;
@@ -41,6 +42,14 @@ public AggCall visitAggregateFunction(AggregateFunction node, CalcitePlanContext
4142
for (UnresolvedExpression arg : node.getArgList()) {
4243
argList.add(rexNodeVisitor.analyze(arg, context));
4344
}
44-
return AggregateUtils.translate(node, field, context, argList);
45+
return BuiltinFunctionName.ofAggregation(node.getFuncName())
46+
.map(
47+
functionName -> {
48+
return PlanUtils.makeAggCall(
49+
context, functionName, node.getDistinct(), field, argList);
50+
})
51+
.orElseThrow(
52+
() ->
53+
new UnsupportedOperationException("Unexpected aggregation: " + node.getFuncName()));
4554
}
4655
}

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;
99

1010
import java.sql.Connection;
11+
import java.util.List;
1112
import java.util.Optional;
1213
import java.util.Stack;
1314
import java.util.function.BiFunction;
@@ -43,6 +44,7 @@ public class CalcitePlanContext {
4344
@Getter @Setter private boolean isProjectVisited = false;
4445

4546
private final Stack<RexCorrelVariable> correlVar = new Stack<>();
47+
private final Stack<List<RexNode>> windowPartitions = new Stack<>();
4648

4749
private CalcitePlanContext(FrameworkConfig config, QueryType queryType) {
4850
this.config = config;

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.opensearch.sql.ast.tree.TableFunction;
7676
import org.opensearch.sql.ast.tree.Trendline;
7777
import org.opensearch.sql.ast.tree.UnresolvedPlan;
78+
import org.opensearch.sql.ast.tree.Window;
7879
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
7980
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
8081
import org.opensearch.sql.exception.CalciteUnsupportedException;
@@ -629,6 +630,15 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) {
629630
return context.relBuilder.peek();
630631
}
631632

633+
@Override
634+
public RelNode visitWindow(Window node, CalcitePlanContext context) {
635+
visitChildren(node, context);
636+
List<RexNode> overExpressions =
637+
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();
638+
context.relBuilder.projectPlus(overExpressions);
639+
return context.relBuilder.peek();
640+
}
641+
632642
/*
633643
* Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta
634644
*/

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,21 @@
55

66
package org.opensearch.sql.calcite;
77

8+
import static java.util.Objects.requireNonNull;
9+
import static org.apache.calcite.sql.SqlKind.AS;
810
import static org.opensearch.sql.ast.expression.SpanUnit.NONE;
911
import static org.opensearch.sql.ast.expression.SpanUnit.UNKNOWN;
1012

1113
import java.math.BigDecimal;
1214
import java.util.ArrayList;
15+
import java.util.Collections;
1316
import java.util.List;
1417
import lombok.RequiredArgsConstructor;
1518
import org.apache.calcite.rel.RelNode;
1619
import org.apache.calcite.rel.type.RelDataType;
1720
import org.apache.calcite.rel.type.RelDataTypeFactory;
1821
import org.apache.calcite.rex.RexBuilder;
22+
import org.apache.calcite.rex.RexCall;
1923
import org.apache.calcite.rex.RexNode;
2024
import org.apache.calcite.sql.SqlIntervalQualifier;
2125
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -45,6 +49,7 @@
4549
import org.opensearch.sql.ast.expression.SpanUnit;
4650
import org.opensearch.sql.ast.expression.UnresolvedExpression;
4751
import org.opensearch.sql.ast.expression.When;
52+
import org.opensearch.sql.ast.expression.WindowFunction;
4853
import org.opensearch.sql.ast.expression.Xor;
4954
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
5055
import org.opensearch.sql.ast.expression.subquery.InSubquery;
@@ -341,6 +346,43 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
341346
throw new IllegalArgumentException("Unsupported operator: " + node.getFuncName());
342347
}
343348

349+
@Override
350+
public RexNode visitWindowFunction(WindowFunction node, CalcitePlanContext context) {
351+
Function windowFunction = (Function) node.getFunction();
352+
List<RexNode> arguments =
353+
windowFunction.getFuncArgs().stream().map(arg -> analyze(arg, context)).toList();
354+
List<RexNode> partitions =
355+
node.getPartitionByList().stream()
356+
.map(arg -> analyze(arg, context))
357+
.map(this::extractRexNodeFromAlias)
358+
.toList();
359+
return BuiltinFunctionName.ofWindowFunction(windowFunction.getFuncName())
360+
.map(
361+
functionName -> {
362+
RexNode field = arguments.isEmpty() ? null : arguments.getFirst();
363+
List<RexNode> args =
364+
(arguments.isEmpty() || arguments.size() == 1)
365+
? Collections.emptyList()
366+
: arguments.subList(1, arguments.size());
367+
return PlanUtils.makeOver(
368+
context, functionName, field, args, partitions, node.getWindowFrame());
369+
})
370+
.orElseThrow(
371+
() ->
372+
new UnsupportedOperationException(
373+
"Unexpected window function: " + windowFunction.getFuncName()));
374+
}
375+
376+
/** extract the expression of Alias from a node */
377+
private RexNode extractRexNodeFromAlias(RexNode node) {
378+
requireNonNull(node);
379+
if (node.getKind() == AS) {
380+
return ((RexCall) node).getOperands().get(0);
381+
} else {
382+
return node;
383+
}
384+
}
385+
344386
@Override
345387
public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
346388
List<RexNode> nodes = node.getChild().stream().map(child -> analyze(child, context)).toList();

0 commit comments

Comments
 (0)