diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 013ca261ad7..351527d218b 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1224,6 +1224,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { context.relBuilder.peek(), context.relBuilder.literal(context.sysLimit.joinSubsearchLimit()))); } + List leftAllFields = context.fieldBuilder.getAllFieldNames(1); + List rightAllFields = context.fieldBuilder.getAllFieldNames(0); if (node.getJoinCondition().isEmpty()) { // join-with-field-list grammar List leftColumns = context.fieldBuilder.getStaticFieldNames(1); @@ -1255,12 +1257,12 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { || (node.getArgumentMap().get("overwrite").equals(Literal.TRUE))) { toBeRemovedFields = duplicatedFieldNames.stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, true, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context)) .toList(); } else { toBeRemovedFields = duplicatedFieldNames.stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, false, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context)) .toList(); } Literal max = node.getArgumentMap().get("max"); @@ -1285,6 +1287,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { if (!toBeRemovedFields.isEmpty()) { context.relBuilder.projectExcept(toBeRemovedFields); } + context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields); + return context.relBuilder.peek(); } // The join-with-criteria grammar doesn't allow empty join condition @@ -1292,6 +1296,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { node.getJoinCondition() .map(c -> rexVisitor.analyzeJoinCondition(c, context)) .orElse(context.relBuilder.literal(true)); + joinCondition = context.rexBuilder.castAnyToAlignTypes(joinCondition, context); if (node.getJoinType() == SEMI || node.getJoinType() == ANTI) { // semi and anti join only return left table outputs context.relBuilder.join( @@ -1302,7 +1307,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { // when a new project add to stack. To avoid `id0`, we will rename the `id0` to `alias.id` // or `tableIdentifier.id`: List leftColumns = context.fieldBuilder.getStaticFieldNames(1); - List rightColumns = context.fieldBuilder.getStaticFieldNames(); + List rightColumns = context.fieldBuilder.getStaticFieldNames(0); List rightTableName = PlanUtils.findTable(context.relBuilder.peek()).getQualifiedName(); // Using `table.column` instead of `catalog.database.table.column` as column prefix because @@ -1337,6 +1342,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { } context.relBuilder.join( JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition); + + context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields); JoinAndLookupUtils.renameToExpectedFields( rightColumnsWithAliasIfConflict, leftColumns.size(), context); } @@ -1369,9 +1376,9 @@ public Void visitInputRef(RexInputRef inputRef) { private static RexNode buildJoinConditionByFieldName( CalcitePlanContext context, String fieldName) { - RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsForLookUp(fieldName, false, context); - RexNode sourceKey = JoinAndLookupUtils.analyzeFieldsForLookUp(fieldName, true, context); - return context.rexBuilder.equals(sourceKey, lookupKey); + RexNode sourceKey = JoinAndLookupUtils.analyzeFieldsInLeft(fieldName, context); + RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsInRight(fieldName, context); + return context.rexBuilder.equalsWithCastAsNeeded(sourceKey, lookupKey); } @Override @@ -1397,6 +1404,10 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { // Get lookupColumns from top of stack (after above potential projection). List lookupTableFieldNames = context.fieldBuilder.getStaticFieldNames(); + // For merging with dynamic fields later + List leftAllFields = context.fieldBuilder.getAllFieldNames(1); + List rightAllFields = context.fieldBuilder.getAllFieldNames(0); + // 3. Find fields which should be removed in lookup-table. // For lookup table, the mapping fields should be dropped after join // unless they are explicitly put in the output fields @@ -1410,6 +1421,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { .toList(); List toBeRemovedLookupFields = toBeRemovedLookupFieldNames.stream() + .filter(d -> lookupTableFieldNames.contains(d)) .map(d -> (RexNode) context.fieldBuilder.staticField(2, 1, d)) .toList(); List toBeRemovedFields = new ArrayList<>(toBeRemovedLookupFields); @@ -1421,7 +1433,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { List duplicatedSourceFields = duplicatedFieldNamesMap.keySet().stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, true, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context)) .toList(); // Duplicated fields in source-field should always be removed. toBeRemovedFields.addAll(duplicatedSourceFields); @@ -1433,7 +1445,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { if (!duplicatedFieldNamesMap.isEmpty() && node.getOutputStrategy() == OutputStrategy.APPEND) { List duplicatedProvidedFields = duplicatedFieldNamesMap.values().stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, false, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context)) .toList(); for (int i = 0; i < duplicatedProvidedFields.size(); ++i) { newCoalesceList.add( @@ -1470,7 +1482,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { context.relBuilder.projectExcept(toBeRemovedFields); } - // TODO: dedupe dynamic fields + context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields); // 7. Rename the fields to the expected names. JoinAndLookupUtils.renameToExpectedFields( diff --git a/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java b/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java index 4d86614895b..daa62eba9be 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java +++ b/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java @@ -12,9 +12,11 @@ import org.apache.calcite.avatica.util.TimeUnit; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeName; @@ -22,6 +24,7 @@ import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.calcite.type.AbstractExprRelDataType; import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.calcite.utils.RexConverter; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.SemanticCheckException; @@ -41,6 +44,20 @@ public RexNode equals(RexNode n1, RexNode n2) { return this.makeCall(SqlStdOperatorTable.EQUALS, n1, n2); } + /** Make equals call with adding cast in case the node type is ANY. */ + public RexNode equalsWithCastAsNeeded(RexNode n1, RexNode n2) { + if (isAnyType(n1) && isAnyType(n2)) { + n1 = castToString(n1); + n2 = castToString(n2); + } else if (isAnyType(n1)) { + n1 = castToTargetType(n1, n2); + } else if (isAnyType(n2)) { + n2 = castToTargetType(n2, n1); + } + + return equals(n1, n2); + } + public RexNode and(RexNode left, RexNode right) { final RelDataType booleanType = this.getTypeFactory().createSqlType(SqlTypeName.BOOLEAN); return this.makeCall(booleanType, SqlStdOperatorTable.AND, List.of(left, right)); @@ -163,4 +180,36 @@ else if ((SqlTypeUtil.isApproximateNumeric(sourceType) || SqlTypeUtil.isDecimal( } return super.makeCast(pos, type, exp, matchNullability, safe, format); } + + public boolean isAnyType(RexNode node) { + return node.getType().getSqlTypeName().equals(SqlTypeName.ANY); + } + + public RexNode castToString(RexNode node) { + RelDataType stringType = getTypeFactory().createSqlType(SqlTypeName.VARCHAR); + RelDataType nullableStringType = getTypeFactory().createTypeWithNullability(stringType, true); + return makeCast(nullableStringType, node, true, true); + } + + /** cast node to the same type as target */ + public RexNode castToTargetType(RexNode node, RexNode target) { + return makeCast(target.getType(), node, true, true); + } + + /** Utility to cast ANY to specific types to avoid compare issue */ + RexNode castAnyToAlignTypes(RexNode rexNode, CalcitePlanContext context) { + return rexNode.accept( + new RexConverter() { + @Override + public RexNode visitCall(RexCall call) { + if (call.getKind() == SqlKind.EQUALS) { + RexNode n0 = call.operands.get(0); + RexNode n1 = call.operands.get(1); + return super.visitCall((RexCall) equalsWithCastAsNeeded(n0, n1)); + } else { + return super.visitCall(call); + } + } + }); + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java index 7c559f1906c..790896aa5ed 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java @@ -21,7 +21,6 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalProject; -import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; @@ -34,6 +33,7 @@ import org.apache.calcite.util.mapping.Mappings; import org.apache.commons.lang3.tuple.Pair; import org.immutables.value.Value; +import org.opensearch.sql.calcite.ExtendedRexBuilder; import org.opensearch.sql.calcite.rel.RelBuilderWrapper; import org.opensearch.sql.calcite.rel.RelFieldBuilder; @@ -78,7 +78,8 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec final RelBuilder rawRelBuilder = call.builder(); final RelBuilderWrapper relBuilder = new RelBuilderWrapper(rawRelBuilder); - final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder(); + final ExtendedRexBuilder rexBuilder = + new ExtendedRexBuilder(aggregate.getCluster().getRexBuilder()); final RelFieldBuilder fieldBuilder = new RelFieldBuilder(rawRelBuilder, rexBuilder); relBuilder.push(project.getInput()); diff --git a/core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java b/core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java index f4570d08856..99bebf9f293 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java +++ b/core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java @@ -38,11 +38,17 @@ public static Optional resolveField( if (inputFieldNames.contains(fieldName)) { return Optional.of(context.fieldBuilder.staticField(inputCount, inputOrdinal, fieldName)); } else if (context.fieldBuilder.isDynamicFieldsExist()) { - return Optional.of(context.fieldBuilder.dynamicField(fieldName)); + return Optional.of(context.fieldBuilder.dynamicField(inputCount, inputOrdinal, fieldName)); } return Optional.empty(); } + /** Resolve field in the top of the stack */ + public static Optional resolveField(String fieldName, CalcitePlanContext context) { + return resolveField(1, 0, fieldName, context); + } + + /** Resolve field in the specified input. Throw exception if not found. */ public static RexNode resolveFieldOrThrow( int inputCount, int inputOrdinal, String fieldName, CalcitePlanContext context) { return resolveField(inputCount, inputOrdinal, fieldName, context) @@ -50,6 +56,11 @@ public static RexNode resolveFieldOrThrow( () -> new IllegalArgumentException(String.format("Field [%s] not found.", fieldName))); } + /** Resolve field in the top of the stack. Throw exception if not found. */ + public static RexNode resolveFieldOrThrow(String fieldName, CalcitePlanContext context) { + return resolveFieldOrThrow(1, 0, fieldName, context); + } + /** * Resolves a qualified name to a RexNode based on the current context. * @@ -78,6 +89,8 @@ private static RexNode resolveInJoinCondition( return resolveFieldWithAlias(nameNode, context, 2) .or(() -> resolveFieldWithoutAlias(nameNode, context, 2)) + .or(() -> resolveDynamicFieldsWithAlias(nameNode, context, 2)) + .or(() -> resolveDynamicFields(nameNode, context, 2)) .orElseThrow(() -> getNotFoundException(nameNode)); } @@ -139,6 +152,28 @@ private static Optional resolveFieldWithAlias( return Optional.empty(); } + private static Optional resolveDynamicFieldsWithAlias( + QualifiedName nameNode, CalcitePlanContext context, int inputCount) { + List parts = nameNode.getParts(); + log.debug( + "resolveDynamicFieldsWithAlias() called with nameNode={}, parts={}, inputCount={}", + nameNode, + parts, + inputCount); + + if (parts.size() >= 2) { + // Consider first part as table alias + String alias = parts.get(0); + + String fieldName = String.join(".", parts.subList(1, parts.size())); + Optional dynamicField = + tryToResolveField(alias, DYNAMIC_FIELDS_MAP, context, inputCount); + return dynamicField.map(field -> createItemAccess(field, fieldName, context)); + } + + return Optional.empty(); + } + private static Optional resolveDynamicFields( QualifiedName nameNode, CalcitePlanContext context, int inputCount) { List parts = nameNode.getParts(); diff --git a/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java b/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java index bf4e1591a72..ea72f09f4fe 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java +++ b/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java @@ -8,14 +8,19 @@ import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP; import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; +import lombok.Value; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.RelBuilder; +import org.opensearch.sql.calcite.ExtendedRexBuilder; import org.opensearch.sql.calcite.plan.OpenSearchConstants; import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.PPLFuncImpTable; @@ -28,7 +33,7 @@ @RequiredArgsConstructor public class RelFieldBuilder { private final RelBuilder relBuilder; - private final RexBuilder rexBuilder; + private final ExtendedRexBuilder rexBuilder; public List getStaticFieldNames() { return getStaticFieldNames(0); @@ -36,7 +41,7 @@ public List getStaticFieldNames() { public List getStaticFieldNames(int n) { return getAllFieldNames(n).stream() - .filter(name -> !DYNAMIC_FIELDS_MAP.equals(name)) + .filter(RelFieldBuilder::isStaticField) .collect(Collectors.toList()); } @@ -56,6 +61,10 @@ public List getAllFieldNames(int n) { return relBuilder.peek(n).getRowType().getFieldNames(); } + public List allFields() { + return relBuilder.fields(); + } + public RexInputRef staticField(String fieldName) { return relBuilder.field(fieldName); } @@ -73,8 +82,10 @@ public RexInputRef staticField(int fieldIndex) { } public List staticFields() { - return relBuilder.fields().stream() - .filter(node -> !DYNAMIC_FIELDS_MAP.equals(node)) + List originalFields = relBuilder.peek().getRowType().getFieldNames(); + return IntStream.range(0, originalFields.size()) + .filter(i -> isStaticField(originalFields.get(i))) + .mapToObj(i -> relBuilder.field(i)) .collect(Collectors.toList()); } @@ -129,4 +140,117 @@ private RexNode createItemAccess(RexNode field, String itemName) { return PPLFuncImpTable.INSTANCE.resolve( rexBuilder, BuiltinFunctionName.INTERNAL_ITEM, field, rexBuilder.makeLiteral(itemName)); } + + private static boolean isStaticField(String fieldName) { + // use startsWith as `_MAP` can be renamed to `_MAP0`, etc. by RelBuilder + return !fieldName.startsWith(DYNAMIC_FIELDS_MAP); + } + + @Value + @AllArgsConstructor + private static class JoinedField { + boolean isInLeft; + boolean isInRight; + boolean isDynamicFieldMap; + String fieldName; + RexInputRef fieldNode; + } + + /** + * Reorganize dynamic fields map (_MAP) after join. It will concat dynamic fields map from both + * side, and also override static field by the value from dynamic fields as needed. This should be + * called after join, but with fields information before join as parameters + */ + public void reorganizeDynamicFields(List leftAllFields, List rightAllFields) { + boolean leftHasDynamicFields = leftAllFields.contains(DYNAMIC_FIELDS_MAP); + boolean rightHasDynamicFields = rightAllFields.contains(DYNAMIC_FIELDS_MAP); + + if (!leftHasDynamicFields && !rightHasDynamicFields) { + return; + } + + List fields = collectStaticFieldsInfo(leftAllFields, rightAllFields); + Optional leftMap = + leftHasDynamicFields ? Optional.of(getLeftDynamicFieldMapInfo()) : Optional.empty(); + Optional rightMap = + rightHasDynamicFields ? Optional.of(getRightDynamicFieldMapInfo()) : Optional.empty(); + + List projected = new ArrayList<>(); + List names = new ArrayList<>(); + for (JoinedField field : fields) { + if (field.isInLeft && !field.isInRight && rightHasDynamicFields) { + // need to prioritize value from right dynamic map if only left side has static field with + // the name + RexNode valueFromRightMap = createItemAccess(rightMap.get().fieldNode, field.fieldName); + RexNode merged = rexBuilder.coalesce(valueFromRightMap, field.fieldNode); + projected.add(merged); + names.add(field.fieldName); + } else { + projected.add(field.fieldNode); + names.add(field.fieldName); + } + } + if (leftHasDynamicFields && rightHasDynamicFields) { + // need to concat map from both side + projected.add(mapConcatCall(leftMap.get().fieldNode, rightMap.get().fieldNode)); + } else if (leftHasDynamicFields) { + projected.add(leftMap.get().fieldNode); + } else if (rightHasDynamicFields) { + projected.add(rightMap.get().fieldNode); + } + names.add(DYNAMIC_FIELDS_MAP); + + relBuilder.projectNamed(projected, names, true); + } + + private JoinedField getLeftDynamicFieldMapInfo() { + List allFields = getAllFieldNames(); + for (int i = 0; i < allFields.size(); i++) { + String fieldName = allFields.get(i); + if (fieldName.startsWith(DYNAMIC_FIELDS_MAP)) { + return new JoinedField(true, false, true, fieldName, relBuilder.field(i)); + } + } + throw new IllegalStateException("Dynamic field not found for right input."); + } + + private JoinedField getRightDynamicFieldMapInfo() { + List allFields = getAllFieldNames(); + for (int i = allFields.size() - 1; 0 <= i; i++) { + String fieldName = allFields.get(i); + if (fieldName.startsWith(DYNAMIC_FIELDS_MAP)) { + return new JoinedField(false, true, true, fieldName, relBuilder.field(i)); + } + } + throw new IllegalStateException("Dynamic field not found for right input."); + } + + private List collectStaticFieldsInfo( + List leftAllFields, List rightAllFields) { + List fields = new ArrayList<>(); + List allFields = getAllFieldNames(); + // iterate by index to avoid referring field by name (it causes issue when field name is + // overlapping) + for (int i = 0; i < allFields.size(); i++) { + String fieldName = allFields.get(i); + if (!fieldName.startsWith(DYNAMIC_FIELDS_MAP)) { + boolean isInLeft = leftAllFields.contains(fieldName); + boolean isInRight = + rightAllFields.contains(fieldName) + || (isInLeft && rightAllFields.contains(excludeLastZero(fieldName))); + fields.add(new JoinedField(isInLeft, isInRight, false, fieldName, relBuilder.field(i))); + } + } + return fields; + } + + /** exclude zero to consider the rename (adding `0` at the end) due to duplicate name */ + private String excludeLastZero(String fieldName) { + return fieldName.endsWith("0") ? fieldName.substring(0, fieldName.length() - 1) : fieldName; + } + + private RexNode mapConcatCall(RexInputRef left, RexInputRef right) { + return PPLFuncImpTable.INSTANCE.resolve( + rexBuilder, BuiltinFunctionName.MAP_CONCAT, left, right); + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java index 6680c98d259..74910a9a521 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java @@ -66,7 +66,9 @@ static void addProjectionIfNecessary(Lookup node, CalcitePlanContext context) { if (lookupMappingFields.size() != context.fieldBuilder.staticFields().size()) { List projectList = lookupMappingFields.stream() - .map(fieldName -> (RexNode) context.fieldBuilder.staticField(fieldName)) + .map( + fieldName -> + (RexNode) QualifiedNameResolver.resolveFieldOrThrow(fieldName, context)) .toList(); context.relBuilder.project(projectList); } @@ -78,18 +80,22 @@ static void addJoinForLookUp(Lookup node, CalcitePlanContext context) { node.getMappingAliasMap().entrySet().stream() .map( entry -> { - RexNode lookupKey = analyzeFieldsForLookUp(entry.getKey(), false, context); - RexNode sourceKey = analyzeFieldsForLookUp(entry.getValue(), true, context); - return context.rexBuilder.equals(sourceKey, lookupKey); + RexNode lookupKey = analyzeFieldsInRight(entry.getKey(), context); + RexNode sourceKey = analyzeFieldsInLeft(entry.getValue(), context); + return context.rexBuilder.equalsWithCastAsNeeded(sourceKey, lookupKey); }) .reduce(context.rexBuilder::and) .orElse(context.relBuilder.literal(true)); context.relBuilder.join(JoinRelType.LEFT, joinCondition); } - static RexNode analyzeFieldsForLookUp( - String fieldName, boolean isSourceTable, CalcitePlanContext context) { - return QualifiedNameResolver.resolveField(2, isSourceTable ? 0 : 1, fieldName, context) + static RexNode analyzeFieldsInLeft(String fieldName, CalcitePlanContext context) { + return QualifiedNameResolver.resolveField(2, 0, fieldName, context) + .orElseThrow(() -> new IllegalArgumentException("field not found: " + fieldName)); + } + + static RexNode analyzeFieldsInRight(String fieldName, CalcitePlanContext context) { + return QualifiedNameResolver.resolveField(2, 1, fieldName, context) .orElseThrow(() -> new IllegalArgumentException("field not found: " + fieldName)); } @@ -97,7 +103,7 @@ static void renameToExpectedFields( List expectedProvidedFieldNames, int sourceFieldsCountLeft, CalcitePlanContext context) { - List oldFields = context.relBuilder.peek().getRowType().getFieldNames(); + List oldFields = context.fieldBuilder.getStaticFieldNames(); assert sourceFieldsCountLeft + expectedProvidedFieldNames.size() == oldFields.size() : "The source fields count left plus new provided fields count must equal to the output" + " fields count of current plan(i.e project-join)."; diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/RexConverter.java b/core/src/main/java/org/opensearch/sql/calcite/utils/RexConverter.java new file mode 100644 index 00000000000..f4dfec7d538 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/RexConverter.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLambda; +import org.apache.calcite.rex.RexLambdaRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexPatternFieldRef; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexVisitor; + +/** + * Base class for converting specific portions of a RexNode tree by overriding the node types of + * interest. This class implements the visitor pattern for RexNode traversal, providing default + * implementations that return nodes unchanged. Subclasses can override specific visit methods to + * transform particular node types while leaving others untouched. + */ +public class RexConverter implements RexVisitor { + + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + return inputRef; + } + + @Override + public RexNode visitLocalRef(RexLocalRef localRef) { + return localRef; + } + + @Override + public RexNode visitLiteral(RexLiteral literal) { + return literal; + } + + @Override + public RexNode visitCall(RexCall call) { + List operands = + call.getOperands().stream() + .map(operand -> operand.accept(this)) + .collect(Collectors.toList()); + if (operands.equals(call.getOperands())) { + return call; + } + return call.clone(call.getType(), operands); + } + + @Override + public RexNode visitOver(RexOver over) { + List operands = + over.getOperands().stream() + .map(operand -> operand.accept(this)) + .collect(Collectors.toList()); + if (operands.equals(over.getOperands())) { + return over; + } + return over.clone(over.getType(), operands); + } + + @Override + public RexNode visitCorrelVariable(RexCorrelVariable correlVariable) { + return correlVariable; + } + + @Override + public RexNode visitDynamicParam(RexDynamicParam dynamicParam) { + return dynamicParam; + } + + @Override + public RexNode visitRangeRef(RexRangeRef rangeRef) { + return rangeRef; + } + + @Override + public RexNode visitFieldAccess(RexFieldAccess fieldAccess) { + RexNode expr = fieldAccess.getReferenceExpr().accept(this); + if (expr == fieldAccess.getReferenceExpr()) { + return fieldAccess; + } + throw new UnsupportedOperationException( + "RexFieldAccess transformation not supported. Override visitFieldAccess() to handle this" + + " case."); + } + + @Override + public RexNode visitSubQuery(RexSubQuery subQuery) { + List operands = + subQuery.getOperands().stream() + .map(operand -> operand.accept(this)) + .collect(Collectors.toList()); + if (operands.equals(subQuery.getOperands())) { + return subQuery; + } + return subQuery.clone(subQuery.getType(), operands); + } + + @Override + public RexNode visitTableInputRef(RexTableInputRef fieldRef) { + return fieldRef; + } + + @Override + public RexNode visitPatternFieldRef(RexPatternFieldRef fieldRef) { + return fieldRef; + } + + @Override + public RexNode visitLambda(RexLambda lambda) { + RexNode expr = lambda.getExpression().accept(this); + if (expr == lambda.getExpression()) { + return lambda; + } + throw new UnsupportedOperationException( + "RexLambda transformation not supported. Override visitLambda() to handle this case."); + } + + @Override + public RexNode visitLambdaRef(RexLambdaRef lambdaRef) { + return lambdaRef; + } +} diff --git a/core/src/test/java/org/opensearch/sql/calcite/PPLAggregateConvertRuleTest.java b/core/src/test/java/org/opensearch/sql/calcite/PPLAggregateConvertRuleTest.java index 652ee108aca..5332b97f691 100644 --- a/core/src/test/java/org/opensearch/sql/calcite/PPLAggregateConvertRuleTest.java +++ b/core/src/test/java/org/opensearch/sql/calcite/PPLAggregateConvertRuleTest.java @@ -50,7 +50,7 @@ public class PPLAggregateConvertRuleTest { @Mock RelOptPlanner planner; RelDataType type = TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT); RelDataType rowType = TYPE_FACTORY.createStructType(List.of(type, type), List.of("a", "b")); - RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY); + RexBuilder rexBuilder = new ExtendedRexBuilder(new RexBuilder(TYPE_FACTORY)); RelBuilder relBuilder; @BeforeEach diff --git a/core/src/test/java/org/opensearch/sql/calcite/utils/RexConverterTest.java b/core/src/test/java/org/opensearch/sql/calcite/utils/RexConverterTest.java new file mode 100644 index 00000000000..9e84715435f --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/calcite/utils/RexConverterTest.java @@ -0,0 +1,177 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.math.BigDecimal; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLambda; +import org.apache.calcite.rex.RexLambdaRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class RexConverterTest { + + private RexBuilder rexBuilder; + private RelDataType intType; + + @BeforeEach + void setUp() { + rexBuilder = new RexBuilder(OpenSearchTypeFactory.TYPE_FACTORY); + intType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.INTEGER); + } + + @Test + void testVisitInputRef_returnsUnchanged() { + RexConverter converter = new RexConverter(); + RexInputRef inputRef = rexBuilder.makeInputRef(intType, 0); + + RexNode result = inputRef.accept(converter); + + assertSame(inputRef, result); + } + + @Test + void testVisitLiteral_returnsUnchanged() { + RexConverter converter = new RexConverter(); + RexLiteral literal = rexBuilder.makeExactLiteral(BigDecimal.valueOf(42)); + + RexNode result = literal.accept(converter); + + assertSame(literal, result); + } + + @Test + void testVisitCall_withNoTransformation_returnsOriginal() { + RexConverter converter = new RexConverter(); + RexNode left = rexBuilder.makeInputRef(intType, 0); + RexNode right = rexBuilder.makeInputRef(intType, 1); + RexCall call = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.PLUS, left, right); + + RexNode result = call.accept(converter); + + assertSame(call, result); + } + + @Test + void testVisitCall_withTransformation_returnsNewNode() { + RexConverter converter = + new RexConverter() { + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + return rexBuilder.makeInputRef(inputRef.getType(), inputRef.getIndex() + 10); + } + }; + + RexNode left = rexBuilder.makeInputRef(intType, 0); + RexNode right = rexBuilder.makeInputRef(intType, 1); + RexCall call = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.PLUS, left, right); + + RexNode result = call.accept(converter); + + assertNotSame(call, result); + RexCall resultCall = (RexCall) result; + assertEquals(2, resultCall.getOperands().size()); + assertEquals(10, ((RexInputRef) resultCall.getOperands().get(0)).getIndex()); + assertEquals(11, ((RexInputRef) resultCall.getOperands().get(1)).getIndex()); + } + + @Test + void testVisitCall_nestedCalls_transformsRecursively() { + RexConverter converter = + new RexConverter() { + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + return rexBuilder.makeInputRef(inputRef.getType(), inputRef.getIndex() + 1); + } + }; + + RexNode a = rexBuilder.makeInputRef(intType, 0); + RexNode b = rexBuilder.makeInputRef(intType, 1); + RexNode c = rexBuilder.makeInputRef(intType, 2); + RexCall innerCall = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.PLUS, a, b); + RexCall outerCall = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, innerCall, c); + + RexNode result = outerCall.accept(converter); + + assertNotSame(outerCall, result); + RexCall resultCall = (RexCall) result; + RexCall innerResult = (RexCall) resultCall.getOperands().get(0); + assertEquals(1, ((RexInputRef) innerResult.getOperands().get(0)).getIndex()); + assertEquals(2, ((RexInputRef) innerResult.getOperands().get(1)).getIndex()); + assertEquals(3, ((RexInputRef) resultCall.getOperands().get(1)).getIndex()); + } + + @Test + void testVisitLambda_withNoTransformation_returnsOriginal() { + RexConverter converter = new RexConverter(); + RexLambdaRef lambdaRef = new RexLambdaRef(0, "x", intType); + RexLambda lambda = + (RexLambda) rexBuilder.makeLambdaCall(lambdaRef, java.util.List.of(lambdaRef)); + + RexNode result = lambda.accept(converter); + + assertSame(lambda, result); + } + + @Test + void testVisitLambda_withTransformation_throwsException() { + RexConverter converter = + new RexConverter() { + @Override + public RexNode visitLambdaRef(RexLambdaRef lambdaRef) { + return new RexLambdaRef( + lambdaRef.getIndex() + 1, lambdaRef.getName(), lambdaRef.getType()); + } + }; + + RexLambdaRef lambdaRef = new RexLambdaRef(0, "x", intType); + RexLambda lambda = + (RexLambda) rexBuilder.makeLambdaCall(lambdaRef, java.util.List.of(lambdaRef)); + + UnsupportedOperationException exception = + assertThrows(UnsupportedOperationException.class, () -> lambda.accept(converter)); + assertEquals( + "RexLambda transformation not supported. Override visitLambda() to handle this case.", + exception.getMessage()); + } + + @Test + void testCustomConverter_selectiveTransformation() { + RexConverter converter = + new RexConverter() { + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + if (inputRef.getIndex() == 0) { + return rexBuilder.makeInputRef(inputRef.getType(), 99); + } + return inputRef; + } + }; + + RexNode input0 = rexBuilder.makeInputRef(intType, 0); + RexNode input1 = rexBuilder.makeInputRef(intType, 1); + RexCall call = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.PLUS, input0, input1); + + RexNode result = call.accept(converter); + + assertNotSame(call, result); + RexCall resultCall = (RexCall) result; + assertEquals(99, ((RexInputRef) resultCall.getOperands().get(0)).getIndex()); + assertEquals(1, ((RexInputRef) resultCall.getOperands().get(1)).getIndex()); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsCommandIT.java index 4a2c1750ca9..1e64e72b20d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsCommandIT.java @@ -321,63 +321,6 @@ public void testFlatten() throws IOException { verifyDataRows(result, rows(4, "Engineering"), rows(5, "")); } - @Test - @Ignore("pending join adaptation") - public void testExpand() throws IOException { - String query = source(TEST_INDEX_DYNAMIC, "expand arr as expanded | where isnotnull(expanded)"); - - JSONObject result = executeQuery(query); - - verifySchema(result, schema("department", "string"), schema("count", "bigint")); - verifyDataRows(result, rows("Engineering", 2)); - } - - @Test - @Ignore("pending join adaptation") - public void testJoinWithStaticField() throws IOException { - String query = - source( - TEST_INDEX_DYNAMIC, - "join left=l right=r on l.account_number = r.account_number | fields l.firstname," - + " r.department | head 1"); - - JSONObject result = executeQuery(query); - - verifySchema(result, schema("l.firstname", "string"), schema("r.department", "string")); - verifyDataRows(result, rows("John", "Engineering")); - } - - @Test - @Ignore("pending join adaptation") - public void testJoinWithDynamicField() throws IOException { - String query = - source( - TEST_INDEX_DYNAMIC, - "join left=l right=r on l.department = r.department | fields l.firstname, r.department" - + " | head 1"); - - JSONObject result = executeQuery(query); - - verifySchema(result, schema("l.firstname", "string"), schema("r.department", "string")); - verifyDataRows(result, rows("John", "Engineering")); - } - - @Test - @Ignore("pending join adaptation") - public void testLookupWithStaticField() throws IOException { - String query = - source( - TEST_INDEX_DYNAMIC, - "lookup " - + TEST_INDEX_DYNAMIC - + " account_number as acc_num | fields firstname, department"); - - JSONObject result = executeQuery(query); - - verifySchema(result, schema("firstname", "string"), schema("department", "string")); - verifyDataRows(result, rows("John", "Engineering")); - } - @Test @Ignore("pending aggregation adaptation") public void testTop() throws IOException { diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java new file mode 100644 index 00000000000..40a5da7f0a8 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java @@ -0,0 +1,383 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.standalone; + +import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient; +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; + +public class CalciteDynamicFieldsJoinIT extends CalcitePPLPermissiveIntegTestCase { + + private static final String TEST_DYNAMIC_LEFT = "test_dynamic_left"; + private static final String TEST_DYNAMIC_RIGHT = "test_dynamic_right"; + + @Override + public void init() throws IOException { + super.init(); + createLeftIndex(); + createRightIndex(); + enableCalcite(); + } + + @Test + public void testJoinStaticWithStaticByFieldName() throws IOException { + String query = + source(TEST_DYNAMIC_LEFT, "join a2 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); + + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + + " LogicalJoin(condition=[=($1, $3)], joinType=[inner])\n" + + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" + + " LogicalProject(a2=[$0], a3=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t5, $t6)]," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t2, $t5)], expr#10=['a4']," + + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," + + " a2=[$t3], a3=[$t4], a4=[$t11], a5=[$t13])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableMergeJoin(condition=[=($1, $3)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$1], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}], _MAP=[$t8])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableSort(sort0=[$0], dir0=[ASC])\n" + + " EnumerableLimit(fetch=[50000])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}]," + + " _MAP=[$t8])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch," + + " test_dynamic_right]])\n"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testJoinStaticWithStaticByCriteria() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left=l right=r on l.a2 = r.a2 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testJoinDynamicWithStaticByFieldName() throws IOException { + String query = + source(TEST_DYNAMIC_LEFT, "join a3 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + + " LogicalJoin(condition=[=(SAFE_CAST(ITEM($2, 'a3')), $4)]," + + " joinType=[inner])\n" + + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" + + " LogicalProject(a2=[$0], a3=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t5, $t6)]," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t1, $t5)], expr#10=['a4']," + + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," + + " a2=[$t3], a3=[$t4], a4=[$t11], a5=[$t13])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableMergeJoin(condition=[=($2, $4)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$2], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a3'], expr#10=[ITEM($t8," + + " $t9)], expr#11=[SAFE_CAST($t10)], a1=[$t0], _MAP=[$t8], $f2=[$t11])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableSort(sort0=[$1], dir0=[ASC])\n" + + " EnumerableLimit(fetch=[50000])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}]," + + " _MAP=[$t8])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch," + + " test_dynamic_right]])\n"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testJoinDynamicWithStaticByCriteria() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on l.a3 = r.a3 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testJoinStaticWithDynamicByFieldName() throws IOException { + String query = + source(TEST_DYNAMIC_LEFT, "join a1 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); + + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + + " LogicalJoin(condition=[=($0, SAFE_CAST(ITEM($5, 'a1')))]," + + " joinType=[inner])\n" + + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" + + " LogicalProject(a2=[$0], a3=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t4, $t6)]," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t1, $t4)], expr#10=['a4']," + + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," + + " a2=[$t2], a3=[$t3], a4=[$t11], a5=[$t13])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableMergeJoin(condition=[=($0, $5)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$0], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], a1=[$t0], _MAP=[$t8])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableSort(sort0=[$3], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a1'], expr#10=[ITEM($t8," + + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + + " EnumerableLimit(fetch=[50000])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch," + + " test_dynamic_right]])\n"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testJoinStaticWithDynamicByCriteria() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on l.a1 = r.a1 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testJoinDynamicWithDynamicByFieldName() throws IOException { + String query = + source(TEST_DYNAMIC_LEFT, "join a4 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + + " LogicalJoin(condition=[=(SAFE_CAST(ITEM($2, 'a4')), SAFE_CAST(ITEM($5," + + " 'a4')))], joinType=[inner])\n" + + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" + + " LogicalProject(a2=[$0], a3=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..6=[{inputs}], expr#7=['a1'], expr#8=[ITEM($t5, $t7)]," + + " expr#9=[COALESCE($t8, $t0)], expr#10=[MAP_CONCAT($t1, $t5)], expr#11=['a4']," + + " expr#12=[ITEM($t10, $t11)], expr#13=['a5'], expr#14=[ITEM($t10, $t13)], a1=[$t9]," + + " a2=[$t3], a3=[$t4], a4=[$t12], a5=[$t14])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableMergeJoin(condition=[=($2, $6)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$2], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a4'], expr#10=[ITEM($t8," + + " $t9)], expr#11=[SAFE_CAST($t10)], a1=[$t0], _MAP=[$t8], $f2=[$t11])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableSort(sort0=[$3], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a4'], expr#10=[ITEM($t8," + + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + + " EnumerableLimit(fetch=[50000])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch," + + " test_dynamic_right]])\n"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testJoinDynamicWithDynamicByCriteria() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on l.a4 = r.a4 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testLookupStaticWithDynamic() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a1 | fields a1, a2, a3, a4, a5"); + + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($17, 'a1'), $0)], a2=[$9], a3=[$10]," + + " a4=[ITEM(MAP_CONCAT($8, $17), 'a4')], a5=[ITEM(MAP_CONCAT($8, $17), 'a5')])\n" + + " LogicalJoin(condition=[=($0, SAFE_CAST(ITEM($17, 'a1')))]," + + " joinType=[left])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t4, $t6)]," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t1, $t4)], expr#10=['a4']," + + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," + + " a2=[$t2], a3=[$t3], a4=[$t11], a5=[$t13])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableHashJoin(condition=[=($0, $5)], joinType=[left])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], a1=[$t0], _MAP=[$t8])\n" + + " EnumerableLimit(fetch=[200])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a1'], expr#10=[ITEM($t8," + + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_right]])\n"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testLookupStaticWithStatic() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a2 | fields a1, a2, a3, a4, a5"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testLookupDynamicWithStaticWithoutCast() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a3 | fields a1, a2, a3, a4, a5"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testLookupDynamicWithDynamicWithoutCast() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a4 | fields a1, a2, a3, a4, a5"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + private void createLeftIndex() throws IOException { + if (isIndexExist(client(), TEST_DYNAMIC_LEFT)) { + return; + } + + String mapping = + "{" + + "\"mappings\": {" + // Disable dynamic mapping - extra fields won't be indexed but will be stored + + " \"dynamic\": false," + + " \"properties\": {" + + " \"a1\": {\"type\": \"text\"}," + + " \"a2\": {\"type\": \"long\"}" + + " }" + + "}" + + "}"; + + createIndexByRestClient(client(), TEST_DYNAMIC_LEFT, mapping); + + String bulkData = + "{\"index\":{\"_id\":\"1\"}}\n" + + "{\"a1\":\"1-a1\",\"a2\":12,\"a3\":\"1-a3\",\"a4\":14,\"a5\":\"1-a5\"}\n" + + "{\"index\":{\"_id\":\"2\"}}\n" + + "{\"a1\":\"2-a1\",\"a2\":22,\"a3\":\"2-a3\",\"a4\":24,\"a5\":\"2-a5\"}\n" + + "{\"index\":{\"_id\":\"3\"}}\n" + + "{\"a1\":\"3-a1\",\"a2\":32,\"a3\":\"3-a3\",\"a4\":34}\n" + + "{\"index\":{\"_id\":\"4\"}}\n" + + "{\"a1\":\"4-a1\",\"a2\":42,\"a3\":\"4-a3\",\"a4\":44}\n"; + + Request request = new Request("POST", "/" + TEST_DYNAMIC_LEFT + "/_bulk?refresh=true"); + request.setJsonEntity(bulkData); + client().performRequest(request); + } + + private void createRightIndex() throws IOException { + if (isIndexExist(client(), TEST_DYNAMIC_RIGHT)) { + return; + } + + String mapping = + "{" + + "\"mappings\": {" + // Disable dynamic mapping - extra fields won't be indexed but will be stored + + " \"dynamic\": false," + + " \"properties\": {" + + " \"a2\": {\"type\": \"long\"}," + + " \"a3\": {\"type\": \"text\"}" + + " }" + + "}" + + "}"; + + createIndexByRestClient(client(), TEST_DYNAMIC_RIGHT, mapping); + + String bulkData = + "{\"index\":{\"_id\":\"1\"}}\n" + + "{\"a1\":\"1-a1\",\"a2\":12,\"a3\":\"1-a3\",\"a4\":14,\"a5\":\"1-a5_2\"}\n" + + "{\"index\":{\"_id\":\"2\"}}\n" + + "{\"a1\":\"2-a1\",\"a2\":22,\"a3\":\"2-a3\",\"a4\":24}\n" + + "{\"index\":{\"_id\":\"3\"}}\n" + + "{\"a1\":\"3-a1\",\"a2\":32,\"a3\":\"3-a3\",\"a4\":34,\"a5\":\"3-a5_2\"}\n" + + "{\"index\":{\"_id\":\"4\"}}\n" + + "{\"a1\":\"4-a1\",\"a2\":42,\"a3\":\"4-a3\",\"a4\":44}\n"; + + Request request = new Request("POST", "/" + TEST_DYNAMIC_RIGHT + "/_bulk?refresh=true"); + request.setJsonEntity(bulkData); + client().performRequest(request); + } + + private void verifyJoinResult(JSONObject result) { + verifySchema( + result, + schema("a1", "string"), + schema("a2", "bigint"), + schema("a3", "string"), + schema("a4", "int"), + schema("a5", "string")); + verifyDataRows( + result, + rows("1-a1", 12, "1-a3", 14, "1-a5_2"), + rows("2-a1", 22, "2-a3", 24, "2-a5"), + rows("3-a1", 32, "3-a3", 34, "3-a5_2"), + rows("4-a1", 42, "4-a3", 44, null)); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java index 94bc6d8dbcd..7396eb8fcbe 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java @@ -128,7 +128,9 @@ protected ImmutableMap.Builder getDefaultSettingsBuilder() { .put(Key.PATTERN_MODE, "LABEL") .put(Key.PATTERN_MAX_SAMPLE_COUNT, 10) .put(Key.PATTERN_BUFFER_LIMIT, 100000) - .put(Key.PPL_REX_MAX_MATCH_LIMIT, 10); + .put(Key.PPL_REX_MAX_MATCH_LIMIT, 10) + .put(Key.PPL_JOIN_SUBSEARCH_MAXOUT, 50000) + .put(Key.PPL_SUBSEARCH_MAXOUT, 10); } protected Settings defaultSettings(Map settings) {