Skip to content

Commit 60bf40d

Browse files
committed
lookup and build engine bridge impl by engine name.
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 6dfe93d commit 60bf40d

File tree

10 files changed

+107
-22
lines changed

10 files changed

+107
-22
lines changed

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_streamNex
748748
with_jni_env(|env| {
749749
match result {
750750
Ok(Some(batch)) => {
751+
log_info!("[RUST streamNext] Batch produced: {} rows, {} columns, schema: {:?}",
752+
batch.num_rows(), batch.num_columns(), batch.schema().fields().iter().map(|f| f.name().as_str()).collect::<Vec<_>>());
751753
// Convert to FFI
752754
let struct_array: StructArray = batch.into();
753755
let array_data = struct_array.into_data();
@@ -756,6 +758,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_streamNex
756758
set_action_listener_ok_global(env, &listener_ref, ffi_array_ptr as jlong);
757759
}
758760
Ok(None) => {
761+
log_info!("[RUST streamNext] End of stream reached");
759762
// End of stream
760763
set_action_listener_ok_global(env, &listener_ref, 0);
761764
}

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionAnalyticsBackend.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.opensearch.analytics.backend.EngineBridge;
1313
import org.opensearch.analytics.spi.AnalyticsBackEndPlugin;
1414
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
15+
import org.opensearch.index.engine.exec.coord.CompositeEngine;
1516

1617
/**
1718
* SPI adapter that delegates to the real {@link DataFusionPlugin} instance,
@@ -40,8 +41,8 @@ public String name() {
4041
}
4142

4243
@Override
43-
public EngineBridge<?, ?, ?> bridge(CatalogSnapshot snapshot) {
44-
return delegate().bridge(snapshot);
44+
public EngineBridge<?, ?, ?> bridge(CompositeEngine engine, CatalogSnapshot snapshot) {
45+
return delegate().bridge(engine, snapshot);
4546
}
4647

4748
@Override

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionBridge.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.opensearch.analytics.backend.EngineBridge;
3232
import org.opensearch.core.action.ActionListener;
3333
import org.opensearch.datafusion.jni.NativeBridge;
34+
import org.opensearch.datafusion.search.DatafusionContext;
3435
import org.opensearch.datafusion.search.DatafusionReader;
3536

3637
import java.io.Closeable;

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.env.NodeEnvironment;
3434
import org.opensearch.index.engine.SearchExecEngine;
3535
import org.opensearch.index.engine.exec.FileMetadata;
36+
import org.opensearch.index.engine.exec.coord.CompositeEngine;
3637
import org.opensearch.index.shard.ShardPath;
3738
import org.opensearch.plugins.ActionPlugin;
3839
import org.opensearch.plugins.ExtensiblePlugin;
@@ -212,7 +213,8 @@ public String name() {
212213
}
213214

214215
@Override
215-
public EngineBridge<?, ?, ?> bridge(CatalogSnapshot snapshot) {
216+
public EngineBridge<?, ?, ?> bridge(CompositeEngine engine, CatalogSnapshot snapshot) {
217+
DatafusionEngine dfEngine = (DatafusionEngine) engine.getEngine(name());
216218
long runtimePointer = dataFusionService.getRuntimePointer();
217219
Collection<WriterFileSet> files = snapshot.getSearchableFiles("parquet");
218220
// Derive directory path from the first WriterFileSet, or use empty string if no files

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsBackEndPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.calcite.sql.SqlOperatorTable;
1212
import org.opensearch.analytics.backend.EngineBridge;
1313
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
14+
import org.opensearch.index.engine.exec.coord.CompositeEngine;
1415

1516

1617
/**
@@ -22,7 +23,7 @@ public interface AnalyticsBackEndPlugin {
2223
String name();
2324

2425
/** JNI boundary for executing serialized plans, or null for engines without native execution. */
25-
EngineBridge<?, ?, ?> bridge(CatalogSnapshot snapshot);
26+
EngineBridge<?, ?, ?> bridge(CompositeEngine engine, CatalogSnapshot snapshot);
2627

2728
/** Supported functions as a Calcite operator table, or null if the back-end adds no functions. */
2829
SqlOperatorTable operatorTable();

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackend.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
import org.apache.calcite.sql.SqlOperatorTable;
1212
import org.opensearch.analytics.backend.EngineBridge;
1313
import org.opensearch.analytics.spi.AnalyticsBackEndPlugin;
14+
import org.opensearch.datafusion.DatafusionEngine;
1415
import org.opensearch.index.engine.exec.CatalogSnapshotAwareReaderManager;
1516
import org.opensearch.index.engine.exec.SearchExecEngine;
1617
import org.opensearch.index.engine.exec.WriterFileSet;
1718
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
19+
import org.opensearch.index.engine.exec.coord.CompositeEngine;
1820
import org.opensearch.index.shard.ShardPath;
1921
import org.opensearch.plugins.SearchAnalyticsBackEndPlugin;
2022
import org.opensearch.plugins.spi.vectorized.DataFormat;
@@ -45,14 +47,12 @@ public String name() {
4547
}
4648

4749
@Override
48-
public EngineBridge<?, ?, ?> bridge(CatalogSnapshot snapshot) {
49-
Collection<WriterFileSet> files = snapshot.getSearchableFiles("parquet");
50-
if (files.isEmpty() || files.stream().allMatch(wfs -> wfs.getFiles().isEmpty())) {
51-
throw new IllegalStateException("No parquet files available in catalog snapshot");
50+
public EngineBridge<?, ?, ?> bridge(CompositeEngine engine, CatalogSnapshot snapshot) {
51+
try {
52+
return new SandboxDataFusionBridge(getRuntimePtr(), (DatafusionReader) engine.getReader(name(), snapshot));
53+
} catch (IOException e) {
54+
throw new RuntimeException(e);
5255
}
53-
String dir = files.stream().findFirst().map(WriterFileSet::getDirectory).orElse("");
54-
DatafusionReader reader = new DatafusionReader(dir, files);
55-
return new SandboxDataFusionBridge(getRuntimePtr(), reader);
5656
}
5757

5858
@Override

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.opensearch.index.engine.exec.CatalogSnapshotAwareReaderManager;
1616
import org.opensearch.index.engine.exec.IndexFilterProvider;
1717
import org.opensearch.index.engine.exec.SourceProvider;
18+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
19+
import org.opensearch.index.engine.exec.coord.CompositeEngine;
1820
import org.opensearch.index.shard.ShardPath;
1921
import org.opensearch.plugins.SearchAnalyticsBackEndPlugin;
2022
import org.opensearch.plugins.spi.vectorized.DataFormat;
@@ -36,7 +38,7 @@ public String name() {
3638
}
3739

3840
@Override
39-
public EngineBridge<?, ?, ?> bridge() {
41+
public EngineBridge<?, ?, ?> bridge(CompositeEngine engine, CatalogSnapshot snapshot) {
4042
return null;
4143
}
4244

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,21 @@ public Iterable<Object[]> execute(RelNode logicalFragment, Object context) {
8888

8989
CompositeEngine engine = (CompositeEngine) indexShard.getIndexer();
9090

91-
// Prefer SearchExecEngine path if supported
92-
if (plugin.supportsSearchExecEngine() && engine.getSearchBackendFactory() != null) {
93-
logger.info("[DefaultPlanExecutor] Using SearchExecEngine path for back-end [{}]", plugin.name());
94-
try {
95-
return executeViaSearchExecEngine(engine, logicalFragment);
96-
} catch (Exception e) {
97-
throw new RuntimeException("SearchExecEngine execution failed for [" + plugin.name() + "]", e);
98-
}
99-
}
91+
// // Prefer SearchExecEngine path if supported
92+
// if (plugin.supportsSearchExecEngine() && engine.getSearchBackendFactory() != null) {
93+
// logger.info("[DefaultPlanExecutor] Using SearchExecEngine path for back-end [{}]", plugin.name());
94+
// try {
95+
// return executeViaSearchExecEngine(engine, logicalFragment);
96+
// } catch (Exception e) {
97+
// throw new RuntimeException("SearchExecEngine execution failed for [" + plugin.name() + "]", e);
98+
// }
99+
// }
100100

101101
// Bridge path
102102
try (CompositeEngine.ReleasableRef<CatalogSnapshot> snapshot = engine.acquireSnapshot()) {
103103
EngineBridge<byte[], ? extends EngineResultStream, RelNode> bridge =
104-
(EngineBridge<byte[], ? extends EngineResultStream, RelNode>) plugin.bridge(snapshot.getRef());
104+
(EngineBridge<byte[], ? extends EngineResultStream, RelNode>) plugin.bridge(engine, snapshot.getRef());
105+
105106
byte[] converted = bridge.convertFragment(logicalFragment);
106107

107108
List<Object[]> rows = new ArrayList<>();

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/ClickBenchRestIT.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,17 @@ private void bulkInsertTestData() throws IOException {
117117
// logger.info("Bulk inserted test data into {} index", HITS_INDEX);
118118
}
119119

120+
// Expected results for each query based on the 10-doc test dataset.
121+
// Each entry: queryId -> expected single-row values (in column order).
122+
private static final Map<String, List<Object>> EXPECTED_RESULTS = Map.of(
123+
"q1", List.of(10), // COUNT(*)
124+
"q2", List.of(2), // COUNT(*) WHERE AdvEngineID != 0 (docs 1,7)
125+
"q3", List.of(5, 10), // SUM(AdvEngineID)=5, COUNT(*)=10
126+
"q4", List.of(435090932899640449L), // MAX(UserID)
127+
"q5", List.of(10), // COUNT(DISTINCT UserID) - all 10 unique
128+
"q6", List.of(7) // COUNT(DISTINCT SearchPhrase) - 6 non-empty + 1 empty = 7
129+
);
130+
120131
public void testClickBenchQueries() throws Exception {
121132
createHitsIndex();
122133
bulkInsertTestData();
@@ -164,6 +175,29 @@ private void runClickBenchQuery(String queryId) throws Exception {
164175
List<String> columns = (List<String>) responseMap.get("columns");
165176
assertFalse("Columns should not be empty for " + queryId, columns.isEmpty());
166177

178+
@SuppressWarnings("unchecked")
179+
List<List<Object>> rows = (List<List<Object>>) responseMap.get("rows");
180+
logger.info("RESULT {}: columns={}, rows={}", queryId, columns, rows.size());
181+
for (int i = 0; i < Math.min(rows.size(), 10); i++) {
182+
logger.info(" row[{}]: {}", i, rows.get(i));
183+
}
184+
if (rows.size() > 10) {
185+
logger.info(" ... ({} more rows)", rows.size() - 10);
186+
}
187+
188+
// Assert expected results
189+
List<Object> expected = EXPECTED_RESULTS.get(queryId);
190+
if (expected != null) {
191+
assertEquals("Expected exactly 1 row for " + queryId, 1, rows.size());
192+
List<Object> actualRow = rows.get(0);
193+
assertEquals("Column count mismatch for " + queryId, expected.size(), actualRow.size());
194+
for (int i = 0; i < expected.size(); i++) {
195+
long expectedVal = ((Number) expected.get(i)).longValue();
196+
long actualVal = ((Number) actualRow.get(i)).longValue();
197+
assertEquals(queryId + " column " + columns.get(i) + " (index " + i + ")", expectedVal, actualVal);
198+
}
199+
}
200+
167201
logger.info("SUCCESS {}: {} columns", queryId, columns.size());
168202
}
169203

server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.opensearch.index.translog.listener.TranslogEventListener;
9898
import org.opensearch.indices.pollingingest.PollingIngestStats;
9999
import org.opensearch.plugins.PluginsService;
100+
import org.opensearch.plugins.SearchAnalyticsBackEndPlugin;
100101
import org.opensearch.plugins.SearchEnginePlugin;
101102
import org.opensearch.search.suggest.completion.CompletionStats;
102103
import org.opensearch.plugins.spi.vectorized.DataFormat;
@@ -173,6 +174,8 @@ public class CompositeEngine implements LifecycleAware, Closeable, Indexer, Chec
173174
private final List<CatalogSnapshotAwareRefreshListener> catalogSnapshotAwareRefreshListeners = new ArrayList<>();
174175
private final Map<String, List<FileDeletionListener>> fileDeletionListeners = new HashMap<>();
175176
private final Map<DataFormat, List<SearchExecEngine<?, ?, ?, ?>>> readEngines = new HashMap<>();
177+
private final Map<String, SearchExecEngine<?, ?, ?, ?>> readEnginesByName = new HashMap<>();
178+
private final Map<String, CatalogSnapshotAwareReaderManager<?>> readerManagersByName = new HashMap<>();
176179
@Nullable
177180
private final SearchBackendFactory searchBackendFactory;
178181
private final MergeScheduler mergeScheduler;
@@ -406,6 +409,7 @@ public void onFailure(String reason, Exception ex) {
406409

407410
currentSearchEngines.add(newSearchEngine);
408411
readEngines.put(dataFormat, currentSearchEngines);
412+
readEnginesByName.put(dataFormat.getName(), newSearchEngine);
409413

410414
// TODO : figure out how to do internal and external refresh listeners
411415
// Maybe external refresh should be managed in opensearch core and plugins should always give
@@ -434,6 +438,17 @@ public void onFailure(String reason, Exception ex) {
434438
}
435439
this.searchBackendFactory = backendFactory;
436440

441+
// Populate name-keyed reader managers from SearchAnalyticsBackEndPlugins
442+
for (SearchAnalyticsBackEndPlugin backendPlugin : pluginsService.filterPlugins(SearchAnalyticsBackEndPlugin.class)) {
443+
for (DataFormat format : backendPlugin.getSupportedFormats()) {
444+
try {
445+
readerManagersByName.put(backendPlugin.name(), backendPlugin.createReaderManager(format, shardPath));
446+
} catch (Exception e) {
447+
logger.warn("Failed to create reader manager for plugin [{}], format [{}]", backendPlugin.name(), format, e);
448+
}
449+
}
450+
}
451+
437452
success = true;
438453
} catch (IOException | TranslogCorruptedException e) {
439454
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@@ -605,6 +620,31 @@ public Object getReader(DataFormat format, CatalogSnapshot snapshot) throws IOEx
605620
return rm != null ? rm.getReader(snapshot) : null;
606621
}
607622

623+
public Object getEngine(String name) {
624+
return readEnginesByName.get(name);
625+
}
626+
627+
/**
628+
* Returns the catalog-snapshot-aware reader manager for the given name, or null.
629+
*/
630+
@Nullable
631+
public CatalogSnapshotAwareReaderManager<?> getReaderManager(String name) {
632+
return readerManagersByName.get(name);
633+
}
634+
635+
/**
636+
* Returns a reader for the given name and catalog snapshot, or null.
637+
*/
638+
@Nullable
639+
public Object getReader(String name, CatalogSnapshot snapshot) throws IOException {
640+
CatalogSnapshotAwareReaderManager<?> rm = getReaderManager(name);
641+
// hack
642+
if (rm != null) {
643+
rm.afterRefresh(true, snapshot);
644+
}
645+
return rm != null ? rm.getReader(snapshot) : null;
646+
}
647+
608648
/**
609649
* Returns a reader for the given format using the current catalog snapshot.
610650
* TODO: return composite reader that spans all formats

0 commit comments

Comments
 (0)