Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
445799f
Initial checkpoint - following calcite way and commented legacy way
Oct 10, 2025
427af0f
Removed the build.gradle dependency opensearch-common
Oct 22, 2025
83d7786
Ready to submit this PR
Oct 22, 2025
7f6e127
Ready to submit this PR
Oct 22, 2025
dcbf56b
Ready to submit this PR
Oct 22, 2025
a3c1384
Add mvexpand.rst
Oct 22, 2025
148ccc5
Add Tests
Oct 22, 2025
f5a9e82
Add the mvexpand.rst to the index.rst
Oct 23, 2025
2cc60ad
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 27, 2025
18cbba4
Remove the unwanted code
Oct 27, 2025
60fa2ad
Fix the failing test
Oct 27, 2025
d248cb0
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 30, 2025
a28894a
Address the PR comments and fix the tests accordingly
Oct 30, 2025
825c52e
Address the PR comments and fix the tests accordingly
Oct 30, 2025
dc76a55
Address the PR comments and fix the tests accordingly
Oct 30, 2025
8319583
Add comment lines for buildUnnestForLeft
Oct 30, 2025
6d87133
Fix the mvexpand.rst
Oct 31, 2025
b83ab21
Merge branch 'main' into main
srikanthpadakanti Nov 3, 2025
c84703d
Fix the failing test
Nov 3, 2025
de82b65
Fix the failing test
Nov 3, 2025
6c6e0ec
Fix the failing test
Nov 3, 2025
069d52e
Fix the failing test
Nov 3, 2025
2f3aeb6
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Nov 5, 2025
e584368
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Nov 6, 2025
a41b081
Address the PR comments
Nov 6, 2025
bf018b7
Address the PR comments
Nov 7, 2025
c215243
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Nov 10, 2025
a6ccb5a
Merge upstream changes to my main
Nov 13, 2025
7743ad0
Address the PR comments
Nov 13, 2025
1e03131
Address the PR comments
Nov 13, 2025
dbea3a2
Address the PR comments
Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -701,6 +702,11 @@ public LogicalPlan visitExpand(Expand expand, AnalysisContext context) {
throw getOnlyForCalciteException("Expand");
}

@Override
public LogicalPlan visitMvExpand(MvExpand node, AnalysisContext context) {
throw getOnlyForCalciteException("MvExpand");
}

/** Build {@link LogicalTrendline} for Trendline command. */
@Override
public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -451,4 +452,8 @@ public T visitAppend(Append node, C context) {
public T visitMultisearch(Multisearch node, C context) {
return visitChildren(node, context);
}

public T visitMvExpand(MvExpand node, C context) {
return visitChildren(node, context);
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.MinSpanBin;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
import org.opensearch.sql.ast.tree.Project;
Expand Down Expand Up @@ -136,6 +137,11 @@ public Expand expand(UnresolvedPlan input, Field field, String alias) {
return new Expand(field, alias).attach(input);
}

public static UnresolvedPlan mvexpand(UnresolvedPlan input, Field field, Integer limit) {
// attach the incoming child plan so the AST contains the pipeline link
return new MvExpand(field, limit).attach(input);
}

public static UnresolvedPlan projectWithArg(
UnresolvedPlan input, List<Argument> argList, UnresolvedExpression... projectList) {
return new Project(Arrays.asList(projectList), argList).attach(input);
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/MvExpand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Field;

/** AST node representing an {@code mvexpand <field> [limit N]} operation. */
@ToString
@EqualsAndHashCode(callSuper = false)
public class MvExpand extends UnresolvedPlan {

private UnresolvedPlan child;
@Getter private final Field field;
@Getter @Nullable private final Integer limit;

public MvExpand(Field field, @Nullable Integer limit) {
this.field = field;
this.limit = limit;
}

@Override
public MvExpand attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitMvExpand(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import org.opensearch.sql.ast.tree.Lookup.OutputStrategy;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -839,7 +840,12 @@ public RelNode visitPatterns(Patterns node, CalcitePlanContext context) {
.toList();
context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggCall);
buildExpandRelNode(
context.relBuilder.field(node.getAlias()), node.getAlias(), node.getAlias(), context);
context.relBuilder.field(node.getAlias()),
node.getAlias(),
node.getAlias(),
null,
context);

flattenParsedPattern(
node.getAlias(),
context.relBuilder.field(node.getAlias()),
Expand Down Expand Up @@ -2807,7 +2813,21 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) {
RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context);
String alias = expand.getAlias();

buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, context);
buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, null, context);

return context.relBuilder.peek();
}

@Override
public RelNode visitMvExpand(MvExpand node, CalcitePlanContext context) {
visitChildren(node, context);
Field arrayField = node.getField();
RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context);

// pass the per-document limit into the builder so it can be applied inside the UNNEST inner
// query
buildMvExpandRelNode(
arrayFieldRex, arrayField.getField().toString(), null, node.getLimit(), context);

return context.relBuilder.peek();
}
Expand Down Expand Up @@ -3056,45 +3076,114 @@ private void flattenParsedPattern(
}

private void buildExpandRelNode(
RexInputRef arrayFieldRex, String arrayFieldName, String alias, CalcitePlanContext context) {
// 3. Capture the outer row in a CorrelationId
RexNode arrayFieldRexNode,
String arrayFieldName,
String alias,
Integer mvExpandLimit,
CalcitePlanContext context) {

// Convert incoming RexNode to RexInputRef when possible; otherwise resolve by field name.
RexInputRef arrayFieldRex;
if (arrayFieldRexNode instanceof RexInputRef) {
arrayFieldRex = (RexInputRef) arrayFieldRexNode;

// If caller gave an input ref, try to sanity-check that the referenced field in the
// current row type is actually an array. If not, surface a clear semantic error.
RelDataType currentRowTypeCheck = context.relBuilder.peek().getRowType();
int idx = arrayFieldRex.getIndex();
RelDataTypeField checkField = null;
if (idx >= 0 && idx < currentRowTypeCheck.getFieldList().size()) {
checkField = currentRowTypeCheck.getFieldList().get(idx);
}
// Allow both ArraySqlType and MapSqlType here to avoid failing
// early when mappings are represented as MAP at runtime.
if (checkField != null
&& !(checkField.getType() instanceof ArraySqlType)
&& !(checkField.getType() instanceof MapSqlType)) {
throw new SemanticCheckException(
String.format(
"Cannot expand field '%s': expected ARRAY type but found %s",
checkField.getName(), checkField.getType().getSqlTypeName()));
}
} else {
// Try resolve by name and provide user-friendly errors when resolution fails or the type
// is not an array (user-visible message).
RelDataType currentRowType = context.relBuilder.peek().getRowType();
RelDataTypeField fld = currentRowType.getField(arrayFieldName, false, false);
if (fld == null) {
throw new SemanticCheckException(
String.format("Cannot expand field '%s': field not found in input", arrayFieldName));
}
// Accept ArraySqlType or MapSqlType here
if (!(fld.getType() instanceof ArraySqlType) && !(fld.getType() instanceof MapSqlType)) {
throw new SemanticCheckException(
String.format(
"Cannot expand field '%s': expected ARRAY type but found %s",
arrayFieldName, fld.getType().getSqlTypeName()));
}
arrayFieldRex = context.rexBuilder.makeInputRef(currentRowType, fld.getIndex());
}

// Capture left node and its schema BEFORE calling build()
RelNode leftNode = context.relBuilder.peek();
RelDataType leftRowType = leftNode.getRowType();

// Resolve the array field index in left schema by name (robust); fallback to original index
RelDataTypeField leftField = leftRowType.getField(arrayFieldName, false, false);
int arrayFieldIndexInLeft =
(leftField != null) ? leftField.getIndex() : arrayFieldRex.getIndex();

// If left schema has the field but it's not an array, produce a helpful message.
// Accept MapSqlType as well to avoid premature failure when mapping is object-like.
if (leftField != null
&& !(leftField.getType() instanceof ArraySqlType)
&& !(leftField.getType() instanceof MapSqlType)) {
throw new SemanticCheckException(
String.format(
"Cannot expand field '%s': expected ARRAY type in input but found %s",
arrayFieldName, leftField.getType().getSqlTypeName()));
}

// Create correlation variable while left is still on the builder stack
Holder<RexCorrelVariable> correlVariable = Holder.empty();
context.relBuilder.variable(correlVariable::set);

// 4. Create RexFieldAccess to access left node's array field with correlationId and build join
// left node
// Create correlated field access while left is still on the builder stack
RexNode correlArrayFieldAccess =
context.relBuilder.field(
context.rexBuilder.makeCorrel(
context.relBuilder.peek().getRowType(), correlVariable.get().id),
arrayFieldRex.getIndex());
RelNode leftNode = context.relBuilder.build();
context.rexBuilder.makeCorrel(leftRowType, correlVariable.get().id),
arrayFieldIndexInLeft);

// Materialize leftBuilt
RelNode leftBuilt = context.relBuilder.build();

// 5. Build join right node and expand the array field using uncollect
// Build the right (uncollect) using the small helper
RelNode rightNode =
context
.relBuilder
// fake input, see convertUnnest and convertExpression in Calcite SqlToRelConverter
.push(LogicalValues.createOneRow(context.relBuilder.getCluster()))
.project(List.of(correlArrayFieldAccess), List.of(arrayFieldName))
.uncollect(List.of(), false)
.build();

// 6. Perform a nested-loop join (correlate) between the original table and the expanded
// array field.
// The last parameter has to refer to the array to be expanded on the left side. It will
// be used by the right side to correlate with the left side.
buildRightUncollect(correlArrayFieldAccess, arrayFieldName, mvExpandLimit, context);

// Compute required column ref against leftBuilt's row type (robust)
RexNode requiredColumnRef =
context.rexBuilder.makeInputRef(leftBuilt.getRowType(), arrayFieldIndexInLeft);

// Correlate leftBuilt and rightNode using the proper required column ref
context
.relBuilder
.push(leftNode)
.push(leftBuilt)
.push(rightNode)
.correlate(JoinRelType.INNER, correlVariable.get().id, List.of(arrayFieldRex))
// 7. Remove the original array field from the output.
// TODO: RFC: should we keep the original array field when alias is present?
.projectExcept(arrayFieldRex);
.correlate(JoinRelType.INNER, correlVariable.get().id, List.of(requiredColumnRef));

// Remove the original array field from the output by name if possible
RexNode toRemove;
try {
toRemove = context.relBuilder.field(arrayFieldName);
} catch (Exception e) {
// Fallback in case name lookup fails
toRemove = requiredColumnRef;
}
context.relBuilder.projectExcept(toRemove);

// Optional rename into alias (preserve the original logic)
if (alias != null) {
// Sub-nested fields cannot be removed after renaming the nested field.
tryToRemoveNestedFields(context);
RexInputRef expandedField = context.relBuilder.field(arrayFieldName);
List<String> names = new ArrayList<>(context.relBuilder.peek().getRowType().getFieldNames());
Expand All @@ -3103,6 +3192,45 @@ private void buildExpandRelNode(
}
}

/**
* Build the inner uncollect (UNNEST) right node given a correlated field access.
*
* <p>This helper intentionally keeps a very small surface: it accepts the correlated access
* (which must be created while the left is still on the builder stack) and the other local
* options, constructs the "one-row -> project(correlatedField) -> (limit?) -> uncollect" sequence
* and returns the built right RelNode.
*
* <p>Keeping the correlate + projectExcept logic in buildExpandRelNode simplifies reasoning about
* the required correlate-variable lifecycle (it must be created while left is on the builder
* stack).
*/
private RelNode buildRightUncollect(
RexNode correlArrayFieldAccess,
String arrayFieldName,
Integer mvExpandLimit,
CalcitePlanContext context) {

RelBuilder rb = context.relBuilder;
rb.push(LogicalValues.createOneRow(rb.getCluster()))
.project(List.of(correlArrayFieldAccess), List.of(arrayFieldName));
// apply per-document limit into the inner SELECT if provided
if (mvExpandLimit != null && mvExpandLimit > 0) {
rb.limit(0, mvExpandLimit);
}
return rb.uncollect(List.of(), false).build();
}

private void buildMvExpandRelNode(
RexInputRef arrayFieldRex,
String arrayFieldName,
String alias,
Integer mvExpandLimit,
CalcitePlanContext context) {

// Delegate to the canonical expand implementation (pass the per-document limit through).
buildExpandRelNode(arrayFieldRex, arrayFieldName, alias, mvExpandLimit, context);
}

/** Creates an optimized sed call using native Calcite functions */
private RexNode createOptimizedSedCall(
RexNode fieldRex, String sedExpression, CalcitePlanContext context) {
Expand Down
Loading
Loading