Skip to content
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
445799f
Initial checkpoint - following calcite way and commented legacy way
Oct 10, 2025
427af0f
Removed the build.gradle dependency opensearch-common
Oct 22, 2025
83d7786
Ready to submit this PR
Oct 22, 2025
7f6e127
Ready to submit this PR
Oct 22, 2025
dcbf56b
Ready to submit this PR
Oct 22, 2025
a3c1384
Add mvexpand.rst
Oct 22, 2025
148ccc5
Add Tests
Oct 22, 2025
f5a9e82
Add the mvexpand.rst to the index.rst
Oct 23, 2025
2cc60ad
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 27, 2025
18cbba4
Remove the unwanted code
Oct 27, 2025
60fa2ad
Fix the failing test
Oct 27, 2025
d248cb0
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 30, 2025
a28894a
Address the PR comments and fix the tests accordingly
Oct 30, 2025
825c52e
Address the PR comments and fix the tests accordingly
Oct 30, 2025
dc76a55
Address the PR comments and fix the tests accordingly
Oct 30, 2025
8319583
Add comment lines for buildUnnestForLeft
Oct 30, 2025
6d87133
Fix the mvexpand.rst
Oct 31, 2025
b83ab21
Merge branch 'main' into main
srikanthpadakanti Nov 3, 2025
c84703d
Fix the failing test
Nov 3, 2025
de82b65
Fix the failing test
Nov 3, 2025
6c6e0ec
Fix the failing test
Nov 3, 2025
069d52e
Fix the failing test
Nov 3, 2025
2f3aeb6
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Nov 5, 2025
e584368
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Nov 6, 2025
a41b081
Address the PR comments
Nov 6, 2025
bf018b7
Address the PR comments
Nov 7, 2025
c215243
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Nov 10, 2025
a6ccb5a
Merge upstream changes to my main
Nov 13, 2025
7743ad0
Address the PR comments
Nov 13, 2025
1e03131
Address the PR comments
Nov 13, 2025
dbea3a2
Address the PR comments
Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -701,6 +702,11 @@ public LogicalPlan visitExpand(Expand expand, AnalysisContext context) {
throw getOnlyForCalciteException("Expand");
}

@Override
public LogicalPlan visitMvExpand(MvExpand node, AnalysisContext context) {
throw getOnlyForCalciteException("MvExpand");
}

/** Build {@link LogicalTrendline} for Trendline command. */
@Override
public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -451,4 +452,8 @@ public T visitAppend(Append node, C context) {
public T visitMultisearch(Multisearch node, C context) {
return visitChildren(node, context);
}

public T visitMvExpand(MvExpand node, C context) {
return visitChildren(node, context);
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.MinSpanBin;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
import org.opensearch.sql.ast.tree.Project;
Expand Down Expand Up @@ -135,6 +136,11 @@ public Expand expand(UnresolvedPlan input, Field field, String alias) {
return new Expand(field, alias).attach(input);
}

public static UnresolvedPlan mvexpand(UnresolvedPlan input, Field field, Integer limit) {
// attach the incoming child plan so the AST contains the pipeline link
return new MvExpand(field, limit).attach(input);
}

public static UnresolvedPlan projectWithArg(
UnresolvedPlan input, List<Argument> argList, UnresolvedExpression... projectList) {
return new Project(Arrays.asList(projectList), argList).attach(input);
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/MvExpand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Field;

/** AST node representing an {@code mvexpand <field> [limit N]} operation. */
@ToString
@EqualsAndHashCode(callSuper = false)
public class MvExpand extends UnresolvedPlan {

private UnresolvedPlan child;
@Getter private final Field field;
@Getter @Nullable private final Integer limit;

public MvExpand(Field field, @Nullable Integer limit) {
this.field = field;
this.limit = limit;
}

@Override
public MvExpand attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitMvExpand(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import org.opensearch.sql.ast.tree.Lookup.OutputStrategy;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -817,7 +818,12 @@ public RelNode visitPatterns(Patterns node, CalcitePlanContext context) {
.toList();
context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggCall);
buildExpandRelNode(
context.relBuilder.field(node.getAlias()), node.getAlias(), node.getAlias(), context);
context.relBuilder.field(node.getAlias()),
node.getAlias(),
node.getAlias(),
null,
context);

flattenParsedPattern(
node.getAlias(),
context.relBuilder.field(node.getAlias()),
Expand Down Expand Up @@ -1589,6 +1595,20 @@ private static void buildDedupNotNull(
context.relBuilder.projectExcept(_row_number_dedup_);
}

@Override
public RelNode visitMvExpand(MvExpand node, CalcitePlanContext context) {
visitChildren(node, context);
Field arrayField = node.getField();
RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context);

// pass the per-document limit into the builder so it can be applied inside the UNNEST inner
// query
buildMvExpandRelNode(
arrayFieldRex, arrayField.getField().toString(), null, node.getLimit(), context);

return context.relBuilder.peek();
}

@Override
public RelNode visitWindow(Window node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down Expand Up @@ -3067,7 +3087,7 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) {
RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context);
String alias = expand.getAlias();

buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, context);
buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, null, context);

return context.relBuilder.peek();
}
Expand Down Expand Up @@ -3315,46 +3335,51 @@ private void flattenParsedPattern(
projectPlusOverriding(fattenedNodes, projectNames, context);
}

private void buildExpandRelNode(
RexInputRef arrayFieldRex, String arrayFieldName, String alias, CalcitePlanContext context) {
// 3. Capture the outer row in a CorrelationId
Holder<RexCorrelVariable> correlVariable = Holder.empty();
context.relBuilder.variable(correlVariable::set);
// New generic helper: builds Uncollect + Correlate using a provided left node (so caller
// can ensure left rowType is fixed).
private void buildUnnestForLeft(
RelNode leftBuilt,
RelDataType leftRowType,
int arrayFieldIndex,
String arrayFieldName,
String alias,
Holder<RexCorrelVariable> correlVariable,
RexNode correlArrayFieldAccess,
Integer mvExpandLimit,
CalcitePlanContext context) {

// 4. Create RexFieldAccess to access left node's array field with correlationId and build join
// left node
RexNode correlArrayFieldAccess =
context.relBuilder.field(
context.rexBuilder.makeCorrel(
context.relBuilder.peek().getRowType(), correlVariable.get().id),
arrayFieldRex.getIndex());
RelNode leftNode = context.relBuilder.build();
RelBuilder rb = context.relBuilder;
rb.push(LogicalValues.createOneRow(rb.getCluster()))
.project(List.of(correlArrayFieldAccess), List.of(arrayFieldName));
// apply per-document limit into the inner SELECT if provided
if (mvExpandLimit != null && mvExpandLimit > 0) {
rb.limit(0, mvExpandLimit);
}
RelNode rightNode = rb.uncollect(List.of(), false).build();

// 5. Build join right node and expand the array field using uncollect
RelNode rightNode =
context
.relBuilder
// fake input, see convertUnnest and convertExpression in Calcite SqlToRelConverter
.push(LogicalValues.createOneRow(context.relBuilder.getCluster()))
.project(List.of(correlArrayFieldAccess), List.of(arrayFieldName))
.uncollect(List.of(), false)
.build();

// 6. Perform a nested-loop join (correlate) between the original table and the expanded
// array field.
// The last parameter has to refer to the array to be expanded on the left side. It will
// be used by the right side to correlate with the left side.
// Compute required column ref against leftBuilt's row type (robust)
RexNode requiredColumnRef =
context.rexBuilder.makeInputRef(leftBuilt.getRowType(), arrayFieldIndex);

// Correlate leftBuilt and rightNode using the proper required column ref
context
.relBuilder
.push(leftNode)
.push(leftBuilt)
.push(rightNode)
.correlate(JoinRelType.INNER, correlVariable.get().id, List.of(arrayFieldRex))
// 7. Remove the original array field from the output.
// TODO: RFC: should we keep the original array field when alias is present?
.projectExcept(arrayFieldRex);
.correlate(JoinRelType.INNER, correlVariable.get().id, List.of(requiredColumnRef));

// Remove the original array field from the output by name if possible
RexNode toRemove;
try {
toRemove = context.relBuilder.field(arrayFieldName);
} catch (Exception e) {
// Fallback in case name lookup fails
toRemove = requiredColumnRef;
}
context.relBuilder.projectExcept(toRemove);

// Optional rename into alias (preserve the original logic)
if (alias != null) {
// Sub-nested fields cannot be removed after renaming the nested field.
tryToRemoveNestedFields(context);
RexInputRef expandedField = context.relBuilder.field(arrayFieldName);
List<String> names = new ArrayList<>(context.relBuilder.peek().getRowType().getFieldNames());
Expand All @@ -3363,6 +3388,74 @@ private void buildExpandRelNode(
}
}

private void buildExpandRelNode(
RexNode arrayFieldRexNode,
String arrayFieldName,
String alias,
Integer mvExpandLimit,
CalcitePlanContext context) {

// Convert incoming RexNode to RexInputRef when possible; otherwise resolve by field name.
RexInputRef arrayFieldRex;
if (arrayFieldRexNode instanceof RexInputRef) {
arrayFieldRex = (RexInputRef) arrayFieldRexNode;
} else {
RelDataType currentRowType = context.relBuilder.peek().getRowType();
RelDataTypeField fld = currentRowType.getField(arrayFieldName, false, false);
if (fld != null) {
arrayFieldRex = context.rexBuilder.makeInputRef(currentRowType, fld.getIndex());
} else {
throw new IllegalArgumentException(
"buildExpandRelNode: expected RexInputRef or resolvable field name: " + arrayFieldName);
}
}

// Capture left node and its schema BEFORE calling build()
RelNode leftNode = context.relBuilder.peek();
RelDataType leftRowType = leftNode.getRowType();

// Resolve the array field index in left schema by name (robust); fallback to original index
RelDataTypeField leftField = leftRowType.getField(arrayFieldName, false, false);
int arrayFieldIndexInLeft =
(leftField != null) ? leftField.getIndex() : arrayFieldRex.getIndex();

// Create correlation variable while left is still on the builder stack
Holder<RexCorrelVariable> correlVariable = Holder.empty();
context.relBuilder.variable(correlVariable::set);

// Create correlated field access while left is still on the builder stack
RexNode correlArrayFieldAccess =
context.relBuilder.field(
context.rexBuilder.makeCorrel(leftRowType, correlVariable.get().id),
arrayFieldIndexInLeft);

// Materialize leftBuilt
RelNode leftBuilt = context.relBuilder.build();

// Use unified helper to build right/uncollect + correlate + cleanup
buildUnnestForLeft(
leftBuilt,
leftRowType,
arrayFieldIndexInLeft,
arrayFieldName,
alias,
correlVariable,
correlArrayFieldAccess,
mvExpandLimit,
context);
}

private void buildMvExpandRelNode(
RexInputRef arrayFieldRex,
String arrayFieldName,
String alias,
Integer mvExpandLimit,
CalcitePlanContext context) {

// Delegate to the canonical expand implementation (pass the per-document limit through).
buildExpandRelNode(arrayFieldRex, arrayFieldName, alias, mvExpandLimit, context);
}

/** Creates an optimized sed call using native Calcite functions */
private RexNode createOptimizedSedCall(
RexNode fieldRex, String sedExpression, CalcitePlanContext context) {
Expand Down
4 changes: 2 additions & 2 deletions docs/category.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
"user/ppl/cmd/syntax.rst",
"user/ppl/cmd/chart.rst",
"user/ppl/cmd/timechart.rst",
"user/ppl/cmd/search.rst",
"user/ppl/functions/statistical.rst",
"user/ppl/cmd/top.rst",
"user/ppl/cmd/trendline.rst",
Expand All @@ -68,7 +67,8 @@
"user/ppl/functions/string.rst",
"user/ppl/functions/conversion.rst",
"user/ppl/general/datatypes.rst",
"user/ppl/general/identifiers.rst"
"user/ppl/general/identifiers.rst",
"user/ppl/cmd/mvexpand.rst"
],
"bash_settings": [
"user/ppl/admin/settings.rst"
Expand Down
Loading
Loading