Skip to content

Commit 23407cb

Browse files
authored
Fix pushing down filter with nested filed of the text type (#3645)
* Fix pushing down filter with nested filed of the text type Signed-off-by: Heng Qian <qianheng@amazon.com> * fix UT Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent 730f066 commit 23407cb

File tree

6 files changed

+85
-46
lines changed

6 files changed

+85
-46
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
setup:
2+
- do:
3+
query.settings:
4+
body:
5+
transient:
6+
plugins.calcite.enabled : true
7+
plugins.calcite.fallback.allowed : false
8+
9+
---
10+
teardown:
11+
- do:
12+
query.settings:
13+
body:
14+
transient:
15+
plugins.calcite.enabled : false
16+
plugins.calcite.fallback.allowed : true
17+
18+
---
19+
"Push down filter with nested field":
20+
- skip:
21+
features:
22+
- headers
23+
- do:
24+
bulk:
25+
index: test
26+
refresh: true
27+
body:
28+
- '{"index": {}}'
29+
- '{"log": {"url": {"message": "/e2e/h/zap"} } }'
30+
- do:
31+
headers:
32+
Content-Type: 'application/json'
33+
ppl:
34+
body:
35+
query: 'source=test | where log.url.message = "/e2e/h/zap"'
36+
- match: {"total": 1}
37+
- match: {"schema": [{"name": "log", "type": "struct"}]}
38+
- match: {"datarows": [[{"url": {"message": "/e2e/h/zap"}}]]}

opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
import org.opensearch.search.aggregations.support.ValueType;
5454
import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder;
5555
import org.opensearch.search.sort.SortOrder;
56-
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
56+
import org.opensearch.sql.data.type.ExprType;
5757
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.NamedFieldExpression;
5858
import org.opensearch.sql.opensearch.response.agg.CompositeAggregationParser;
5959
import org.opensearch.sql.opensearch.response.agg.MetricParser;
@@ -107,14 +107,14 @@ private AggregateAnalyzer() {}
107107
public static Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser> analyze(
108108
Aggregate aggregate,
109109
List<String> schema,
110-
Map<String, OpenSearchDataType> typeMapping,
110+
Map<String, ExprType> fieldTypes,
111111
List<String> outputFields)
112112
throws ExpressionNotAnalyzableException {
113113
requireNonNull(aggregate, "aggregate");
114114
try {
115115
List<Integer> groupList = aggregate.getGroupSet().asList();
116116
FieldExpressionCreator fieldExpressionCreator =
117-
fieldIndex -> new NamedFieldExpression(fieldIndex, schema, typeMapping);
117+
fieldIndex -> new NamedFieldExpression(fieldIndex, schema, fieldTypes);
118118
// Process all aggregate calls
119119
Pair<Builder, List<MetricParser>> builderAndParser =
120120
processAggregateCalls(
@@ -261,8 +261,7 @@ private static CompositeValuesSourceBuilder<?> createTermsSourceBuilder(
261261
.field(groupExpr.getReferenceForTermQuery());
262262

263263
// Time types values are converted to LONG in ExpressionAggregationScript::execute
264-
if (List.of(TIMESTAMP, TIME, DATE)
265-
.contains(groupExpr.getOpenSearchDataType().getExprCoreType())) {
264+
if (List.of(TIMESTAMP, TIME, DATE).contains(groupExpr.getExprType())) {
266265
sourceBuilder.userValuetypeHint(ValueType.LONG);
267266
}
268267

opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import org.opensearch.index.query.QueryBuilder;
6666
import org.opensearch.index.query.RangeQueryBuilder;
6767
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
68-
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
68+
import org.opensearch.sql.data.type.ExprType;
6969
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType.MappingType;
7070
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
7171

@@ -117,18 +117,18 @@ private PredicateAnalyzer() {}
117117
*
118118
* @param expression expression to analyze
119119
* @param schema current schema of scan operator
120-
* @param typeMapping mapping of OpenSearch field name to OpenSearchDataType
120+
* @param filedTypes mapping of OpenSearch field name to ExprType, nested fields are flattened
121121
* @return search query which can be used to query OS cluster
122122
* @throws ExpressionNotAnalyzableException when expression can't processed by this analyzer
123123
*/
124124
public static QueryBuilder analyze(
125-
RexNode expression, List<String> schema, Map<String, OpenSearchDataType> typeMapping)
125+
RexNode expression, List<String> schema, Map<String, ExprType> filedTypes)
126126
throws ExpressionNotAnalyzableException {
127127
requireNonNull(expression, "expression");
128128
try {
129129
// visits expression tree
130130
QueryExpression queryExpression =
131-
(QueryExpression) expression.accept(new Visitor(schema, typeMapping));
131+
(QueryExpression) expression.accept(new Visitor(schema, filedTypes));
132132

133133
if (queryExpression != null && queryExpression.isPartial()) {
134134
throw new UnsupportedOperationException(
@@ -145,17 +145,17 @@ public static QueryBuilder analyze(
145145
private static class Visitor extends RexVisitorImpl<Expression> {
146146

147147
List<String> schema;
148-
Map<String, OpenSearchDataType> typeMapping;
148+
Map<String, ExprType> filedTypes;
149149

150-
private Visitor(List<String> schema, Map<String, OpenSearchDataType> typeMapping) {
150+
private Visitor(List<String> schema, Map<String, ExprType> filedTypes) {
151151
super(true);
152152
this.schema = schema;
153-
this.typeMapping = typeMapping;
153+
this.filedTypes = filedTypes;
154154
}
155155

156156
@Override
157157
public Expression visitInputRef(RexInputRef inputRef) {
158-
return new NamedFieldExpression(inputRef, schema, typeMapping);
158+
return new NamedFieldExpression(inputRef, schema, filedTypes);
159159
}
160160

161161
@Override
@@ -973,12 +973,11 @@ static boolean isCastExpression(Expression exp) {
973973
static final class NamedFieldExpression implements TerminalExpression {
974974

975975
private final String name;
976-
private final OpenSearchDataType type;
976+
private final ExprType type;
977977

978-
NamedFieldExpression(
979-
int refIndex, List<String> schema, Map<String, OpenSearchDataType> typeMapping) {
978+
NamedFieldExpression(int refIndex, List<String> schema, Map<String, ExprType> filedTypes) {
980979
this.name = refIndex >= schema.size() ? null : schema.get(refIndex);
981-
this.type = typeMapping.get(name);
980+
this.type = filedTypes.get(name);
982981
}
983982

984983
private NamedFieldExpression() {
@@ -987,10 +986,10 @@ private NamedFieldExpression() {
987986
}
988987

989988
private NamedFieldExpression(
990-
RexInputRef ref, List<String> schema, Map<String, OpenSearchDataType> typeMapping) {
989+
RexInputRef ref, List<String> schema, Map<String, ExprType> filedTypes) {
991990
this.name =
992991
(ref == null || ref.getIndex() >= schema.size()) ? null : schema.get(ref.getIndex());
993-
this.type = typeMapping.get(name);
992+
this.type = filedTypes.get(name);
994993
}
995994

996995
private NamedFieldExpression(RexLiteral literal) {
@@ -1002,12 +1001,12 @@ String getRootName() {
10021001
return name;
10031002
}
10041003

1005-
OpenSearchDataType getOpenSearchDataType() {
1004+
ExprType getExprType() {
10061005
return type;
10071006
}
10081007

10091008
boolean isTextType() {
1010-
return type != null && type.getMappingType() == OpenSearchDataType.MappingType.Text;
1009+
return type != null && type instanceof OpenSearchTextType;
10111010
}
10121011

10131012
String toKeywordSubField() {

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.opensearch.search.aggregations.AggregationBuilder;
3030
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
3131
import org.opensearch.sql.common.setting.Settings;
32+
import org.opensearch.sql.data.type.ExprType;
3233
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
3334
import org.opensearch.sql.opensearch.planner.physical.EnumerableIndexScanRule;
3435
import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules;
@@ -87,9 +88,9 @@ public CalciteLogicalIndexScan pushDownFilter(Filter filter) {
8788
try {
8889
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(filter.getRowType());
8990
List<String> schema = this.getRowType().getFieldNames();
90-
Map<String, OpenSearchDataType> typeMapping = this.osIndex.getFieldOpenSearchTypes();
91+
Map<String, ExprType> filedTypes = this.osIndex.getFieldTypes();
9192
QueryBuilder filterBuilder =
92-
PredicateAnalyzer.analyze(filter.getCondition(), schema, typeMapping);
93+
PredicateAnalyzer.analyze(filter.getCondition(), schema, filedTypes);
9394
newScan.pushDownContext.add(
9495
PushDownAction.of(
9596
PushDownType.FILTER,
@@ -133,10 +134,10 @@ public CalciteLogicalIndexScan pushDownAggregate(Aggregate aggregate) {
133134
try {
134135
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(aggregate.getRowType());
135136
List<String> schema = this.getRowType().getFieldNames();
136-
Map<String, OpenSearchDataType> typeMapping = this.osIndex.getFieldOpenSearchTypes();
137+
Map<String, ExprType> fieldTypes = this.osIndex.getFieldTypes();
137138
List<String> outputFields = aggregate.getRowType().getFieldNames();
138139
final Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser> aggregationBuilder =
139-
AggregateAnalyzer.analyze(aggregate, schema, typeMapping, outputFields);
140+
AggregateAnalyzer.analyze(aggregate, schema, fieldTypes, outputFields);
140141
Map<String, OpenSearchDataType> extendedTypeMapping =
141142
aggregate.getRowType().getFieldList().stream()
142143
.collect(

opensearch/src/test/java/org/opensearch/sql/opensearch/request/AggregateAnalyzerTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.commons.lang3.tuple.Pair;
2828
import org.junit.jupiter.api.Test;
2929
import org.opensearch.search.aggregations.AggregationBuilder;
30+
import org.opensearch.sql.data.type.ExprType;
3031
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
3132
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType.MappingType;
3233
import org.opensearch.sql.opensearch.request.AggregateAnalyzer.ExpressionNotAnalyzableException;
@@ -41,7 +42,7 @@ class AggregateAnalyzerTest {
4142

4243
private final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
4344
private final List<String> schema = List.of("a", "b", "c");
44-
final Map<String, OpenSearchDataType> typeMapping =
45+
final Map<String, ExprType> fieldTypes =
4546
Map.of(
4647
"a",
4748
OpenSearchDataType.of(MappingType.Integer),
@@ -124,7 +125,7 @@ void analyze_aggCall_simple() throws ExpressionNotAnalyzableException {
124125
createMockAggregate(
125126
List.of(countCall, avgCall, sumCall, minCall, maxCall), ImmutableBitSet.of());
126127
Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser> result =
127-
AggregateAnalyzer.analyze(aggregate, schema, typeMapping, outputFields);
128+
AggregateAnalyzer.analyze(aggregate, schema, fieldTypes, outputFields);
128129
assertEquals(
129130
"[{\"cnt\":{\"value_count\":{\"field\":\"_index\"}}},"
130131
+ " {\"avg\":{\"avg\":{\"field\":\"a\"}}},"
@@ -204,7 +205,7 @@ void analyze_aggCall_extended() throws ExpressionNotAnalyzableException {
204205
createMockAggregate(
205206
List.of(varSampCall, varPopCall, stddevSampCall, stddevPopCall), ImmutableBitSet.of());
206207
Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser> result =
207-
AggregateAnalyzer.analyze(aggregate, schema, typeMapping, outputFields);
208+
AggregateAnalyzer.analyze(aggregate, schema, fieldTypes, outputFields);
208209
assertEquals(
209210
"[{\"var_samp\":{\"extended_stats\":{\"field\":\"a\",\"sigma\":2.0}}},"
210211
+ " {\"var_pop\":{\"extended_stats\":{\"field\":\"a\",\"sigma\":2.0}}},"
@@ -242,7 +243,7 @@ void analyze_groupBy() throws ExpressionNotAnalyzableException {
242243
List<String> outputFields = List.of("a", "b", "cnt");
243244
Aggregate aggregate = createMockAggregate(List.of(aggCall), ImmutableBitSet.of(0, 1));
244245
Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser> result =
245-
AggregateAnalyzer.analyze(aggregate, schema, typeMapping, outputFields);
246+
AggregateAnalyzer.analyze(aggregate, schema, fieldTypes, outputFields);
246247

247248
assertEquals(
248249
"[{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":["
@@ -282,7 +283,7 @@ void analyze_aggCall_TextWithoutKeyword() {
282283
ExpressionNotAnalyzableException exception =
283284
assertThrows(
284285
ExpressionNotAnalyzableException.class,
285-
() -> AggregateAnalyzer.analyze(aggregate, schema, typeMapping, List.of("sum")));
286+
() -> AggregateAnalyzer.analyze(aggregate, schema, fieldTypes, List.of("sum")));
286287
assertEquals("[field] must not be null: [sum]", exception.getCause().getMessage());
287288
}
288289

@@ -306,7 +307,7 @@ void analyze_groupBy_TextWithoutKeyword() {
306307
ExpressionNotAnalyzableException exception =
307308
assertThrows(
308309
ExpressionNotAnalyzableException.class,
309-
() -> AggregateAnalyzer.analyze(aggregate, schema, typeMapping, outputFields));
310+
() -> AggregateAnalyzer.analyze(aggregate, schema, fieldTypes, outputFields));
310311
assertEquals("[field] must not be null", exception.getCause().getMessage());
311312
}
312313

0 commit comments

Comments
 (0)