Skip to content

Commit 69e230f

Browse files
authored
Making terms agg work with string (#19874)
Signed-off-by: expani <[email protected]>
1 parent 6e20f60 commit 69e230f

File tree

10 files changed

+30
-15
lines changed

10 files changed

+30
-15
lines changed

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.datafusion.search;
1010

11+
import org.apache.arrow.vector.util.Text;
1112
import org.apache.lucene.search.Collector;
1213
import org.apache.lucene.search.CollectorManager;
1314
import org.apache.lucene.search.FieldDoc;
@@ -807,4 +808,14 @@ public Map<String, Object[]> getDFResults() {
807808
return dfResults;
808809
}
809810

811+
@Override
812+
public Comparable<?> convertToComparable(Object rawValue) {
813+
if (rawValue instanceof Number) {
814+
return (Comparable) rawValue;
815+
} else if (rawValue instanceof Text) {
816+
return rawValue.toString();
817+
} else {
818+
throw new IllegalArgumentException("Conversion to Comparable not supported for type " + rawValue.getClass());
819+
}
820+
}
810821
}

server/src/main/java/org/opensearch/search/aggregations/ShardResultConvertor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,24 @@
88

99
package org.opensearch.search.aggregations;
1010

11+
import org.opensearch.search.internal.SearchContext;
12+
1113
import java.util.ArrayList;
1214
import java.util.List;
1315
import java.util.Map;
1416

1517
public interface ShardResultConvertor {
1618

17-
default List<InternalAggregation> convert(Map<String, Object[]> shardResult) {
19+
default List<InternalAggregation> convert(Map<String, Object[]> shardResult, SearchContext searchContext) {
1820
int rows = shardResult.entrySet().stream().findFirst().get().getValue().length;
1921
List<InternalAggregation> internalAggregations = new ArrayList<>();
2022
for (int i = 0; i < rows; i++) {
21-
internalAggregations.add(convertRow(shardResult, i));
23+
internalAggregations.add(convertRow(shardResult, i, searchContext));
2224
}
2325
return internalAggregations;
2426
}
2527

26-
default InternalAggregation convertRow(Map<String, Object[]> shardResult, int row) {
28+
default InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
2729
throw new UnsupportedOperationException("Row conversion not supported");
2830
}
2931

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ public void collect(int doc, long zeroBucket) throws IOException {
729729
}
730730

731731
@Override
732-
public List<InternalAggregation> convert(Map<String, Object[]> shardResult) {
732+
public List<InternalAggregation> convert(Map<String, Object[]> shardResult, SearchContext searchContext) {
733733
// Generate the composite keys
734734
List<Comparable> currentCompositeKey = new ArrayList<>(sourceConfigs.length);
735735
List<CompositeKey> compositeKeys = new ArrayList<>(shardResult.size());
@@ -740,7 +740,7 @@ public List<InternalAggregation> convert(Map<String, Object[]> shardResult) {
740740
}
741741
Object[] values = shardResult.get(sourceConfig.fieldType().name());
742742
// TODO : Would require conversion for certain types,
743-
currentCompositeKey.add((Comparable) values[i]);
743+
currentCompositeKey.add(searchContext.convertToComparable(values[i]));
744744
}
745745
compositeKeys.add(new CompositeKey(currentCompositeKey.toArray(new Comparable[0])));
746746
currentCompositeKey.clear();
@@ -754,7 +754,7 @@ public List<InternalAggregation> convert(Map<String, Object[]> shardResult) {
754754
throw new UnsupportedOperationException(String.format("Aggregation [%s] doesn't support shard result conversion Impl [%s]", subAgg.name(), subAgg.getClass().getName()));
755755
}
756756
ShardResultConvertor convertor = (ShardResultConvertor) subAgg;
757-
subAggs.add(convertor.convertRow(shardResult, row));
757+
subAggs.add(convertor.convertRow(shardResult, row, searchContext));
758758
}
759759
buckets.add(new InternalComposite.InternalBucket(
760760
sourceNames,

server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ public void collectStarTreeEntry(int starTreeEntryBit, long bucket) throws IOExc
282282
}
283283

284284
@Override
285-
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row) {
285+
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
286286
Object[] counts = shardResult.get(name + "_count");
287287
Object[] sums = shardResult.get(name + "_sum");
288288
return new InternalAvg(name, ((Number) sums[row]).doubleValue(), ((Number) sums[row]).longValue(), format, metadata());

server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public void doReset() {
285285
}
286286

287287
@Override
288-
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row) {
288+
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
289289
Object[] values = shardResult.get(name);
290290
return new InternalMax(name, ((Number) values[row]).doubleValue(), formatter, metadata());
291291
}

server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
276276
}
277277

278278
@Override
279-
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row) {
279+
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
280280
Object[] values = shardResult.get(name);
281281
return new InternalMin(name, ((Number) values[row]).doubleValue(), format, metadata());
282282
}

server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public void doClose() {
220220
}
221221

222222
@Override
223-
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row) {
223+
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
224224
Object[] values = shardResult.get(name);
225225
return new InternalSum(name, ((Number) values[row]).doubleValue(), format, metadata());
226226
}

server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
214214
}
215215

216216
@Override
217-
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row) {
217+
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
218218
Object[] values = shardResult.get(name);
219219
return new InternalValueCount(name, ((Number) values[row]).longValue(), metadata());
220220
}

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,4 +574,9 @@ public void setDFResults(Map<String, Object[]> dfResults) {
574574
public Map<String, Object[]> getDFResults() {
575575
return Collections.emptyMap();
576576
}
577+
578+
// TODO : This should be a part of mapper given by DataFormat or SearchEngine as related to Field type.
579+
public Comparable convertToComparable(Object rawValue) {
580+
throw new UnsupportedOperationException("Engine doesn't implement response value conversion");
581+
}
577582
}

server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323

2424
public class SearchEngineResultConversionUtils {
2525

26-
private static final Logger LOGGER = LogManager.getLogger(SearchEngineResultConversionUtils.class);
27-
2826
public static void convertDFResultGeneric(SearchContext searchContext) {
2927
if (searchContext.aggregations() != null) {
3028
Map<String, Object[]> dfResult = searchContext.getDFResults();
@@ -51,9 +49,8 @@ public static void convertDFResultGeneric(SearchContext searchContext) {
5149
}).toList();
5250

5351
InternalAggregations internalAggregations = InternalAggregations.from(
54-
shardResultConvertors.stream().flatMap(x -> x.convert(dfResult).stream()).collect(Collectors.toList())
52+
shardResultConvertors.stream().flatMap(x -> x.convert(dfResult, searchContext).stream()).collect(Collectors.toList())
5553
);
56-
// LOGGER.info("Internal Aggregations converted {}", internalAggregations.asMap());
5754
searchContext.queryResult().aggregations(internalAggregations);
5855
} catch (IOException e) {
5956
throw new RuntimeException(e);

0 commit comments

Comments
 (0)