Skip to content

Commit ca5a5bd

Browse files
authored
Support multisearch command in calcite (#4332)
--------- Signed-off-by: Kai Huang <[email protected]>
1 parent e050689 commit ca5a5bd

33 files changed

+1794
-56
lines changed

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.opensearch.sql.ast.tree.Limit;
7676
import org.opensearch.sql.ast.tree.Lookup;
7777
import org.opensearch.sql.ast.tree.ML;
78+
import org.opensearch.sql.ast.tree.Multisearch;
7879
import org.opensearch.sql.ast.tree.Paginate;
7980
import org.opensearch.sql.ast.tree.Parse;
8081
import org.opensearch.sql.ast.tree.Patterns;
@@ -820,6 +821,11 @@ public LogicalPlan visitAppend(Append node, AnalysisContext context) {
820821
throw getOnlyForCalciteException("Append");
821822
}
822823

824+
@Override
825+
public LogicalPlan visitMultisearch(Multisearch node, AnalysisContext context) {
826+
throw getOnlyForCalciteException("Multisearch");
827+
}
828+
823829
private LogicalSort buildSort(
824830
LogicalPlan child, AnalysisContext context, Integer count, List<Field> sortFields) {
825831
ExpressionReferenceOptimizer optimizer =

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.opensearch.sql.ast.tree.Limit;
6464
import org.opensearch.sql.ast.tree.Lookup;
6565
import org.opensearch.sql.ast.tree.ML;
66+
import org.opensearch.sql.ast.tree.Multisearch;
6667
import org.opensearch.sql.ast.tree.Paginate;
6768
import org.opensearch.sql.ast.tree.Parse;
6869
import org.opensearch.sql.ast.tree.Patterns;
@@ -431,4 +432,8 @@ public T visitAppendCol(AppendCol node, C context) {
431432
public T visitAppend(Append node, C context) {
432433
return visitChildren(node, context);
433434
}
435+
436+
public T visitMultisearch(Multisearch node, C context) {
437+
return visitChildren(node, context);
438+
}
434439
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.ToString;
13+
import org.opensearch.sql.ast.AbstractNodeVisitor;
14+
15+
/** Logical plan node for Multisearch operation. Combines results from multiple search queries. */
16+
@Getter
17+
@ToString
18+
@EqualsAndHashCode(callSuper = false)
19+
public class Multisearch extends UnresolvedPlan {
20+
21+
private UnresolvedPlan child;
22+
private final List<UnresolvedPlan> subsearches;
23+
24+
public Multisearch(List<UnresolvedPlan> subsearches) {
25+
this.subsearches = subsearches;
26+
}
27+
28+
@Override
29+
public Multisearch attach(UnresolvedPlan child) {
30+
this.child = child;
31+
return this;
32+
}
33+
34+
@Override
35+
public List<UnresolvedPlan> getChild() {
36+
if (this.child == null) {
37+
return ImmutableList.copyOf(subsearches);
38+
} else {
39+
return ImmutableList.<UnresolvedPlan>builder().add(this.child).addAll(subsearches).build();
40+
}
41+
}
42+
43+
@Override
44+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
45+
return nodeVisitor.visitMultisearch(this, context);
46+
}
47+
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.calcite.rel.hint.RelHint;
4949
import org.apache.calcite.rel.logical.LogicalAggregate;
5050
import org.apache.calcite.rel.logical.LogicalValues;
51+
import org.apache.calcite.rel.type.RelDataType;
5152
import org.apache.calcite.rel.type.RelDataTypeField;
5253
import org.apache.calcite.rex.RexCall;
5354
import org.apache.calcite.rex.RexCorrelVariable;
@@ -60,7 +61,6 @@
6061
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
6162
import org.apache.calcite.sql.type.SqlTypeFamily;
6263
import org.apache.calcite.sql.type.SqlTypeName;
63-
import org.apache.calcite.sql.validate.SqlValidatorUtil;
6464
import org.apache.calcite.tools.RelBuilder;
6565
import org.apache.calcite.tools.RelBuilder.AggCall;
6666
import org.apache.calcite.util.Holder;
@@ -111,6 +111,7 @@
111111
import org.opensearch.sql.ast.tree.Lookup;
112112
import org.opensearch.sql.ast.tree.Lookup.OutputStrategy;
113113
import org.opensearch.sql.ast.tree.ML;
114+
import org.opensearch.sql.ast.tree.Multisearch;
114115
import org.opensearch.sql.ast.tree.Paginate;
115116
import org.opensearch.sql.ast.tree.Parse;
116117
import org.opensearch.sql.ast.tree.Patterns;
@@ -1649,65 +1650,73 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) {
16491650
node.getSubSearch().accept(new EmptySourcePropagateVisitor(), null);
16501651
prunedSubSearch.accept(this, context);
16511652

1652-
// 3. Merge two query schemas
1653+
// 3. Merge two query schemas using shared logic
16531654
RelNode subsearchNode = context.relBuilder.build();
16541655
RelNode mainNode = context.relBuilder.build();
1655-
List<RelDataTypeField> mainFields = mainNode.getRowType().getFieldList();
1656-
List<RelDataTypeField> subsearchFields = subsearchNode.getRowType().getFieldList();
1657-
Map<String, RelDataTypeField> subsearchFieldMap =
1658-
subsearchFields.stream()
1659-
.map(typeField -> Pair.of(typeField.getName(), typeField))
1660-
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
1661-
boolean[] isSelected = new boolean[subsearchFields.size()];
1662-
List<String> names = new ArrayList<>();
1663-
List<RexNode> mainUnionProjects = new ArrayList<>();
1664-
List<RexNode> subsearchUnionProjects = new ArrayList<>();
1665-
1666-
// 3.1 Start with main query's schema. If subsearch plan doesn't have matched column,
1667-
// add same type column in place with NULL literal
1668-
for (int i = 0; i < mainFields.size(); i++) {
1669-
mainUnionProjects.add(context.rexBuilder.makeInputRef(mainNode, i));
1670-
RelDataTypeField mainField = mainFields.get(i);
1671-
RelDataTypeField subsearchField = subsearchFieldMap.get(mainField.getName());
1672-
names.add(mainField.getName());
1673-
if (subsearchFieldMap.containsKey(mainField.getName())
1674-
&& subsearchField != null
1675-
&& subsearchField.getType().equals(mainField.getType())) {
1676-
subsearchUnionProjects.add(
1677-
context.rexBuilder.makeInputRef(subsearchNode, subsearchField.getIndex()));
1678-
isSelected[subsearchField.getIndex()] = true;
1679-
} else {
1680-
subsearchUnionProjects.add(context.rexBuilder.makeNullLiteral(mainField.getType()));
1681-
}
1656+
1657+
// Use shared schema merging logic that handles type conflicts via field renaming
1658+
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subsearchNode);
1659+
List<RelNode> projectedNodes =
1660+
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context);
1661+
1662+
// 4. Union the projected plans
1663+
for (RelNode projectedNode : projectedNodes) {
1664+
context.relBuilder.push(projectedNode);
16821665
}
1666+
context.relBuilder.union(true);
1667+
return context.relBuilder.peek();
1668+
}
16831669

1684-
// 3.2 Add remaining subsearch columns to the merged schema
1685-
for (int j = 0; j < subsearchFields.size(); j++) {
1686-
RelDataTypeField subsearchField = subsearchFields.get(j);
1687-
if (!isSelected[j]) {
1688-
mainUnionProjects.add(context.rexBuilder.makeNullLiteral(subsearchField.getType()));
1689-
subsearchUnionProjects.add(context.rexBuilder.makeInputRef(subsearchNode, j));
1690-
names.add(subsearchField.getName());
1670+
@Override
1671+
public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) {
1672+
List<RelNode> subsearchNodes = new ArrayList<>();
1673+
for (UnresolvedPlan subsearch : node.getSubsearches()) {
1674+
UnresolvedPlan prunedSubSearch = subsearch.accept(new EmptySourcePropagateVisitor(), null);
1675+
prunedSubSearch.accept(this, context);
1676+
subsearchNodes.add(context.relBuilder.build());
1677+
}
1678+
1679+
// Use shared schema merging logic that handles type conflicts via field renaming
1680+
List<RelNode> alignedNodes =
1681+
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(subsearchNodes, context);
1682+
1683+
for (RelNode alignedNode : alignedNodes) {
1684+
context.relBuilder.push(alignedNode);
1685+
}
1686+
context.relBuilder.union(true, alignedNodes.size());
1687+
1688+
RelDataType rowType = context.relBuilder.peek().getRowType();
1689+
String timestampField = findTimestampField(rowType);
1690+
if (timestampField != null) {
1691+
RelDataTypeField timestampFieldRef = rowType.getField(timestampField, false, false);
1692+
if (timestampFieldRef != null) {
1693+
RexNode timestampRef =
1694+
context.rexBuilder.makeInputRef(
1695+
context.relBuilder.peek(), timestampFieldRef.getIndex());
1696+
context.relBuilder.sort(context.relBuilder.desc(timestampRef));
16911697
}
16921698
}
16931699

1694-
// 3.3 Uniquify names in case the merged names have duplicates
1695-
List<String> uniqNames =
1696-
SqlValidatorUtil.uniquify(names, SqlValidatorUtil.EXPR_SUGGESTER, true);
1697-
1698-
// 4. Apply new schema over two query plans
1699-
RelNode projectedMainNode =
1700-
context.relBuilder.push(mainNode).project(mainUnionProjects, uniqNames).build();
1701-
RelNode projectedSubsearchNode =
1702-
context.relBuilder.push(subsearchNode).project(subsearchUnionProjects, uniqNames).build();
1703-
1704-
// 5. Union all two projected plans
1705-
context.relBuilder.push(projectedMainNode);
1706-
context.relBuilder.push(projectedSubsearchNode);
1707-
context.relBuilder.union(true);
17081700
return context.relBuilder.peek();
17091701
}
17101702

1703+
/**
1704+
* Finds the timestamp field for multisearch ordering.
1705+
*
1706+
* @param rowType The row type to search for timestamp fields
1707+
* @return The name of the timestamp field, or null if not found
1708+
*/
1709+
private String findTimestampField(RelDataType rowType) {
1710+
String[] candidates = {"@timestamp", "_time", "timestamp", "time"};
1711+
for (String fieldName : candidates) {
1712+
RelDataTypeField field = rowType.getField(fieldName, false, false);
1713+
if (field != null) {
1714+
return fieldName;
1715+
}
1716+
}
1717+
return null;
1718+
}
1719+
17111720
/*
17121721
* Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta
17131722
*/
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite;
7+
8+
import java.util.ArrayList;
9+
import java.util.HashMap;
10+
import java.util.HashSet;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Set;
14+
import java.util.stream.Collectors;
15+
import org.apache.calcite.rel.RelNode;
16+
import org.apache.calcite.rel.type.RelDataType;
17+
import org.apache.calcite.rel.type.RelDataTypeField;
18+
import org.apache.calcite.rex.RexNode;
19+
import org.apache.calcite.sql.validate.SqlValidatorUtil;
20+
21+
/**
22+
* Utility class for unifying schemas across multiple RelNodes with type conflict resolution. Uses
23+
* the same strategy as append command - renames conflicting fields to avoid type conflicts.
24+
*/
25+
public class SchemaUnifier {
26+
27+
/**
28+
* Builds a unified schema for multiple nodes with type conflict resolution.
29+
*
30+
* @param nodes List of RelNodes to unify schemas for
31+
* @param context Calcite plan context
32+
* @return List of projected RelNodes with unified schema
33+
*/
34+
public static List<RelNode> buildUnifiedSchemaWithConflictResolution(
35+
List<RelNode> nodes, CalcitePlanContext context) {
36+
if (nodes.isEmpty()) {
37+
return new ArrayList<>();
38+
}
39+
40+
if (nodes.size() == 1) {
41+
return nodes;
42+
}
43+
44+
// Step 1: Build the unified schema by processing all nodes
45+
List<SchemaField> unifiedSchema = buildUnifiedSchema(nodes);
46+
47+
// Step 2: Create projections for each node to align with unified schema
48+
List<RelNode> projectedNodes = new ArrayList<>();
49+
List<String> fieldNames =
50+
unifiedSchema.stream().map(SchemaField::getName).collect(Collectors.toList());
51+
52+
for (RelNode node : nodes) {
53+
List<RexNode> projection = buildProjectionForNode(node, unifiedSchema, context);
54+
RelNode projectedNode = context.relBuilder.push(node).project(projection, fieldNames).build();
55+
projectedNodes.add(projectedNode);
56+
}
57+
58+
// Step 3: Unify names to handle type conflicts (this creates age0, age1, etc.)
59+
List<String> uniqueNames =
60+
SqlValidatorUtil.uniquify(fieldNames, SqlValidatorUtil.EXPR_SUGGESTER, true);
61+
62+
// Step 4: Re-project with unique names if needed
63+
if (!uniqueNames.equals(fieldNames)) {
64+
List<RelNode> renamedNodes = new ArrayList<>();
65+
for (RelNode node : projectedNodes) {
66+
RelNode renamedNode =
67+
context.relBuilder.push(node).project(context.relBuilder.fields(), uniqueNames).build();
68+
renamedNodes.add(renamedNode);
69+
}
70+
return renamedNodes;
71+
}
72+
73+
return projectedNodes;
74+
}
75+
76+
/**
77+
* Builds a unified schema by merging fields from all nodes. Fields with the same name but
78+
* different types are added as separate entries (which will be renamed during uniquification).
79+
*
80+
* @param nodes List of RelNodes to merge schemas from
81+
* @return List of SchemaField representing the unified schema (may contain duplicate names)
82+
*/
83+
private static List<SchemaField> buildUnifiedSchema(List<RelNode> nodes) {
84+
List<SchemaField> schema = new ArrayList<>();
85+
Map<String, Set<RelDataType>> seenFields = new HashMap<>();
86+
87+
for (RelNode node : nodes) {
88+
for (RelDataTypeField field : node.getRowType().getFieldList()) {
89+
String fieldName = field.getName();
90+
RelDataType fieldType = field.getType();
91+
92+
// Track which (name, type) combinations we've seen
93+
Set<RelDataType> typesForName = seenFields.computeIfAbsent(fieldName, k -> new HashSet<>());
94+
95+
if (!typesForName.contains(fieldType)) {
96+
// New field or same name with different type - add to schema
97+
schema.add(new SchemaField(fieldName, fieldType));
98+
typesForName.add(fieldType);
99+
}
100+
// If we've seen this exact (name, type) combination, skip it
101+
}
102+
}
103+
104+
return schema;
105+
}
106+
107+
/**
108+
* Builds a projection for a node to align with the unified schema. For each field in the unified
109+
* schema: - If the node has a matching field with the same type, use it - Otherwise, project NULL
110+
*
111+
* @param node The node to build projection for
112+
* @param unifiedSchema List of SchemaField representing the unified schema
113+
* @param context Calcite plan context
114+
* @return List of RexNode representing the projection
115+
*/
116+
private static List<RexNode> buildProjectionForNode(
117+
RelNode node, List<SchemaField> unifiedSchema, CalcitePlanContext context) {
118+
Map<String, RelDataTypeField> nodeFieldMap =
119+
node.getRowType().getFieldList().stream()
120+
.collect(Collectors.toMap(RelDataTypeField::getName, field -> field));
121+
122+
List<RexNode> projection = new ArrayList<>();
123+
for (SchemaField schemaField : unifiedSchema) {
124+
String fieldName = schemaField.getName();
125+
RelDataType expectedType = schemaField.getType();
126+
RelDataTypeField nodeField = nodeFieldMap.get(fieldName);
127+
128+
if (nodeField != null && nodeField.getType().equals(expectedType)) {
129+
// Field exists with matching type - use it
130+
projection.add(context.rexBuilder.makeInputRef(node, nodeField.getIndex()));
131+
} else {
132+
// Field missing or type mismatch - project NULL
133+
projection.add(context.rexBuilder.makeNullLiteral(expectedType));
134+
}
135+
}
136+
137+
return projection;
138+
}
139+
140+
/** Represents a field in the unified schema with name and type. */
141+
private static class SchemaField {
142+
private final String name;
143+
private final RelDataType type;
144+
145+
SchemaField(String name, RelDataType type) {
146+
this.name = name;
147+
this.type = type;
148+
}
149+
150+
String getName() {
151+
return name;
152+
}
153+
154+
RelDataType getType() {
155+
return type;
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)