Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,22 @@
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.apache.calcite.util.Holder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.AllFields;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Let;
import org.opensearch.sql.ast.expression.Map;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
Expand All @@ -48,10 +52,12 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
import org.opensearch.sql.exception.SemanticCheckException;

public class CalciteRelNodeVisitor extends AbstractNodeVisitor<RelNode, CalcitePlanContext> {

Expand Down Expand Up @@ -146,6 +152,30 @@ public RelNode visitProject(Project node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

@Override
public RelNode visitRename(Rename node, CalcitePlanContext context) {
visitChildren(node, context);
List<String> originalNames = context.relBuilder.peek().getRowType().getFieldNames();
List<String> newNames = new ArrayList<>(originalNames);
for (Map renameMap : node.getRenameList()) {
if (renameMap.getTarget() instanceof Field t) {
String newName = t.getField().toString();
RexNode check = rexVisitor.analyze(renameMap.getOrigin(), context);
if (check instanceof RexInputRef ref) {
newNames.set(ref.getIndex(), newName);
} else {
throw new SemanticCheckException(
String.format("the original field %s cannot be resolved", renameMap.getOrigin()));
}
} else {
throw new SemanticCheckException(
String.format("the target expected to be field, but is %s", renameMap.getTarget()));
}
}
context.relBuilder.rename(newNames);
return context.relBuilder.peek();
}

@Override
public RelNode visitSort(Sort node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down Expand Up @@ -256,21 +286,84 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
node.getAggExprList().stream()
.map(expr -> aggVisitor.analyze(expr, context))
.collect(Collectors.toList());
List<RexNode> groupByList =
node.getGroupExprList().stream()
.map(expr -> rexVisitor.analyze(expr, context))
.collect(Collectors.toList());

// The span column is always the first column in result whatever
// the order of span in query is first or last one
List<RexNode> groupByList = new ArrayList<>();
UnresolvedExpression span = node.getSpan();
if (!Objects.isNull(span)) {
RexNode spanRex = rexVisitor.analyze(span, context);
groupByList.add(spanRex);
// add span's group alias field (most recent added expression)
}
groupByList.addAll(
node.getGroupExprList().stream().map(expr -> rexVisitor.analyze(expr, context)).toList());

context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggList);

// handle normal aggregate
// TODO Should we keep alignment with V2 behaviour in new Calcite implementation?
// TODO how about add a legacy enable config to control behaviour in Calcite?
// Some behaviours between PPL and Databases are different.
// As an example, in command `stats count() by colA, colB`:
// 1. the sequence of output schema is different:
// In PPL v2, the sequence of output schema is "count, colA, colB".
// But in most databases, the sequence of output schema is "colA, colB, count".
// 2. the output order is different:
// In PPL v2, the order of output results is ordered by "colA + colB".
// But in most databases, the output order is random.
// User must add ORDER BY clause after GROUP BY clause to keep the results aligning.
// Following logic is to align with the PPL legacy behaviour.

// alignment for 1.sequence of output schema: adding order-by
// we use the groupByList instead of node.getSortExprList as input because
// the groupByList may include span column.
node.getGroupExprList()
.forEach(
g -> {
// node.getGroupExprList() should all be instance of Alias
// which defined in AstBuilder.
assert g instanceof Alias;
});
List<String> aliasesFromGroupByList =
groupByList.stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
.toList();
List<RexNode> aliasedGroupByList =
aliasesFromGroupByList.stream()
.map(context.relBuilder::field)
.map(f -> (RexNode) f)
.toList();
context.relBuilder.sort(aliasedGroupByList);

// alignment for 2.the output order: schema reordering
List<RexNode> outputFields = context.relBuilder.fields();
int numOfOutputFields = outputFields.size();
int numOfAggList = aggList.size();
List<RexNode> reordered = new ArrayList<>(numOfOutputFields);
// Add aggregation results first
List<RexNode> aggRexList =
outputFields.subList(numOfOutputFields - numOfAggList, numOfOutputFields);
reordered.addAll(aggRexList);
// Add group by columns
reordered.addAll(aliasedGroupByList);
context.relBuilder.project(reordered);

return context.relBuilder.peek();
}

/** extract the RexLiteral of Alias from a node */
private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
if (node == null) {
return Optional.empty();
} else if (node.getKind() == SqlKind.AS) {
return Optional.of((RexLiteral) ((RexCall) node).getOperands().get(1));
} else {
return Optional.empty();
}
}

@Override
public RelNode visitJoin(Join node, CalcitePlanContext context) {
List<UnresolvedPlan> children = node.getChildren();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,17 @@ public RexNode visitSpan(Span node, CalcitePlanContext context) {
return context.rexBuilder.makeIntervalLiteral(new BigDecimal(millis), intervalQualifier);
} else {
// if the unit is not time base - create a math expression to bucket the span partitions
SqlTypeName type = field.getType().getSqlTypeName();
return context.rexBuilder.makeCall(
typeFactory.createSqlType(SqlTypeName.DOUBLE),
typeFactory.createSqlType(type),
SqlStdOperatorTable.MULTIPLY,
List.of(
context.rexBuilder.makeCall(
typeFactory.createSqlType(SqlTypeName.DOUBLE),
typeFactory.createSqlType(type),
SqlStdOperatorTable.FLOOR,
List.of(
context.rexBuilder.makeCall(
typeFactory.createSqlType(SqlTypeName.DOUBLE),
typeFactory.createSqlType(type),
SqlStdOperatorTable.DIVIDE,
List.of(field, value)))),
value));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

/*
* This file contains code from the Apache Calcite project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.opensearch.sql.calcite.udf.udaf;

import static com.google.common.base.Preconditions.checkArgument;

import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.fun.SqlAvgAggFunction;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeTransforms;
import org.apache.calcite.util.Optionality;

public class NullableSqlAvgAggFunction extends SqlAggFunction {

// ~ Constructors -----------------------------------------------------------

/** Creates a NullableSqlAvgAggFunction. */
public NullableSqlAvgAggFunction(SqlKind kind) {
this(kind.name(), kind);
}

NullableSqlAvgAggFunction(String name, SqlKind kind) {
super(
name,
null,
kind,
ReturnTypes.AVG_AGG_FUNCTION.andThen(SqlTypeTransforms.FORCE_NULLABLE), // modified here
null,
OperandTypes.NUMERIC,
SqlFunctionCategory.NUMERIC,
false,
false,
Optionality.FORBIDDEN);
checkArgument(SqlKind.AVG_AGG_FUNCTIONS.contains(kind), "unsupported sql kind");
}

// ~ Methods ----------------------------------------------------------------

/**
* Returns the specific function, e.g. AVG or STDDEV_POP.
*
* @return Subtype
*/
@Deprecated // to be removed before 2.0
public SqlAvgAggFunction.Subtype getSubtype() {
return SqlAvgAggFunction.Subtype.valueOf(kind.name());
}

/** Sub-type of aggregate function. */
@Deprecated // to be removed before 2.0
public enum Subtype {
AVG,
STDDEV_POP,
STDDEV_SAMP,
VAR_POP,
VAR_SAMP
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

/*
* This file contains code from the Apache Calcite project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.opensearch.sql.calcite.udf.udaf;

import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlSplittableAggFunction;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeTransforms;
import org.apache.calcite.util.Optionality;
import org.checkerframework.checker.nullness.qual.Nullable;

public class NullableSqlSumAggFunction extends SqlAggFunction {

// ~ Instance fields --------------------------------------------------------

@Deprecated // to be removed before 2.0
private final RelDataType type;

// ~ Constructors -----------------------------------------------------------

public NullableSqlSumAggFunction(RelDataType type) {
super(
"SUM",
null,
SqlKind.SUM,
ReturnTypes.AGG_SUM.andThen(SqlTypeTransforms.FORCE_NULLABLE), // modified here
null,
OperandTypes.NUMERIC,
SqlFunctionCategory.NUMERIC,
false,
false,
Optionality.FORBIDDEN);
this.type = type;
}

// ~ Methods ----------------------------------------------------------------

@SuppressWarnings("deprecation")
@Override
public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
return ImmutableList.of(type);
}

@Deprecated // to be removed before 2.0
public RelDataType getType() {
return type;
}

@SuppressWarnings("deprecation")
@Override
public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
return type;
}

@Override
public <T extends Object> @Nullable T unwrap(Class<T> clazz) {
if (clazz.isInstance(SqlSplittableAggFunction.SumSplitter.INSTANCE)) {
return clazz.cast(SqlSplittableAggFunction.SumSplitter.INSTANCE);
}
return super.unwrap(clazz);
}

@Override
public SqlAggFunction getRollup() {
return this;
}
}
Loading
Loading