Skip to content

Commit 847aa7e

Browse files
Add changes for searcher integration
Co-authored-by: Arpit Bandejiya <[email protected]> Signed-off-by: Arpit Bandejiya <[email protected]>
1 parent 19db171 commit 847aa7e

File tree

8 files changed

+97
-14
lines changed

8 files changed

+97
-14
lines changed

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public List<DataFormat> getSupportedFormats() {
125125
public SearchExecEngine<DatafusionContext, DatafusionSearcher,
126126
DatafusionReaderManager, DatafusionQuery>
127127
createEngine(DataFormat dataFormat,Collection<FileMetadata> formatCatalogSnapshot) throws IOException {
128-
return new DatafusionEngine(dataFormat, formatCatalogSnapshot);
128+
return new DatafusionEngine(dataFormat, formatCatalogSnapshot, dataFusionService);
129129
}
130130

131131
/**

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ public CompletableFuture<RecordBatchStream> executeSubstraitQuery(long sessionCo
161161
return engine.executeSubstraitQuery(sessionContextId, substraitPlanBytes);
162162
}
163163

164+
public long getRuntimePointer() {
165+
return globalRuntimeEnv.getPointer();
166+
}
164167
/**
165168
* Close the session context and clean up resources
166169
*

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,15 @@
88

99
package org.opensearch.datafusion;
1010

11+
import org.apache.arrow.memory.RootAllocator;
12+
import org.apache.arrow.vector.FieldVector;
13+
import org.apache.arrow.vector.VectorSchemaRoot;
14+
import org.apache.arrow.vector.types.pojo.Field;
15+
import org.apache.logging.log4j.LogManager;
16+
import org.apache.logging.log4j.Logger;
1117
import org.opensearch.action.search.SearchShardTask;
1218
import org.opensearch.common.lease.Releasables;
19+
import org.opensearch.datafusion.core.DefaultRecordBatchStream;
1320
import org.opensearch.datafusion.search.DatafusionContext;
1421
import org.opensearch.datafusion.search.DatafusionQuery;
1522
import org.opensearch.datafusion.search.DatafusionQueryPhaseExecutor;
@@ -23,6 +30,7 @@
2330
import org.opensearch.index.engine.EngineSearcherSupplier;
2431
import org.opensearch.index.engine.SearchExecEngine;
2532
import org.opensearch.index.engine.exec.FileMetadata;
33+
import org.opensearch.search.aggregations.SearchResultsCollector;
2634
import org.opensearch.search.internal.ReaderContext;
2735
import org.opensearch.search.internal.ShardSearchRequest;
2836
import org.opensearch.search.query.QueryPhaseExecutor;
@@ -33,17 +41,23 @@
3341
import java.io.UncheckedIOException;
3442
import java.util.ArrayList;
3543
import java.util.Collection;
44+
import java.util.HashMap;
45+
import java.util.Map;
3646
import java.util.function.Function;
3747

3848
public class DatafusionEngine extends SearchExecEngine<DatafusionContext, DatafusionSearcher,
3949
DatafusionReaderManager, DatafusionQuery> {
4050

51+
private static final Logger logger = LogManager.getLogger(DatafusionEngine.class);
52+
4153
private DataFormat dataFormat;
4254
private DatafusionReaderManager datafusionReaderManager;
55+
private DataFusionService datafusionService;
4356

44-
public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot) throws IOException {
57+
public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot, DataFusionService dataFusionService) throws IOException {
4558
this.dataFormat = dataFormat;
4659
this.datafusionReaderManager = new DatafusionReaderManager("TODO://FigureOutPath", formatCatalogSnapshot);
60+
this.datafusionService = dataFusionService;
4761
}
4862

4963
@Override
@@ -60,7 +74,7 @@ public QueryPhaseExecutor<DatafusionContext> getQueryPhaseExecutor() {
6074
public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTask task) throws IOException {
6175
DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, task, this);
6276
// Parse source
63-
datafusionContext.datafusionQuery(new DatafusionQuery(request.source().getSubstraitBytes(), new ArrayList<>()));
77+
datafusionContext.datafusionQuery(new DatafusionQuery(request.source().queryPlanIR(), new ArrayList<>()));
6478
return datafusionContext;
6579
}
6680

@@ -138,4 +152,48 @@ public CatalogSnapshotAwareRefreshListener getRefreshListener(Engine.SearcherSco
138152
public boolean assertSearcherIsWarmedUp(String source, Engine.SearcherScope scope) {
139153
return false;
140154
}
155+
156+
@Override
157+
public Map<String, Object[]> execute(DatafusionContext context) {
158+
159+
Map<String, Object[]> finalRes = new HashMap<>();
160+
try {
161+
DatafusionSearcher datafusionSearcher = context.getEngineSearcher();
162+
long streamPointer = datafusionSearcher.search(context.getDatafusionQuery());
163+
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
164+
RecordBatchStream stream = new RecordBatchStream(streamPointer, datafusionService.getRuntimePointer() , allocator);
165+
166+
// We can have some collectors passed like this which can collect the results and convert to InternalAggregation
167+
// Is the possible? need to check
168+
169+
SearchResultsCollector<RecordBatchStream> collector = new SearchResultsCollector<RecordBatchStream>() {
170+
@Override
171+
public void collect(RecordBatchStream value) {
172+
VectorSchemaRoot root = value.getVectorSchemaRoot();
173+
for (Field field : root.getSchema().getFields()) {
174+
String filedName = field.getName();
175+
FieldVector fieldVector = root.getVector(filedName);
176+
Object[] fieldValues = new Object[fieldVector.getValueCount()];
177+
for (int i = 0; i < fieldVector.getValueCount(); i++) {
178+
fieldValues[i] = fieldVector.getObject(i);
179+
}
180+
finalRes.put(filedName, fieldValues);
181+
}
182+
}
183+
};
184+
185+
while (stream.loadNextBatch().join()) {
186+
collector.collect(stream);
187+
}
188+
189+
logger.info("Final Results:");
190+
for (Map.Entry<String, Object[]> entry : finalRes.entrySet()) {
191+
logger.info("{}: {}", entry.getKey(), java.util.Arrays.toString(entry.getValue()));
192+
}
193+
194+
} catch (Exception exception) {
195+
logger.error("Failed to execute Substrait query plan", exception);
196+
}
197+
return finalRes;
198+
}
141199
}

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/RecordBatchStream.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,22 @@
2929
*/
3030
public class RecordBatchStream {
3131

32-
private final SessionContext context;
3332
private final long streamPointer;
3433
private final BufferAllocator allocator;
3534
private final CDataDictionaryProvider dictionaryProvider;
3635
private boolean initialized = false;
3736
private VectorSchemaRoot vectorSchemaRoot = null;
37+
private long runtimePtr;
3838

3939
/**
4040
* Creates a new RecordBatchStream for the given stream pointer
41-
* @param ctx the session context
42-
* @param streamId pointer to the native stream
41+
* @param streamId the stream pointer
4342
* @param allocator memory allocator for Arrow vectors
4443
*/
45-
public RecordBatchStream(SessionContext ctx, long streamId, BufferAllocator allocator) {
46-
this.context = ctx;
44+
public RecordBatchStream(long streamId, long runtimePtr, BufferAllocator allocator) {
4745
this.streamPointer = streamId;
4846
this.allocator = allocator;
47+
this.runtimePtr = runtimePtr;
4948
this.dictionaryProvider = new CDataDictionaryProvider();
5049
}
5150

@@ -99,7 +98,7 @@ private void ensureInitialized() {
9998
*/
10099
public CompletableFuture<Boolean> loadNextBatch() {
101100
ensureInitialized();
102-
long runtimePointer = context.getRuntime();
101+
long runtimePointer = this.runtimePtr;
103102
CompletableFuture<Boolean> result = new CompletableFuture<>();
104103
next(runtimePointer, streamPointer, (errString, arrowArrayAddress) -> {
105104
if (ErrorUtil.containsError(errString)) {

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public void search(DatafusionQuery datafusionQuery, List<SearchResultsCollector<
4848
}
4949
}
5050

51+
@Override
52+
public long search(DatafusionQuery datafusionQuery) {
53+
return DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getSubstraitBytes());
54+
}
55+
5156
public DatafusionReader getReader() {
5257
return reader;
5358
}

server/src/main/java/org/opensearch/index/engine/EngineSearcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.search.aggregations.SearchResultsCollector;
1414

1515
import java.io.IOException;
16+
import java.io.UnsupportedEncodingException;
1617
import java.util.List;
1718

1819
@ExperimentalApi
@@ -30,4 +31,8 @@ public interface EngineSearcher<Q,C> extends Releasable {
3031
default void search(Q query, List<SearchResultsCollector<C>> collectors) throws IOException {
3132
throw new UnsupportedOperationException();
3233
}
34+
35+
default long search(Q query) throws IOException {
36+
throw new UnsupportedOperationException();
37+
}
3338
}

server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.opensearch.search.query.QueryPhaseExecutor;
1818

1919
import java.io.IOException;
20+
import java.util.Map;
2021

2122
/**
2223
* Generic read engine interface that provides searcher operations and query phase execution
@@ -43,4 +44,10 @@ public abstract class SearchExecEngine<C extends SearchContext, S extends Engine
4344
* Create a search context for this engine
4445
*/
4546
public abstract C createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTask task) throws IOException;
47+
48+
/**
49+
* execute
50+
* @return
51+
*/
52+
public abstract Map<String, Object[]> execute(C context) throws IOException;
4653
}

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,8 @@ private SearchPhaseResult executeQueryPhase(
813813
// Till here things are generic but for datafusion , we need to abstract out and get the read engine specific implementation
814814
// it could be reusing existing
815815
final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
816-
SearchExecEngine<?, ?, ?, ?> searchExecEngine = readerContext.indexShard()
816+
@SuppressWarnings("unchecked")
817+
SearchExecEngine searchExecEngine = readerContext.indexShard()
817818
.getIndexingExecutionCoordinator()
818819
.getPrimaryReadEngine();
819820

@@ -828,10 +829,10 @@ private SearchPhaseResult executeQueryPhase(
828829
) {
829830

830831
// TODO Execute plan here
832+
// TODO : figure out how to tie this
831833
byte[] substraitQuery = request.source().queryPlanIR();
832834
if (substraitQuery != null) {
833-
SearchExecutionEngine searchExecutionEngine = readerContext.indexShard().getSearchExecutionEngine();
834-
Map<String, Object[]> result = searchExecutionEngine.execute(substraitQuery);
835+
Map<String, Object[]> result = searchExecEngine.execute(context);
835836
context.setDFResults(result);
836837
}
837838

@@ -841,9 +842,14 @@ private SearchPhaseResult executeQueryPhase(
841842
}
842843
final long afterQueryTime;
843844
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
845+
// TODO check for this
846+
// @SuppressWarnings("unchecked")
847+
// QueryPhaseExecutor<SearchContext> queryPhaseExecutor =
848+
// (QueryPhaseExecutor<SearchContext>) searchExecEngine.getQueryPhaseExecutor();
849+
844850
//QueryPhaseExecutor<?> queryPhaseExecutor = readEngine.getQueryPhaseExecutor();
845-
//boolean success = queryPhaseExecutor.execute(context);
846-
//loadOrExecuteQueryPhase(request, context);
851+
// boolean success = queryPhaseExecutor.execute(context);
852+
loadOrExecuteQueryPhase(request, context);
847853
queryPhase.execute(context);
848854
// loadOrExecuteQueryPhase(request, context);
849855
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {

0 commit comments

Comments
 (0)