diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 2caf6803a2..2834dc8809 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -78,6 +78,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -701,6 +702,11 @@ public LogicalPlan visitExpand(Expand expand, AnalysisContext context) { throw getOnlyForCalciteException("Expand"); } + @Override + public LogicalPlan visitMvExpand(MvExpand node, AnalysisContext context) { + throw getOnlyForCalciteException("MvExpand"); + } + /** Build {@link LogicalTrendline} for Trendline command. */ @Override public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) { diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index a8bbfc3a82..a83dc6b282 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -66,6 +66,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -451,4 +452,8 @@ public T visitAppend(Append node, C context) { public T visitMultisearch(Multisearch node, C context) { return visitChildren(node, context); } + + public T visitMvExpand(MvExpand node, C context) { + return visitChildren(node, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index 93ad06011c..c97a078c42 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -62,6 +62,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Limit; import org.opensearch.sql.ast.tree.MinSpanBin; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -136,6 +137,11 @@ public Expand expand(UnresolvedPlan input, Field field, String alias) { return new Expand(field, alias).attach(input); } + public static UnresolvedPlan mvexpand(UnresolvedPlan input, Field field, Integer limit) { + // attach the incoming child plan so the AST contains the pipeline link + return new MvExpand(field, limit).attach(input); + } + public static UnresolvedPlan projectWithArg( UnresolvedPlan input, List argList, UnresolvedExpression... projectList) { return new Project(Arrays.asList(projectList), argList).attach(input); diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/MvExpand.java b/core/src/main/java/org/opensearch/sql/ast/tree/MvExpand.java new file mode 100644 index 0000000000..540e53fd6e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/MvExpand.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import javax.annotation.Nullable; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Field; + +/** AST node representing an {@code mvexpand [limit N]} operation. */ +@ToString +@EqualsAndHashCode(callSuper = false) +public class MvExpand extends UnresolvedPlan { + + private UnresolvedPlan child; + @Getter private final Field field; + @Getter @Nullable private final Integer limit; + + public MvExpand(Field field, @Nullable Integer limit) { + this.field = field; + this.limit = limit; + } + + @Override + public MvExpand attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitMvExpand(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 4848415c36..185cacf9d0 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -122,6 +122,7 @@ import org.opensearch.sql.ast.tree.Lookup.OutputStrategy; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -839,7 +840,12 @@ public RelNode visitPatterns(Patterns node, CalcitePlanContext context) { .toList(); context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggCall); buildExpandRelNode( - context.relBuilder.field(node.getAlias()), node.getAlias(), node.getAlias(), context); + context.relBuilder.field(node.getAlias()), + node.getAlias(), + node.getAlias(), + null, + context); + flattenParsedPattern( node.getAlias(), context.relBuilder.field(node.getAlias()), @@ -2807,7 +2813,21 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) { RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context); String alias = expand.getAlias(); - buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, context); + buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, null, context); + + return context.relBuilder.peek(); + } + + @Override + public RelNode visitMvExpand(MvExpand node, CalcitePlanContext context) { + visitChildren(node, context); + Field arrayField = node.getField(); + RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context); + + // pass the per-document limit into the builder so it can be applied inside the UNNEST inner + // query + buildMvExpandRelNode( + arrayFieldRex, arrayField.getField().toString(), null, node.getLimit(), context); return context.relBuilder.peek(); } @@ -3056,45 +3076,114 @@ private void flattenParsedPattern( } private void buildExpandRelNode( - RexInputRef arrayFieldRex, String arrayFieldName, String alias, CalcitePlanContext context) { - // 3. Capture the outer row in a CorrelationId + RexNode arrayFieldRexNode, + String arrayFieldName, + String alias, + Integer mvExpandLimit, + CalcitePlanContext context) { + + // Convert incoming RexNode to RexInputRef when possible; otherwise resolve by field name. + RexInputRef arrayFieldRex; + if (arrayFieldRexNode instanceof RexInputRef) { + arrayFieldRex = (RexInputRef) arrayFieldRexNode; + + // If caller gave an input ref, try to sanity-check that the referenced field in the + // current row type is actually an array. If not, surface a clear semantic error. + RelDataType currentRowTypeCheck = context.relBuilder.peek().getRowType(); + int idx = arrayFieldRex.getIndex(); + RelDataTypeField checkField = null; + if (idx >= 0 && idx < currentRowTypeCheck.getFieldList().size()) { + checkField = currentRowTypeCheck.getFieldList().get(idx); + } + // Allow both ArraySqlType and MapSqlType here to avoid failing + // early when mappings are represented as MAP at runtime. + if (checkField != null + && !(checkField.getType() instanceof ArraySqlType) + && !(checkField.getType() instanceof MapSqlType)) { + throw new SemanticCheckException( + String.format( + "Cannot expand field '%s': expected ARRAY type but found %s", + checkField.getName(), checkField.getType().getSqlTypeName())); + } + } else { + // Try resolve by name and provide user-friendly errors when resolution fails or the type + // is not an array (user-visible message). + RelDataType currentRowType = context.relBuilder.peek().getRowType(); + RelDataTypeField fld = currentRowType.getField(arrayFieldName, false, false); + if (fld == null) { + throw new SemanticCheckException( + String.format("Cannot expand field '%s': field not found in input", arrayFieldName)); + } + // Accept ArraySqlType or MapSqlType here + if (!(fld.getType() instanceof ArraySqlType) && !(fld.getType() instanceof MapSqlType)) { + throw new SemanticCheckException( + String.format( + "Cannot expand field '%s': expected ARRAY type but found %s", + arrayFieldName, fld.getType().getSqlTypeName())); + } + arrayFieldRex = context.rexBuilder.makeInputRef(currentRowType, fld.getIndex()); + } + + // Capture left node and its schema BEFORE calling build() + RelNode leftNode = context.relBuilder.peek(); + RelDataType leftRowType = leftNode.getRowType(); + + // Resolve the array field index in left schema by name (robust); fallback to original index + RelDataTypeField leftField = leftRowType.getField(arrayFieldName, false, false); + int arrayFieldIndexInLeft = + (leftField != null) ? leftField.getIndex() : arrayFieldRex.getIndex(); + + // If left schema has the field but it's not an array, produce a helpful message. + // Accept MapSqlType as well to avoid premature failure when mapping is object-like. + if (leftField != null + && !(leftField.getType() instanceof ArraySqlType) + && !(leftField.getType() instanceof MapSqlType)) { + throw new SemanticCheckException( + String.format( + "Cannot expand field '%s': expected ARRAY type in input but found %s", + arrayFieldName, leftField.getType().getSqlTypeName())); + } + + // Create correlation variable while left is still on the builder stack Holder correlVariable = Holder.empty(); context.relBuilder.variable(correlVariable::set); - // 4. Create RexFieldAccess to access left node's array field with correlationId and build join - // left node + // Create correlated field access while left is still on the builder stack RexNode correlArrayFieldAccess = context.relBuilder.field( - context.rexBuilder.makeCorrel( - context.relBuilder.peek().getRowType(), correlVariable.get().id), - arrayFieldRex.getIndex()); - RelNode leftNode = context.relBuilder.build(); + context.rexBuilder.makeCorrel(leftRowType, correlVariable.get().id), + arrayFieldIndexInLeft); + + // Materialize leftBuilt + RelNode leftBuilt = context.relBuilder.build(); - // 5. Build join right node and expand the array field using uncollect + // Build the right (uncollect) using the small helper RelNode rightNode = - context - .relBuilder - // fake input, see convertUnnest and convertExpression in Calcite SqlToRelConverter - .push(LogicalValues.createOneRow(context.relBuilder.getCluster())) - .project(List.of(correlArrayFieldAccess), List.of(arrayFieldName)) - .uncollect(List.of(), false) - .build(); - - // 6. Perform a nested-loop join (correlate) between the original table and the expanded - // array field. - // The last parameter has to refer to the array to be expanded on the left side. It will - // be used by the right side to correlate with the left side. + buildRightUncollect(correlArrayFieldAccess, arrayFieldName, mvExpandLimit, context); + + // Compute required column ref against leftBuilt's row type (robust) + RexNode requiredColumnRef = + context.rexBuilder.makeInputRef(leftBuilt.getRowType(), arrayFieldIndexInLeft); + + // Correlate leftBuilt and rightNode using the proper required column ref context .relBuilder - .push(leftNode) + .push(leftBuilt) .push(rightNode) - .correlate(JoinRelType.INNER, correlVariable.get().id, List.of(arrayFieldRex)) - // 7. Remove the original array field from the output. - // TODO: RFC: should we keep the original array field when alias is present? - .projectExcept(arrayFieldRex); + .correlate(JoinRelType.INNER, correlVariable.get().id, List.of(requiredColumnRef)); + + // Remove the original array field from the output by name if possible + RexNode toRemove; + try { + toRemove = context.relBuilder.field(arrayFieldName); + } catch (Exception e) { + // Fallback in case name lookup fails + toRemove = requiredColumnRef; + } + context.relBuilder.projectExcept(toRemove); + // Optional rename into alias (preserve the original logic) if (alias != null) { - // Sub-nested fields cannot be removed after renaming the nested field. tryToRemoveNestedFields(context); RexInputRef expandedField = context.relBuilder.field(arrayFieldName); List names = new ArrayList<>(context.relBuilder.peek().getRowType().getFieldNames()); @@ -3103,6 +3192,45 @@ private void buildExpandRelNode( } } + /** + * Build the inner uncollect (UNNEST) right node given a correlated field access. + * + *

This helper intentionally keeps a very small surface: it accepts the correlated access + * (which must be created while the left is still on the builder stack) and the other local + * options, constructs the "one-row -> project(correlatedField) -> (limit?) -> uncollect" sequence + * and returns the built right RelNode. + * + *

Keeping the correlate + projectExcept logic in buildExpandRelNode simplifies reasoning about + * the required correlate-variable lifecycle (it must be created while left is on the builder + * stack). + */ + private RelNode buildRightUncollect( + RexNode correlArrayFieldAccess, + String arrayFieldName, + Integer mvExpandLimit, + CalcitePlanContext context) { + + RelBuilder rb = context.relBuilder; + rb.push(LogicalValues.createOneRow(rb.getCluster())) + .project(List.of(correlArrayFieldAccess), List.of(arrayFieldName)); + // apply per-document limit into the inner SELECT if provided + if (mvExpandLimit != null && mvExpandLimit > 0) { + rb.limit(0, mvExpandLimit); + } + return rb.uncollect(List.of(), false).build(); + } + + private void buildMvExpandRelNode( + RexInputRef arrayFieldRex, + String arrayFieldName, + String alias, + Integer mvExpandLimit, + CalcitePlanContext context) { + + // Delegate to the canonical expand implementation (pass the per-document limit through). + buildExpandRelNode(arrayFieldRex, arrayFieldName, alias, mvExpandLimit, context); + } + /** Creates an optimized sed call using native Calcite functions */ private RexNode createOptimizedSedCall( RexNode fieldRex, String sedExpression, CalcitePlanContext context) { diff --git a/core/src/test/java/org/opensearch/sql/calcite/CalciteRelNodeVisitorExpandTest.java b/core/src/test/java/org/opensearch/sql/calcite/CalciteRelNodeVisitorExpandTest.java new file mode 100644 index 0000000000..87a9f13118 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/calcite/CalciteRelNodeVisitorExpandTest.java @@ -0,0 +1,153 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.ArraySqlType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.RelBuilder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.calcite.utils.CalciteToolsHelper; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.executor.QueryType; + +/** Negative tests for expand branch validations. */ +@ExtendWith(MockitoExtension.class) +public class CalciteRelNodeVisitorExpandTest { + + private MockedStatic mockedCalciteToolsHelper; + + @SuppressWarnings("unused") + private FrameworkConfig frameworkConfig = mock(FrameworkConfig.class); + + private final RelBuilder relBuilder = mock(RelBuilder.class); + private final RelNode leftRelNode = mock(RelNode.class); + private final RelDataType leftRowType = mock(RelDataType.class); + private final RelDataTypeField arrayField = mock(RelDataTypeField.class); + private final RelDataTypeField nonArrayField = mock(RelDataTypeField.class); + private final ArraySqlType arraySqlType = mock(ArraySqlType.class); + private final RelDataType nonArrayType = mock(RelDataType.class); + private final DataSourceService dataSourceService = mock(DataSourceService.class); + private final ExtendedRexBuilder rexBuilder = mock(ExtendedRexBuilder.class); + + private CalciteRelNodeVisitor visitor; + private CalcitePlanContext context; + + @BeforeEach + public void setUp() { + // Intercept CalciteToolsHelper.create(...) so CalcitePlanContext.create(...) ends up using our + // relBuilder. + mockedCalciteToolsHelper = Mockito.mockStatic(CalciteToolsHelper.class); + mockedCalciteToolsHelper + .when(() -> CalciteToolsHelper.create(any(), any(), any())) + .thenReturn(relBuilder); + + // Minimal relBuilder / row-type wiring used by the validation branches. + lenient().when(relBuilder.peek()).thenReturn(leftRelNode); + lenient().when(leftRelNode.getRowType()).thenReturn(leftRowType); + + // Some versions of Calcite require relBuilder.getRexBuilder()/getTypeFactory during context + // creation. + lenient().when(relBuilder.getRexBuilder()).thenReturn(rexBuilder); + lenient().when(rexBuilder.getTypeFactory()).thenReturn(TYPE_FACTORY); + + // Create the plan context. Pass null for SysLimit (tests do not depend on it). + context = CalcitePlanContext.create(frameworkConfig, null, QueryType.PPL); + + visitor = new CalciteRelNodeVisitor(dataSourceService); + } + + @AfterEach + public void tearDown() { + mockedCalciteToolsHelper.close(); + } + + /** + * Negative: requested field does not exist in current row type -> SemanticCheckException + * + *

This exercises the resolve-by-name branch early validation that throws when the named field + * is not found in the current row type. + */ + @Test + public void expand_on_nonexistent_field_should_throw_user_friendly_error() throws Exception { + lenient().when(leftRowType.getField("missing_field", false, false)).thenReturn(null); + RexNode nonInputRexNode = mock(RexNode.class); + + Method m = + CalciteRelNodeVisitor.class.getDeclaredMethod( + "buildExpandRelNode", + RexNode.class, + String.class, + String.class, + Integer.class, + CalcitePlanContext.class); + m.setAccessible(true); + + InvocationTargetException ite = + assertThrows( + InvocationTargetException.class, + () -> m.invoke(visitor, nonInputRexNode, "missing_field", null, null, context)); + Throwable cause = ite.getCause(); + assertTrue(cause instanceof SemanticCheckException); + assertEquals( + "Cannot expand field 'missing_field': field not found in input", cause.getMessage()); + } + + /** + * Negative: requested field exists but is not an ARRAY -> SemanticCheckException + * + *

This exercises the resolve-by-name branch early validation that throws when the named field + * exists but its type is not ArraySqlType. + */ + @Test + public void expand_on_non_array_field_should_throw_expected_array_message() throws Exception { + // leftRowType.getField("not_array", false, false) -> nonArrayField and its type is non-array + lenient().when(leftRowType.getField("not_array", false, false)).thenReturn(nonArrayField); + lenient().when(nonArrayField.getType()).thenReturn(nonArrayType); + lenient().when(nonArrayType.getSqlTypeName()).thenReturn(SqlTypeName.VARCHAR); + + RexNode nonInputRexNode = mock(RexNode.class); + + Method m = + CalciteRelNodeVisitor.class.getDeclaredMethod( + "buildExpandRelNode", + RexNode.class, + String.class, + String.class, + Integer.class, + CalcitePlanContext.class); + m.setAccessible(true); + + InvocationTargetException ite = + assertThrows( + InvocationTargetException.class, + () -> m.invoke(visitor, nonInputRexNode, "not_array", null, null, context)); + Throwable cause = ite.getCause(); + assertTrue(cause instanceof SemanticCheckException); + assertEquals( + "Cannot expand field 'not_array': expected ARRAY type but found VARCHAR", + cause.getMessage()); + } +} diff --git a/docs/category.json b/docs/category.json index f126904da6..a0ee68d045 100644 --- a/docs/category.json +++ b/docs/category.json @@ -52,7 +52,6 @@ "user/ppl/cmd/syntax.rst", "user/ppl/cmd/chart.rst", "user/ppl/cmd/timechart.rst", - "user/ppl/cmd/search.rst", "user/ppl/functions/statistical.rst", "user/ppl/cmd/top.rst", "user/ppl/cmd/trendline.rst", @@ -68,7 +67,8 @@ "user/ppl/functions/string.rst", "user/ppl/functions/conversion.rst", "user/ppl/general/datatypes.rst", - "user/ppl/general/identifiers.rst" + "user/ppl/general/identifiers.rst", + "user/ppl/cmd/mvexpand.rst" ], "bash_settings": [ "user/ppl/admin/settings.rst" diff --git a/docs/user/ppl/cmd/mvexpand.rst b/docs/user/ppl/cmd/mvexpand.rst new file mode 100644 index 0000000000..1dbeaaf22c --- /dev/null +++ b/docs/user/ppl/cmd/mvexpand.rst @@ -0,0 +1,201 @@ +============= +mvexpand +============= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ +| The ``mvexpand`` command expands each value in a multivalue (array) field into a separate row, similar to Splunk's `mvexpand` command. +| For each document, every value in the specified field is returned as a new row. This is especially useful for log analytics and data exploration involving array fields. + +| Key features of ``mvexpand``: +- Expands array fields into multiple rows, one per value. +- Supports an optional ``limit`` parameter to restrict the number of expanded values per document. +- Handles empty, null, and non-array fields gracefully. +- Works as a streaming/distributable command for performance and scalability. + +Syntax +====== +mvexpand [limit=] + +* **field**: The multivalue (array) field to expand. (Required) +* **limit**: Maximum number of values per document to expand. (Optional) + +Usage +===== +Basic expansion:: + + source=logs | mvexpand tags + +Expansion with limit:: + + source=docs | mvexpand ids limit=3 + +Limitations +=========== +- Only one field can be expanded per mvexpand command. +- For non-array fields, the value is returned as-is. +- For empty or null arrays, no rows are returned. +- Large arrays may be subject to resource/memory limits; exceeding them results in an error or warning. + +Output ordering and default limit +-------------------------------- +If no `limit` is specified, mvexpand expands all elements in the array (there is no implicit per-document cap). Elements are emitted in the same order they appear in the array (array iteration order). If the underlying field does not provide a defined order, the output order is undefined. Use `limit` to bound the number of expanded rows per document and to avoid resource issues on very large arrays. + +Examples and Edge Cases +======================= + +Example 1: Basic Expansion +-------------------------- +Expand all values from an array field. + +Input document:: + + { "tags": ["error", "warning", "info"] } + +PPL query:: + + source=logs | mvexpand tags + +Output (example):: + + fetched rows / total rows = 3/3 + +--------+ + | tags | + +--------+ + | error | + | warning| + | info | + +--------+ + +Example 2: Expansion with Limit +------------------------------- +Limit the number of expanded values per document. + +Input document:: + + { "ids": [1, 2, 3, 4, 5] } + +PPL query:: + + source=docs | mvexpand ids limit=3 + +Output (example):: + + fetched rows / total rows = 3/3 + +-----+ + | ids | + +-----+ + | 1 | + | 2 | + | 3 | + +-----+ + +Example 3: Empty or Null Arrays +------------------------------ +Handles documents with empty or null array fields. + +Input document:: + + { "tags": [] } + +PPL query:: + + source=logs | mvexpand tags + +Output (example):: + + fetched rows / total rows = 0/0 + +------+ + | tags | + +------+ + +------+ + +Input document:: + + { "tags": null } + +PPL query:: + + source=logs | mvexpand tags + +Output (example):: + + fetched rows / total rows = 0/0 + +------+ + | tags | + +------+ + +------+ + +Example 4: Non-array Field +-------------------------- +If the field is a single value (not an array), mvexpand returns the value as-is. + +Input document:: + + { "tags": "error" } + +PPL query:: + + source=logs | mvexpand tags + +Output (example):: + + fetched rows / total rows = 1/1 + +-------+ + | tags | + +-------+ + | error | + +-------+ + +Example 5: Large Arrays and Memory / resource limits +---------------------------------------------------- +If an array is very large it can trigger engine or cluster resource limits and the query can fail with an error. There is no mvexpand-specific configuration. Instead, limits that can cause a query to be terminated are enforced at the node / engine level and by SQL/PPL query controls. + +- OpenSearch node protections (for example, heap / query memory limits such as plugins.query.memory_limit) can terminate queries that exceed configured memory budgets. +- SQL/PPL execution limits (timeouts, request/response size limits, and engine memory budgets) also apply to queries that use mvexpand. +- Note: in the current Calcite-based engine, circuit-breaking protections are applied primarily to the index scan operator; protections for other operators (including some operators used internally to implement mvexpand) are under research. Do not assume operator-level circuit breaking will fully protect mvexpand in all cases. + +To avoid failures when expanding large arrays: +- Use mvexpand's limit parameter to bound the number of expanded values per document (for example: mvexpand field limit=1000). +- Reduce the input size before expanding (filter with where, project only needed fields). +- Tune cluster and SQL/PPL execution settings (circuit breakers, request/response size, timeouts, memory limits) appropriate for your deployment. + +For node and SQL/PPL settings see: +https://docs.opensearch.org/1.0/search-plugins/ppl/settings/ + +Example 6: Multiple Fields (Limitation) +--------------------------------------- +mvexpand only supports expanding one field per command. To expand multiple fields, use multiple mvexpand commands or document the limitation. + +PPL query:: + + source=docs | mvexpand a | mvexpand b + +Example 7: Edge Case - Field Missing +------------------------------------ +If the field does not exist in a document, no row is produced for that document. + +Input document:: + + { "other": [1,2] } + +PPL query:: + + source=docs | mvexpand tags + +Output (example):: + + fetched rows / total rows = 0/0 + +------+ + | tags | + +------+ + +------+ + +--- \ No newline at end of file diff --git a/docs/user/ppl/index.rst b/docs/user/ppl/index.rst index 04a3182757..5845712c65 100644 --- a/docs/user/ppl/index.rst +++ b/docs/user/ppl/index.rst @@ -68,6 +68,8 @@ The query start with search command and then flowing a set of command delimited - `expand command `_ + - `mvexpand command `_ + - `explain command `_ - `fields command `_ diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java index 15051417db..6d79414301 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java @@ -32,6 +32,7 @@ CalciteDedupCommandIT.class, CalciteDescribeCommandIT.class, CalciteExpandCommandIT.class, + CalciteMvExpandCommandIT.class, CalciteFieldsCommandIT.class, CalciteFillNullCommandIT.class, CalciteFlattenCommandIT.class, diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 52fb467f5b..4b1c783349 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -41,6 +41,7 @@ public void init() throws Exception { loadIndex(Index.LOGS); loadIndex(Index.WORKER); loadIndex(Index.WORK_INFORMATION); + loadIndex(Index.MVEXPAND_EDGE_CASES); loadIndex(Index.WEBLOG); } @@ -311,6 +312,15 @@ public void testExplainMultisearchTimestampInterleaving() throws IOException { assertYamlEqualsIgnoreId(expected, result); } + // Only for Calcite + @Test + public void testMvexpandExplain() throws IOException { + // script pushdown + String expected = loadExpectedPlan("explain_mvexpand.yaml"); + assertYamlEqualsIgnoreId( + expected, explainQueryYaml("source=mvexpand_edge_cases | mvexpand skills")); + } + // Only for Calcite @Test public void testExplainIsBlank() throws IOException { diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvExpandCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvExpandCommandIT.java new file mode 100644 index 0000000000..268f2b0f84 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvExpandCommandIT.java @@ -0,0 +1,146 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; +import org.opensearch.sql.legacy.SQLIntegTestCase.Index; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalciteMvExpandCommandIT extends PPLIntegTestCase { + + private static final String INDEX = Index.MVEXPAND_EDGE_CASES.getName(); + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + deleteIndexIfExists(INDEX); + createIndex( + INDEX, + "{ \"mappings\": { \"properties\": { " + + "\"username\": { \"type\": \"keyword\" }," + + "\"skills\": { \"type\": \"nested\" }" + + "} } }"); + + // Pass plain JSON documents; bulkInsert will auto-assign incremental ids. + bulkInsert( + INDEX, + "{\"username\":\"happy\",\"skills\":[{\"name\":\"python\"},{\"name\":\"java\"},{\"name\":\"sql\"}]}", + "{\"username\":\"single\",\"skills\":[{\"name\":\"go\"}]}", + "{\"username\":\"empty\",\"skills\":[]}", + "{\"username\":\"nullskills\",\"skills\":null}", + "{\"username\":\"noskills\"}", + "{\"username\":\"duplicate\",\"skills\":[{\"name\":\"dup\"},{\"name\":\"dup\"}]}", + "{\"username\":\"large\",\"skills\":[{\"name\":\"s1\"},{\"name\":\"s2\"},{\"name\":\"s3\"},{\"name\":\"s4\"},{\"name\":\"s5\"},{\"name\":\"s6\"},{\"name\":\"s7\"},{\"name\":\"s8\"},{\"name\":\"s9\"},{\"name\":\"s10\"}]}"); + refreshIndex(INDEX); + } + + @Test + public void testMvexpandSingleElement() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='single' | fields username, skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result, rows("single", "go")); + } + + @Test + public void testMvexpandEmptyArray() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='empty' | fields username, skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result); // Should be empty + } + + @Test + public void testMvexpandNullArray() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='nullskills' | fields username," + + " skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result); // Should be empty + } + + @Test + public void testMvexpandNoArrayField() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='noskills' | fields username," + + " skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result); // Should be empty + } + + @Test + public void testMvexpandDuplicate() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='duplicate' | fields username," + + " skills.name | sort skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result, rows("duplicate", "dup"), rows("duplicate", "dup")); + } + + // Helper methods for index setup/teardown + private static void deleteIndexIfExists(String index) throws IOException { + try { + Request request = new Request("DELETE", "/" + index); + PPLIntegTestCase.adminClient().performRequest(request); + } catch (IOException e) { + // Index does not exist or already deleted + } + } + + private static void createIndex(String index, String mappingJson) throws IOException { + Request request = new Request("PUT", "/" + index); + request.setJsonEntity(mappingJson); + PPLIntegTestCase.adminClient().performRequest(request); + } + + /** + * Bulk insert helper: - Accepts plain JSON strings (no id): assigns incremental numeric ids + * starting at 1. - Also accepts legacy "id|" strings if a test prefers explicit ids. + */ + private static void bulkInsert(String index, String... docs) throws IOException { + StringBuilder bulk = new StringBuilder(); + int nextAutoId = 1; + for (String doc : docs) { + String id; + String json; + if (doc.contains("|")) { + String[] parts = doc.split("\\|", 2); + id = parts[0]; + json = parts[1]; + } else { + id = String.valueOf(nextAutoId++); + json = doc; + } + bulk.append("{\"index\":{\"_id\":").append(id).append("}}\n"); + bulk.append(json).append("\n"); + } + Request request = new Request("POST", "/" + index + "/_bulk?refresh=true"); + request.setJsonEntity(bulk.toString()); + PPLIntegTestCase.adminClient().performRequest(request); + } + + private static void refreshIndex(String index) throws IOException { + Request request = new Request("POST", "/" + index + "/_refresh"); + PPLIntegTestCase.adminClient().performRequest(request); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 47632dbc94..785a58d208 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -686,6 +686,11 @@ public enum Index { "_doc", getNestedSimpleIndexMapping(), "src/test/resources/nested_simple.json"), + MVEXPAND_EDGE_CASES( + "mvexpand_edge_cases", + "mvexpand_edge_cases", + getMappingFile("mvexpand_edge_cases_mapping.json"), + "src/test/resources/mvexpand_edge_cases.json"), DEEP_NESTED( TestsConstants.TEST_INDEX_DEEP_NESTED, "_doc", diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_mvexpand.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvexpand.yaml new file mode 100644 index 0000000000..3823139767 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvexpand.yaml @@ -0,0 +1,17 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(skills.level=[$1], skills.name=[$2], username=[$3], KEY=[$10], VALUE=[$11]) + LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + CalciteLogicalIndexScan(table=[[OpenSearch, mvexpand_edge_cases]]) + Uncollect + LogicalProject(skills=[$cor0.skills]) + LogicalValues(tuples=[[{ 0 }]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..5=[{inputs}], skills.level=[$t1], skills.name=[$t2], username=[$t3], KEY=[$t4], VALUE=[$t5]) + EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, mvexpand_edge_cases]], PushDownContext=[[PROJECT->[skills, skills.level, skills.name, username]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["skills","skills.level","skills.name","username"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableUncollect + EnumerableCalc(expr#0=[{inputs}], expr#1=[$cor0], expr#2=[$t1.skills], skills=[$t2]) + EnumerableValues(tuples=[[{ 0 }]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvexpand.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvexpand.yaml new file mode 100644 index 0000000000..3da8d77fb1 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvexpand.yaml @@ -0,0 +1,18 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(skills.level=[$1], skills.name=[$2], username=[$3], KEY=[$10], VALUE=[$11]) + LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + CalciteLogicalIndexScan(table=[[OpenSearch, mvexpand_edge_cases]]) + Uncollect + LogicalProject(skills=[$cor0.skills]) + LogicalValues(tuples=[[{ 0 }]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..5=[{inputs}], skills.level=[$t1], skills.name=[$t2], username=[$t3], KEY=[$t4], VALUE=[$t5]) + EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + EnumerableCalc(expr#0..9=[{inputs}], proj#0..3=[{exprs}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, mvexpand_edge_cases]]) + EnumerableUncollect + EnumerableCalc(expr#0=[{inputs}], expr#1=[$cor0], expr#2=[$t1.skills], skills=[$t2]) + EnumerableValues(tuples=[[{ 0 }]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/mvexpand_edge_cases.json b/integ-test/src/test/resources/mvexpand_edge_cases.json new file mode 100644 index 0000000000..662769d89b --- /dev/null +++ b/integ-test/src/test/resources/mvexpand_edge_cases.json @@ -0,0 +1,18 @@ +{"index":{}} +{"username":"happy","skills":[{"name":"python"},{"name":"java"},{"name":"sql"}]} +{"index":{}} +{"username":"single","skills":[{"name":"go"}]} +{"index":{}} +{"username":"empty","skills":[]} +{"index":{}} +{"username":"nullskills","skills":null} +{"index":{}} +{"username":"noskills"} +{"index":{}} +{"username":"missingattr","skills":[{"name":"c"},{"level":"advanced"}]} +{"index":{}} +{"username":"complex","skills":[{"name":"ml","level":"expert"},{"name":"ai"},{"level":"novice"}]} +{"index":{}} +{"username":"duplicate","skills":[{"name":"dup"},{"name":"dup"}]} +{"index":{}} +{"username":"large","skills":[{"name":"s1"},{"name":"s2"},{"name":"s3"},{"name":"s4"},{"name":"s5"},{"name":"s6"},{"name":"s7"},{"name":"s8"},{"name":"s9"},{"name":"s10"}]} diff --git a/integ-test/src/test/resources/mvexpand_edge_cases_mapping.json b/integ-test/src/test/resources/mvexpand_edge_cases_mapping.json new file mode 100644 index 0000000000..164adb77f6 --- /dev/null +++ b/integ-test/src/test/resources/mvexpand_edge_cases_mapping.json @@ -0,0 +1,8 @@ +{ + "mappings": { + "properties": { + "username": { "type": "keyword" }, + "skills": { "type": "nested" } + } + } +} \ No newline at end of file diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 2e0643fa28..b44b182e70 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -50,6 +50,7 @@ CHART: 'CHART'; TIMECHART: 'TIMECHART'; APPENDCOL: 'APPENDCOL'; EXPAND: 'EXPAND'; +MVEXPAND: 'MVEXPAND'; SIMPLE_PATTERN: 'SIMPLE_PATTERN'; BRAIN: 'BRAIN'; VARIABLE_COUNT_THRESHOLD: 'VARIABLE_COUNT_THRESHOLD'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 494adb1571..30c4388848 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -78,6 +78,7 @@ commands | appendcolCommand | appendCommand | expandCommand + | mvexpandCommand | flattenCommand | reverseCommand | regexCommand @@ -116,6 +117,7 @@ commandName | ML | FILLNULL | EXPAND + | MVEXPAND | FLATTEN | TRENDLINE | TIMECHART @@ -525,6 +527,10 @@ expandCommand : EXPAND fieldExpression (AS alias = qualifiedName)? ; +mvexpandCommand + : MVEXPAND fieldExpression (LIMIT EQUAL INTEGER_LITERAL)? + ; + flattenCommand : FLATTEN fieldExpression (AS aliases = identifierSeq)? ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index ed66682a98..5ef2cdf6ab 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -91,6 +91,7 @@ import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.MinSpanBin; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -870,6 +871,14 @@ public UnresolvedPlan visitExpandCommand(OpenSearchPPLParser.ExpandCommandContex return new Expand(fieldExpression, alias); } + @Override + public UnresolvedPlan visitMvexpandCommand(OpenSearchPPLParser.MvexpandCommandContext ctx) { + Field field = (Field) expressionBuilder.visit(ctx.fieldExpression()); + Integer limit = + ctx.INTEGER_LITERAL() != null ? Integer.parseInt(ctx.INTEGER_LITERAL().getText()) : null; + return new MvExpand(field, limit); + } + @Override public UnresolvedPlan visitGrokCommand(OpenSearchPPLParser.GrokCommandContext ctx) { UnresolvedExpression sourceField = internalVisitExpression(ctx.source_field); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 7e8dc16f4d..49ed8dc569 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -72,6 +72,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.MinSpanBin; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -640,6 +641,17 @@ public String visitAppend(Append node, String context) { return StringUtils.format("%s | append [%s ]", child, subsearch); } + @Override + public String visitMvExpand(MvExpand node, String context) { + String child = node.getChild().get(0).accept(this, context); + String field = MASK_COLUMN; // Always anonymize field names + // Optionally handle limit if needed (e.g., | mvexpand identifier limit=***) + if (node.getLimit() != null) { + return StringUtils.format("%s | mvexpand %s limit=%s", child, field, MASK_LITERAL); + } + return StringUtils.format("%s | mvexpand %s", child, field); + } + @Override public String visitMultisearch(Multisearch node, String context) { List anonymizedSubsearches = new ArrayList<>(); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java index 9dd01b30df..8da3a43c31 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java @@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.rel2sql.RelToSqlConverter; import org.apache.calcite.rel.rel2sql.SqlImplementor; +import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParser; @@ -64,6 +65,21 @@ public CalcitePPLAbstractTest(CalciteAssert.SchemaSpec... schemaSpecs) { this.settings = mock(Settings.class); } + public CalcitePPLAbstractTest(Schema customSchema) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + rootSchema.add("CUSTOM", customSchema); + this.config = + Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(rootSchema.getSubSchema("CUSTOM")) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + this.dataSourceService = mock(DataSourceService.class); + this.planTransformer = new CalciteRelNodeVisitor(dataSourceService); + this.converter = new RelToSqlConverter(OpenSearchSparkSqlDialect.DEFAULT); + this.settings = mock(Settings.class); + } + @Before public void init() { doReturn(true).when(settings).getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvExpandTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvExpandTest.java new file mode 100644 index 0000000000..16594fe0ae --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvExpandTest.java @@ -0,0 +1,335 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +/** + * Calcite tests for the mvexpand command. + * + *

Planner tests for mvexpand; kept minimal and consistent with other Calcite planner tests. + */ +public class CalcitePPLMvExpandTest extends CalcitePPLAbstractTest { + + public CalcitePPLMvExpandTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + + // Keep dataset empty: tests only need schema/type information. + ImmutableList users = ImmutableList.of(); + + schema.add("USERS", new UsersTable(users)); + + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @Test + public void testMvExpandBasic() { + String ppl = "source=USERS | mvexpand skills"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(USERNAME=[$0], name=[$2], level=[$3])\n" + + " LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}])\n" + + " LogicalTableScan(table=[[scott, USERS]])\n" + + " Uncollect\n" + + " LogicalProject(skills=[$cor0.skills])\n" + + " LogicalValues(tuples=[[{ 0 }]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testMvExpandWithLimit() { + String ppl = "source=USERS | mvexpand skills | head 1"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(fetch=[1])\n" + + " LogicalProject(USERNAME=[$0], name=[$2], level=[$3])\n" + + " LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}])\n" + + " LogicalTableScan(table=[[scott, USERS]])\n" + + " Uncollect\n" + + " LogicalProject(skills=[$cor0.skills])\n" + + " LogicalValues(tuples=[[{ 0 }]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testMvExpandProjectNested() { + String ppl = "source=USERS | mvexpand skills | fields USERNAME, name, level"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(USERNAME=[$0], name=[$2], level=[$3])\n" + + " LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}])\n" + + " LogicalTableScan(table=[[scott, USERS]])\n" + + " Uncollect\n" + + " LogicalProject(skills=[$cor0.skills])\n" + + " LogicalValues(tuples=[[{ 0 }]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testMvExpandEmptyOrNullArray() { + String ppl = "source=USERS | where USERNAME in ('empty','nullskills') | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + System.out.println("line 118" + root); + assertNotNull(root); + System.out.println("line 120" + root); + } catch (Exception e) { + fail("mvexpand on empty/null array should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandNoArrayField() { + String ppl = "source=USERS | where USERNAME = 'noskills' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on missing array field should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandWithDuplicates() { + String ppl = "source=USERS | where USERNAME = 'duplicate' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand with duplicates should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandLargeArray() { + String ppl = "source=USERS | where USERNAME = 'large' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on large array should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandProjectMissingAttribute() { + String ppl = "source=USERS | mvexpand skills | fields USERNAME, level"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand projection of missing attribute should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandPrimitiveArray() { + String ppl = "source=USERS | where USERNAME = 'primitive' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on array of primitives should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandAllNullsArray() { + String ppl = "source=USERS | where USERNAME = 'allnulls' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on array of all nulls should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandEmptyObjectArray() { + String ppl = "source=USERS | where USERNAME = 'emptyobj' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on array with empty struct should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandDeeplyNestedArray() { + String ppl = "source=USERS | where USERNAME = 'deeplyNested' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on deeply nested arrays should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandMixedTypesArray() { + String ppl = "source=USERS | where USERNAME = 'mixedTypes' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on array with mixed types should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandNestedObjectArray() { + String ppl = "source=USERS | where USERNAME = 'nestedObject' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on array of nested objects should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandAllEmptyObjectsArray() { + String ppl = "source=USERS | where USERNAME = 'allEmptyObjects' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on array of all empty objects should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandAllEmptyArraysArray() { + String ppl = "source=USERS | where USERNAME = 'allEmptyArrays' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on array of all empty arrays should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandArrayOfArraysOfPrimitives() { + String ppl = "source=USERS | where USERNAME = 'arrayOfArraysOfPrimitives' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail( + "mvexpand on array of arrays of primitives should not throw, but got: " + e.getMessage()); + } + } + + @Test + public void testMvExpandSpecialValuesArray() { + String ppl = "source=USERS | where USERNAME = 'specialValues' | mvexpand skills"; + try { + RelNode root = getRelNode(ppl); + assertNotNull(root); + } catch (Exception e) { + fail("mvexpand on array with special values should not throw, but got: " + e.getMessage()); + } + } + + @RequiredArgsConstructor + static class UsersTable implements ScannableTable { + private final ImmutableList rows; + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("USERNAME", SqlTypeName.VARCHAR) + .add( + "skills", + factory.createArrayType( + factory + .builder() + .add("name", SqlTypeName.VARCHAR) + .add("level", SqlTypeName.VARCHAR) + .build(), + -1)) + .build(); + + @Override + public Enumerable scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index f205b9fe0c..17fc1b75db 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -822,6 +822,18 @@ public void testMvindex() { anonymize("source=t | eval result=mvindex(array(1, 2, 3, 4, 5), 1, 3) | fields result")); } + @Test + public void testMvexpandCommand() { + assertEquals("source=table | mvexpand identifier", anonymize("source=t | mvexpand skills")); + } + + @Test + public void testMvexpandCommandWithLimit() { + assertEquals( + "source=table | mvexpand identifier limit=***", + anonymize("source=t | mvexpand skills limit=5")); + } + @Test public void testRexWithOffsetField() { when(settings.getSettingValue(Key.PPL_REX_MAX_MATCH_LIMIT)).thenReturn(10);