Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ testfixtures_shared/

# build files generated
doc-tools/missing-doclet/bin/
/sandbox/plugins/engine-datafusion/target/
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

package org.opensearch.common;

import org.opensearch.common.annotation.ExperimentalApi;

/**
* A {@link TriFunction}-like interface which allows throwing checked exceptions.
*
* @opensearch.internal
*/
@ExperimentalApi
@FunctionalInterface
public interface CheckedTriFunction<S, T, U, R, E extends Exception> {
R apply(S s, T t, U u) throws E;
Expand Down
3 changes: 2 additions & 1 deletion sandbox/libs/analytics-framework/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
def calciteVersion = '1.41.0'

dependencies {
compileOnly project(':server')
api "org.apache.calcite:calcite-core:${calciteVersion}"
// Calcite's expression tree and Enumerable runtime — required by calcite-core API
api "org.apache.calcite:calcite-linq4j:${calciteVersion}"
Expand All @@ -35,7 +36,7 @@ dependencies {

testingConventions.enabled = false

// analytics-framework does not depend on server
// analytics-framework depends on server for SearchAnalyticsBackEndPlugin SPI
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we let the bridge talk to the engine ? if we need to remove this dependency.

We can come back to this after we figure out if context is required in analytics plugin for delegates.

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
failOnMissingClasses = false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.backend;

import java.util.List;

/**
* Read-only view of a single record batch. Provides field names, row count,
* and positional access to field values.
*
* @opensearch.internal
*/
public interface EngineResultBatch {

/**
* Ordered list of field (column) names in this batch.
*/
List<String> getFieldNames();

/**
* Number of rows in this batch.
*/
int getRowCount();

/**
* Returns the value at the given row index for the named field.
*
* @param fieldName column name
* @param rowIndex zero-based row index
* @return the value (may be null)
*/
Object getFieldValue(String fieldName, int rowIndex);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.backend;

import java.util.Iterator;

/**
* Single-pass iterator over record batches from an {@link EngineResultStream}.
*
* @opensearch.internal
*/
public interface EngineResultBatchIterator extends Iterator<EngineResultBatch> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.backend;

/**
* A closeable stream of record batches returned by engine execution.
* Callers iterate batches via the returned iterator and MUST close the stream
* when done to release native resources.
*
* @opensearch.internal
*/
public interface EngineResultStream extends AutoCloseable {

/**
* Returns an iterator over the record batches in this stream.
* Each call returns the same iterator instance — the stream is single-pass.
*/
EngineResultBatchIterator iterator();

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.backend.jni;

import java.lang.ref.Cleaner;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Base class for type-safe native pointer wrappers.
* Provides automatic resource management and prevents use-after-close errors.
* Subclasses must implement {@link #doClose()} to release native resources.
* Cleaner is used to ensure resources are cleaned up even if the object is not explicitly closed.
*/
public abstract class NativeHandle implements AutoCloseable {

protected final long ptr;
private final AtomicBoolean closed = new AtomicBoolean(false);
protected static final long NULL_POINTER = 0L;
private final Cleaner.Cleanable cleanable;

private static final Cleaner CLEANER = Cleaner.create();

/**
* Creates a new native handle.
* @param ptr the native pointer (must not be 0)
* @throws IllegalArgumentException if ptr is 0
*/
protected NativeHandle(long ptr) {
if (ptr == NULL_POINTER) {
throw new IllegalArgumentException("Null native pointer");
}
this.ptr = ptr;
this.cleanable = CLEANER.register(this, new CleanupAction(ptr, this::doClose));
}

/**
* Ensures the handle is still open.
* @throws IllegalStateException if the handle has been closed
*/
public void ensureOpen() {
if (closed.get()) {
throw new IllegalStateException("Handle already closed");
}
}

/**
* Gets the native pointer value.
* @return the native pointer
* @throws IllegalStateException if the handle has been closed
*/
public long getPointer() {
ensureOpen();
return ptr;
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
cleanable.clean();
}
}

/**
* Releases the native resource.
* Called once when the handle is closed.
* Subclasses must implement this to free native memory.
*/
protected abstract void doClose();

/**
* Cleans up the native resource.
* Called by the cleaner when the handle is garbage collected.
*/
private static final class CleanupAction implements Runnable {
private final long ptr;
private final Runnable doClose;

CleanupAction(long ptr, Runnable doClose) {
this.ptr = ptr;
this.doClose = doClose;
}

@Override
public void run() {
doClose.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@

import org.apache.calcite.sql.SqlOperatorTable;
import org.opensearch.analytics.backend.EngineBridge;
import org.opensearch.plugins.SearchBackEndPlugin;

/**
* SPI extension point for back-end query engines (DataFusion, Lucene, etc.).
* @opensearch.internal
*/
public interface AnalyticsBackEndPlugin {
public interface AnalyticsSearchBackendPlugin extends SearchBackEndPlugin {
/** Unique engine name (e.g., "lucene", "datafusion"). */
String name();

/** JNI boundary for executing serialized plans, or null for engines without native execution. */
EngineBridge<?, ?, ?> bridge();
EngineBridge<?, ?, ?> bridge(); // TODO this doesn't have context / index shard init

/** Supported functions as a Calcite operator table, or null if the back-end adds no functions. */
SqlOperatorTable operatorTable();

}
3 changes: 3 additions & 0 deletions sandbox/plugins/analytics-backend-datafusion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ dependencies {
// Shared types and SPI interfaces (EngineBridge, AnalyticsBackEndPlugin, etc.)
// Also provides calcite-core transitively via api.
api project(':sandbox:libs:analytics-framework')

implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}"
implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

// TODO: Remove once back-end is built out with test suite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
/**
* DataFusion EngineBridge implementation.
* Uses a byte[] representing serialized plan to execute.
* // TODO : we need a stateful engine, not just a bridge, evaluate
* // switch to SearchExecEngine
*/
public class DataFusionBridge implements EngineBridge<byte[], Long, RelNode> {
// S=byte[] (Substrait), H=Long (stream pointer), L=RelNode (logical plan)
Expand Down
Loading
Loading