Skip to content

Commit 6dfe93d

Browse files
native interface abstractions and indexing integration with sandbox plugins (#20926)
Signed-off-by: bharath-techie <bharath78910@gmail.com>
1 parent b3d572b commit 6dfe93d

File tree

68 files changed

+4166
-133
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+4166
-133
lines changed

libs/common/src/main/java/org/opensearch/common/annotation/processor/ApiAnnotationProcessor.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,20 +85,20 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
8585
Set.of(PublicApi.class, ExperimentalApi.class, DeprecatedApi.class)
8686
);
8787

88-
// for (var element : elements) {
89-
// validate(element);
90-
//
91-
// if (!checkPackage(element)) {
92-
// continue;
93-
// }
94-
//
95-
// // Skip all not-public elements
96-
// checkPublicVisibility(null, element);
97-
//
98-
// if (element instanceof TypeElement) {
99-
// process((TypeElement) element);
100-
// }
101-
// }
88+
// for (var element : elements) {
89+
// validate(element);
90+
//
91+
// if (!checkPackage(element)) {
92+
// continue;
93+
// }
94+
//
95+
// // Skip all not-public elements
96+
// checkPublicVisibility(null, element);
97+
//
98+
// if (element instanceof TypeElement) {
99+
// process((TypeElement) element);
100+
// }
101+
// }
102102

103103
return false;
104104
}

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionAnalyticsBackend.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,8 @@
1414
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1515

1616
/**
17-
* SPI adapter that delegates to the real {@link DataFusionPlugin} instance.
18-
*
19-
* <p>{@code PluginsService} extension discovery requires either a no-arg constructor
20-
* or a constructor taking the enclosing plugin. Since {@code DataFusionPlugin}
21-
* itself needs a {@code Settings} constructor for plugin loading, this thin
22-
* adapter bridges the two requirements.
17+
* SPI adapter that delegates to the real {@link DataFusionPlugin} instance,
18+
* or to a child backend plugin if one was discovered via ExtensiblePlugin.
2319
*/
2420
public class DataFusionAnalyticsBackend implements AnalyticsBackEndPlugin {
2521

@@ -29,18 +25,32 @@ public DataFusionAnalyticsBackend(DataFusionPlugin plugin) {
2925
this.plugin = plugin;
3026
}
3127

28+
private AnalyticsBackEndPlugin delegate() {
29+
// If a child plugin (e.g. sandbox analytics-backend-datafusion) registered,
30+
// delegate to it so it can override the bridge implementation.
31+
if (plugin.getChildBackends().isEmpty() == false) {
32+
return plugin.getChildBackends().get(0);
33+
}
34+
return plugin;
35+
}
36+
3237
@Override
3338
public String name() {
3439
return plugin.name();
3540
}
3641

3742
@Override
3843
public EngineBridge<?, ?, ?> bridge(CatalogSnapshot snapshot) {
39-
return plugin.bridge(snapshot);
44+
return delegate().bridge(snapshot);
4045
}
4146

4247
@Override
4348
public SqlOperatorTable operatorTable() {
44-
return plugin.operatorTable();
49+
return delegate().operatorTable();
50+
}
51+
52+
@Override
53+
public boolean supportsSearchExecEngine() {
54+
return delegate().supportsSearchExecEngine();
4555
}
4656
}

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import org.opensearch.index.engine.exec.FileMetadata;
3636
import org.opensearch.index.shard.ShardPath;
3737
import org.opensearch.plugins.ActionPlugin;
38+
import org.opensearch.plugins.ExtensiblePlugin;
3839
import org.opensearch.plugins.Plugin;
40+
import org.opensearch.plugins.SearchAnalyticsBackEndPlugin;
3941
import org.opensearch.plugins.SearchEnginePlugin;
4042
import org.opensearch.plugins.spi.vectorized.DataFormat;
4143
import org.opensearch.plugins.spi.vectorized.DataSourceCodec;
@@ -72,9 +74,13 @@
7274
* Main plugin class for OpenSearch DataFusion integration.
7375
*
7476
*/
75-
public class DataFusionPlugin extends Plugin implements ActionPlugin, SearchEnginePlugin, AnalyticsBackEndPlugin {
77+
public class DataFusionPlugin extends Plugin implements ActionPlugin, SearchEnginePlugin, AnalyticsBackEndPlugin, ExtensiblePlugin, SearchAnalyticsBackEndPlugin {
7678

7779
private DataFusionService dataFusionService;
80+
81+
public DataFusionService getDataFusionService() {
82+
return dataFusionService;
83+
}
7884
private final boolean isDataFusionEnabled;
7985

8086
/**
@@ -224,6 +230,56 @@ public String name() {
224230
public SqlOperatorTable operatorTable() {
225231
return null;
226232
}
233+
234+
// Forward AnalyticsBackEndPlugin extensions from child plugins (e.g. analytics-backend-datafusion)
235+
private final List<AnalyticsBackEndPlugin> childBackends = new ArrayList<>();
236+
237+
@Override
238+
public void loadExtensions(ExtensionLoader loader) {
239+
for (AnalyticsBackEndPlugin ext : loader.loadExtensions(AnalyticsBackEndPlugin.class)) {
240+
// Inject ourselves so child backends can access the DataFusionService
241+
if (ext instanceof ParentAware) {
242+
((ParentAware) ext).setParentPlugin(this);
243+
}
244+
childBackends.add(ext);
245+
}
246+
}
247+
248+
public List<AnalyticsBackEndPlugin> getChildBackends() {
249+
return childBackends;
250+
}
251+
252+
/** Marker interface for child backends that need the parent plugin. */
253+
// ---- SearchAnalyticsBackEndPlugin (delegates to child backend if available) ----
254+
255+
private SearchAnalyticsBackEndPlugin getChildSearchBackend() {
256+
for (AnalyticsBackEndPlugin child : childBackends) {
257+
if (child instanceof SearchAnalyticsBackEndPlugin) {
258+
return (SearchAnalyticsBackEndPlugin) child;
259+
}
260+
}
261+
return null;
262+
}
263+
264+
@Override
265+
public org.opensearch.index.engine.exec.CatalogSnapshotAwareReaderManager<?> createReaderManager(
266+
org.opensearch.plugins.spi.vectorized.DataFormat format, ShardPath shardPath) throws IOException {
267+
SearchAnalyticsBackEndPlugin child = getChildSearchBackend();
268+
if (child != null) return child.createReaderManager(format, shardPath);
269+
return null;
270+
}
271+
272+
@Override
273+
public org.opensearch.index.engine.exec.SearchExecEngine<?, ?> createSearchExecEngine(
274+
org.opensearch.plugins.spi.vectorized.DataFormat format, ShardPath shardPath) throws IOException {
275+
SearchAnalyticsBackEndPlugin child = getChildSearchBackend();
276+
if (child != null) return child.createSearchExecEngine(format, shardPath);
277+
return null;
278+
}
279+
280+
public interface ParentAware {
281+
void setParentPlugin(DataFusionPlugin parent);
282+
}
227283
//
228284
// @Override
229285
// public List<Setting<?>> getSettings() {
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.backend.jni;
10+
11+
import java.lang.ref.Cleaner;
12+
import java.util.concurrent.atomic.AtomicBoolean;
13+
14+
/**
15+
* Base class for type-safe native pointer wrappers.
16+
* Provides automatic resource management and prevents use-after-close errors.
17+
* Subclasses must implement {@link #doClose()} to release native resources.
18+
*/
19+
public abstract class NativeHandle implements AutoCloseable {
20+
21+
protected final long ptr;
22+
private final AtomicBoolean closed = new AtomicBoolean(false);
23+
protected static final long NULL_POINTER = 0L;
24+
private final Cleaner.Cleanable cleanable;
25+
26+
private static final Cleaner CLEANER = Cleaner.create();
27+
28+
protected NativeHandle(long ptr) {
29+
if (ptr == NULL_POINTER) {
30+
throw new IllegalArgumentException("Null native pointer");
31+
}
32+
this.ptr = ptr;
33+
this.cleanable = CLEANER.register(this, new CleanupAction(ptr, this::doClose));
34+
}
35+
36+
public void ensureOpen() {
37+
if (closed.get()) {
38+
throw new IllegalStateException("Handle already closed");
39+
}
40+
}
41+
42+
public long getPointer() {
43+
ensureOpen();
44+
return ptr;
45+
}
46+
47+
@Override
48+
public void close() {
49+
if (closed.compareAndSet(false, true)) {
50+
cleanable.clean();
51+
}
52+
}
53+
54+
protected abstract void doClose();
55+
56+
private static final class CleanupAction implements Runnable {
57+
private final long ptr;
58+
private final Runnable doClose;
59+
60+
CleanupAction(long ptr, Runnable doClose) {
61+
this.ptr = ptr;
62+
this.doClose = doClose;
63+
}
64+
65+
@Override
66+
public void run() {
67+
doClose.run();
68+
}
69+
}
70+
}

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsBackEndPlugin.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
import org.opensearch.analytics.backend.EngineBridge;
1313
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1414

15+
1516
/**
16-
* SPI extension point for back-end query engines (DataFusion, Lucene, etc.).
17+
* SPI extension point for back-end query engines (DataFusion, Lucene, etc.)
1718
* @opensearch.internal
1819
*/
1920
public interface AnalyticsBackEndPlugin {
@@ -25,4 +26,9 @@ public interface AnalyticsBackEndPlugin {
2526

2627
/** Supported functions as a Calcite operator table, or null if the back-end adds no functions. */
2728
SqlOperatorTable operatorTable();
29+
30+
/** Whether this backend supports the SearchExecEngine path via CompositeEngine. */
31+
default boolean supportsSearchExecEngine() {
32+
return false;
33+
}
2834
}

sandbox/plugins/analytics-backend-datafusion/build.gradle

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,28 @@
99
opensearchplugin {
1010
description = 'DataFusion native execution engine plugin for the query engine.'
1111
classname = 'org.opensearch.be.datafusion.DataFusionPlugin'
12-
extendedPlugins = ['analytics-engine']
12+
extendedPlugins = ['engine-datafusion']
1313
}
1414

1515
dependencies {
16-
// Shared types and SPI interfaces (EngineBridge, AnalyticsBackEndPlugin, etc.)
17-
// Also provides calcite-core transitively via api.
18-
api project(':sandbox:libs:analytics-framework')
16+
// All compileOnly — provided by parent classloaders at runtime
17+
compileOnly project(':sandbox:libs:analytics-framework')
18+
compileOnly project(':plugins:engine-datafusion')
19+
compileOnly project(':libs:opensearch-vectorized-exec-spi')
20+
compileOnly "org.apache.arrow:arrow-memory-core:18.3.0"
21+
compileOnly "org.apache.arrow:arrow-vector:18.3.0"
22+
compileOnly "org.apache.arrow:arrow-c-data:18.3.0"
23+
compileOnly "org.apache.arrow:arrow-format:18.3.0"
24+
compileOnly "org.apache.calcite:calcite-core:1.41.0"
25+
compileOnly "com.google.protobuf:protobuf-java:${versions.protobuf}"
26+
27+
// Substrait — only new jars we bundle
28+
implementation('io.substrait:core:0.67.0') { transitive = false }
29+
implementation('io.substrait:isthmus:0.67.0') { transitive = false }
1930
}
2031

21-
// TODO: Remove once back-end is built out with test suite
2232
testingConventions.enabled = false
33+
34+
tasks.withType(JavaCompile).configureEach {
35+
options.compilerArgs -= '-Werror'
36+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.be.datafusion;
10+
11+
import org.apache.calcite.sql.SqlOperatorTable;
12+
import org.opensearch.analytics.backend.EngineBridge;
13+
import org.opensearch.analytics.spi.AnalyticsBackEndPlugin;
14+
import org.opensearch.index.engine.exec.CatalogSnapshotAwareReaderManager;
15+
import org.opensearch.index.engine.exec.SearchExecEngine;
16+
import org.opensearch.index.engine.exec.WriterFileSet;
17+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
18+
import org.opensearch.index.shard.ShardPath;
19+
import org.opensearch.plugins.SearchAnalyticsBackEndPlugin;
20+
import org.opensearch.plugins.spi.vectorized.DataFormat;
21+
22+
import java.io.IOException;
23+
import java.util.Collection;
24+
import java.util.List;
25+
26+
public class DataFusionAnalyticsBackend
27+
implements AnalyticsBackEndPlugin, SearchAnalyticsBackEndPlugin, org.opensearch.datafusion.DataFusionPlugin.ParentAware {
28+
29+
private org.opensearch.datafusion.DataFusionPlugin parentPlugin;
30+
31+
public DataFusionAnalyticsBackend() {}
32+
33+
@Override
34+
public void setParentPlugin(org.opensearch.datafusion.DataFusionPlugin parent) {
35+
this.parentPlugin = parent;
36+
}
37+
38+
private long getRuntimePtr() {
39+
return parentPlugin.getDataFusionService().getRuntimePointer();
40+
}
41+
42+
@Override
43+
public String name() {
44+
return "DataFusion";
45+
}
46+
47+
@Override
48+
public EngineBridge<?, ?, ?> bridge(CatalogSnapshot snapshot) {
49+
Collection<WriterFileSet> files = snapshot.getSearchableFiles("parquet");
50+
if (files.isEmpty() || files.stream().allMatch(wfs -> wfs.getFiles().isEmpty())) {
51+
throw new IllegalStateException("No parquet files available in catalog snapshot");
52+
}
53+
String dir = files.stream().findFirst().map(WriterFileSet::getDirectory).orElse("");
54+
DatafusionReader reader = new DatafusionReader(dir, files);
55+
return new SandboxDataFusionBridge(getRuntimePtr(), reader);
56+
}
57+
58+
@Override
59+
public SqlOperatorTable operatorTable() {
60+
return null;
61+
}
62+
63+
@Override
64+
public boolean supportsSearchExecEngine() {
65+
return true;
66+
}
67+
68+
// ---- SearchAnalyticsBackEndPlugin ----
69+
70+
@Override
71+
public List<DataFormat> getSupportedFormats() {
72+
return List.of(DataFormat.PARQUET);
73+
}
74+
75+
@Override
76+
public CatalogSnapshotAwareReaderManager<?> createReaderManager(DataFormat format, ShardPath shardPath) throws IOException {
77+
return new DatafusionReaderManager(format, shardPath);
78+
}
79+
80+
@Override
81+
public SearchExecEngine<?, ?> createSearchExecEngine(DataFormat format, ShardPath shardPath) throws IOException {
82+
return new DatafusionSearchExecEngine(getRuntimePtr());
83+
}
84+
}

0 commit comments

Comments
 (0)