From bff6aa51cb9950dad6bfd52ea44d0197e84b6afa Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Wed, 5 Nov 2025 09:39:46 -0800 Subject: [PATCH 1/6] Support join with dynamic fields Signed-off-by: Tomoyuki Morita --- .../sql/calcite/CalciteRelNodeVisitor.java | 33 +- .../calcite/plan/PPLAggregateConvertRule.java | 5 +- .../calcite/rel/QualifiedNameResolver.java | 37 +- .../sql/calcite/rel/RelFieldBuilder.java | 135 ++++- .../sql/calcite/utils/JoinAndLookupUtils.java | 48 +- .../calcite/PPLAggregateConvertRuleTest.java | 2 +- .../CalciteDynamicFieldsCommandIT.java | 57 --- .../CalciteDynamicFieldsJoinIT.java | 484 ++++++++++++++++++ .../standalone/CalcitePPLIntegTestCase.java | 4 +- 9 files changed, 723 insertions(+), 82 deletions(-) create mode 100644 integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 013ca261ad7..48ec3c62268 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1224,6 +1224,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { context.relBuilder.peek(), context.relBuilder.literal(context.sysLimit.joinSubsearchLimit()))); } + List leftAllFields = context.fieldBuilder.getAllFieldNames(1); + List rightAllFields = context.fieldBuilder.getAllFieldNames(0); if (node.getJoinCondition().isEmpty()) { // join-with-field-list grammar List leftColumns = context.fieldBuilder.getStaticFieldNames(1); @@ -1255,12 +1257,12 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { || (node.getArgumentMap().get("overwrite").equals(Literal.TRUE))) { toBeRemovedFields = duplicatedFieldNames.stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, true, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context)) .toList(); } else { toBeRemovedFields = duplicatedFieldNames.stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, false, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context)) .toList(); } Literal max = node.getArgumentMap().get("max"); @@ -1285,6 +1287,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { if (!toBeRemovedFields.isEmpty()) { context.relBuilder.projectExcept(toBeRemovedFields); } + context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields); + return context.relBuilder.peek(); } // The join-with-criteria grammar doesn't allow empty join condition @@ -1292,6 +1296,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { node.getJoinCondition() .map(c -> rexVisitor.analyzeJoinCondition(c, context)) .orElse(context.relBuilder.literal(true)); + JoinAndLookupUtils.verifyJoinConditionNotUseAnyType(joinCondition, context); if (node.getJoinType() == SEMI || node.getJoinType() == ANTI) { // semi and anti join only return left table outputs context.relBuilder.join( @@ -1302,7 +1307,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { // when a new project add to stack. To avoid `id0`, we will rename the `id0` to `alias.id` // or `tableIdentifier.id`: List leftColumns = context.fieldBuilder.getStaticFieldNames(1); - List rightColumns = context.fieldBuilder.getStaticFieldNames(); + List rightColumns = context.fieldBuilder.getStaticFieldNames(0); List rightTableName = PlanUtils.findTable(context.relBuilder.peek()).getQualifiedName(); // Using `table.column` instead of `catalog.database.table.column` as column prefix because @@ -1337,6 +1342,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { } context.relBuilder.join( JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition); + + context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields); JoinAndLookupUtils.renameToExpectedFields( rightColumnsWithAliasIfConflict, leftColumns.size(), context); } @@ -1369,8 +1376,13 @@ public Void visitInputRef(RexInputRef inputRef) { private static RexNode buildJoinConditionByFieldName( CalcitePlanContext context, String fieldName) { - RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsForLookUp(fieldName, false, context); - RexNode sourceKey = JoinAndLookupUtils.analyzeFieldsForLookUp(fieldName, true, context); + RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsInRight(fieldName, context); + RexNode sourceKey = JoinAndLookupUtils.analyzeFieldsInLeft(fieldName, context); + if (context.fieldBuilder.isAnyType(sourceKey)) { + throw new IllegalArgumentException( + String.format( + "Source key `%s` needs to be specific type. Please cast explicitly.", fieldName)); + } return context.rexBuilder.equals(sourceKey, lookupKey); } @@ -1397,6 +1409,10 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { // Get lookupColumns from top of stack (after above potential projection). List lookupTableFieldNames = context.fieldBuilder.getStaticFieldNames(); + // For merging with dynamic fields later + List leftAllFields = context.fieldBuilder.getAllFieldNames(1); + List rightAllFields = context.fieldBuilder.getAllFieldNames(0); + // 3. Find fields which should be removed in lookup-table. // For lookup table, the mapping fields should be dropped after join // unless they are explicitly put in the output fields @@ -1410,6 +1426,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { .toList(); List toBeRemovedLookupFields = toBeRemovedLookupFieldNames.stream() + .filter(d -> lookupTableFieldNames.contains(d)) .map(d -> (RexNode) context.fieldBuilder.staticField(2, 1, d)) .toList(); List toBeRemovedFields = new ArrayList<>(toBeRemovedLookupFields); @@ -1421,7 +1438,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { List duplicatedSourceFields = duplicatedFieldNamesMap.keySet().stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, true, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context)) .toList(); // Duplicated fields in source-field should always be removed. toBeRemovedFields.addAll(duplicatedSourceFields); @@ -1433,7 +1450,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { if (!duplicatedFieldNamesMap.isEmpty() && node.getOutputStrategy() == OutputStrategy.APPEND) { List duplicatedProvidedFields = duplicatedFieldNamesMap.values().stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, false, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context)) .toList(); for (int i = 0; i < duplicatedProvidedFields.size(); ++i) { newCoalesceList.add( @@ -1470,7 +1487,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) { context.relBuilder.projectExcept(toBeRemovedFields); } - // TODO: dedupe dynamic fields + context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields); // 7. Rename the fields to the expected names. JoinAndLookupUtils.renameToExpectedFields( diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java index 7c559f1906c..72189185bfb 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java @@ -21,7 +21,6 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalProject; -import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; @@ -34,6 +33,7 @@ import org.apache.calcite.util.mapping.Mappings; import org.apache.commons.lang3.tuple.Pair; import org.immutables.value.Value; +import org.opensearch.sql.calcite.ExtendedRexBuilder; import org.opensearch.sql.calcite.rel.RelBuilderWrapper; import org.opensearch.sql.calcite.rel.RelFieldBuilder; @@ -78,7 +78,8 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec final RelBuilder rawRelBuilder = call.builder(); final RelBuilderWrapper relBuilder = new RelBuilderWrapper(rawRelBuilder); - final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder(); + final ExtendedRexBuilder rexBuilder = + (ExtendedRexBuilder) aggregate.getCluster().getRexBuilder(); final RelFieldBuilder fieldBuilder = new RelFieldBuilder(rawRelBuilder, rexBuilder); relBuilder.push(project.getInput()); diff --git a/core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java b/core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java index f4570d08856..99bebf9f293 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java +++ b/core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java @@ -38,11 +38,17 @@ public static Optional resolveField( if (inputFieldNames.contains(fieldName)) { return Optional.of(context.fieldBuilder.staticField(inputCount, inputOrdinal, fieldName)); } else if (context.fieldBuilder.isDynamicFieldsExist()) { - return Optional.of(context.fieldBuilder.dynamicField(fieldName)); + return Optional.of(context.fieldBuilder.dynamicField(inputCount, inputOrdinal, fieldName)); } return Optional.empty(); } + /** Resolve field in the top of the stack */ + public static Optional resolveField(String fieldName, CalcitePlanContext context) { + return resolveField(1, 0, fieldName, context); + } + + /** Resolve field in the specified input. Throw exception if not found. */ public static RexNode resolveFieldOrThrow( int inputCount, int inputOrdinal, String fieldName, CalcitePlanContext context) { return resolveField(inputCount, inputOrdinal, fieldName, context) @@ -50,6 +56,11 @@ public static RexNode resolveFieldOrThrow( () -> new IllegalArgumentException(String.format("Field [%s] not found.", fieldName))); } + /** Resolve field in the top of the stack. Throw exception if not found. */ + public static RexNode resolveFieldOrThrow(String fieldName, CalcitePlanContext context) { + return resolveFieldOrThrow(1, 0, fieldName, context); + } + /** * Resolves a qualified name to a RexNode based on the current context. * @@ -78,6 +89,8 @@ private static RexNode resolveInJoinCondition( return resolveFieldWithAlias(nameNode, context, 2) .or(() -> resolveFieldWithoutAlias(nameNode, context, 2)) + .or(() -> resolveDynamicFieldsWithAlias(nameNode, context, 2)) + .or(() -> resolveDynamicFields(nameNode, context, 2)) .orElseThrow(() -> getNotFoundException(nameNode)); } @@ -139,6 +152,28 @@ private static Optional resolveFieldWithAlias( return Optional.empty(); } + private static Optional resolveDynamicFieldsWithAlias( + QualifiedName nameNode, CalcitePlanContext context, int inputCount) { + List parts = nameNode.getParts(); + log.debug( + "resolveDynamicFieldsWithAlias() called with nameNode={}, parts={}, inputCount={}", + nameNode, + parts, + inputCount); + + if (parts.size() >= 2) { + // Consider first part as table alias + String alias = parts.get(0); + + String fieldName = String.join(".", parts.subList(1, parts.size())); + Optional dynamicField = + tryToResolveField(alias, DYNAMIC_FIELDS_MAP, context, inputCount); + return dynamicField.map(field -> createItemAccess(field, fieldName, context)); + } + + return Optional.empty(); + } + private static Optional resolveDynamicFields( QualifiedName nameNode, CalcitePlanContext context, int inputCount) { List parts = nameNode.getParts(); diff --git a/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java b/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java index bf4e1591a72..86d99955891 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java +++ b/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java @@ -8,14 +8,20 @@ import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP; import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; +import lombok.Value; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.RelBuilder; +import org.opensearch.sql.calcite.ExtendedRexBuilder; import org.opensearch.sql.calcite.plan.OpenSearchConstants; import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.PPLFuncImpTable; @@ -28,7 +34,7 @@ @RequiredArgsConstructor public class RelFieldBuilder { private final RelBuilder relBuilder; - private final RexBuilder rexBuilder; + private final ExtendedRexBuilder rexBuilder; public List getStaticFieldNames() { return getStaticFieldNames(0); @@ -36,7 +42,7 @@ public List getStaticFieldNames() { public List getStaticFieldNames(int n) { return getAllFieldNames(n).stream() - .filter(name -> !DYNAMIC_FIELDS_MAP.equals(name)) + .filter(RelFieldBuilder::isStaticField) .collect(Collectors.toList()); } @@ -56,6 +62,10 @@ public List getAllFieldNames(int n) { return relBuilder.peek(n).getRowType().getFieldNames(); } + public List allFields() { + return relBuilder.fields(); + } + public RexInputRef staticField(String fieldName) { return relBuilder.field(fieldName); } @@ -73,8 +83,10 @@ public RexInputRef staticField(int fieldIndex) { } public List staticFields() { - return relBuilder.fields().stream() - .filter(node -> !DYNAMIC_FIELDS_MAP.equals(node)) + List originalFields = relBuilder.peek().getRowType().getFieldNames(); + return IntStream.range(0, originalFields.size()) + .filter(i -> isStaticField(originalFields.get(i))) + .mapToObj(i -> relBuilder.field(i)) .collect(Collectors.toList()); } @@ -92,6 +104,10 @@ public ImmutableList staticFields(List ordinals) { return relBuilder.fields(ordinals); } + public boolean isAnyType(RexNode node) { + return node.getType().getSqlTypeName().equals(SqlTypeName.ANY); + } + public boolean isDynamicFieldsExist() { return isDynamicFieldsExist(1, 0); } @@ -129,4 +145,113 @@ private RexNode createItemAccess(RexNode field, String itemName) { return PPLFuncImpTable.INSTANCE.resolve( rexBuilder, BuiltinFunctionName.INTERNAL_ITEM, field, rexBuilder.makeLiteral(itemName)); } + + private static boolean isStaticField(String fieldName) { + // use startsWith as `_MAP` can be renamed to `_MAP0`, etc. by RelBuilder + return !fieldName.startsWith(DYNAMIC_FIELDS_MAP); + } + + @Value + @AllArgsConstructor + private static class JoinedField { + boolean isInLeft; + boolean isInRight; + boolean isDynamicFieldMap; + String fieldName; + RexInputRef fieldNode; + } + + /** + * Reorganize dynamic fields map (_MAP) after join. It will concat dynamic fields map from both + * side, and also override static field by the value from dynamic fields as needed. This should be + * called after join, but with fields information before join as parameters + */ + public void reorganizeDynamicFields(List leftAllFields, List rightAllFields) { + boolean leftHasDynamicFields = leftAllFields.contains(DYNAMIC_FIELDS_MAP); + boolean rightHasDynamicFields = rightAllFields.contains(DYNAMIC_FIELDS_MAP); + + List fields = collectStaticFieldsInfo(leftAllFields, rightAllFields); + Optional leftMap = + leftHasDynamicFields ? Optional.of(getLeftDynamicFieldMapInfo()) : Optional.empty(); + Optional rightMap = + rightHasDynamicFields ? Optional.of(getRightDynamicFieldMapInfo()) : Optional.empty(); + + List projected = new ArrayList<>(); + List names = new ArrayList<>(); + for (JoinedField field : fields) { + if (field.isInLeft && !field.isInRight && rightHasDynamicFields) { + // need to prioritize value from right dynamic map if only left side has static field with + // the name + RexNode valueFromRightMap = createItemAccess(rightMap.get().fieldNode, field.fieldName); + RexNode merged = rexBuilder.coalesce(valueFromRightMap, field.fieldNode); + projected.add(merged); + names.add(field.fieldName); + } else { + projected.add(field.fieldNode); + names.add(field.fieldName); + } + } + if (leftHasDynamicFields && rightHasDynamicFields) { + // need to concat map from both side + projected.add(mapConcatCall(leftMap.get().fieldNode, rightMap.get().fieldNode)); + } else if (leftHasDynamicFields) { + projected.add(leftMap.get().fieldNode); + } else if (rightHasDynamicFields) { + projected.add(rightMap.get().fieldNode); + } + names.add(DYNAMIC_FIELDS_MAP); + + relBuilder.projectNamed(projected, names, true); + } + + private JoinedField getLeftDynamicFieldMapInfo() { + List allFields = getAllFieldNames(); + for (int i = 0; i < allFields.size(); i++) { + String fieldName = allFields.get(i); + if (fieldName.startsWith(DYNAMIC_FIELDS_MAP)) { + return new JoinedField(true, false, true, fieldName, relBuilder.field(i)); + } + } + throw new IllegalStateException("Dynamic field not found for right input."); + } + + private JoinedField getRightDynamicFieldMapInfo() { + List allFields = getAllFieldNames(); + for (int i = allFields.size() - 1; 0 <= i; i++) { + String fieldName = allFields.get(i); + if (fieldName.startsWith(DYNAMIC_FIELDS_MAP)) { + return new JoinedField(false, true, true, fieldName, relBuilder.field(i)); + } + } + throw new IllegalStateException("Dynamic field not found for right input."); + } + + private List collectStaticFieldsInfo( + List leftAllFields, List rightAllFields) { + List fields = new ArrayList<>(); + List allFields = getAllFieldNames(); + // iterate by index to avoid referring field by name (it causes issue when field name is + // overlapping) + for (int i = 0; i < allFields.size(); i++) { + String fieldName = allFields.get(i); + if (!fieldName.startsWith(DYNAMIC_FIELDS_MAP)) { + boolean isInLeft = leftAllFields.contains(fieldName); + boolean isInRight = + rightAllFields.contains(fieldName) + || (isInLeft && rightAllFields.contains(excludeLastZero(fieldName))); + fields.add(new JoinedField(isInLeft, isInRight, false, fieldName, relBuilder.field(i))); + } + } + return fields; + } + + /** exclude zero to consider the rename (adding `0` at the end) due to duplicate name */ + private String excludeLastZero(String fieldName) { + return fieldName.endsWith("0") ? fieldName.substring(0, fieldName.length() - 1) : fieldName; + } + + private RexNode mapConcatCall(RexInputRef left, RexInputRef right) { + return PPLFuncImpTable.INSTANCE.resolve( + rexBuilder, BuiltinFunctionName.MAP_CONCAT, left, right); + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java index 6680c98d259..cb65e77e749 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java @@ -11,7 +11,10 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.util.Pair; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Lookup; @@ -66,20 +69,47 @@ static void addProjectionIfNecessary(Lookup node, CalcitePlanContext context) { if (lookupMappingFields.size() != context.fieldBuilder.staticFields().size()) { List projectList = lookupMappingFields.stream() - .map(fieldName -> (RexNode) context.fieldBuilder.staticField(fieldName)) + .map( + fieldName -> + (RexNode) QualifiedNameResolver.resolveFieldOrThrow(fieldName, context)) .toList(); context.relBuilder.project(projectList); } } } + /** Utility to verify join condition does not use ANY typed field to avoid */ + static void verifyJoinConditionNotUseAnyType(RexNode rexNode, CalcitePlanContext context) { + rexNode.accept( + new RexVisitorImpl(true) { + @Override + public Void visitCall(RexCall call) { + if (call.getKind() == SqlKind.EQUALS) { + RexNode left = call.operands.get(0); + RexNode right = call.operands.get(1); + if (context.fieldBuilder.isAnyType(left) || context.fieldBuilder.isAnyType(right)) { + throw new IllegalArgumentException( + "Join condition needs to use specific type. Please cast explicitly."); + } + } + return super.visitCall(call); + } + }); + } + static void addJoinForLookUp(Lookup node, CalcitePlanContext context) { RexNode joinCondition = node.getMappingAliasMap().entrySet().stream() .map( entry -> { - RexNode lookupKey = analyzeFieldsForLookUp(entry.getKey(), false, context); - RexNode sourceKey = analyzeFieldsForLookUp(entry.getValue(), true, context); + RexNode lookupKey = analyzeFieldsInRight(entry.getKey(), context); + RexNode sourceKey = analyzeFieldsInLeft(entry.getValue(), context); + if (context.fieldBuilder.isAnyType(sourceKey)) { + throw new IllegalArgumentException( + String.format( + "Source key `%s` needs to be specific type. Please cast explicitly.", + entry.getValue())); + } return context.rexBuilder.equals(sourceKey, lookupKey); }) .reduce(context.rexBuilder::and) @@ -87,9 +117,13 @@ static void addJoinForLookUp(Lookup node, CalcitePlanContext context) { context.relBuilder.join(JoinRelType.LEFT, joinCondition); } - static RexNode analyzeFieldsForLookUp( - String fieldName, boolean isSourceTable, CalcitePlanContext context) { - return QualifiedNameResolver.resolveField(2, isSourceTable ? 0 : 1, fieldName, context) + static RexNode analyzeFieldsInLeft(String fieldName, CalcitePlanContext context) { + return QualifiedNameResolver.resolveField(2, 0, fieldName, context) + .orElseThrow(() -> new IllegalArgumentException("field not found: " + fieldName)); + } + + static RexNode analyzeFieldsInRight(String fieldName, CalcitePlanContext context) { + return QualifiedNameResolver.resolveField(2, 1, fieldName, context) .orElseThrow(() -> new IllegalArgumentException("field not found: " + fieldName)); } @@ -97,7 +131,7 @@ static void renameToExpectedFields( List expectedProvidedFieldNames, int sourceFieldsCountLeft, CalcitePlanContext context) { - List oldFields = context.relBuilder.peek().getRowType().getFieldNames(); + List oldFields = context.fieldBuilder.getStaticFieldNames(); assert sourceFieldsCountLeft + expectedProvidedFieldNames.size() == oldFields.size() : "The source fields count left plus new provided fields count must equal to the output" + " fields count of current plan(i.e project-join)."; diff --git a/core/src/test/java/org/opensearch/sql/calcite/PPLAggregateConvertRuleTest.java b/core/src/test/java/org/opensearch/sql/calcite/PPLAggregateConvertRuleTest.java index 652ee108aca..5332b97f691 100644 --- a/core/src/test/java/org/opensearch/sql/calcite/PPLAggregateConvertRuleTest.java +++ b/core/src/test/java/org/opensearch/sql/calcite/PPLAggregateConvertRuleTest.java @@ -50,7 +50,7 @@ public class PPLAggregateConvertRuleTest { @Mock RelOptPlanner planner; RelDataType type = TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT); RelDataType rowType = TYPE_FACTORY.createStructType(List.of(type, type), List.of("a", "b")); - RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY); + RexBuilder rexBuilder = new ExtendedRexBuilder(new RexBuilder(TYPE_FACTORY)); RelBuilder relBuilder; @BeforeEach diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsCommandIT.java index 4a2c1750ca9..1e64e72b20d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsCommandIT.java @@ -321,63 +321,6 @@ public void testFlatten() throws IOException { verifyDataRows(result, rows(4, "Engineering"), rows(5, "")); } - @Test - @Ignore("pending join adaptation") - public void testExpand() throws IOException { - String query = source(TEST_INDEX_DYNAMIC, "expand arr as expanded | where isnotnull(expanded)"); - - JSONObject result = executeQuery(query); - - verifySchema(result, schema("department", "string"), schema("count", "bigint")); - verifyDataRows(result, rows("Engineering", 2)); - } - - @Test - @Ignore("pending join adaptation") - public void testJoinWithStaticField() throws IOException { - String query = - source( - TEST_INDEX_DYNAMIC, - "join left=l right=r on l.account_number = r.account_number | fields l.firstname," - + " r.department | head 1"); - - JSONObject result = executeQuery(query); - - verifySchema(result, schema("l.firstname", "string"), schema("r.department", "string")); - verifyDataRows(result, rows("John", "Engineering")); - } - - @Test - @Ignore("pending join adaptation") - public void testJoinWithDynamicField() throws IOException { - String query = - source( - TEST_INDEX_DYNAMIC, - "join left=l right=r on l.department = r.department | fields l.firstname, r.department" - + " | head 1"); - - JSONObject result = executeQuery(query); - - verifySchema(result, schema("l.firstname", "string"), schema("r.department", "string")); - verifyDataRows(result, rows("John", "Engineering")); - } - - @Test - @Ignore("pending join adaptation") - public void testLookupWithStaticField() throws IOException { - String query = - source( - TEST_INDEX_DYNAMIC, - "lookup " - + TEST_INDEX_DYNAMIC - + " account_number as acc_num | fields firstname, department"); - - JSONObject result = executeQuery(query); - - verifySchema(result, schema("firstname", "string"), schema("department", "string")); - verifyDataRows(result, rows("John", "Engineering")); - } - @Test @Ignore("pending aggregation adaptation") public void testTop() throws IOException { diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java new file mode 100644 index 00000000000..993525f2e0c --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java @@ -0,0 +1,484 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.standalone; + +import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient; +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Ignore; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; + +public class CalciteDynamicFieldsJoinIT extends CalcitePPLPermissiveIntegTestCase { + + private static final String TEST_DYNAMIC_LEFT = "test_dynamic_left"; + private static final String TEST_DYNAMIC_RIGHT = "test_dynamic_right"; + + @Override + public void init() throws IOException { + super.init(); + createLeftIndex(); + createRightIndex(); + enableCalcite(); + } + + @Test + @Ignore("pending join adaptation") + public void testExpand() throws IOException { + String query = source(TEST_DYNAMIC_LEFT, "expand arr as expanded | where isnotnull(expanded)"); + + JSONObject result = executeQuery(query); + + verifySchema(result, schema("department", "string"), schema("count", "bigint")); + verifyDataRows(result, rows("Engineering", 2)); + } + + @Test + public void testJoinWithStaticField() throws IOException { + String query = + source(TEST_DYNAMIC_LEFT, "join a2 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); + + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$1], a3=[$4]," + + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + + " LogicalJoin(condition=[=($1, $3)], joinType=[inner])\n" + + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" + + " LogicalProject(a2=[$0], a3=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t5, $t6)]," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t2, $t5)], expr#10=['a4']," + + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," + + " a2=[$t1], a3=[$t4], a4=[$t11], a5=[$t13])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableMergeJoin(condition=[=($1, $3)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$1], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}], _MAP=[$t8])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableSort(sort0=[$0], dir0=[ASC])\n" + + " EnumerableLimit(fetch=[50000])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}]," + + " _MAP=[$t8])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch," + + " test_dynamic_right]])\n"); + + JSONObject result = executeQuery(query); + + System.out.println(result.toString()); + + verifyJoinResult(result); + } + + @Test + public void testJoinDynamicWithStaticWithoutCast1() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on l.a3 = r.a3 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); + assertEquals( + "Join condition needs to use specific type. Please cast explicitly.", th.getMessage()); + } + + @Test + public void testJoinDynamicWithStaticWithoutCast2() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on r.a3 = l.a3 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); + assertEquals( + "Join condition needs to use specific type. Please cast explicitly.", th.getMessage()); + } + + @Test + public void testJoinDynamicWithStaticWithoutCast3() throws IOException { + String query = + source(TEST_DYNAMIC_LEFT, "join a3 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); + Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); + assertEquals( + "Source key `a3` needs to be specific type. Please cast explicitly.", th.getMessage()); + } + + @Test + public void testJoinDynamicWithStatic() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on cast(l.a3 as string) = r.a3 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$1], a3=[$4]," + + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + + " LogicalJoin(condition=[=(SAFE_CAST(ITEM($2, 'a3')), $4)]," + + " joinType=[inner])\n" + + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" + + " LogicalProject(a2=[$0], a3=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t5, $t6)]," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t2, $t5)], expr#10=['a4']," + + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," + + " a2=[$t1], a3=[$t4], a4=[$t11], a5=[$t13])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableMergeJoin(condition=[=($3, $4)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$3], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a3'], expr#10=[ITEM($t8," + + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableSort(sort0=[$0], dir0=[ASC])\n" + + " EnumerableLimit(fetch=[50000])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], a3=[$t1], _MAP=[$t8])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch," + + " test_dynamic_right]])"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testJoinStaticWithDynamic() throws IOException { + String query = + source(TEST_DYNAMIC_LEFT, "join a1 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); + + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$1], a3=[$4]," + + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + + " LogicalJoin(condition=[=($0, ITEM($5, 'a1'))], joinType=[inner])\n" + + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" + + " LogicalProject(a2=[$0], a3=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t4, $t6)]," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t2, $t4)], expr#10=['a4']," + + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," + + " a2=[$t1], a3=[$t3], a4=[$t11], a5=[$t13])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableMergeJoin(condition=[=($0, $5)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$0], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}], _MAP=[$t8])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableSort(sort0=[$2], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a1'], expr#10=[ITEM($t8," + + " $t9)], a3=[$t1], _MAP=[$t8], $f2=[$t10])\n" + + " EnumerableLimit(fetch=[50000])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch," + + " test_dynamic_right]])\n"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testJoinDynamicWithDynamicWithoutCast1() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on l.a4 = r.a4 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); + assertEquals( + "Join condition needs to use specific type. Please cast explicitly.", th.getMessage()); + } + + @Test + public void testJoinDynamicWithDynamicWithoutCast2() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on r.a4 = l.a4 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); + assertEquals( + "Join condition needs to use specific type. Please cast explicitly.", th.getMessage()); + } + + @Test + public void testJoinDynamicWithDynamicWithoutCast3() throws IOException { + String query = + source(TEST_DYNAMIC_LEFT, "join a4 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); + Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); + assertEquals( + "Source key `a4` needs to be specific type. Please cast explicitly.", th.getMessage()); + } + + @Test + public void testJoinDynamicWithDynamic() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on cast(l.a4 as int) = cast(r.a4 as int) " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); + + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$1], a3=[$4]," + + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + + " LogicalJoin(condition=[=(SAFE_CAST(ITEM($2, 'a4')), SAFE_CAST(ITEM($5," + + " 'a4')))], joinType=[inner])\n" + + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" + + " LogicalProject(a2=[$0], a3=[$1], _MAP=[$8])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..6=[{inputs}], expr#7=['a1'], expr#8=[ITEM($t5, $t7)]," + + " expr#9=[COALESCE($t8, $t0)], expr#10=[MAP_CONCAT($t2, $t5)], expr#11=['a4']," + + " expr#12=[ITEM($t10, $t11)], expr#13=['a5'], expr#14=[ITEM($t10, $t13)], a1=[$t9]," + + " a2=[$t1], a3=[$t4], a4=[$t12], a5=[$t14])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableMergeJoin(condition=[=($3, $6)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$3], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a4'], expr#10=[ITEM($t8," + + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableSort(sort0=[$2], dir0=[ASC])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a4'], expr#10=[ITEM($t8," + + " $t9)], expr#11=[SAFE_CAST($t10)], a3=[$t1], _MAP=[$t8], $f2=[$t11])\n" + + " EnumerableLimit(fetch=[50000])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch," + + " test_dynamic_right]])\n"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testLookupStaticWithDynamic() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a1 | fields a1, a2, a3, a4, a5"); + + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($17, 'a1'), $0)], a2=[$9], a3=[$10]," + + " a4=[ITEM(MAP_CONCAT($8, $17), 'a4')], a5=[ITEM(MAP_CONCAT($8, $17), 'a5')])\n" + + " LogicalJoin(condition=[=($0, ITEM($17, 'a1'))], joinType=[left])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t4, $t6)]," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t1, $t4)], expr#10=['a4']," + + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," + + " a2=[$t2], a3=[$t3], a4=[$t11], a5=[$t13])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableHashJoin(condition=[=($0, $5)], joinType=[left])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], a1=[$t0], _MAP=[$t8])\n" + + " EnumerableLimit(fetch=[200])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a1'], expr#10=[ITEM($t8," + + " $t9)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t10])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_right]])\n"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testLookupStaticWithStatic() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a2 | fields a1, a2, a3, a4, a5"); + + assertExplainYaml( + query, + "calcite:\n" + + " logical: |\n" + + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + + " LogicalProject(a1=[COALESCE(ITEM($17, 'a1'), $0)], a2=[$1], a3=[$10]," + + " a4=[ITEM(MAP_CONCAT($8, $17), 'a4')], a5=[ITEM(MAP_CONCAT($8, $17), 'a5')])\n" + + " LogicalJoin(condition=[=($1, $9)], joinType=[left])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + + " physical: |\n" + + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t5, $t6)]," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t2, $t5)], expr#10=['a4']," + + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," + + " a2=[$t1], a3=[$t4], a4=[$t11], a5=[$t13])\n" + + " EnumerableLimit(fetch=[200])\n" + + " EnumerableHashJoin(condition=[=($1, $3)], joinType=[left])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}], _MAP=[$t8])\n" + + " EnumerableLimit(fetch=[200])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}], _MAP=[$t8])\n" + + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_right]])\n"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testLookupDynamicWithStaticWithoutCast() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a3 | fields a1, a2, a3, a4, a5"); + + Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); + assertEquals( + "Source key `a3` needs to be specific type. Please cast explicitly.", th.getMessage()); + } + + @Test + public void testLookupDynamicWithStaticWithCast() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "eval a3=cast(a3 as string)" + + "| lookup " + + TEST_DYNAMIC_RIGHT + + " a3 | fields a1, a2, a3, a4, a5"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + @Test + public void testLookupDynamicWithDynamicWithoutCast() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a4 | fields a1, a2, a3, a4, a5"); + Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); + assertEquals( + "Source key `a4` needs to be specific type. Please cast explicitly.", th.getMessage()); + } + + @Test + public void testLookupDynamicWithDynamicWithCast() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "eval a4=cast(a4 as int)" + + "|lookup " + + TEST_DYNAMIC_RIGHT + + " a4 | fields a1, a2, a3, a4, a5"); + + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + + private void createLeftIndex() throws IOException { + if (isIndexExist(client(), TEST_DYNAMIC_LEFT)) { + return; + } + + String mapping = + "{" + + "\"mappings\": {" + // Disable dynamic mapping - extra fields won't be indexed but will be stored + + " \"dynamic\": false," + + " \"properties\": {" + + " \"a1\": {\"type\": \"text\"}," + + " \"a2\": {\"type\": \"long\"}" + + " }" + + "}" + + "}"; + + createIndexByRestClient(client(), TEST_DYNAMIC_LEFT, mapping); + + String bulkData = + "{\"index\":{\"_id\":\"1\"}}\n" + + "{\"a1\":\"1-a1\",\"a2\":12,\"a3\":\"1-a3\",\"a4\":14,\"a5\":\"1-a5\"}\n" + + "{\"index\":{\"_id\":\"2\"}}\n" + + "{\"a1\":\"2-a1\",\"a2\":22,\"a3\":\"2-a3\",\"a4\":24,\"a5\":\"2-a5\"}\n" + + "{\"index\":{\"_id\":\"3\"}}\n" + + "{\"a1\":\"3-a1\",\"a2\":32,\"a3\":\"3-a3\",\"a4\":34}\n" + + "{\"index\":{\"_id\":\"4\"}}\n" + + "{\"a1\":\"4-a1\",\"a2\":42,\"a3\":\"4-a3\",\"a4\":44}\n"; + + Request request = new Request("POST", "/" + TEST_DYNAMIC_LEFT + "/_bulk?refresh=true"); + request.setJsonEntity(bulkData); + client().performRequest(request); + } + + private void createRightIndex() throws IOException { + if (isIndexExist(client(), TEST_DYNAMIC_RIGHT)) { + return; + } + + String mapping = + "{" + + "\"mappings\": {" + // Disable dynamic mapping - extra fields won't be indexed but will be stored + + " \"dynamic\": false," + + " \"properties\": {" + + " \"a2\": {\"type\": \"long\"}," + + " \"a3\": {\"type\": \"text\"}" + + " }" + + "}" + + "}"; + + createIndexByRestClient(client(), TEST_DYNAMIC_RIGHT, mapping); + + String bulkData = + "{\"index\":{\"_id\":\"1\"}}\n" + + "{\"a1\":\"1-a1\",\"a2\":12,\"a3\":\"1-a3\",\"a4\":14,\"a5\":\"1-a5_2\"}\n" + + "{\"index\":{\"_id\":\"2\"}}\n" + + "{\"a1\":\"2-a1\",\"a2\":22,\"a3\":\"2-a3\",\"a4\":24}\n" + + "{\"index\":{\"_id\":\"3\"}}\n" + + "{\"a1\":\"3-a1\",\"a2\":32,\"a3\":\"3-a3\",\"a4\":34,\"a5\":\"3-a5_2\"}\n" + + "{\"index\":{\"_id\":\"4\"}}\n" + + "{\"a1\":\"4-a1\",\"a2\":42,\"a3\":\"4-a3\",\"a4\":44}\n"; + + Request request = new Request("POST", "/" + TEST_DYNAMIC_RIGHT + "/_bulk?refresh=true"); + request.setJsonEntity(bulkData); + client().performRequest(request); + } + + private void verifyJoinResult(JSONObject result) { + verifySchema( + result, + schema("a1", "string"), + schema("a2", "bigint"), + schema("a3", "string"), + schema("a4", "int"), + schema("a5", "string")); + verifyDataRows( + result, + rows("1-a1", 12, "1-a3", 14, "1-a5_2"), + rows("2-a1", 22, "2-a3", 24, "2-a5"), + rows("3-a1", 32, "3-a3", 34, "3-a5_2"), + rows("4-a1", 42, "4-a3", 44, null)); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java index 94bc6d8dbcd..7396eb8fcbe 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java @@ -128,7 +128,9 @@ protected ImmutableMap.Builder getDefaultSettingsBuilder() { .put(Key.PATTERN_MODE, "LABEL") .put(Key.PATTERN_MAX_SAMPLE_COUNT, 10) .put(Key.PATTERN_BUFFER_LIMIT, 100000) - .put(Key.PPL_REX_MAX_MATCH_LIMIT, 10); + .put(Key.PPL_REX_MAX_MATCH_LIMIT, 10) + .put(Key.PPL_JOIN_SUBSEARCH_MAXOUT, 50000) + .put(Key.PPL_SUBSEARCH_MAXOUT, 10); } protected Settings defaultSettings(Map settings) { From 7227cf4fa146d5a6bfac94d4bcb00fe781e4a303 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Wed, 5 Nov 2025 10:42:49 -0800 Subject: [PATCH 2/6] Fix test failure Signed-off-by: Tomoyuki Morita --- .../sql/calcite/CalciteRelNodeVisitor.java | 4 ++-- .../standalone/CalciteDynamicFieldsJoinIT.java | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 48ec3c62268..e6eec970134 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1257,12 +1257,12 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { || (node.getArgumentMap().get("overwrite").equals(Literal.TRUE))) { toBeRemovedFields = duplicatedFieldNames.stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context)) .toList(); } else { toBeRemovedFields = duplicatedFieldNames.stream() - .map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context)) + .map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context)) .toList(); } Literal max = node.getArgumentMap().get("max"); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java index 993525f2e0c..84136e2c131 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java @@ -52,7 +52,7 @@ public void testJoinWithStaticField() throws IOException { "calcite:\n" + " logical: |\n" + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" - + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$1], a3=[$4]," + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + " LogicalJoin(condition=[=($1, $3)], joinType=[inner])\n" + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" @@ -64,7 +64,7 @@ public void testJoinWithStaticField() throws IOException { + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t5, $t6)]," + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t2, $t5)], expr#10=['a4']," + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," - + " a2=[$t1], a3=[$t4], a4=[$t11], a5=[$t13])\n" + + " a2=[$t3], a3=[$t4], a4=[$t11], a5=[$t13])\n" + " EnumerableLimit(fetch=[200])\n" + " EnumerableMergeJoin(condition=[=($1, $3)], joinType=[inner])\n" + " EnumerableSort(sort0=[$1], dir0=[ASC])\n" @@ -157,7 +157,7 @@ public void testJoinDynamicWithStatic() throws IOException { + " EnumerableLimit(fetch=[50000])\n" + " EnumerableCalc(expr#0..8=[{inputs}], a3=[$t1], _MAP=[$t8])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch," - + " test_dynamic_right]])"); + + " test_dynamic_right]])\n"); JSONObject result = executeQuery(query); verifyJoinResult(result); @@ -173,7 +173,7 @@ public void testJoinStaticWithDynamic() throws IOException { "calcite:\n" + " logical: |\n" + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" - + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$1], a3=[$4]," + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + " LogicalJoin(condition=[=($0, ITEM($5, 'a1'))], joinType=[inner])\n" + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" @@ -183,17 +183,17 @@ public void testJoinStaticWithDynamic() throws IOException { + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + " physical: |\n" + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t4, $t6)]," - + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t2, $t4)], expr#10=['a4']," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t1, $t4)], expr#10=['a4']," + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," - + " a2=[$t1], a3=[$t3], a4=[$t11], a5=[$t13])\n" + + " a2=[$t2], a3=[$t3], a4=[$t11], a5=[$t13])\n" + " EnumerableLimit(fetch=[200])\n" + " EnumerableMergeJoin(condition=[=($0, $5)], joinType=[inner])\n" + " EnumerableSort(sort0=[$0], dir0=[ASC])\n" - + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}], _MAP=[$t8])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], a1=[$t0], _MAP=[$t8])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" - + " EnumerableSort(sort0=[$2], dir0=[ASC])\n" + + " EnumerableSort(sort0=[$3], dir0=[ASC])\n" + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a1'], expr#10=[ITEM($t8," - + " $t9)], a3=[$t1], _MAP=[$t8], $f2=[$t10])\n" + + " $t9)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t10])\n" + " EnumerableLimit(fetch=[50000])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch," + " test_dynamic_right]])\n"); From 898f84af0fb4d18cc1ed74948a5ea8c19378cc05 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Wed, 5 Nov 2025 14:22:49 -0800 Subject: [PATCH 3/6] Fix rexBuilder issue Signed-off-by: Tomoyuki Morita --- .../opensearch/sql/calcite/plan/PPLAggregateConvertRule.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java index 72189185bfb..790896aa5ed 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java @@ -79,7 +79,7 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec final RelBuilder rawRelBuilder = call.builder(); final RelBuilderWrapper relBuilder = new RelBuilderWrapper(rawRelBuilder); final ExtendedRexBuilder rexBuilder = - (ExtendedRexBuilder) aggregate.getCluster().getRexBuilder(); + new ExtendedRexBuilder(aggregate.getCluster().getRexBuilder()); final RelFieldBuilder fieldBuilder = new RelFieldBuilder(rawRelBuilder, rexBuilder); relBuilder.push(project.getInput()); From 9ed76e5372808041195e1c86e631e4c2233c9472 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Wed, 5 Nov 2025 15:48:34 -0800 Subject: [PATCH 4/6] Fix join issue Signed-off-by: Tomoyuki Morita --- .../java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java b/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java index 86d99955891..f80c04cf385 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java +++ b/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java @@ -170,6 +170,10 @@ public void reorganizeDynamicFields(List leftAllFields, List rig boolean leftHasDynamicFields = leftAllFields.contains(DYNAMIC_FIELDS_MAP); boolean rightHasDynamicFields = rightAllFields.contains(DYNAMIC_FIELDS_MAP); + if (!leftHasDynamicFields && !rightHasDynamicFields) { + return; + } + List fields = collectStaticFieldsInfo(leftAllFields, rightAllFields); Optional leftMap = leftHasDynamicFields ? Optional.of(getLeftDynamicFieldMapInfo()) : Optional.empty(); From e2587bd0d1f1293e7502429fca518bb69435dda4 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Tue, 11 Nov 2025 13:38:31 -0800 Subject: [PATCH 5/6] Cast join condition automatically Signed-off-by: Tomoyuki Morita --- .../sql/calcite/CalciteRelNodeVisitor.java | 11 +- .../sql/calcite/ExtendedRexBuilder.java | 49 +++++ .../sql/calcite/rel/RelFieldBuilder.java | 5 - .../sql/calcite/utils/JoinAndLookupUtils.java | 30 +-- .../sql/calcite/utils/RexConverter.java | 136 ++++++++++++++ .../sql/calcite/utils/RexConverterTest.java | 177 ++++++++++++++++++ .../CalciteDynamicFieldsJoinIT.java | 97 ++-------- 7 files changed, 385 insertions(+), 120 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/calcite/utils/RexConverter.java create mode 100644 core/src/test/java/org/opensearch/sql/calcite/utils/RexConverterTest.java diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index e6eec970134..351527d218b 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1296,7 +1296,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { node.getJoinCondition() .map(c -> rexVisitor.analyzeJoinCondition(c, context)) .orElse(context.relBuilder.literal(true)); - JoinAndLookupUtils.verifyJoinConditionNotUseAnyType(joinCondition, context); + joinCondition = context.rexBuilder.castAnyToAlignTypes(joinCondition, context); if (node.getJoinType() == SEMI || node.getJoinType() == ANTI) { // semi and anti join only return left table outputs context.relBuilder.join( @@ -1376,14 +1376,9 @@ public Void visitInputRef(RexInputRef inputRef) { private static RexNode buildJoinConditionByFieldName( CalcitePlanContext context, String fieldName) { - RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsInRight(fieldName, context); RexNode sourceKey = JoinAndLookupUtils.analyzeFieldsInLeft(fieldName, context); - if (context.fieldBuilder.isAnyType(sourceKey)) { - throw new IllegalArgumentException( - String.format( - "Source key `%s` needs to be specific type. Please cast explicitly.", fieldName)); - } - return context.rexBuilder.equals(sourceKey, lookupKey); + RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsInRight(fieldName, context); + return context.rexBuilder.equalsWithCastAsNeeded(sourceKey, lookupKey); } @Override diff --git a/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java b/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java index 4d86614895b..daa62eba9be 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java +++ b/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java @@ -12,9 +12,11 @@ import org.apache.calcite.avatica.util.TimeUnit; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeName; @@ -22,6 +24,7 @@ import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.calcite.type.AbstractExprRelDataType; import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.calcite.utils.RexConverter; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.SemanticCheckException; @@ -41,6 +44,20 @@ public RexNode equals(RexNode n1, RexNode n2) { return this.makeCall(SqlStdOperatorTable.EQUALS, n1, n2); } + /** Make equals call with adding cast in case the node type is ANY. */ + public RexNode equalsWithCastAsNeeded(RexNode n1, RexNode n2) { + if (isAnyType(n1) && isAnyType(n2)) { + n1 = castToString(n1); + n2 = castToString(n2); + } else if (isAnyType(n1)) { + n1 = castToTargetType(n1, n2); + } else if (isAnyType(n2)) { + n2 = castToTargetType(n2, n1); + } + + return equals(n1, n2); + } + public RexNode and(RexNode left, RexNode right) { final RelDataType booleanType = this.getTypeFactory().createSqlType(SqlTypeName.BOOLEAN); return this.makeCall(booleanType, SqlStdOperatorTable.AND, List.of(left, right)); @@ -163,4 +180,36 @@ else if ((SqlTypeUtil.isApproximateNumeric(sourceType) || SqlTypeUtil.isDecimal( } return super.makeCast(pos, type, exp, matchNullability, safe, format); } + + public boolean isAnyType(RexNode node) { + return node.getType().getSqlTypeName().equals(SqlTypeName.ANY); + } + + public RexNode castToString(RexNode node) { + RelDataType stringType = getTypeFactory().createSqlType(SqlTypeName.VARCHAR); + RelDataType nullableStringType = getTypeFactory().createTypeWithNullability(stringType, true); + return makeCast(nullableStringType, node, true, true); + } + + /** cast node to the same type as target */ + public RexNode castToTargetType(RexNode node, RexNode target) { + return makeCast(target.getType(), node, true, true); + } + + /** Utility to cast ANY to specific types to avoid compare issue */ + RexNode castAnyToAlignTypes(RexNode rexNode, CalcitePlanContext context) { + return rexNode.accept( + new RexConverter() { + @Override + public RexNode visitCall(RexCall call) { + if (call.getKind() == SqlKind.EQUALS) { + RexNode n0 = call.operands.get(0); + RexNode n1 = call.operands.get(1); + return super.visitCall((RexCall) equalsWithCastAsNeeded(n0, n1)); + } else { + return super.visitCall(call); + } + } + }); + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java b/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java index f80c04cf385..ea72f09f4fe 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java +++ b/core/src/main/java/org/opensearch/sql/calcite/rel/RelFieldBuilder.java @@ -19,7 +19,6 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.RelBuilder; import org.opensearch.sql.calcite.ExtendedRexBuilder; import org.opensearch.sql.calcite.plan.OpenSearchConstants; @@ -104,10 +103,6 @@ public ImmutableList staticFields(List ordinals) { return relBuilder.fields(ordinals); } - public boolean isAnyType(RexNode node) { - return node.getType().getSqlTypeName().equals(SqlTypeName.ANY); - } - public boolean isDynamicFieldsExist() { return isDynamicFieldsExist(1, 0); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java index cb65e77e749..74910a9a521 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java @@ -11,10 +11,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexVisitorImpl; -import org.apache.calcite.sql.SqlKind; import org.apache.calcite.util.Pair; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Lookup; @@ -78,25 +75,6 @@ static void addProjectionIfNecessary(Lookup node, CalcitePlanContext context) { } } - /** Utility to verify join condition does not use ANY typed field to avoid */ - static void verifyJoinConditionNotUseAnyType(RexNode rexNode, CalcitePlanContext context) { - rexNode.accept( - new RexVisitorImpl(true) { - @Override - public Void visitCall(RexCall call) { - if (call.getKind() == SqlKind.EQUALS) { - RexNode left = call.operands.get(0); - RexNode right = call.operands.get(1); - if (context.fieldBuilder.isAnyType(left) || context.fieldBuilder.isAnyType(right)) { - throw new IllegalArgumentException( - "Join condition needs to use specific type. Please cast explicitly."); - } - } - return super.visitCall(call); - } - }); - } - static void addJoinForLookUp(Lookup node, CalcitePlanContext context) { RexNode joinCondition = node.getMappingAliasMap().entrySet().stream() @@ -104,13 +82,7 @@ static void addJoinForLookUp(Lookup node, CalcitePlanContext context) { entry -> { RexNode lookupKey = analyzeFieldsInRight(entry.getKey(), context); RexNode sourceKey = analyzeFieldsInLeft(entry.getValue(), context); - if (context.fieldBuilder.isAnyType(sourceKey)) { - throw new IllegalArgumentException( - String.format( - "Source key `%s` needs to be specific type. Please cast explicitly.", - entry.getValue())); - } - return context.rexBuilder.equals(sourceKey, lookupKey); + return context.rexBuilder.equalsWithCastAsNeeded(sourceKey, lookupKey); }) .reduce(context.rexBuilder::and) .orElse(context.relBuilder.literal(true)); diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/RexConverter.java b/core/src/main/java/org/opensearch/sql/calcite/utils/RexConverter.java new file mode 100644 index 00000000000..f4dfec7d538 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/RexConverter.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLambda; +import org.apache.calcite.rex.RexLambdaRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexPatternFieldRef; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexVisitor; + +/** + * Base class for converting specific portions of a RexNode tree by overriding the node types of + * interest. This class implements the visitor pattern for RexNode traversal, providing default + * implementations that return nodes unchanged. Subclasses can override specific visit methods to + * transform particular node types while leaving others untouched. + */ +public class RexConverter implements RexVisitor { + + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + return inputRef; + } + + @Override + public RexNode visitLocalRef(RexLocalRef localRef) { + return localRef; + } + + @Override + public RexNode visitLiteral(RexLiteral literal) { + return literal; + } + + @Override + public RexNode visitCall(RexCall call) { + List operands = + call.getOperands().stream() + .map(operand -> operand.accept(this)) + .collect(Collectors.toList()); + if (operands.equals(call.getOperands())) { + return call; + } + return call.clone(call.getType(), operands); + } + + @Override + public RexNode visitOver(RexOver over) { + List operands = + over.getOperands().stream() + .map(operand -> operand.accept(this)) + .collect(Collectors.toList()); + if (operands.equals(over.getOperands())) { + return over; + } + return over.clone(over.getType(), operands); + } + + @Override + public RexNode visitCorrelVariable(RexCorrelVariable correlVariable) { + return correlVariable; + } + + @Override + public RexNode visitDynamicParam(RexDynamicParam dynamicParam) { + return dynamicParam; + } + + @Override + public RexNode visitRangeRef(RexRangeRef rangeRef) { + return rangeRef; + } + + @Override + public RexNode visitFieldAccess(RexFieldAccess fieldAccess) { + RexNode expr = fieldAccess.getReferenceExpr().accept(this); + if (expr == fieldAccess.getReferenceExpr()) { + return fieldAccess; + } + throw new UnsupportedOperationException( + "RexFieldAccess transformation not supported. Override visitFieldAccess() to handle this" + + " case."); + } + + @Override + public RexNode visitSubQuery(RexSubQuery subQuery) { + List operands = + subQuery.getOperands().stream() + .map(operand -> operand.accept(this)) + .collect(Collectors.toList()); + if (operands.equals(subQuery.getOperands())) { + return subQuery; + } + return subQuery.clone(subQuery.getType(), operands); + } + + @Override + public RexNode visitTableInputRef(RexTableInputRef fieldRef) { + return fieldRef; + } + + @Override + public RexNode visitPatternFieldRef(RexPatternFieldRef fieldRef) { + return fieldRef; + } + + @Override + public RexNode visitLambda(RexLambda lambda) { + RexNode expr = lambda.getExpression().accept(this); + if (expr == lambda.getExpression()) { + return lambda; + } + throw new UnsupportedOperationException( + "RexLambda transformation not supported. Override visitLambda() to handle this case."); + } + + @Override + public RexNode visitLambdaRef(RexLambdaRef lambdaRef) { + return lambdaRef; + } +} diff --git a/core/src/test/java/org/opensearch/sql/calcite/utils/RexConverterTest.java b/core/src/test/java/org/opensearch/sql/calcite/utils/RexConverterTest.java new file mode 100644 index 00000000000..9e84715435f --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/calcite/utils/RexConverterTest.java @@ -0,0 +1,177 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.math.BigDecimal; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLambda; +import org.apache.calcite.rex.RexLambdaRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class RexConverterTest { + + private RexBuilder rexBuilder; + private RelDataType intType; + + @BeforeEach + void setUp() { + rexBuilder = new RexBuilder(OpenSearchTypeFactory.TYPE_FACTORY); + intType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.INTEGER); + } + + @Test + void testVisitInputRef_returnsUnchanged() { + RexConverter converter = new RexConverter(); + RexInputRef inputRef = rexBuilder.makeInputRef(intType, 0); + + RexNode result = inputRef.accept(converter); + + assertSame(inputRef, result); + } + + @Test + void testVisitLiteral_returnsUnchanged() { + RexConverter converter = new RexConverter(); + RexLiteral literal = rexBuilder.makeExactLiteral(BigDecimal.valueOf(42)); + + RexNode result = literal.accept(converter); + + assertSame(literal, result); + } + + @Test + void testVisitCall_withNoTransformation_returnsOriginal() { + RexConverter converter = new RexConverter(); + RexNode left = rexBuilder.makeInputRef(intType, 0); + RexNode right = rexBuilder.makeInputRef(intType, 1); + RexCall call = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.PLUS, left, right); + + RexNode result = call.accept(converter); + + assertSame(call, result); + } + + @Test + void testVisitCall_withTransformation_returnsNewNode() { + RexConverter converter = + new RexConverter() { + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + return rexBuilder.makeInputRef(inputRef.getType(), inputRef.getIndex() + 10); + } + }; + + RexNode left = rexBuilder.makeInputRef(intType, 0); + RexNode right = rexBuilder.makeInputRef(intType, 1); + RexCall call = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.PLUS, left, right); + + RexNode result = call.accept(converter); + + assertNotSame(call, result); + RexCall resultCall = (RexCall) result; + assertEquals(2, resultCall.getOperands().size()); + assertEquals(10, ((RexInputRef) resultCall.getOperands().get(0)).getIndex()); + assertEquals(11, ((RexInputRef) resultCall.getOperands().get(1)).getIndex()); + } + + @Test + void testVisitCall_nestedCalls_transformsRecursively() { + RexConverter converter = + new RexConverter() { + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + return rexBuilder.makeInputRef(inputRef.getType(), inputRef.getIndex() + 1); + } + }; + + RexNode a = rexBuilder.makeInputRef(intType, 0); + RexNode b = rexBuilder.makeInputRef(intType, 1); + RexNode c = rexBuilder.makeInputRef(intType, 2); + RexCall innerCall = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.PLUS, a, b); + RexCall outerCall = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, innerCall, c); + + RexNode result = outerCall.accept(converter); + + assertNotSame(outerCall, result); + RexCall resultCall = (RexCall) result; + RexCall innerResult = (RexCall) resultCall.getOperands().get(0); + assertEquals(1, ((RexInputRef) innerResult.getOperands().get(0)).getIndex()); + assertEquals(2, ((RexInputRef) innerResult.getOperands().get(1)).getIndex()); + assertEquals(3, ((RexInputRef) resultCall.getOperands().get(1)).getIndex()); + } + + @Test + void testVisitLambda_withNoTransformation_returnsOriginal() { + RexConverter converter = new RexConverter(); + RexLambdaRef lambdaRef = new RexLambdaRef(0, "x", intType); + RexLambda lambda = + (RexLambda) rexBuilder.makeLambdaCall(lambdaRef, java.util.List.of(lambdaRef)); + + RexNode result = lambda.accept(converter); + + assertSame(lambda, result); + } + + @Test + void testVisitLambda_withTransformation_throwsException() { + RexConverter converter = + new RexConverter() { + @Override + public RexNode visitLambdaRef(RexLambdaRef lambdaRef) { + return new RexLambdaRef( + lambdaRef.getIndex() + 1, lambdaRef.getName(), lambdaRef.getType()); + } + }; + + RexLambdaRef lambdaRef = new RexLambdaRef(0, "x", intType); + RexLambda lambda = + (RexLambda) rexBuilder.makeLambdaCall(lambdaRef, java.util.List.of(lambdaRef)); + + UnsupportedOperationException exception = + assertThrows(UnsupportedOperationException.class, () -> lambda.accept(converter)); + assertEquals( + "RexLambda transformation not supported. Override visitLambda() to handle this case.", + exception.getMessage()); + } + + @Test + void testCustomConverter_selectiveTransformation() { + RexConverter converter = + new RexConverter() { + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + if (inputRef.getIndex() == 0) { + return rexBuilder.makeInputRef(inputRef.getType(), 99); + } + return inputRef; + } + }; + + RexNode input0 = rexBuilder.makeInputRef(intType, 0); + RexNode input1 = rexBuilder.makeInputRef(intType, 1); + RexCall call = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.PLUS, input0, input1); + + RexNode result = call.accept(converter); + + assertNotSame(call, result); + RexCall resultCall = (RexCall) result; + assertEquals(99, ((RexInputRef) resultCall.getOperands().get(0)).getIndex()); + assertEquals(1, ((RexInputRef) resultCall.getOperands().get(1)).getIndex()); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java index 84136e2c131..71f23421985 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java @@ -54,7 +54,7 @@ public void testJoinWithStaticField() throws IOException { + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" - + " LogicalJoin(condition=[=($1, $3)], joinType=[inner])\n" + + " LogicalJoin(condition=[=($3, $1)], joinType=[inner])\n" + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" @@ -92,9 +92,8 @@ public void testJoinDynamicWithStaticWithoutCast1() throws IOException { "join left = l right = r on l.a3 = r.a3 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); - Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); - assertEquals( - "Join condition needs to use specific type. Please cast explicitly.", th.getMessage()); + JSONObject result = executeQuery(query); + verifyJoinResult(result); } @Test @@ -105,18 +104,16 @@ public void testJoinDynamicWithStaticWithoutCast2() throws IOException { "join left = l right = r on r.a3 = l.a3 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); - Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); - assertEquals( - "Join condition needs to use specific type. Please cast explicitly.", th.getMessage()); + JSONObject result = executeQuery(query); + verifyJoinResult(result); } @Test public void testJoinDynamicWithStaticWithoutCast3() throws IOException { String query = source(TEST_DYNAMIC_LEFT, "join a3 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); - Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); - assertEquals( - "Source key `a3` needs to be specific type. Please cast explicitly.", th.getMessage()); + JSONObject result = executeQuery(query); + verifyJoinResult(result); } @Test @@ -175,7 +172,8 @@ public void testJoinStaticWithDynamic() throws IOException { + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" - + " LogicalJoin(condition=[=($0, ITEM($5, 'a1'))], joinType=[inner])\n" + + " LogicalJoin(condition=[=(SAFE_CAST(ITEM($5, 'a1')), $0)]," + + " joinType=[inner])\n" + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" @@ -193,7 +191,7 @@ public void testJoinStaticWithDynamic() throws IOException { + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + " EnumerableSort(sort0=[$3], dir0=[ASC])\n" + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a1'], expr#10=[ITEM($t8," - + " $t9)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t10])\n" + + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + " EnumerableLimit(fetch=[50000])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch," + " test_dynamic_right]])\n"); @@ -210,9 +208,8 @@ public void testJoinDynamicWithDynamicWithoutCast1() throws IOException { "join left = l right = r on l.a4 = r.a4 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); - Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); - assertEquals( - "Join condition needs to use specific type. Please cast explicitly.", th.getMessage()); + JSONObject result = executeQuery(query); + verifyJoinResult(result); } @Test @@ -223,18 +220,16 @@ public void testJoinDynamicWithDynamicWithoutCast2() throws IOException { "join left = l right = r on r.a4 = l.a4 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); - Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); - assertEquals( - "Join condition needs to use specific type. Please cast explicitly.", th.getMessage()); + JSONObject result = executeQuery(query); + verifyJoinResult(result); } @Test public void testJoinDynamicWithDynamicWithoutCast3() throws IOException { String query = source(TEST_DYNAMIC_LEFT, "join a4 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); - Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); - assertEquals( - "Source key `a4` needs to be specific type. Please cast explicitly.", th.getMessage()); + JSONObject result = executeQuery(query); + verifyJoinResult(result); } @Test @@ -295,7 +290,8 @@ public void testLookupStaticWithDynamic() throws IOException { + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + " LogicalProject(a1=[COALESCE(ITEM($17, 'a1'), $0)], a2=[$9], a3=[$10]," + " a4=[ITEM(MAP_CONCAT($8, $17), 'a4')], a5=[ITEM(MAP_CONCAT($8, $17), 'a5')])\n" - + " LogicalJoin(condition=[=($0, ITEM($17, 'a1'))], joinType=[left])\n" + + " LogicalJoin(condition=[=($0, SAFE_CAST(ITEM($17, 'a1')))]," + + " joinType=[left])\n" + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + " physical: |\n" @@ -309,7 +305,7 @@ public void testLookupStaticWithDynamic() throws IOException { + " EnumerableLimit(fetch=[200])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a1'], expr#10=[ITEM($t8," - + " $t9)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t10])\n" + + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_right]])\n"); JSONObject result = executeQuery(query); @@ -321,30 +317,6 @@ public void testLookupStaticWithStatic() throws IOException { String query = source( TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a2 | fields a1, a2, a3, a4, a5"); - - assertExplainYaml( - query, - "calcite:\n" - + " logical: |\n" - + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" - + " LogicalProject(a1=[COALESCE(ITEM($17, 'a1'), $0)], a2=[$1], a3=[$10]," - + " a4=[ITEM(MAP_CONCAT($8, $17), 'a4')], a5=[ITEM(MAP_CONCAT($8, $17), 'a5')])\n" - + " LogicalJoin(condition=[=($1, $9)], joinType=[left])\n" - + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" - + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" - + " physical: |\n" - + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t5, $t6)]," - + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t2, $t5)], expr#10=['a4']," - + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," - + " a2=[$t1], a3=[$t4], a4=[$t11], a5=[$t13])\n" - + " EnumerableLimit(fetch=[200])\n" - + " EnumerableHashJoin(condition=[=($1, $3)], joinType=[left])\n" - + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}], _MAP=[$t8])\n" - + " EnumerableLimit(fetch=[200])\n" - + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" - + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}], _MAP=[$t8])\n" - + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_right]])\n"); - JSONObject result = executeQuery(query); verifyJoinResult(result); } @@ -354,22 +326,6 @@ public void testLookupDynamicWithStaticWithoutCast() throws IOException { String query = source( TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a3 | fields a1, a2, a3, a4, a5"); - - Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); - assertEquals( - "Source key `a3` needs to be specific type. Please cast explicitly.", th.getMessage()); - } - - @Test - public void testLookupDynamicWithStaticWithCast() throws IOException { - String query = - source( - TEST_DYNAMIC_LEFT, - "eval a3=cast(a3 as string)" - + "| lookup " - + TEST_DYNAMIC_RIGHT - + " a3 | fields a1, a2, a3, a4, a5"); - JSONObject result = executeQuery(query); verifyJoinResult(result); } @@ -379,21 +335,6 @@ public void testLookupDynamicWithDynamicWithoutCast() throws IOException { String query = source( TEST_DYNAMIC_LEFT, "lookup " + TEST_DYNAMIC_RIGHT + " a4 | fields a1, a2, a3, a4, a5"); - Throwable th = assertThrows(IllegalArgumentException.class, () -> executeQuery(query)); - assertEquals( - "Source key `a4` needs to be specific type. Please cast explicitly.", th.getMessage()); - } - - @Test - public void testLookupDynamicWithDynamicWithCast() throws IOException { - String query = - source( - TEST_DYNAMIC_LEFT, - "eval a4=cast(a4 as int)" - + "|lookup " - + TEST_DYNAMIC_RIGHT - + " a4 | fields a1, a2, a3, a4, a5"); - JSONObject result = executeQuery(query); verifyJoinResult(result); } From 1bdfe5362dbe35edc8949d25d9fa457c1861e965 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Tue, 11 Nov 2025 15:24:47 -0800 Subject: [PATCH 6/6] Fix IT Signed-off-by: Tomoyuki Morita --- .../CalciteDynamicFieldsJoinIT.java | 140 ++++++------------ 1 file changed, 49 insertions(+), 91 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java index 71f23421985..40a5da7f0a8 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDynamicFieldsJoinIT.java @@ -14,7 +14,6 @@ import java.io.IOException; import org.json.JSONObject; -import org.junit.Ignore; import org.junit.jupiter.api.Test; import org.opensearch.client.Request; @@ -32,18 +31,7 @@ public void init() throws IOException { } @Test - @Ignore("pending join adaptation") - public void testExpand() throws IOException { - String query = source(TEST_DYNAMIC_LEFT, "expand arr as expanded | where isnotnull(expanded)"); - - JSONObject result = executeQuery(query); - - verifySchema(result, schema("department", "string"), schema("count", "bigint")); - verifyDataRows(result, rows("Engineering", 2)); - } - - @Test - public void testJoinWithStaticField() throws IOException { + public void testJoinStaticWithStaticByFieldName() throws IOException { String query = source(TEST_DYNAMIC_LEFT, "join a2 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); @@ -54,7 +42,7 @@ public void testJoinWithStaticField() throws IOException { + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" - + " LogicalJoin(condition=[=($3, $1)], joinType=[inner])\n" + + " LogicalJoin(condition=[=($1, $3)], joinType=[inner])\n" + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" + " LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])\n" @@ -78,18 +66,15 @@ public void testJoinWithStaticField() throws IOException { + " test_dynamic_right]])\n"); JSONObject result = executeQuery(query); - - System.out.println(result.toString()); - verifyJoinResult(result); } @Test - public void testJoinDynamicWithStaticWithoutCast1() throws IOException { + public void testJoinStaticWithStaticByCriteria() throws IOException { String query = source( TEST_DYNAMIC_LEFT, - "join left = l right = r on l.a3 = r.a3 " + "join left=l right=r on l.a2 = r.a2 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); JSONObject result = executeQuery(query); @@ -97,40 +82,15 @@ public void testJoinDynamicWithStaticWithoutCast1() throws IOException { } @Test - public void testJoinDynamicWithStaticWithoutCast2() throws IOException { - String query = - source( - TEST_DYNAMIC_LEFT, - "join left = l right = r on r.a3 = l.a3 " - + TEST_DYNAMIC_RIGHT - + " | fields a1, a2, a3, a4, a5"); - JSONObject result = executeQuery(query); - verifyJoinResult(result); - } - - @Test - public void testJoinDynamicWithStaticWithoutCast3() throws IOException { + public void testJoinDynamicWithStaticByFieldName() throws IOException { String query = source(TEST_DYNAMIC_LEFT, "join a3 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); - JSONObject result = executeQuery(query); - verifyJoinResult(result); - } - - @Test - public void testJoinDynamicWithStatic() throws IOException { - String query = - source( - TEST_DYNAMIC_LEFT, - "join left = l right = r on cast(l.a3 as string) = r.a3 " - + TEST_DYNAMIC_RIGHT - + " | fields a1, a2, a3, a4, a5"); - assertExplainYaml( query, "calcite:\n" + " logical: |\n" + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" - + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$1], a3=[$4]," + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + " LogicalJoin(condition=[=(SAFE_CAST(ITEM($2, 'a3')), $4)]," + " joinType=[inner])\n" @@ -141,27 +101,39 @@ public void testJoinDynamicWithStatic() throws IOException { + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + " physical: |\n" + " EnumerableCalc(expr#0..5=[{inputs}], expr#6=['a1'], expr#7=[ITEM($t5, $t6)]," - + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t2, $t5)], expr#10=['a4']," + + " expr#8=[COALESCE($t7, $t0)], expr#9=[MAP_CONCAT($t1, $t5)], expr#10=['a4']," + " expr#11=[ITEM($t9, $t10)], expr#12=['a5'], expr#13=[ITEM($t9, $t12)], a1=[$t8]," - + " a2=[$t1], a3=[$t4], a4=[$t11], a5=[$t13])\n" + + " a2=[$t3], a3=[$t4], a4=[$t11], a5=[$t13])\n" + " EnumerableLimit(fetch=[200])\n" - + " EnumerableMergeJoin(condition=[=($3, $4)], joinType=[inner])\n" - + " EnumerableSort(sort0=[$3], dir0=[ASC])\n" + + " EnumerableMergeJoin(condition=[=($2, $4)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$2], dir0=[ASC])\n" + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a3'], expr#10=[ITEM($t8," - + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + + " $t9)], expr#11=[SAFE_CAST($t10)], a1=[$t0], _MAP=[$t8], $f2=[$t11])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" - + " EnumerableSort(sort0=[$0], dir0=[ASC])\n" + + " EnumerableSort(sort0=[$1], dir0=[ASC])\n" + " EnumerableLimit(fetch=[50000])\n" - + " EnumerableCalc(expr#0..8=[{inputs}], a3=[$t1], _MAP=[$t8])\n" + + " EnumerableCalc(expr#0..8=[{inputs}], proj#0..1=[{exprs}]," + + " _MAP=[$t8])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch," + " test_dynamic_right]])\n"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + @Test + public void testJoinDynamicWithStaticByCriteria() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on l.a3 = r.a3 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); JSONObject result = executeQuery(query); verifyJoinResult(result); } @Test - public void testJoinStaticWithDynamic() throws IOException { + public void testJoinStaticWithDynamicByFieldName() throws IOException { String query = source(TEST_DYNAMIC_LEFT, "join a1 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); @@ -172,7 +144,7 @@ public void testJoinStaticWithDynamic() throws IOException { + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" - + " LogicalJoin(condition=[=(SAFE_CAST(ITEM($5, 'a1')), $0)]," + + " LogicalJoin(condition=[=($0, SAFE_CAST(ITEM($5, 'a1')))]," + " joinType=[inner])\n" + " LogicalProject(a1=[$0], a2=[$1], _MAP=[$8])\n" + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" @@ -201,23 +173,11 @@ public void testJoinStaticWithDynamic() throws IOException { } @Test - public void testJoinDynamicWithDynamicWithoutCast1() throws IOException { - String query = - source( - TEST_DYNAMIC_LEFT, - "join left = l right = r on l.a4 = r.a4 " - + TEST_DYNAMIC_RIGHT - + " | fields a1, a2, a3, a4, a5"); - JSONObject result = executeQuery(query); - verifyJoinResult(result); - } - - @Test - public void testJoinDynamicWithDynamicWithoutCast2() throws IOException { + public void testJoinStaticWithDynamicByCriteria() throws IOException { String query = source( TEST_DYNAMIC_LEFT, - "join left = l right = r on r.a4 = l.a4 " + "join left = l right = r on l.a1 = r.a1 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); JSONObject result = executeQuery(query); @@ -225,28 +185,15 @@ public void testJoinDynamicWithDynamicWithoutCast2() throws IOException { } @Test - public void testJoinDynamicWithDynamicWithoutCast3() throws IOException { + public void testJoinDynamicWithDynamicByFieldName() throws IOException { String query = source(TEST_DYNAMIC_LEFT, "join a4 " + TEST_DYNAMIC_RIGHT + " | fields a1, a2, a3, a4, a5"); - JSONObject result = executeQuery(query); - verifyJoinResult(result); - } - - @Test - public void testJoinDynamicWithDynamic() throws IOException { - String query = - source( - TEST_DYNAMIC_LEFT, - "join left = l right = r on cast(l.a4 as int) = cast(r.a4 as int) " - + TEST_DYNAMIC_RIGHT - + " | fields a1, a2, a3, a4, a5"); - assertExplainYaml( query, "calcite:\n" + " logical: |\n" + " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n" - + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$1], a3=[$4]," + + " LogicalProject(a1=[COALESCE(ITEM($5, 'a1'), $0)], a2=[$3], a3=[$4]," + " a4=[ITEM(MAP_CONCAT($2, $5), 'a4')], a5=[ITEM(MAP_CONCAT($2, $5), 'a5')])\n" + " LogicalJoin(condition=[=(SAFE_CAST(ITEM($2, 'a4')), SAFE_CAST(ITEM($5," + " 'a4')))], joinType=[inner])\n" @@ -257,22 +204,33 @@ public void testJoinDynamicWithDynamic() throws IOException { + " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_right]])\n" + " physical: |\n" + " EnumerableCalc(expr#0..6=[{inputs}], expr#7=['a1'], expr#8=[ITEM($t5, $t7)]," - + " expr#9=[COALESCE($t8, $t0)], expr#10=[MAP_CONCAT($t2, $t5)], expr#11=['a4']," + + " expr#9=[COALESCE($t8, $t0)], expr#10=[MAP_CONCAT($t1, $t5)], expr#11=['a4']," + " expr#12=[ITEM($t10, $t11)], expr#13=['a5'], expr#14=[ITEM($t10, $t13)], a1=[$t9]," - + " a2=[$t1], a3=[$t4], a4=[$t12], a5=[$t14])\n" + + " a2=[$t3], a3=[$t4], a4=[$t12], a5=[$t14])\n" + " EnumerableLimit(fetch=[200])\n" - + " EnumerableMergeJoin(condition=[=($3, $6)], joinType=[inner])\n" - + " EnumerableSort(sort0=[$3], dir0=[ASC])\n" + + " EnumerableMergeJoin(condition=[=($2, $6)], joinType=[inner])\n" + + " EnumerableSort(sort0=[$2], dir0=[ASC])\n" + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a4'], expr#10=[ITEM($t8," - + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + + " $t9)], expr#11=[SAFE_CAST($t10)], a1=[$t0], _MAP=[$t8], $f2=[$t11])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_left]])\n" - + " EnumerableSort(sort0=[$2], dir0=[ASC])\n" + + " EnumerableSort(sort0=[$3], dir0=[ASC])\n" + " EnumerableCalc(expr#0..8=[{inputs}], expr#9=['a4'], expr#10=[ITEM($t8," - + " $t9)], expr#11=[SAFE_CAST($t10)], a3=[$t1], _MAP=[$t8], $f2=[$t11])\n" + + " $t9)], expr#11=[SAFE_CAST($t10)], proj#0..1=[{exprs}], _MAP=[$t8], $f3=[$t11])\n" + " EnumerableLimit(fetch=[50000])\n" + " CalciteEnumerableIndexScan(table=[[OpenSearch," + " test_dynamic_right]])\n"); + JSONObject result = executeQuery(query); + verifyJoinResult(result); + } + @Test + public void testJoinDynamicWithDynamicByCriteria() throws IOException { + String query = + source( + TEST_DYNAMIC_LEFT, + "join left = l right = r on l.a4 = r.a4 " + + TEST_DYNAMIC_RIGHT + + " | fields a1, a2, a3, a4, a5"); JSONObject result = executeQuery(query); verifyJoinResult(result); }