Skip to content

Commit 0a9b038

Browse files
Changes to make plugin contexts work with source parse
Signed-off-by: bharath-techie <[email protected]>
1 parent eba3575 commit 0a9b038

File tree

2 files changed

+27
-14
lines changed

2 files changed

+27
-14
lines changed

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public class DatafusionContext extends SearchContext {
7676
private final QueryShardContext queryShardContext;
7777
private DatafusionQuery datafusionQuery;
7878
private Map<String, Object[]> dfResults;
79+
private SearchContextAggregations aggregations;
80+
7981
/**
8082
* Constructor
8183
* @param readerContext The reader context
@@ -224,7 +226,7 @@ public ScrollContext scrollContext() {
224226

225227
@Override
226228
public SearchContextAggregations aggregations() {
227-
return null;
229+
return aggregations;
228230
}
229231

230232
/**
@@ -233,7 +235,8 @@ public SearchContextAggregations aggregations() {
233235
*/
234236
@Override
235237
public SearchContext aggregations(SearchContextAggregations aggregations) {
236-
return null;
238+
this.aggregations = aggregations;
239+
return this;
237240
}
238241

239242
/**
@@ -790,4 +793,5 @@ public void setDFResults(Map<String, Object[]> dfResults) {
790793
public Map<String, Object[]> getDFResults() {
791794
return dfResults;
792795
}
796+
793797
}

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -825,15 +825,15 @@ private SearchPhaseResult executeQueryPhase(
825825
);
826826
try (
827827
Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
828-
//SearchContext context = createContext(readerContext, request, task, true, isStreamSearch)
829-
830828
// Get engine-specific executor and context
831829
// TODO : move this logic to work with Lucene
832830

833-
SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task);
831+
SearchContext context = createContext(readerContext, request, task, true, isStreamSearch, searchExecEngine);
832+
834833
//SearchContext context = createContext(readerContext, request, task, true)
835834
) {
836-
835+
// TODO : this is not correct - need to tie source to plugin context above
836+
//context.aggregations(context1.aggregations());
837837
// TODO Execute plan here
838838
// TODO : figure out how to tie this
839839
byte[] substraitQuery = request.source().queryPlanIR();
@@ -1270,17 +1270,26 @@ final SearchContext createContext(
12701270
SearchShardTask task,
12711271
boolean includeAggregations
12721272
) throws IOException {
1273-
return createContext(readerContext, request, task, includeAggregations, false);
1273+
return createContext(readerContext, request, task, includeAggregations, false, null);
12741274
}
12751275

12761276
private SearchContext createContext(
12771277
ReaderContext readerContext,
12781278
ShardSearchRequest request,
12791279
SearchShardTask task,
12801280
boolean includeAggregations,
1281-
boolean isStreamSearch
1281+
boolean isStreamSearch,
1282+
SearchExecEngine searchExecEngine
12821283
) throws IOException {
1283-
final DefaultSearchContext context = createSearchContext(readerContext, request, defaultSearchTimeout, false, isStreamSearch);
1284+
//final DefaultSearchContext context = createSearchContext(readerContext, request, defaultSearchTimeout, false, isStreamSearch);
1285+
1286+
SearchShardTarget shardTarget = new SearchShardTarget(
1287+
clusterService.localNode().getId(),
1288+
readerContext.indexShard().shardId(),
1289+
request.getClusterAlias(),
1290+
OriginalIndices.NONE
1291+
);
1292+
SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task);
12841293
try {
12851294
if (request.scroll() != null) {
12861295
context.scrollContext().scroll = request.scroll();
@@ -1546,10 +1555,10 @@ private void processFailure(ReaderContext context, Exception exc) {
15461555
}
15471556
}
15481557

1549-
private void parseSource(DefaultSearchContext context, SearchSourceBuilder source, boolean includeAggregations) {
1558+
private void parseSource(SearchContext context, SearchSourceBuilder source, boolean includeAggregations) {
15501559
// nothing to parse...
15511560
if (source == null) {
1552-
context.evaluateRequestShouldUseConcurrentSearch();
1561+
// context.evaluateRequestShouldUseConcurrentSearch(); // TODO : specific to default search context
15531562
return;
15541563
}
15551564

@@ -1706,7 +1715,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
17061715
if (context.scrollContext() == null && !(context.readerContext() instanceof PitReaderContext)) {
17071716
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context or PIT context");
17081717
}
1709-
context.sliceBuilder(source.slice());
1718+
// context.sliceBuilder(source.slice()); // TODO : specific to default search context
17101719
}
17111720

17121721
if (source.storedFields() != null) {
@@ -1740,13 +1749,13 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
17401749
final CollapseContext collapseContext = source.collapse().build(queryShardContext);
17411750
context.collapse(collapseContext);
17421751
}
1743-
context.evaluateRequestShouldUseConcurrentSearch();
1752+
// context.evaluateRequestShouldUseConcurrentSearch(); // TODO : specific to default search context
17441753
if (source.profile()) {
17451754
final Function<Query, Collection<Supplier<ProfileMetric>>> pluginProfileMetricsSupplier = (query) -> pluginProfilers.stream()
17461755
.flatMap(p -> p.getQueryProfileMetrics(context, query).stream())
17471756
.toList();
17481757
Profilers profilers = new Profilers(context.searcher(), context.shouldUseConcurrentSearch(), pluginProfileMetricsSupplier);
1749-
context.setProfilers(profilers);
1758+
// context.setProfilers(profilers); // TODO : specific to default search context
17501759
}
17511760

17521761
if (context.getStarTreeIndexEnabled() && StarTreeQueryHelper.isStarTreeSupported(context)) {

0 commit comments

Comments
 (0)