Skip to content

Commit b8e9ef2

Browse files
committed
Fix timechart and trendline
Signed-off-by: Tomoyuki Morita <[email protected]>
1 parent 6b2e491 commit b8e9ef2

File tree

6 files changed

+94
-80
lines changed

6 files changed

+94
-80
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.opensearch.sql.ast.expression.AllFieldsExcludeMeta;
8181
import org.opensearch.sql.ast.expression.Argument;
8282
import org.opensearch.sql.ast.expression.Argument.ArgumentMap;
83+
import org.opensearch.sql.ast.expression.Cast;
8384
import org.opensearch.sql.ast.expression.Field;
8485
import org.opensearch.sql.ast.expression.Function;
8586
import org.opensearch.sql.ast.expression.Let;
@@ -1898,9 +1899,10 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
18981899

18991900
// 1. group the group-by list + field list and add a count() aggregation
19001901
List<UnresolvedExpression> groupExprList = new ArrayList<>();
1901-
node.getGroupExprList().forEach(exp -> groupExprList.add(addAliasToFieldAccess(exp)));
1902+
node.getGroupExprList().forEach(exp -> groupExprList.add(exp));
19021903
// need alias for dynamic fields
1903-
node.getFields().forEach(field -> groupExprList.add(addAliasToFieldAccess(field)));
1904+
node.getFields().forEach(field -> groupExprList.add(field));
1905+
groupExprList.forEach(expr -> projectDynamicField(expr, context));
19041906
List<UnresolvedExpression> aggExprList =
19051907
List.of(AstDSL.alias(countFieldName, AstDSL.aggregate("count", null)));
19061908
aggregateWithTrimming(groupExprList, aggExprList, context);
@@ -2048,10 +2050,9 @@ public RelNode visitTimechart(
20482050
org.opensearch.sql.ast.tree.Timechart node, CalcitePlanContext context) {
20492051
visitChildren(node, context);
20502052

2051-
if (!context.fieldBuilder.isFieldSpecificType("@timestamp")) {
2052-
throw new IllegalArgumentException(
2053-
"`@timestamp` field needs to be specific type. Please cast explicitly.");
2054-
}
2053+
projectDynamicFieldAsString(
2054+
node.getBinExpression(), context); // spanExpr would become static field.
2055+
projectDynamicFieldAsString(node.getByField(), context); // byField would become static field.
20552056

20562057
// Extract parameters
20572058
UnresolvedExpression spanExpr = node.getBinExpression();
@@ -2096,11 +2097,6 @@ public RelNode visitTimechart(
20962097
UnresolvedExpression byField = node.getByField();
20972098

20982099
String byFieldName = ((Field) node.getByField()).getField().toString();
2099-
if (!context.fieldBuilder.isFieldSpecificType(byFieldName)) {
2100-
throw new IllegalArgumentException(
2101-
String.format(
2102-
"By field `%s` needs to be specific type. Please cast explicitly.", byFieldName));
2103-
}
21042100
String valueFunctionName = getValueFunctionName(node.getAggregateFunction());
21052101

21062102
int limit = Optional.ofNullable(node.getLimit()).orElse(10);
@@ -2154,6 +2150,55 @@ public RelNode visitTimechart(
21542150
}
21552151
}
21562152

2153+
private void projectDynamicFieldAsString(UnresolvedExpression exp, CalcitePlanContext context) {
2154+
projectDynamicField(exp, true, context);
2155+
}
2156+
2157+
private void projectDynamicField(UnresolvedExpression exp, CalcitePlanContext context) {
2158+
projectDynamicField(exp, false, context);
2159+
}
2160+
2161+
private void projectDynamicField(
2162+
UnresolvedExpression exp, boolean castNeeded, CalcitePlanContext context) {
2163+
if (exp != null) {
2164+
exp.accept(
2165+
new AbstractNodeVisitor<Void, CalcitePlanContext>() {
2166+
@Override
2167+
public Void visitField(Field field, CalcitePlanContext context) {
2168+
RexNode node = rexVisitor.analyze(field, context);
2169+
if (node.isA(SqlKind.ITEM)) {
2170+
RexNode casted = castNeeded ? castToString(node, context) : node;
2171+
RexNode alias = context.relBuilder.alias(casted, field.getField().toString());
2172+
context.relBuilder.projectPlus(alias);
2173+
}
2174+
return null;
2175+
}
2176+
},
2177+
context);
2178+
}
2179+
}
2180+
2181+
private RexNode castToString(RexNode node, CalcitePlanContext context) {
2182+
RelDataType stringType = context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
2183+
RelDataType nullableStringType =
2184+
context.rexBuilder.getTypeFactory().createTypeWithNullability(stringType, true);
2185+
return context.rexBuilder.makeCast(nullableStringType, node, true, true);
2186+
}
2187+
2188+
/** Add cast and alias to Field. Needed for when the field is resolved as dynamic field access. */
2189+
private UnresolvedExpression castAndAliasToFieldAccess(UnresolvedExpression exp) {
2190+
if (exp instanceof Field) {
2191+
Field f = (Field) exp;
2192+
return AstDSL.alias(f.getField().toString(), castToString(f));
2193+
} else {
2194+
return castToString(exp);
2195+
}
2196+
}
2197+
2198+
private UnresolvedExpression castToString(UnresolvedExpression exp) {
2199+
return new Cast(exp, AstDSL.stringLiteral("STRING"));
2200+
}
2201+
21572202
/** Add alias to Field. Needed for when the field is resolved as dynamic field access. */
21582203
private UnresolvedExpression addAliasToFieldAccess(UnresolvedExpression exp) {
21592204
if (exp instanceof Field f) {
@@ -2367,6 +2412,7 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
23672412
.ifPresent(
23682413
sortField -> {
23692414
SortOption sortOption = analyzeSortOption(sortField.getFieldArgs());
2415+
projectDynamicFieldAsString(sortField, context);
23702416
RexNode field = rexVisitor.analyze(sortField, context);
23712417
if (sortOption == DEFAULT_DESC) {
23722418
context.relBuilder.sort(context.relBuilder.desc(field));
@@ -2380,13 +2426,8 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
23802426
node.getComputations()
23812427
.forEach(
23822428
trendlineComputation -> {
2429+
projectDynamicField(trendlineComputation.getDataField(), context);
23832430
RexNode field = rexVisitor.analyze(trendlineComputation.getDataField(), context);
2384-
String dataFieldName = trendlineComputation.getDataField().getField().toString();
2385-
if (!context.fieldBuilder.isFieldSpecificType(dataFieldName)) {
2386-
throw new IllegalArgumentException(
2387-
String.format(
2388-
"`%s` needs to be specific type. Please cast explicitly.", dataFieldName));
2389-
}
23902431

23912432
context.relBuilder.filter(context.relBuilder.isNotNull(field));
23922433

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,15 @@ static RexNode makeOver(
182182
return variance(context, field, partitions, rows, lowerBound, upperBound, false, false);
183183
case ROW_NUMBER:
184184
return withOver(
185-
context.relBuilder.aggregateCall(SqlStdOperatorTable.ROW_NUMBER),
185+
makeAggCall(context, functionName, false, null, List.of()),
186186
partitions,
187187
orderKeys,
188188
true,
189189
lowerBound,
190190
upperBound);
191191
case NTH_VALUE:
192192
return withOver(
193-
context.relBuilder.aggregateCall(SqlStdOperatorTable.NTH_VALUE, field, argList.get(0)),
193+
makeAggCall(context, functionName, false, field, argList.subList(0, 1)),
194194
partitions,
195195
orderKeys,
196196
true,
@@ -215,7 +215,12 @@ private static RexNode sumOver(
215215
RexWindowBound lowerBound,
216216
RexWindowBound upperBound) {
217217
return withOver(
218-
ctx.relBuilder.sum(operation), partitions, List.of(), rows, lowerBound, upperBound);
218+
makeAggCall(ctx, BuiltinFunctionName.SUM, false, operation, List.of()),
219+
partitions,
220+
List.of(),
221+
rows,
222+
lowerBound,
223+
upperBound);
219224
}
220225

221226
private static RexNode countOver(

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@
153153
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
154154
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOTEQUAL;
155155
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOW;
156+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NTH_VALUE;
156157
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NULLIF;
157158
import static org.opensearch.sql.expression.function.BuiltinFunctionName.OR;
158159
import static org.opensearch.sql.expression.function.BuiltinFunctionName.PERCENTILE_APPROX;
@@ -177,6 +178,7 @@
177178
import static org.opensearch.sql.expression.function.BuiltinFunctionName.RIGHT;
178179
import static org.opensearch.sql.expression.function.BuiltinFunctionName.RINT;
179180
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ROUND;
181+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ROW_NUMBER;
180182
import static org.opensearch.sql.expression.function.BuiltinFunctionName.RTRIM;
181183
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SECOND;
182184
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SECOND_OF_MINUTE;
@@ -1227,10 +1229,11 @@ void registerOperator(BuiltinFunctionName functionName, SqlAggFunction aggFuncti
12271229
wrapSqlOperandTypeChecker(innerTypeChecker, functionName.name(), true);
12281230
AggHandler handler =
12291231
(distinct, field, argList, ctx) -> {
1232+
List<RexNode> fields = field != null ? List.of(field) : List.of();
12301233
List<RexNode> newArgList =
12311234
argList.stream().map(PlanUtils::derefMapCall).collect(Collectors.toList());
12321235
return UserDefinedFunctionUtils.makeAggregateCall(
1233-
aggFunction, List.of(field), newArgList, ctx.relBuilder);
1236+
aggFunction, fields, newArgList, ctx.relBuilder);
12341237
};
12351238
register(functionName, handler, typeChecker);
12361239
}
@@ -1247,6 +1250,8 @@ void populate() {
12471250
registerOperator(INTERNAL_PATTERN, PPLBuiltinOperators.INTERNAL_PATTERN);
12481251
registerOperator(LIST, PPLBuiltinOperators.LIST);
12491252
registerOperator(VALUES, PPLBuiltinOperators.VALUES);
1253+
registerOperator(NTH_VALUE, SqlStdOperatorTable.NTH_VALUE);
1254+
registerOperator(ROW_NUMBER, SqlStdOperatorTable.ROW_NUMBER);
12501255

12511256
register(
12521257
AVG,

core/src/main/java/org/opensearch/sql/expression/function/udf/SpanFunction.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ public UDFOperandMetadata getOperandMetadata() {
5252
return UDFOperandMetadata.wrap(
5353
(CompositeOperandTypeChecker)
5454
OperandTypes.family(
55-
SqlTypeFamily.CHARACTER, SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER)
55+
SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER)
5656
.or(
5757
OperandTypes.family(
58-
SqlTypeFamily.DATETIME, SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER))
58+
SqlTypeFamily.DATETIME, SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER))
5959
// TODO: numeric span should support decimal as its interval
6060
.or(
6161
OperandTypes.family(
@@ -98,6 +98,17 @@ public Expression implement(
9898
Types.lookupMethod(
9999
SpanFunction.class, methodName, String.class, int.class, String.class));
100100
return function.getImplementor().implement(translator, call, RexImpTable.NullAs.NULL);
101+
} else if (SqlTypeUtil.isCharacter(fieldType)) {
102+
ScalarFunctionImpl function =
103+
(ScalarFunctionImpl)
104+
ScalarFunctionImpl.create(
105+
Types.lookupMethod(
106+
SpanFunction.class,
107+
"evalTimestamp",
108+
String.class,
109+
int.class,
110+
String.class));
111+
return function.getImplementor().implement(translator, call, RexImpTable.NullAs.NULL);
101112
}
102113
throw new IllegalArgumentException(
103114
String.format(

core/src/main/java/org/opensearch/sql/expression/function/udf/binning/SpanBucketFunction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.sql.expression.function.udf.binning;
77

88
import java.util.List;
9+
import java.util.Locale;
910
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
1011
import org.apache.calcite.adapter.enumerable.NullPolicy;
1112
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
@@ -85,7 +86,7 @@ public static String calculateSpanBucket(Number fieldValue, Number spanParam) {
8586
/** Format range string with appropriate precision. */
8687
private static String formatRange(double binStart, double binEnd, double span) {
8788
if (isIntegerSpan(span) && isIntegerValue(binStart) && isIntegerValue(binEnd)) {
88-
return String.format("%d-%d", (long) binStart, (long) binEnd);
89+
return String.format(Locale.ROOT, "%d-%d", (long) binStart, (long) binEnd);
8990
} else {
9091
return formatFloatingPointRange(binStart, binEnd, span);
9192
}
@@ -104,8 +105,8 @@ private static boolean isIntegerValue(double value) {
104105
/** Formats floating-point ranges with appropriate precision. */
105106
private static String formatFloatingPointRange(double binStart, double binEnd, double span) {
106107
int decimalPlaces = getAppropriateDecimalPlaces(span);
107-
String format = String.format("%%.%df-%%.%df", decimalPlaces, decimalPlaces);
108-
return String.format(format, binStart, binEnd);
108+
String format = String.format(Locale.ROOT, "%%.%df-%%.%df", decimalPlaces, decimalPlaces);
109+
return String.format(Locale.ROOT, format, binStart, binEnd);
109110
}
110111

111112
/** Determines appropriate decimal places for formatting based on span size. */

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsTimechartIT.java

Lines changed: 5 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -29,39 +29,9 @@ public void init() throws IOException {
2929
enableCalcite();
3030
}
3131

32-
@Test
33-
public void testTimechartByDynamicFieldWithoutCast() throws IOException {
34-
String query =
35-
source(
36-
TEST_INDEX_DYNAMIC,
37-
"eval @timestamp=cast(@timestamp as timestamp)"
38-
+ "| timechart span=1d count() by event");
39-
40-
Throwable e = assertThrows(IllegalArgumentException.class, () -> executeQuery(query));
41-
assertEquals(
42-
"By field `event` needs to be specific type. Please cast explicitly.", e.getMessage());
43-
}
44-
45-
@Test
46-
public void testTimechartWithDynamicTimestampField() throws IOException {
47-
String query =
48-
source(
49-
TEST_INDEX_DYNAMIC,
50-
"eval event=cast(event as string)" + "| timechart span=1d count() by event");
51-
52-
Throwable e = assertThrows(IllegalArgumentException.class, () -> executeQuery(query));
53-
assertEquals(
54-
"`@timestamp` field needs to be specific type. Please cast explicitly.", e.getMessage());
55-
}
56-
5732
@Test
5833
public void testTimechartByDynamicField() throws IOException {
59-
// Dynamic fields requires explicit cast before used in timechart
60-
String query =
61-
source(
62-
TEST_INDEX_DYNAMIC,
63-
"eval @timestamp=cast(@timestamp as timestamp), event=cast(event as string)"
64-
+ "| timechart span=1d count() by event");
34+
String query = source(TEST_INDEX_DYNAMIC, "timechart span=1d count() by event");
6535

6636
JSONObject result = executeQuery(query);
6737

@@ -79,38 +49,20 @@ public void testTimechartByDynamicField() throws IOException {
7949

8050
@Test
8151
public void testTimechartWithDynamicField() throws IOException {
82-
// Dynamic fields requires explicit cast before used in timechart
83-
String query =
84-
source(
85-
TEST_INDEX_DYNAMIC,
86-
"eval @timestamp=cast(@timestamp as timestamp), latency=cast(latency as int)"
87-
+ "| timechart span=1d avg(latency)");
52+
String query = source(TEST_INDEX_DYNAMIC, "timechart span=1d avg(latency)");
8853

8954
JSONObject result = executeQuery(query);
9055

91-
verifySchema(result, schema("@timestamp", "timestamp"), schema("avg(latency)", "double"));
56+
verifySchema(result, schema("@timestamp", "string"), schema("avg(latency)", "double"));
9257
verifyDataRows(result, rows("2025-10-25 00:00:00", 62));
9358
}
9459

95-
@Test
96-
public void testTrendlineWithDynamicFieldWithoutCast() throws IOException {
97-
String query =
98-
source(
99-
TEST_INDEX_DYNAMIC,
100-
"trendline sma(2, latency) as latency_trend | fields id, latency, latency_trend");
101-
102-
Throwable e = assertThrows(IllegalArgumentException.class, () -> executeQuery(query));
103-
assertEquals("`latency` needs to be specific type. Please cast explicitly.", e.getMessage());
104-
}
105-
10660
@Test
10761
public void testTrendlineWithDynamicFieldWithCast() throws IOException {
10862
String query =
10963
source(
11064
TEST_INDEX_DYNAMIC,
111-
"eval latency = cast(latency as int)"
112-
+ "| trendline sma(2, latency) as latency_trend"
113-
+ "| fields id, latency, latency_trend");
65+
"trendline sma(2, latency) as latency_trend" + "| fields id, latency, latency_trend");
11466
JSONObject result = executeQuery(query);
11567

11668
verifySchema(
@@ -132,8 +84,7 @@ public void testTrendlineSortByDynamicField() throws IOException {
13284
String query =
13385
source(
13486
TEST_INDEX_DYNAMIC,
135-
"eval latency = cast(latency as int)"
136-
+ "| trendline sort event sma(2, latency) as latency_trend "
87+
"trendline sort event sma(2, latency) as latency_trend "
13788
+ "| fields id, latency, latency_trend");
13889
JSONObject result = executeQuery(query);
13990

0 commit comments

Comments
 (0)