Skip to content

Commit 3d548b9

Browse files
authored
Support push down sort on aggregation measure for more than one agg calls (opensearch-project#4759)
Signed-off-by: Lantao Jin <[email protected]>
1 parent 5523932 commit 3d548b9

31 files changed

+254
-146
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.plan;
7+
8+
import org.apache.calcite.plan.Contexts;
9+
import org.apache.calcite.plan.RelRule;
10+
import org.apache.calcite.rel.core.RelFactories;
11+
import org.apache.calcite.tools.RelBuilderFactory;
12+
import org.immutables.value.Value;
13+
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
14+
15+
public interface OpenSearchRuleConfig extends RelRule.Config {
16+
17+
/** Return a custom RelBuilderFactory for creating OpenSearchRelBuilder */
18+
@Override
19+
@Value.Default
20+
default RelBuilderFactory relBuilderFactory() {
21+
return CalciteToolsHelper.proto(Contexts.of(RelFactories.DEFAULT_STRUCT));
22+
}
23+
}

core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggGroupMergeRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec
105105

106106
/** Rule configuration. */
107107
@Value.Immutable
108-
public interface Config extends RelRule.Config {
108+
public interface Config extends OpenSearchRuleConfig {
109109
Config GROUP_MERGE =
110110
ImmutablePPLAggGroupMergeRule.Config.builder()
111111
.build()

core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ private RexNode aliasMaybe(RelBuilder builder, RexNode node, String alias) {
240240

241241
/** Rule configuration. */
242242
@Value.Immutable
243-
public interface Config extends RelRule.Config {
243+
public interface Config extends OpenSearchRuleConfig {
244244
Config SUM_CONVERTER =
245245
ImmutablePPLAggregateConvertRule.Config.builder()
246246
.build()

core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.apache.calcite.tools.FrameworkConfig;
8484
import org.apache.calcite.tools.Frameworks;
8585
import org.apache.calcite.tools.RelBuilder;
86+
import org.apache.calcite.tools.RelBuilderFactory;
8687
import org.apache.calcite.tools.RelRunner;
8788
import org.apache.calcite.util.Holder;
8889
import org.apache.calcite.util.Util;
@@ -127,6 +128,10 @@ public static Connection connect(FrameworkConfig config, JavaTypeFactory typeFac
127128
}
128129
}
129130

131+
public static RelBuilderFactory proto(final Context context) {
132+
return (cluster, schema) -> new OpenSearchRelBuilder(context, cluster, schema);
133+
}
134+
130135
/**
131136
* This method copied from {@link Frameworks#withPrepare(FrameworkConfig,
132137
* Frameworks.BasePrepareAction)}. The purpose is the method {@link

integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ public void test() throws IOException {
7070
}
7171
logger.info("Running Query{}", i);
7272
String ppl = sanitize(loadFromFile("clickbench/queries/q" + i + ".ppl"));
73-
timing(summary, "q" + i, ppl);
7473
// V2 gets unstable scripts, ignore them when comparing plan
7574
if (isCalciteEnabled()) {
7675
String expected = loadExpectedPlan("clickbench/q" + i + ".yaml");
7776
assertYamlEqualsIgnoreId(expected, explainQueryYaml(ppl));
7877
}
78+
timing(summary, "q" + i, ppl);
7979
}
8080
}
8181
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,24 @@ public void testExplainSortOnMeasureMultiTermsWithScript() throws IOException {
11861186
+ " sort `count()`"));
11871187
}
11881188

1189+
@Test
1190+
public void testExplainSortOnMeasureComplex() throws IOException {
1191+
enabledOnlyWhenPushdownIsEnabled();
1192+
String expected = loadExpectedPlan("explain_agg_sort_on_measure_complex1.yaml");
1193+
assertYamlEqualsIgnoreId(
1194+
expected,
1195+
explainQueryYaml(
1196+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false sum(balance),"
1197+
+ " count() as c, dc(employer) by state | sort - c"));
1198+
expected = loadExpectedPlan("explain_agg_sort_on_measure_complex2.yaml");
1199+
assertYamlEqualsIgnoreId(
1200+
expected,
1201+
explainQueryYaml(
1202+
"source=opensearch-sql_test_index_account | eval new_state = lower(state) | stats"
1203+
+ " bucket_nullable=false sum(balance), count(), dc(employer) as d by gender,"
1204+
+ " new_state | sort - d"));
1205+
}
1206+
11891207
@Test
11901208
public void testExplainCompositeMultiBucketsAutoDateThenSortOnMeasureNotPushdown()
11911209
throws IOException {
@@ -1238,15 +1256,15 @@ public void testExplainCompositeRangeAutoDateThenSortOnMeasureNotPushdown() thro
12381256
}
12391257

12401258
@Test
1241-
public void testExplainMultipleAggregatorsWithSortOnOneMeasureNotPushDown() throws IOException {
1259+
public void testExplainMultipleCollationsWithSortOnOneMeasureNotPushDown() throws IOException {
12421260
enabledOnlyWhenPushdownIsEnabled();
12431261
String expected =
12441262
loadExpectedPlan("explain_multiple_agg_with_sort_on_one_measure_not_push1.yaml");
12451263
assertYamlEqualsIgnoreId(
12461264
expected,
12471265
explainQueryYaml(
12481266
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c,"
1249-
+ " sum(balance) as s by state | sort c"));
1267+
+ " sum(balance) as s by state | sort c, state"));
12501268
expected = loadExpectedPlan("explain_multiple_agg_with_sort_on_one_measure_not_push2.yaml");
12511269
assertYamlEqualsIgnoreId(
12521270
expected,
@@ -1255,6 +1273,17 @@ public void testExplainMultipleAggregatorsWithSortOnOneMeasureNotPushDown() thro
12551273
+ " sum(balance) as s by state | sort c, s"));
12561274
}
12571275

1276+
@Test
1277+
public void testExplainSortOnMeasureMultiBucketsNotMultiTermsNotPushDown() throws IOException {
1278+
enabledOnlyWhenPushdownIsEnabled();
1279+
String expected = loadExpectedPlan("explain_agg_sort_on_measure_multi_buckets_not_pushed.yaml");
1280+
assertYamlEqualsIgnoreId(
1281+
expected,
1282+
explainQueryYaml(
1283+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c,"
1284+
+ " sum(balance) as s by state, span(age, 5) | sort c"));
1285+
}
1286+
12581287
@Test
12591288
public void testExplainEvalMax() throws IOException {
12601289
String expected = loadExpectedPlan("explain_eval_max.json");

integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,4 +1174,55 @@ public void testStatsSpanSortOnMeasureMultiTermsWithScript() throws IOException
11741174
resetQueryBucketSize();
11751175
}
11761176
}
1177+
1178+
@Test
1179+
public void testStatsSortOnMeasureComplex() throws IOException {
1180+
try {
1181+
setQueryBucketSize(5);
1182+
JSONObject response =
1183+
executeQuery(
1184+
String.format(
1185+
"source=%s | stats bucket_nullable=false sum(balance), count() as c, dc(employer)"
1186+
+ " as d by state | sort - c | head 5",
1187+
TEST_INDEX_ACCOUNT));
1188+
verifySchema(
1189+
response,
1190+
schema("sum(balance)", null, "bigint"),
1191+
schema("c", null, "bigint"),
1192+
schema("d", null, "bigint"),
1193+
schema("state", null, "string"));
1194+
System.out.println(response);
1195+
verifyDataRows(
1196+
response,
1197+
rows(782199, 30, 30, "TX"),
1198+
rows(732523, 28, 28, "MD"),
1199+
rows(657957, 27, 27, "ID"),
1200+
rows(541575, 25, 25, "ME"),
1201+
rows(643489, 25, 25, "AL"));
1202+
response =
1203+
executeQuery(
1204+
String.format(
1205+
"source=%s | eval new_state = lower(state) | stats bucket_nullable=false"
1206+
+ " sum(balance), count() as c, dc(employer) as d by gender, new_state | sort"
1207+
+ " - d | head 5",
1208+
TEST_INDEX_ACCOUNT));
1209+
verifySchema(
1210+
response,
1211+
schema("sum(balance)", null, "bigint"),
1212+
schema("c", null, "bigint"),
1213+
schema("d", null, "bigint"),
1214+
schema("gender", null, "string"),
1215+
schema("new_state", null, "string"));
1216+
System.out.println(response);
1217+
verifyDataRows(
1218+
response,
1219+
rows(484567, 18, 18, "M", "md"),
1220+
rows(376394, 17, 17, "M", "id"),
1221+
rows(505688, 17, 17, "F", "tx"),
1222+
rows(375409, 16, 16, "M", "me"),
1223+
rows(432776, 15, 15, "M", "ok"));
1224+
} finally {
1225+
resetQueryBucketSize();
1226+
}
1227+
}
11771228
}

integ-test/src/test/resources/expectedOutput/calcite/clickbench/q10.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,4 @@ calcite:
88
LogicalFilter(condition=[IS NOT NULL($68)])
99
CalciteLogicalIndexScan(table=[[OpenSearch, hits]])
1010
physical: |
11-
EnumerableLimit(fetch=[10000])
12-
EnumerableLimit(fetch=[10])
13-
EnumerableSort(sort0=[$1], dir0=[DESC-nulls-last])
14-
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},sum(AdvEngineID)=SUM($0),c=COUNT(),avg(ResolutionWidth)=AVG($2),dc(UserID)=COUNT(DISTINCT $3)), PROJECT->[sum(AdvEngineID), c, avg(ResolutionWidth), dc(UserID), RegionID]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"RegionID":{"terms":{"field":"RegionID","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"sum(AdvEngineID)":{"sum":{"field":"AdvEngineID"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"dc(UserID)":{"cardinality":{"field":"UserID"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},sum(AdvEngineID)=SUM($0),c=COUNT(),avg(ResolutionWidth)=AVG($2),dc(UserID)=COUNT(DISTINCT $3)), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[sum(AdvEngineID), c, avg(ResolutionWidth), dc(UserID), RegionID], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"RegionID":{"terms":{"field":"RegionID","size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"sum(AdvEngineID)":{"sum":{"field":"AdvEngineID"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"dc(UserID)":{"cardinality":{"field":"UserID"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/clickbench/q23.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,4 @@ calcite:
99
LogicalFilter(condition=[AND(ILIKE($97, '%Google%', '\'), <>($63, ''), NOT(ILIKE($26, '%.google.%', '\')))])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, hits]])
1111
physical: |
12-
EnumerableLimit(fetch=[10000])
13-
EnumerableLimit(fetch=[10])
14-
EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
15-
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[URL, SearchPhrase, UserID, Title], FILTER->AND(ILIKE($3, '%Google%', '\'), <>($1, ''), NOT(ILIKE($0, '%.google.%', '\'))), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},c=COUNT(),dc(UserID)=COUNT(DISTINCT $1)), PROJECT->[c, dc(UserID), SearchPhrase]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"wildcard":{"Title":{"wildcard":"*Google*","case_insensitive":true,"boost":1.0}}},{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"bool":{"must_not":[{"wildcard":{"URL":{"wildcard":"*.google.*","case_insensitive":true,"boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["URL","SearchPhrase","UserID","Title"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"SearchPhrase":{"terms":{"field":"SearchPhrase","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"dc(UserID)":{"cardinality":{"field":"UserID"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[URL, SearchPhrase, UserID, Title], FILTER->AND(ILIKE($3, '%Google%', '\'), <>($1, ''), NOT(ILIKE($0, '%.google.%', '\'))), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},c=COUNT(),dc(UserID)=COUNT(DISTINCT $1)), SORT_AGG_METRICS->[1 DESC LAST], PROJECT->[c, dc(UserID), SearchPhrase], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"wildcard":{"Title":{"wildcard":"*Google*","case_insensitive":true,"boost":1.0}}},{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"bool":{"must_not":[{"wildcard":{"URL":{"wildcard":"*.google.*","case_insensitive":true,"boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["URL","SearchPhrase","UserID","Title"],"excludes":[]},"aggregations":{"SearchPhrase":{"terms":{"field":"SearchPhrase","size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"dc(UserID)":{"cardinality":{"field":"UserID"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/clickbench/q31.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,4 @@ calcite:
99
LogicalFilter(condition=[<>($63, '')])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, hits]])
1111
physical: |
12-
EnumerableLimit(fetch=[10000])
13-
EnumerableLimit(fetch=[10])
14-
EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
15-
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[SearchPhrase, SearchEngineID, IsRefresh, ClientIP, ResolutionWidth], FILTER->AND(<>($0, ''), IS NOT NULL($1), IS NOT NULL($3)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},c=COUNT(),sum(IsRefresh)=SUM($2),avg(ResolutionWidth)=AVG($3)), PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), SearchEngineID, ClientIP]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"SearchEngineID","boost":1.0}},{"exists":{"field":"ClientIP","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["SearchPhrase","SearchEngineID","IsRefresh","ClientIP","ResolutionWidth"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"SearchEngineID":{"terms":{"field":"SearchEngineID","missing_bucket":false,"order":"asc"}}},{"ClientIP":{"terms":{"field":"ClientIP","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[SearchPhrase, SearchEngineID, IsRefresh, ClientIP, ResolutionWidth], FILTER->AND(<>($0, ''), IS NOT NULL($1), IS NOT NULL($3)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},c=COUNT(),sum(IsRefresh)=SUM($2),avg(ResolutionWidth)=AVG($3)), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), SearchEngineID, ClientIP], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"SearchEngineID","boost":1.0}},{"exists":{"field":"ClientIP","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["SearchPhrase","SearchEngineID","IsRefresh","ClientIP","ResolutionWidth"],"excludes":[]},"aggregations":{"SearchEngineID|ClientIP":{"multi_terms":{"terms":[{"field":"SearchEngineID"},{"field":"ClientIP"}],"size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)