Skip to content

Commit 93fcf57

Browse files
authored
Fixing end to end flow for pure aggregations (#19494)
* Integrate aggregators to convert result from datafusion Signed-off-by: expani <[email protected]> * Initialised bigArrays and queryCollManagers for DatafusionContext Signed-off-by: expani <[email protected]> * Refactored to set agg result within utility Signed-off-by: expani <[email protected]> --------- Signed-off-by: expani <[email protected]>
1 parent 0a9b038 commit 93fcf57

File tree

6 files changed

+48
-34
lines changed

6 files changed

+48
-34
lines changed

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.logging.log4j.Logger;
1717
import org.opensearch.action.search.SearchShardTask;
1818
import org.opensearch.common.lease.Releasables;
19+
import org.opensearch.common.util.BigArrays;
1920
import org.opensearch.datafusion.core.DefaultRecordBatchStream;
2021
import org.opensearch.datafusion.search.DatafusionContext;
2122
import org.opensearch.datafusion.search.DatafusionQuery;
@@ -33,6 +34,7 @@
3334
import org.opensearch.search.SearchShardTarget;
3435
import org.opensearch.search.aggregations.SearchResultsCollector;
3536
import org.opensearch.search.internal.ReaderContext;
37+
import org.opensearch.search.internal.SearchContext;
3638
import org.opensearch.search.internal.ShardSearchRequest;
3739
import org.opensearch.search.query.QueryPhaseExecutor;
3840
import org.opensearch.vectorized.execution.search.DataFormat;
@@ -57,7 +59,7 @@ public class DatafusionEngine extends SearchExecEngine<DatafusionContext, Datafu
5759

5860
public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot, DataFusionService dataFusionService) throws IOException {
5961
this.dataFormat = dataFormat;
60-
this.datafusionReaderManager = new DatafusionReaderManager("/Users/gbh/Downloads/res", formatCatalogSnapshot);
62+
this.datafusionReaderManager = new DatafusionReaderManager("/Users/anijainc/Desktop/BLRBackups/AOS_Search/Mustang/res", formatCatalogSnapshot);
6163
this.datafusionService = dataFusionService;
6264
}
6365

@@ -72,8 +74,8 @@ public QueryPhaseExecutor<DatafusionContext> getQueryPhaseExecutor() {
7274
}
7375

7476
@Override
75-
public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task) throws IOException {
76-
DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, searchShardTarget, task, this);
77+
public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task, BigArrays bigArrays) throws IOException {
78+
DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, searchShardTarget, task, this, bigArrays);
7779
// Parse source
7880
datafusionContext.datafusionQuery(new DatafusionQuery(request.source().queryPlanIR(), new ArrayList<>()));
7981
return datafusionContext;

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.opensearch.search.suggest.SuggestionSearchContext;
5858
import org.opensearch.vectorized.execution.search.spi.RecordBatchStream;
5959

60+
import java.util.HashMap;
6061
import java.util.List;
6162
import java.util.Map;
6263

@@ -77,6 +78,8 @@ public class DatafusionContext extends SearchContext {
7778
private DatafusionQuery datafusionQuery;
7879
private Map<String, Object[]> dfResults;
7980
private SearchContextAggregations aggregations;
81+
private final BigArrays bigArrays;
82+
private final Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> queryCollectorManagers = new HashMap<>();
8083

8184
/**
8285
* Constructor
@@ -90,7 +93,8 @@ public DatafusionContext(
9093
ShardSearchRequest request,
9194
SearchShardTarget searchShardTarget,
9295
SearchShardTask task,
93-
DatafusionEngine engine) {
96+
DatafusionEngine engine,
97+
BigArrays bigArrays) {
9498
this.readerContext = readerContext;
9599
this.indexShard = readerContext.indexShard();
96100
this.request = request;
@@ -108,6 +112,7 @@ public DatafusionContext(
108112
false, // reevaluate the usage
109113
false // specific to lucene
110114
);
115+
this.bigArrays = bigArrays;
111116
}
112117

113118
/**
@@ -383,7 +388,7 @@ public SimilarityService similarityService() {
383388

384389
@Override
385390
public BigArrays bigArrays() {
386-
return null;
391+
return bigArrays;
387392
}
388393

389394
@Override
@@ -736,7 +741,7 @@ public long getRelativeTimeInMillis() {
736741

737742
@Override
738743
public Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> queryCollectorManagers() {
739-
return Map.of();
744+
return queryCollectorManagers;
740745
}
741746

742747
@Override

server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.opensearch.action.search.SearchShardTask;
1212
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.common.util.BigArrays;
1314
import org.opensearch.search.SearchShardTarget;
1415
import org.opensearch.search.internal.ReaderContext;
1516
import org.opensearch.search.internal.SearchContext;
@@ -44,7 +45,7 @@ public abstract class SearchExecEngine<C extends SearchContext, S extends Engine
4445
/**
4546
* Create a search context for this engine
4647
*/
47-
public abstract C createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task) throws IOException;
48+
public abstract C createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task, BigArrays bigArrays) throws IOException;
4849

4950
/**
5051
* execute

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,15 +1281,15 @@ private SearchContext createContext(
12811281
boolean isStreamSearch,
12821282
SearchExecEngine searchExecEngine
12831283
) throws IOException {
1284-
//final DefaultSearchContext context = createSearchContext(readerContext, request, defaultSearchTimeout, false, isStreamSearch);
1284+
//final DefaultSearchContext originalContext = createSearchContext(readerContext, request, defaultSearchTimeout, false, isStreamSearch);
12851285

12861286
SearchShardTarget shardTarget = new SearchShardTarget(
12871287
clusterService.localNode().getId(),
12881288
readerContext.indexShard().shardId(),
12891289
request.getClusterAlias(),
12901290
OriginalIndices.NONE
12911291
);
1292-
SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task);
1292+
SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task, bigArrays);
12931293
try {
12941294
if (request.scroll() != null) {
12951295
context.scrollContext().scroll = request.scroll();

server/src/main/java/org/opensearch/search/query/QueryPhase.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
166166
// boolean rescore = executeInternal(searchContext, queryPhaseSearcher);
167167

168168
// Post process
169-
final InternalAggregations internalAggregations = SearchEngineResultConversionUtils.convertDFResultGeneric(searchContext);
170-
LOGGER.info("InternalAggregation created is {}", internalAggregations.asList());
171-
searchContext.queryResult().aggregations(internalAggregations);
169+
SearchEngineResultConversionUtils.convertDFResultGeneric(searchContext);
172170

173171
// if (rescore) { // only if we do a regular search
174172
// rescoreProcessor.process(searchContext);

server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.opensearch.search.query;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.opensearch.search.aggregations.Aggregator;
1214
import org.opensearch.search.aggregations.InternalAggregations;
1315
import org.opensearch.search.aggregations.ShardResultConvertor;
@@ -21,35 +23,41 @@
2123

2224
public class SearchEngineResultConversionUtils {
2325

24-
public static InternalAggregations convertDFResultGeneric(SearchContext searchContext) {
25-
Map<String, Object[]> dfResult = searchContext.getDFResults();
26+
private static final Logger LOGGER = LogManager.getLogger(SearchEngineResultConversionUtils.class);
2627

27-
// Create aggregators which will process the result from DataFusion
28-
try {
28+
public static void convertDFResultGeneric(SearchContext searchContext) {
29+
if (searchContext.aggregations() != null) {
30+
Map<String, Object[]> dfResult = searchContext.getDFResults();
2931

30-
List<Aggregator> aggregators = new ArrayList<>();
32+
// Create aggregators which will process the result from DataFusion
33+
try {
3134

32-
if (searchContext.aggregations().factories().hasGlobalAggregator()) {
33-
aggregators.addAll(searchContext.aggregations().factories().createTopLevelGlobalAggregators(searchContext));
34-
}
35+
List<Aggregator> aggregators = new ArrayList<>();
3536

36-
if (searchContext.aggregations().factories().hasNonGlobalAggregator()) {
37-
aggregators.addAll(searchContext.aggregations().factories().createTopLevelNonGlobalAggregators(searchContext));
38-
}
37+
if (searchContext.aggregations().factories().hasGlobalAggregator()) {
38+
aggregators.addAll(searchContext.aggregations().factories().createTopLevelGlobalAggregators(searchContext));
39+
}
3940

40-
List<ShardResultConvertor> shardResultConvertors = aggregators.stream().map(x -> {
41-
if (x instanceof ShardResultConvertor) {
42-
return ((ShardResultConvertor) x);
43-
} else {
44-
throw new UnsupportedOperationException("Aggregator doesn't support converting results from shard: " + x);
41+
if (searchContext.aggregations().factories().hasNonGlobalAggregator()) {
42+
aggregators.addAll(searchContext.aggregations().factories().createTopLevelNonGlobalAggregators(searchContext));
4543
}
46-
}).toList();
4744

48-
return InternalAggregations.from(
49-
shardResultConvertors.stream().flatMap(x -> x.convert(dfResult).stream()).collect(Collectors.toList())
50-
);
51-
} catch (IOException e) {
52-
throw new RuntimeException(e);
45+
List<ShardResultConvertor> shardResultConvertors = aggregators.stream().map(x -> {
46+
if (x instanceof ShardResultConvertor) {
47+
return ((ShardResultConvertor) x);
48+
} else {
49+
throw new UnsupportedOperationException("Aggregator doesn't support converting results from shard: " + x);
50+
}
51+
}).toList();
52+
53+
InternalAggregations internalAggregations = InternalAggregations.from(
54+
shardResultConvertors.stream().flatMap(x -> x.convert(dfResult).stream()).collect(Collectors.toList())
55+
);
56+
LOGGER.info("Internal Aggregations converted {}", internalAggregations.asMap());
57+
searchContext.queryResult().aggregations(internalAggregations);
58+
} catch (IOException e) {
59+
throw new RuntimeException(e);
60+
}
5361
}
5462
}
5563

0 commit comments

Comments
 (0)