Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions sandbox/libs/analytics-framework/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# analytics-framework

Shared library containing the SPI interfaces and core types for the analytics engine. All plugins depend on this library — it defines the contracts but contains no implementation logic.

## SPI Interfaces

- **`QueryPlanExecutorPlugin`** — Factory for creating a `QueryPlanExecutor` from discovered back-end plugins.
- **`AnalyticsBackEndPlugin`** — Extension point for native execution engines (DataFusion, Lucene, etc.). Exposes engine name, bridge, and capabilities.
- **`AnalyticsFrontEndPlugin`** — Marker interface for query language front-ends (PPL, SQL). Discovered by the hub for lifecycle tracking.
- **`SchemaProvider`** — Functional interface that builds a Calcite `SchemaPlus` from cluster state.

## Core Types

- **`QueryPlanExecutor`** — Executes a Calcite `RelNode` plan fragment and returns result rows.
- **`EngineBridge<T>`** — JNI/native boundary for engine-specific plan conversion and execution (e.g., Substrait → Arrow batches).
- **`EngineCapabilities`** — Declares supported operators and functions. Used by the push-down planner to decide what gets absorbed into engine-executed boundary nodes vs. what stays in Calcite's in-process execution.

## Dependencies

Calcite and Arrow — no dependency on the OpenSearch server module.
419 changes: 419 additions & 0 deletions sandbox/libs/analytics-framework/build.gradle

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0dd7b4be638f0cea174f78cc851322b64d813a1e
2,261 changes: 2,261 additions & 0 deletions sandbox/libs/analytics-framework/licenses/calcite-core-LICENSE.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Apache Calcite
Copyright 2012-2024 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.backend;

/**
* JNI boundary interface between the query planner (Java) and a native
* execution engine (e.g., DataFusion/Rust).
*
* <p>The bridge has two responsibilities:
* <ol>
* <li>{@link #convertFragment} — serialise a logical plan fragment into
* the engine's wire format (e.g., Substrait bytes).</li>
* <li>{@link #execute} — hand the serialised plan to the native engine
* and obtain an opaque handle to the result stream that lives
* entirely in native memory.</li>
* </ol>
*
* <p>Arrow data never crosses the JNI boundary into the JVM heap.
* Consumers read from the native stream via Arrow Flight or
* direct native-memory access using the returned handle.
*
* @param <Fragment> serialised plan type (e.g., {@code byte[]} for Substrait)
* @param <Stream> result stream handle
* @param <LogicalPlan>> logical plan type (e.g., Calcite {@code RelNode})
* @opensearch.internal
*/
public interface EngineBridge<Fragment, Stream, LogicalPlan> {

/**
* Converts a logical plan fragment into the native engine's serialised
* format.
*
* @param fragment the logical plan subtree to serialise
* @return the serialised plan in the engine's wire format
*/
Fragment convertFragment(LogicalPlan fragment);

/**
* Submits the serialised plan to the native engine for execution and
* returns an opaque handle to the result stream.
*
* <p>The returned handle is a pointer into native memory (e.g., a
* {@code long} address of a Rust {@code RecordBatchStream}). The
* caller must eventually close the stream through a corresponding
* native call to avoid leaking resources.
*
* @param fragment the serialised plan produced by {@link #convertFragment}
* @return an opaque handle to the native result stream
*/
Stream execute(Fragment fragment);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.backend;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Declares what the custom engine supports using Calcite's own types.
*/
public class EngineCapabilities {

private final Set<Class<? extends RelNode>> supportedOperators;
private final Set<SqlOperator> supportedFunctions;

/**
* Creates capabilities from explicit operator and function sets.
*
* @param supportedOperators relational operator classes the engine can execute
* @param supportedFunctions scalar and aggregate functions the engine supports
*/
public EngineCapabilities(Set<Class<? extends RelNode>> supportedOperators, Set<SqlOperator> supportedFunctions) {
this.supportedOperators = Set.copyOf(supportedOperators);
this.supportedFunctions = Set.copyOf(supportedFunctions);
}

/** Returns capabilities covering standard Calcite logical operators and all built-in functions. */
public static EngineCapabilities defaultCapabilities() {
return new EngineCapabilities(
Set.of(LogicalTableScan.class, LogicalFilter.class, LogicalAggregate.class, LogicalSort.class),
new HashSet<>(SqlStdOperatorTable.instance().getOperatorList())
);
}

/**
* Returns {@code true} if the engine can execute the given relational operator.
*
* @param node the relational operator to check
*/
public boolean supportsOperator(RelNode node) {
return supportedOperators.contains(node.getClass());
}

/**
* Returns {@code true} if every scalar function in the expression tree is supported.
*
* @param expression the row expression tree to check
*/
public boolean supportsAllFunctions(RexNode expression) {
if (expression == null) {
return true;
}
Boolean result = expression.accept(new FunctionSupportVisitor());
return result == null || result;
}

private class FunctionSupportVisitor extends RexVisitorImpl<Boolean> {
FunctionSupportVisitor() {
super(true);
}

@Override
public Boolean visitCall(RexCall call) {
if (!supportedFunctions.contains(call.getOperator())) {
return false;
}
for (RexNode operand : call.getOperands()) {
Boolean childResult = operand.accept(this);
if (childResult != null && !childResult) {
return false;
}
}
return true;
}
}

/**
* Returns {@code true} if every aggregate function in the list is supported.
*
* @param aggCalls the aggregate calls to check
*/
public boolean supportsAllAggFunctions(List<AggregateCall> aggCalls) {
if (aggCalls == null || aggCalls.isEmpty()) {
return true;
}
for (AggregateCall aggCall : aggCalls) {
if (!supportedFunctions.contains(aggCall.getAggregation())) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Back-end engine abstractions: bridge interface and capability declarations.
*/
package org.opensearch.analytics.backend;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.exec;

/**
* Executes a logical query plan fragment against the underlying data store.
*
* @opensearch.internal
*/
@FunctionalInterface
public interface QueryPlanExecutor<LogicalPlan, Stream> {

/**
* Executes the given logical fragment and returns result rows.
*
* @param plan the logical subtree to execute
* @param context execution context (opaque Object to avoid server dependency)
* @return rows produced by the engine
*/
Stream execute(LogicalPlan plan, Object context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Query plan execution interfaces.
*/
package org.opensearch.analytics.exec;
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.schema;

import org.apache.calcite.schema.SchemaPlus;

/**
* Provides a Calcite {@link SchemaPlus} from the current cluster state.
*
* @opensearch.internal
*/
@FunctionalInterface
public interface SchemaProvider {

/**
* Builds a Calcite {@link SchemaPlus} from the given cluster state.
*
* @param clusterState the current cluster state (opaque Object to avoid
* server dependency in the library)
* @return a SchemaPlus with tables derived from index mappings
*/
SchemaPlus buildSchema(Object clusterState);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Schema construction from OpenSearch cluster metadata.
*/
package org.opensearch.analytics.schema;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.spi;

import org.opensearch.analytics.backend.EngineBridge;
import org.opensearch.analytics.backend.EngineCapabilities;

/**
* SPI extension point for back-end query engines (DataFusion, Lucene, etc.).
* @opensearch.internal
*/
public interface AnalyticsBackEndPlugin {
/** Unique engine name (e.g., "lucene", "datafusion"). */
String name();

/** JNI boundary for executing serialized plans, or null for engines without native execution. */
EngineBridge<?, ?, ?> bridge();

/** Engine capabilities describing supported operators/functions, or null. */
EngineCapabilities getEngineCapabilities();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* SPI extension points for analytics back-end and front-end plugins.
*/
package org.opensearch.analytics.spi;
7 changes: 4 additions & 3 deletions sandbox/libs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ subprojects {
project.afterEvaluate {
configurations.all { Configuration conf ->
dependencies.matching { it instanceof ProjectDependency }.all { ProjectDependency dep ->
Project depProject = dep.dependencyProject
Project depProject = project.project(dep.path)
if (depProject != null
&& false == depProject.path.equals(':libs:opensearch-core')
&& (false == depProject.path.equals(':libs:opensearch-core') &&
false == depProject.path.equals(':libs:opensearch-common'))
&& depProject.path.startsWith(':libs')) {
throw new InvalidUserDataException("projects in :libs " +
"may not depend on other projects libs except " +
":libs:opensearch-core but " +
":libs:opensearch-core or :libs:opensearch-common but " +
"${project.path} depends on ${depProject.path}")
}
}
Expand Down
16 changes: 16 additions & 0 deletions sandbox/plugins/analytics-backend-datafusion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# analytics-backend-datafusion

DataFusion native execution engine plugin. Implements `AnalyticsBackEndPlugin` to provide a back-end that can execute query plan fragments via JNI.

## What it does

Exposes a `DataFusionBridge` (`EngineBridge<byte[]>`) that converts Calcite `RelNode` fragments into a serialized plan format and executes them through a native Rust/DataFusion library. Currently a stub.

## How it fits in

Declares `extendedPlugins = ['analytics-engine']` so the hub discovers it as an `AnalyticsBackEndPlugin`. The hub passes all discovered back-ends to the `QueryPlanExecutorPlugin` during executor creation. The executor will eventually use the bridge and capabilities to route plan fragments to the appropriate engine.

## Key classes

- **`DataFusionPlugin`** — The `AnalyticsBackEndPlugin` SPI implementation. Reports `name() = "datafusion"`.
- **`DataFusionBridge`** — The `EngineBridge<byte[]>` implementation for native execution.
22 changes: 22 additions & 0 deletions sandbox/plugins/analytics-backend-datafusion/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

opensearchplugin {
description = 'DataFusion native execution engine plugin for the query engine.'
classname = 'org.opensearch.be.datafusion.DataFusionPlugin'
extendedPlugins = ['analytics-engine']
}

dependencies {
// Shared types and SPI interfaces (EngineBridge, EngineCapabilities, AnalyticsBackEndPlugin, etc.)
// Also provides calcite-core transitively via api.
api project(':sandbox:libs:analytics-framework')
}

// TODO: Remove once back-end is built out with test suite
testingConventions.enabled = false
Loading
Loading