Skip to content

Commit 3a3c8c8

Browse files
authored
Support appendpipecommand in PPL (#4602)
* add demos Signed-off-by: xinyual <[email protected]> * add missing column Signed-off-by: xinyual <[email protected]> * add appendpipe poc Signed-off-by: xinyual <[email protected]> * slighty change syntax Signed-off-by: xinyual <[email protected]> * add unresolved plan Signed-off-by: xinyual <[email protected]> * add IT Signed-off-by: xinyual <[email protected]> * add tests Signed-off-by: xinyual <[email protected]> * remove useless ut Signed-off-by: xinyual <[email protected]> * fix conflict Signed-off-by: xinyual <[email protected]> * remove useless code Signed-off-by: xinyual <[email protected]> * remove useless code Signed-off-by: xinyual <[email protected]> * remove useless code Signed-off-by: xinyual <[email protected]> * apply spotless Signed-off-by: xinyual <[email protected]> * remove useless chaneg Signed-off-by: xinyual <[email protected]> * add explain IT Signed-off-by: xinyual <[email protected]> * fix IT Signed-off-by: xinyual <[email protected]> * apply spotless Signed-off-by: xinyual <[email protected]> * add doc Signed-off-by: xinyual <[email protected]> * optimize doc Signed-off-by: xinyual <[email protected]> * add UT Signed-off-by: xinyual <[email protected]> * fix IT due to performance change Signed-off-by: xinyual <[email protected]> * add multiply children check Signed-off-by: xinyual <[email protected]> --------- Signed-off-by: xinyual <[email protected]>
1 parent 20f2234 commit 3a3c8c8

File tree

17 files changed

+408
-1
lines changed

17 files changed

+408
-1
lines changed

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.opensearch.sql.ast.tree.Aggregation;
6161
import org.opensearch.sql.ast.tree.Append;
6262
import org.opensearch.sql.ast.tree.AppendCol;
63+
import org.opensearch.sql.ast.tree.AppendPipe;
6364
import org.opensearch.sql.ast.tree.Bin;
6465
import org.opensearch.sql.ast.tree.Chart;
6566
import org.opensearch.sql.ast.tree.CloseCursor;
@@ -833,6 +834,11 @@ public LogicalPlan visitAppendCol(AppendCol node, AnalysisContext context) {
833834
throw getOnlyForCalciteException("Appendcol");
834835
}
835836

837+
@Override
838+
public LogicalPlan visitAppendPipe(AppendPipe node, AnalysisContext context) {
839+
throw getOnlyForCalciteException("AppendPipe");
840+
}
841+
836842
@Override
837843
public LogicalPlan visitAppend(Append node, AnalysisContext context) {
838844
throw getOnlyForCalciteException("Append");

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.opensearch.sql.ast.tree.Aggregation;
4949
import org.opensearch.sql.ast.tree.Append;
5050
import org.opensearch.sql.ast.tree.AppendCol;
51+
import org.opensearch.sql.ast.tree.AppendPipe;
5152
import org.opensearch.sql.ast.tree.Bin;
5253
import org.opensearch.sql.ast.tree.Chart;
5354
import org.opensearch.sql.ast.tree.CloseCursor;
@@ -140,6 +141,10 @@ public T visitSearch(Search node, C context) {
140141
return visitChildren(node, context);
141142
}
142143

144+
public T visitAppendPipe(AppendPipe node, C context) {
145+
return visitChildren(node, context);
146+
}
147+
143148
public T visitFilter(Filter node, C context) {
144149
return visitChildren(node, context);
145150
}

core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.sql.ast.expression.WindowFunction;
5050
import org.opensearch.sql.ast.expression.Xor;
5151
import org.opensearch.sql.ast.tree.Aggregation;
52+
import org.opensearch.sql.ast.tree.AppendPipe;
5253
import org.opensearch.sql.ast.tree.Bin;
5354
import org.opensearch.sql.ast.tree.CountBin;
5455
import org.opensearch.sql.ast.tree.Dedupe;
@@ -563,6 +564,11 @@ public static Trendline trendline(
563564
return new Trendline(sortField, Arrays.asList(computations)).attach(input);
564565
}
565566

567+
public static AppendPipe appendPipe(UnresolvedPlan input, UnresolvedPlan subquery) {
568+
569+
return new AppendPipe(subquery).attach(input);
570+
}
571+
566572
public static Trendline.TrendlineComputation computation(
567573
Integer numDataPoints, Field dataField, String alias, Trendline.TrendlineType type) {
568574
return new Trendline.TrendlineComputation(numDataPoints, dataField, alias, type);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.Setter;
13+
import lombok.ToString;
14+
import org.opensearch.sql.ast.AbstractNodeVisitor;
15+
16+
@Getter
17+
@Setter
18+
@ToString
19+
@EqualsAndHashCode(callSuper = false)
20+
public class AppendPipe extends UnresolvedPlan {
21+
22+
private UnresolvedPlan subQuery;
23+
24+
private UnresolvedPlan child;
25+
26+
public AppendPipe(UnresolvedPlan subQuery) {
27+
this.subQuery = subQuery;
28+
}
29+
30+
@Override
31+
public AppendPipe attach(UnresolvedPlan child) {
32+
this.child = child;
33+
return this;
34+
}
35+
36+
@Override
37+
public List<UnresolvedPlan> getChild() {
38+
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
39+
}
40+
41+
@Override
42+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
43+
return nodeVisitor.visitAppendPipe(this, context);
44+
}
45+
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import org.opensearch.sql.ast.tree.Aggregation;
106106
import org.opensearch.sql.ast.tree.Append;
107107
import org.opensearch.sql.ast.tree.AppendCol;
108+
import org.opensearch.sql.ast.tree.AppendPipe;
108109
import org.opensearch.sql.ast.tree.Bin;
109110
import org.opensearch.sql.ast.tree.Chart;
110111
import org.opensearch.sql.ast.tree.CloseCursor;
@@ -246,6 +247,28 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
246247
return context.relBuilder.peek();
247248
}
248249

250+
@Override
251+
public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) {
252+
visitChildren(node, context);
253+
UnresolvedPlan subqueryPlan = node.getSubQuery();
254+
UnresolvedPlan childNode = subqueryPlan;
255+
while (childNode.getChild() != null
256+
&& !childNode.getChild().isEmpty()
257+
&& !(childNode.getChild().getFirst() instanceof Values)) {
258+
if (childNode.getChild().size() > 1) {
259+
throw new RuntimeException("AppendPipe doesn't support multiply children subquery.");
260+
}
261+
childNode = (UnresolvedPlan) childNode.getChild().getFirst();
262+
}
263+
childNode.attach(node.getChild().getFirst());
264+
265+
subqueryPlan.accept(this, context);
266+
267+
RelNode subPipelineNode = context.relBuilder.build();
268+
RelNode mainNode = context.relBuilder.build();
269+
return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context);
270+
}
271+
249272
@Override
250273
public RelNode visitRegex(Regex node, CalcitePlanContext context) {
251274
visitChildren(node, context);
@@ -2121,9 +2144,13 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) {
21212144
// 3. Merge two query schemas using shared logic
21222145
RelNode subsearchNode = context.relBuilder.build();
21232146
RelNode mainNode = context.relBuilder.build();
2147+
return mergeTableAndResolveColumnConflict(mainNode, subsearchNode, context);
2148+
}
21242149

2150+
private RelNode mergeTableAndResolveColumnConflict(
2151+
RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) {
21252152
// Use shared schema merging logic that handles type conflicts via field renaming
2126-
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subsearchNode);
2153+
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subqueryNode);
21272154
List<RelNode> projectedNodes =
21282155
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context);
21292156

docs/user/ppl/cmd/appendpipe.rst

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
=========
2+
appendpipe
3+
=========
4+
5+
.. rubric:: Table of contents
6+
7+
.. contents::
8+
:local:
9+
:depth: 2
10+
11+
12+
Description
13+
============
14+
| Using ``appendpipe`` command to appends the result of the subpipeline to the search results. Unlike a subsearch, the subpipeline is not run first.The subpipeline is run when the search reaches the appendpipe command.
15+
The command aligns columns with the same field names and types. For different column fields between the main search and sub-search, NULL values are filled in the respective rows.
16+
17+
Version
18+
=======
19+
3.3.0
20+
21+
Syntax
22+
============
23+
appendpipe [<subpipeline>]
24+
25+
* subpipeline: mandatory. A list of commands that are applied to the search results from the commands that occur in the search before the ``appendpipe`` command.
26+
27+
Example 1: Append rows from a total count to existing search result
28+
====================================================================================
29+
30+
This example appends rows from "total by gender" to "sum by gender, state" with merged column of same field name and type.
31+
32+
PPL query::
33+
34+
os> source=accounts | stats sum(age) as part by gender, state | sort -part | head 5 | appendpipe [ stats sum(part) as total by gender ];
35+
fetched rows / total rows = 6/6
36+
+------+--------+-------+-------+
37+
| part | gender | state | total |
38+
|------+--------+-------+-------|
39+
| 36 | M | TN | null |
40+
| 33 | M | MD | null |
41+
| 32 | M | IL | null |
42+
| 28 | F | VA | null |
43+
| null | F | null | 28 |
44+
| null | M | null | 101 |
45+
+------+--------+-------+-------+
46+
47+
48+
49+
Example 2: Append rows with merged column names
50+
===============================================================
51+
52+
This example appends rows from "count by gender" to "sum by gender, state".
53+
54+
PPL query::
55+
56+
os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender ];
57+
fetched rows / total rows = 6/6
58+
+----------+--------+-------+
59+
| total | gender | state |
60+
|----------+--------+-------|
61+
| 36 | M | TN |
62+
| 33 | M | MD |
63+
| 32 | M | IL |
64+
| 28 | F | VA |
65+
| 28 | F | null |
66+
| 101 | M | null |
67+
+----------+--------+-------+
68+
69+
Limitations
70+
===========
71+
72+
* **Schema Compatibility**: Same as command ``append``, when fields with the same name exist between the main search and sub-search but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns).

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -894,6 +894,18 @@ public void testExplainAppendCommand() throws IOException {
894894
TEST_INDEX_BANK)));
895895
}
896896

897+
@Test
898+
public void testExplainAppendPipeCommand() throws IOException {
899+
String expected = loadExpectedPlan("explain_appendpipe_command.json");
900+
assertJsonEqualsIgnoreId(
901+
expected,
902+
explainQueryToString(
903+
String.format(
904+
Locale.ROOT,
905+
"source=%s | appendpipe [ stats count(balance) as cnt by gender ]",
906+
TEST_INDEX_BANK)));
907+
}
908+
897909
@Test
898910
public void testMvjoinExplain() throws IOException {
899911
String query =
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.remote;
7+
8+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
9+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
10+
import static org.opensearch.sql.util.MatcherUtils.rows;
11+
import static org.opensearch.sql.util.MatcherUtils.schema;
12+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
13+
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;
14+
15+
import java.io.IOException;
16+
import java.util.Locale;
17+
import org.json.JSONObject;
18+
import org.junit.Test;
19+
import org.opensearch.sql.ppl.PPLIntegTestCase;
20+
21+
public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase {
22+
@Override
23+
public void init() throws Exception {
24+
super.init();
25+
enableCalcite();
26+
loadIndex(Index.ACCOUNT);
27+
loadIndex(Index.BANK);
28+
}
29+
30+
@Test
31+
public void testAppendPipe() throws IOException {
32+
JSONObject actual =
33+
executeQuery(
34+
String.format(
35+
Locale.ROOT,
36+
"source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ "
37+
+ " sort -sum_age_by_gender ] |"
38+
+ " head 5",
39+
TEST_INDEX_ACCOUNT));
40+
verifySchemaInOrder(actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string"));
41+
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(15224, "M"), rows(14947, "F"));
42+
}
43+
44+
@Test
45+
public void testAppendDifferentIndex() throws IOException {
46+
JSONObject actual =
47+
executeQuery(
48+
String.format(
49+
Locale.ROOT,
50+
"source=%s | stats sum(age) as sum by gender | append [ source=%s | stats"
51+
+ " sum(age) as bank_sum_age ]",
52+
TEST_INDEX_ACCOUNT,
53+
TEST_INDEX_BANK));
54+
verifySchemaInOrder(
55+
actual,
56+
schema("sum", "bigint"),
57+
schema("gender", "string"),
58+
schema("bank_sum_age", "bigint"));
59+
verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238));
60+
}
61+
62+
@Test
63+
public void testAppendpipeWithMergedColumn() throws IOException {
64+
JSONObject actual =
65+
executeQuery(
66+
String.format(
67+
Locale.ROOT,
68+
"source=%s | stats sum(age) as sum by gender |"
69+
+ " appendpipe [ stats sum(sum) as sum ] | head 5",
70+
TEST_INDEX_ACCOUNT,
71+
TEST_INDEX_ACCOUNT));
72+
verifySchemaInOrder(actual, schema("sum", "bigint"), schema("gender", "string"));
73+
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(30171, null));
74+
}
75+
76+
@Test
77+
public void testAppendpipeWithConflictTypeColumn() throws IOException {
78+
Exception exception =
79+
assertThrows(
80+
Exception.class,
81+
() ->
82+
executeQuery(
83+
String.format(
84+
Locale.ROOT,
85+
"source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum ="
86+
+ " cast(sum as double) ] | head 5",
87+
TEST_INDEX_ACCOUNT)));
88+
assertTrue(exception.getMessage().contains("due to incompatible types"));
89+
}
90+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
4+
"physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..12=[{inputs}], expr#13=[null:BIGINT], proj#0..13=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},cnt=COUNT($1)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"balance\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
4+
"physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[null:BIGINT], proj#0..12=[{exprs}], cnt=[$t19])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n EnumerableAggregate(group=[{4}], cnt=[COUNT($7)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n"
5+
}
6+
}

0 commit comments

Comments
 (0)