Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_DEDUP;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_MAIN;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_RARE_TOP;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_STREAMSTATS;
Expand Down Expand Up @@ -1317,7 +1318,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
: duplicatedFieldNames.stream()
.map(a -> (RexNode) context.relBuilder.field(a))
.toList();
buildDedupNotNull(context, dedupeFields, allowedDuplication);
buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
}
context.relBuilder.join(
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
Expand Down Expand Up @@ -1373,7 +1374,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
List<RexNode> dedupeFields =
getRightColumnsInJoinCriteria(context.relBuilder, joinCondition);

buildDedupNotNull(context, dedupeFields, allowedDuplication);
buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
}
context.relBuilder.join(
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
Expand Down Expand Up @@ -1538,15 +1539,15 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) {
if (keepEmpty) {
buildDedupOrNull(context, dedupeFields, allowedDuplication);
} else {
buildDedupNotNull(context, dedupeFields, allowedDuplication);
buildDedupNotNull(context, dedupeFields, allowedDuplication, false);
}
return context.relBuilder.peek();
}

private static void buildDedupOrNull(
CalcitePlanContext context, List<RexNode> dedupeFields, Integer allowedDuplication) {
/*
* | dedup 2 a, b keepempty=false
* | dedup 2 a, b keepempty=true
* DropColumns('_row_number_dedup_)
* +- Filter ('_row_number_dedup_ <= n OR isnull('a) OR isnull('b))
* +- Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST, specifiedwindowoundedpreceding$(), currentrow$())) AS _row_number_dedup_], ['a, 'b], ['a ASC NULLS FIRST, 'b ASC NULLS FIRST]
Expand Down Expand Up @@ -1578,7 +1579,10 @@ private static void buildDedupOrNull(
}

private static void buildDedupNotNull(
CalcitePlanContext context, List<RexNode> dedupeFields, Integer allowedDuplication) {
CalcitePlanContext context,
List<RexNode> dedupeFields,
Integer allowedDuplication,
boolean fromJoinMaxOption) {
/*
* | dedup 2 a, b keepempty=false
* DropColumns('_row_number_dedup_)
Expand All @@ -1588,6 +1592,8 @@ private static void buildDedupNotNull(
* +- ...
*/
// Filter (isnotnull('a) AND isnotnull('b))
String rowNumberAlias =
fromJoinMaxOption ? ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP : ROW_NUMBER_COLUMN_FOR_DEDUP;
context.relBuilder.filter(
context.relBuilder.and(dedupeFields.stream().map(context.relBuilder::isNotNull).toList()));
// Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST,
Expand All @@ -1601,15 +1607,15 @@ private static void buildDedupNotNull(
.partitionBy(dedupeFields)
.orderBy(dedupeFields)
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(ROW_NUMBER_COLUMN_FOR_DEDUP);
.as(rowNumberAlias);
context.relBuilder.projectPlus(rowNumber);
RexNode _row_number_dedup_ = context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_DEDUP);
RexNode rowNumberField = context.relBuilder.field(rowNumberAlias);
// Filter ('_row_number_dedup_ <= n)
context.relBuilder.filter(
context.relBuilder.lessThanOrEqual(
_row_number_dedup_, context.relBuilder.literal(allowedDuplication)));
rowNumberField, context.relBuilder.literal(allowedDuplication)));
// DropColumns('_row_number_dedup_)
context.relBuilder.projectExcept(_row_number_dedup_);
context.relBuilder.projectExcept(rowNumberField);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public interface PlanUtils {
/** this is only for dedup command, do not reuse it in other command */
String ROW_NUMBER_COLUMN_FOR_DEDUP = "_row_number_dedup_";

String ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP = "_row_number_join_max_dedup_";
String ROW_NUMBER_COLUMN_FOR_RARE_TOP = "_row_number_rare_top_";
String ROW_NUMBER_COLUMN_FOR_MAIN = "_row_number_main_";
String ROW_NUMBER_COLUMN_FOR_SUBSEARCH = "_row_number_subsearch_";
Expand Down
8 changes: 4 additions & 4 deletions docs/user/ppl/interfaces/endpoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ Explain query::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X POST localhost:9200/_plugins/_ppl/_explain?format=extended \
... -d '{"query" : "source=state_country | where age>30 | dedup age"}'
... -d '{"query" : "source=state_country | where age>30"}'
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5])\n LogicalFilter(condition=[<=($12, 1)])\n LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5], _id=[$6], _index=[$7], _score=[$8], _maxscore=[$9], _sort=[$10], _routing=[$11], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $5 ORDER BY $5)])\n LogicalFilter(condition=[IS NOT NULL($5)])\n LogicalFilter(condition=[>($5, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, state_country]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..6=[{inputs}], expr#7=[1], expr#8=[<=($t6, $t7)], proj#0..5=[{exprs}], $condition=[$t8])\n EnumerableWindow(window#0=[window(partition {5} order by [5] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, state_country]], PushDownContext=[[PROJECT->[name, country, state, month, year, age], FILTER->>($5, 30)], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"name\",\"country\",\"state\",\"month\",\"year\",\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n",
"extended": "public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {\n final org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan v1stashed = (org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan) root.get(\"v1stashed\");\n int prevStart;\n int prevEnd;\n final java.util.Comparator comparator = new java.util.Comparator(){\n public int compare(Object[] v0, Object[] v1) {\n final int c;\n c = org.apache.calcite.runtime.Utilities.compareNullsLast((Long) v0[5], (Long) v1[5]);\n if (c != 0) {\n return c;\n }\n return 0;\n }\n\n public int compare(Object o0, Object o1) {\n return this.compare((Object[]) o0, (Object[]) o1);\n }\n\n };\n final org.apache.calcite.runtime.SortedMultiMap multiMap = new org.apache.calcite.runtime.SortedMultiMap();\n v1stashed.scan().foreach(new org.apache.calcite.linq4j.function.Function1() {\n public Object apply(Object[] v) {\n Long key = (Long) v[5];\n multiMap.putMulti(key, v);\n return null;\n }\n public Object apply(Object v) {\n return apply(\n (Object[]) v);\n }\n }\n );\n final java.util.Iterator iterator = multiMap.arrays(comparator);\n final java.util.ArrayList _list = new java.util.ArrayList(\n multiMap.size());\n Long a0w0 = (Long) null;\n while (iterator.hasNext()) {\n final Object[] _rows = (Object[]) iterator.next();\n prevStart = -1;\n prevEnd = 2147483647;\n for (int i = 0; i < _rows.length; (++i)) {\n final Object[] row = (Object[]) _rows[i];\n if (i != prevEnd) {\n int actualStart = i < prevEnd ? 0 : prevEnd + 1;\n prevEnd = i;\n a0w0 = Long.valueOf(((Number)org.apache.calcite.linq4j.tree.Primitive.of(long.class).numberValueRoundDown((i - 0 + 1))).longValue());\n }\n _list.add(new Object[] {\n row[0],\n row[1],\n row[2],\n row[3],\n row[4],\n row[5],\n a0w0});\n }\n }\n multiMap.clear();\n final org.apache.calcite.linq4j.Enumerable _inputEnumerable = org.apache.calcite.linq4j.Linq4j.asEnumerable(_list);\n final org.apache.calcite.linq4j.AbstractEnumerable child = new org.apache.calcite.linq4j.AbstractEnumerable(){\n public org.apache.calcite.linq4j.Enumerator enumerator() {\n return new org.apache.calcite.linq4j.Enumerator(){\n public final org.apache.calcite.linq4j.Enumerator inputEnumerator = _inputEnumerable.enumerator();\n public void reset() {\n inputEnumerator.reset();\n }\n\n public boolean moveNext() {\n while (inputEnumerator.moveNext()) {\n if (org.apache.calcite.runtime.SqlFunctions.toLong(((Object[]) inputEnumerator.current())[6]) <= $L4J$C$_Number_org_apache_calcite_linq4j_tree_Primitive_of_long_class_358aa52b) {\n return true;\n }\n }\n return false;\n }\n\n public void close() {\n inputEnumerator.close();\n }\n\n public Object current() {\n final Object[] current = (Object[]) inputEnumerator.current();\n final Object input_value = current[0];\n final Object input_value0 = current[1];\n final Object input_value1 = current[2];\n final Object input_value2 = current[3];\n final Object input_value3 = current[4];\n final Object input_value4 = current[5];\n return new Object[] {\n input_value,\n input_value0,\n input_value1,\n input_value2,\n input_value3,\n input_value4};\n }\n\n static final long $L4J$C$_Number_org_apache_calcite_linq4j_tree_Primitive_of_long_class_358aa52b = ((Number)org.apache.calcite.linq4j.tree.Primitive.of(long.class).numberValueRoundDown(1)).longValue();\n };\n }\n\n };\n return child.take(10000);\n}\n\n\npublic Class getElementType() {\n return java.lang.Object[].class;\n}\n\n\n"
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5])\n LogicalFilter(condition=[>($5, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, state_country]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, state_country]], PushDownContext=[[PROJECT->[name, country, state, month, year, age], FILTER->>($5, 30), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"name\",\"country\",\"state\",\"month\",\"year\",\"age\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n",
"extended": "public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {\n final org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan v1stashed = (org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan) root.get(\"v1stashed\");\n return v1stashed.scan();\n}\n\n\npublic Class getElementType() {\n return java.lang.Object[].class;\n}\n\n\n"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,29 @@ public void supportSearchSargPushDown_timeRange() throws IOException {
}

// Only for Calcite
@Ignore("https://github.com/opensearch-project/OpenSearch/issues/3725")
@Test
public void testJoinWithCriteriaAndMaxOption() throws IOException {
// TODO could be optimized with https://github.com/opensearch-project/OpenSearch/issues/3725
enabledOnlyWhenPushdownIsEnabled();
String query =
"source=opensearch-sql_test_index_bank | join max=1 left=l right=r on"
+ " l.account_number=r.account_number opensearch-sql_test_index_bank";
var result = explainQueryToString(query);
String expected = loadExpectedPlan("explain_join_with_criteria_max_option.json");
assertJsonEqualsIgnoreId(expected, result);
var result = explainQueryYaml(query);
String expected = loadExpectedPlan("explain_join_with_criteria_max_option.yaml");
assertYamlEqualsIgnoreId(expected, result);
}

// Only for Calcite
@Ignore("https://github.com/opensearch-project/OpenSearch/issues/3725")
@Test
public void testJoinWithFieldListAndMaxOption() throws IOException {
// TODO could be optimized with https://github.com/opensearch-project/OpenSearch/issues/3725
enabledOnlyWhenPushdownIsEnabled();
String query =
"source=opensearch-sql_test_index_bank | join type=inner max=1 account_number"
+ " opensearch-sql_test_index_bank";
var result = explainQueryToString(query);
String expected = loadExpectedPlan("explain_join_with_fields_max_option.json");
assertJsonEqualsIgnoreId(expected, result);
var result = explainQueryYaml(query);
String expected = loadExpectedPlan("explain_join_with_fields_max_option.yaml");
assertYamlEqualsIgnoreId(expected, result);
}

// Only for Calcite
Expand Down
11 changes: 9 additions & 2 deletions integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public void testStatsByTimeSpan() throws IOException {
TEST_INDEX_BANK)));
}

@Ignore("https://github.com/opensearch-project/OpenSearch/issues/3725")
@Test
public void testDedupPushdown() throws IOException {
String expected = loadExpectedPlan("explain_dedup_push.json");
assertJsonEqualsIgnoreId(
Expand All @@ -504,7 +504,7 @@ public void testDedupKeepEmptyTruePushdown() throws IOException {
+ " | dedup gender KEEPEMPTY=true"));
}

@Ignore("https://github.com/opensearch-project/OpenSearch/issues/3725")
@Test
public void testDedupKeepEmptyFalsePushdown() throws IOException {
String expected = loadExpectedPlan("explain_dedup_keepempty_false_push.json");
assertJsonEqualsIgnoreId(
Expand All @@ -514,6 +514,13 @@ public void testDedupKeepEmptyFalsePushdown() throws IOException {
+ " | dedup gender KEEPEMPTY=false"));
}

@Test
public void testDedupUnsupportedTypeNotPushdown() throws IOException {
String expected = loadExpectedPlan("explain_dedup_unsupported_type_no_push.yaml");
assertYamlEqualsIgnoreId(
expected, explainQueryYaml(String.format("source=%s | dedup male", TEST_INDEX_BANK)));
}

@Test
public void testSingleFieldRelevanceQueryFunctionExplain() throws IOException {
enabledOnlyWhenPushdownIsEnabled();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])
LogicalFilter(condition=[<=($19, 1)])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $12 ORDER BY $12)])
LogicalFilter(condition=[IS NOT NULL($12)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..13=[{inputs}], expr#14=[1], expr#15=[<=($t13, $t14)], proj#0..12=[{exprs}], $condition=[$t15])
EnumerableWindow(window#0=[window(partition {12} order by [12] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], FILTER->IS NOT NULL($12)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"exists":{"field":"male","boost":1.0}},"_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Loading
Loading