diff --git a/.gitignore b/.gitignore index 0a784701375d9..83eff29224279 100644 --- a/.gitignore +++ b/.gitignore @@ -68,3 +68,4 @@ testfixtures_shared/ # build files generated doc-tools/missing-doclet/bin/ +/sandbox/plugins/engine-datafusion/target/ diff --git a/libs/common/src/main/java/org/opensearch/common/CheckedTriFunction.java b/libs/common/src/main/java/org/opensearch/common/CheckedTriFunction.java index 7898226b751f7..53d2adb3951b8 100644 --- a/libs/common/src/main/java/org/opensearch/common/CheckedTriFunction.java +++ b/libs/common/src/main/java/org/opensearch/common/CheckedTriFunction.java @@ -8,11 +8,14 @@ package org.opensearch.common; +import org.opensearch.common.annotation.ExperimentalApi; + /** * A {@link TriFunction}-like interface which allows throwing checked exceptions. * * @opensearch.internal */ +@ExperimentalApi @FunctionalInterface public interface CheckedTriFunction { R apply(S s, T t, U u) throws E; diff --git a/sandbox/libs/analytics-framework/build.gradle b/sandbox/libs/analytics-framework/build.gradle index 13e3d008f0a16..8748528a48dce 100644 --- a/sandbox/libs/analytics-framework/build.gradle +++ b/sandbox/libs/analytics-framework/build.gradle @@ -15,6 +15,7 @@ def calciteVersion = '1.41.0' dependencies { + compileOnly project(':server') api "org.apache.calcite:calcite-core:${calciteVersion}" // Calcite's expression tree and Enumerable runtime — required by calcite-core API api "org.apache.calcite:calcite-linq4j:${calciteVersion}" @@ -35,7 +36,7 @@ dependencies { testingConventions.enabled = false -// analytics-framework does not depend on server +// analytics-framework depends on server for SearchAnalyticsBackEndPlugin SPI tasks.named('forbiddenApisMain').configure { replaceSignatureFiles 'jdk-signatures' failOnMissingClasses = false diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineResultBatch.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineResultBatch.java new file mode 100644 index 0000000000000..d062bcfe079af --- /dev/null +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineResultBatch.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.analytics.backend; + +import java.util.List; + +/** + * Read-only view of a single record batch. Provides field names, row count, + * and positional access to field values. + * + * @opensearch.internal + */ +public interface EngineResultBatch { + + /** + * Ordered list of field (column) names in this batch. + */ + List getFieldNames(); + + /** + * Number of rows in this batch. + */ + int getRowCount(); + + /** + * Returns the value at the given row index for the named field. + * + * @param fieldName column name + * @param rowIndex zero-based row index + * @return the value (may be null) + */ + Object getFieldValue(String fieldName, int rowIndex); +} diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineResultBatchIterator.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineResultBatchIterator.java new file mode 100644 index 0000000000000..1de5bbd5b64c5 --- /dev/null +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineResultBatchIterator.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.analytics.backend; + +import java.util.Iterator; + +/** + * Single-pass iterator over record batches from an {@link EngineResultStream}. + * + * @opensearch.internal + */ +public interface EngineResultBatchIterator extends Iterator {} diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineResultStream.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineResultStream.java new file mode 100644 index 0000000000000..7c189b4079889 --- /dev/null +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineResultStream.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.analytics.backend; + +/** + * A closeable stream of record batches returned by engine execution. + * Callers iterate batches via the returned iterator and MUST close the stream + * when done to release native resources. + * + * @opensearch.internal + */ +public interface EngineResultStream extends AutoCloseable { + + /** + * Returns an iterator over the record batches in this stream. + * Each call returns the same iterator instance — the stream is single-pass. + */ + EngineResultBatchIterator iterator(); + + @Override + void close(); +} diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/jni/NativeHandle.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/jni/NativeHandle.java new file mode 100644 index 0000000000000..f1131432a2950 --- /dev/null +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/jni/NativeHandle.java @@ -0,0 +1,94 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.analytics.backend.jni; + +import java.lang.ref.Cleaner; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Base class for type-safe native pointer wrappers. + * Provides automatic resource management and prevents use-after-close errors. + * Subclasses must implement {@link #doClose()} to release native resources. + * Cleaner is used to ensure resources are cleaned up even if the object is not explicitly closed. + */ +public abstract class NativeHandle implements AutoCloseable { + + protected final long ptr; + private final AtomicBoolean closed = new AtomicBoolean(false); + protected static final long NULL_POINTER = 0L; + private final Cleaner.Cleanable cleanable; + + private static final Cleaner CLEANER = Cleaner.create(); + + /** + * Creates a new native handle. + * @param ptr the native pointer (must not be 0) + * @throws IllegalArgumentException if ptr is 0 + */ + protected NativeHandle(long ptr) { + if (ptr == NULL_POINTER) { + throw new IllegalArgumentException("Null native pointer"); + } + this.ptr = ptr; + this.cleanable = CLEANER.register(this, new CleanupAction(ptr, this::doClose)); + } + + /** + * Ensures the handle is still open. + * @throws IllegalStateException if the handle has been closed + */ + public void ensureOpen() { + if (closed.get()) { + throw new IllegalStateException("Handle already closed"); + } + } + + /** + * Gets the native pointer value. + * @return the native pointer + * @throws IllegalStateException if the handle has been closed + */ + public long getPointer() { + ensureOpen(); + return ptr; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + cleanable.clean(); + } + } + + /** + * Releases the native resource. + * Called once when the handle is closed. + * Subclasses must implement this to free native memory. + */ + protected abstract void doClose(); + + /** + * Cleans up the native resource. + * Called by the cleaner when the handle is garbage collected. + */ + private static final class CleanupAction implements Runnable { + private final long ptr; + private final Runnable doClose; + + CleanupAction(long ptr, Runnable doClose) { + this.ptr = ptr; + this.doClose = doClose; + } + + @Override + public void run() { + doClose.run(); + } + } +} diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsBackEndPlugin.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java similarity index 78% rename from sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsBackEndPlugin.java rename to sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java index 454c6c17bd7f0..a942c70f0328d 100644 --- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsBackEndPlugin.java +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java @@ -10,18 +10,20 @@ import org.apache.calcite.sql.SqlOperatorTable; import org.opensearch.analytics.backend.EngineBridge; +import org.opensearch.plugins.SearchBackEndPlugin; /** * SPI extension point for back-end query engines (DataFusion, Lucene, etc.). * @opensearch.internal */ -public interface AnalyticsBackEndPlugin { +public interface AnalyticsSearchBackendPlugin extends SearchBackEndPlugin { /** Unique engine name (e.g., "lucene", "datafusion"). */ String name(); /** JNI boundary for executing serialized plans, or null for engines without native execution. */ - EngineBridge bridge(); + EngineBridge bridge(); // TODO this doesn't have context / index shard init /** Supported functions as a Calcite operator table, or null if the back-end adds no functions. */ SqlOperatorTable operatorTable(); + } diff --git a/sandbox/plugins/analytics-backend-datafusion/build.gradle b/sandbox/plugins/analytics-backend-datafusion/build.gradle index 61fec92b7219d..89929e691d7c9 100644 --- a/sandbox/plugins/analytics-backend-datafusion/build.gradle +++ b/sandbox/plugins/analytics-backend-datafusion/build.gradle @@ -16,6 +16,9 @@ dependencies { // Shared types and SPI interfaces (EngineBridge, AnalyticsBackEndPlugin, etc.) // Also provides calcite-core transitively via api. api project(':sandbox:libs:analytics-framework') + + implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}" + implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}" } // TODO: Remove once back-end is built out with test suite diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionBridge.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionBridge.java index 97b4326361a0c..a61afaeea8fcb 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionBridge.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionBridge.java @@ -14,6 +14,8 @@ /** * DataFusion EngineBridge implementation. * Uses a byte[] representing serialized plan to execute. + * // TODO : we need a stateful engine, not just a bridge, evaluate + * // switch to SearchExecEngine */ public class DataFusionBridge implements EngineBridge { // S=byte[] (Substrait), H=Long (stream pointer), L=RelNode (logical plan) diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java index 79f4f834bfdb4..7987b2d16d0c0 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java @@ -9,19 +9,93 @@ package org.opensearch.be.datafusion; import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.analytics.backend.EngineBridge; -import org.opensearch.analytics.spi.AnalyticsBackEndPlugin; +import org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.SearchExecEngine; +import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.client.Client; +import org.opensearch.watcher.ResourceWatcherService; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; /** - * DataFusion native execution engine plugin. + * Main plugin class for the DataFusion native engine integration. + *

+ * Initializes the {@link DataFusionService} at node startup and creates + * per-shard {@link DatafusionSearchExecEngine} instances via the + * {@link AnalyticsSearchBackendPlugin} SPI. */ -public class DataFusionPlugin extends Plugin implements AnalyticsBackEndPlugin { +public class DataFusionPlugin extends Plugin implements AnalyticsSearchBackendPlugin { + + private static final Logger logger = LogManager.getLogger(DataFusionPlugin.class); + + /** Memory pool limit for the DataFusion runtime. */ + public static final Setting DATAFUSION_MEMORY_POOL_LIMIT = Setting.longSetting( + "datafusion.memory_pool_limit_bytes", + Runtime.getRuntime().maxMemory() / 4, + 0L, + Setting.Property.NodeScope + ); - /** Creates a new DataFusion plugin. */ - public DataFusionPlugin() {} + /** Spill memory limit — when exceeded, DataFusion spills to disk. */ + public static final Setting DATAFUSION_SPILL_MEMORY_LIMIT = Setting.longSetting( + "datafusion.spill_memory_limit_bytes", + Runtime.getRuntime().maxMemory() / 8, + 0L, + Setting.Property.NodeScope + ); - private final DataFusionBridge bridge = new DataFusionBridge(); + private final Settings settings; + private volatile DataFusionService dataFusionService; + + public DataFusionPlugin(Settings settings) { + this.settings = settings; + } + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier + ) { + long memoryPoolLimit = DATAFUSION_MEMORY_POOL_LIMIT.get(settings); + long spillMemoryLimit = DATAFUSION_SPILL_MEMORY_LIMIT.get(settings); + String spillDir = environment.dataFiles()[0].getParent().resolve("tmp").toAbsolutePath().toString(); + + dataFusionService = new DataFusionService(memoryPoolLimit, spillDir, spillMemoryLimit); + dataFusionService.start(); + logger.info("DataFusion plugin initialized — memory pool {}B, spill limit {}B", memoryPoolLimit, spillMemoryLimit); + + return Collections.singletonList(dataFusionService); + } @Override public String name() { @@ -30,11 +104,38 @@ public String name() { @Override public EngineBridge bridge() { - return bridge; + return null; // TODO decide between bridge and SearchExecEngine } @Override public SqlOperatorTable operatorTable() { return null; } + + @Override + public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) throws IOException { + return new DatafusionReaderManager(format, shardPath); + } + + @Override + public SearchExecEngine createSearchExecEngine(DataFormat format, ShardPath shardPath) throws IOException { + if (dataFusionService == null) { + throw new IllegalStateException("DataFusionPlugin.createComponents() has not been called yet"); + } + return new DatafusionSearchExecEngine(dataFusionService.getNativeRuntime(), format); + } + + /** + * Data formats this plugin can handle. Used by CompositeEngine to route queries. + */ + public List getSupportedFormats() { + return null; // TODO : List.of("parquet"); + } + + @Override + public void close() throws IOException { + if (dataFusionService != null) { + dataFusionService.close(); + } + } } diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java new file mode 100644 index 0000000000000..2cf1811a8b436 --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; + +import java.io.IOException; + +/** + * Node-level service managing the DataFusion native runtime lifecycle. + *

+ * All per-shard {@link DatafusionSearchExecEngine} instances share the single + * Tokio runtime and memory pool owned by this service. The service loads the + * native JNI library on start and tears down the runtime on stop/close. + */ +public class DataFusionService extends AbstractLifecycleComponent { + + private static final Logger logger = LogManager.getLogger(DataFusionService.class); + private static final String NATIVE_LIBRARY_NAME = "opensearch_datafusion_jni"; + + private final long memoryPoolLimit; + private final String spillDirectory; + private final long spillMemoryLimit; + + /** Handle to the native DataFusion global runtime (Tokio + memory pool). */ + private volatile NativeRuntimeHandle runtimeHandle; + + /** + * Creates a new DataFusionService. + * + * @param memoryPoolLimit maximum bytes for the DataFusion memory pool + * @param spillDirectory directory for spill files when memory is exceeded + * @param spillMemoryLimit maximum bytes before spilling to disk + */ + public DataFusionService(long memoryPoolLimit, String spillDirectory, long spillMemoryLimit) { + this.memoryPoolLimit = memoryPoolLimit; + this.spillDirectory = spillDirectory; + this.spillMemoryLimit = spillMemoryLimit; + } + + @Override + protected void doStart() { + logger.info("Starting DataFusion service — loading native library [{}]", NATIVE_LIBRARY_NAME); + try { + System.loadLibrary(NATIVE_LIBRARY_NAME); + } catch (UnsatisfiedLinkError e) { + throw new IllegalStateException("Failed to load native library: " + NATIVE_LIBRARY_NAME, e); + } + + // TODO: initialize Tokio runtime and memory pool via NativeBridge + // long ptr = NativeBridge.createGlobalRuntime(memoryPoolLimit, spillDirectory, spillMemoryLimit); + long ptr = 1L; // placeholder until NativeBridge is wired + this.runtimeHandle = new NativeRuntimeHandle(ptr); + logger.info("DataFusion service started"); + } + + @Override + protected void doStop() { + logger.info("Stopping DataFusion service"); + releaseRuntime(); + } + + @Override + protected void doClose() throws IOException { + releaseRuntime(); + } + + /** + * Returns the handle to the native DataFusion global runtime. + * All consumers should hold this reference and call {@link NativeRuntimeHandle#get()} + * at JNI invocation time to obtain the current live pointer. + * + * @throws IllegalStateException if the service has not been started + */ + public NativeRuntimeHandle getNativeRuntime() { + NativeRuntimeHandle handle = runtimeHandle; + if (handle == null) { + throw new IllegalStateException("DataFusionService has not been started"); + } + return handle; + } + + /** + * Returns the cache manager for per-shard cache management. + * Used by DatafusionReaderManager to evict stale entries on file deletion. + */ + // TODO: uncomment when CacheManager class is available + // public CacheManager getCacheManager() { return cacheManager; } + + private void releaseRuntime() { + NativeRuntimeHandle handle = runtimeHandle; + if (handle != null) { + handle.close(); + runtimeHandle = null; + logger.info("DataFusion native runtime released"); + } + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionContext.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionContext.java new file mode 100644 index 0000000000000..d9a85ef04edb0 --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionContext.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion; + +import org.opensearch.be.datafusion.jni.StreamHandle; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.IndexFilterTree; +import org.opensearch.search.SearchExecutionContext; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.ShardSearchRequest; + +import java.io.IOException; + +/** + * DataFusion-specific search execution context. + *

+ * Carries the DataFusion query plan, engine searcher, optional {@link IndexFilterTree}, + * and the native result stream handle after execution. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class DatafusionContext implements SearchExecutionContext { + + private final ShardSearchRequest request; + private final SearchShardTarget shardTarget; + private final DatafusionSearcher engineSearcher; + private final NativeRuntimeHandle nativeRuntime; + private DatafusionQuery datafusionQuery; + private IndexFilterTree filterTree; + private StreamHandle streamHandle; + + public DatafusionContext( + ShardSearchRequest request, + SearchShardTarget shardTarget, + DatafusionReader reader, + NativeRuntimeHandle nativeRuntime + ) throws IOException { + this.request = request; + this.shardTarget = shardTarget; + this.engineSearcher = new DatafusionSearcher(reader.getReaderHandle()); + this.nativeRuntime = nativeRuntime; + } + + @Override + public ShardSearchRequest request() { + return request; + } + + @Override + public SearchShardTarget shardTarget() { + return shardTarget; + } + + @Override + public void close() throws IOException { + try { + if (streamHandle != null) { + streamHandle.close(); + streamHandle = null; + } + } finally { + try { + if (filterTree != null) { + filterTree.close(); + } + } finally { + engineSearcher.close(); + } + } + } + + // DataFusion-specific + + public DatafusionSearcher getEngineSearcher() { + return engineSearcher; + } + + /** + * Returns the live native runtime pointer for JNI calls. + */ + public long getRuntimePtr() { + return nativeRuntime.get(); + } + + public DatafusionQuery getDatafusionQuery() { + return datafusionQuery; + } + + public void setDatafusionQuery(DatafusionQuery query) { + this.datafusionQuery = query; + } + + public IndexFilterTree getFilterTree() { + return filterTree; + } + + public void setFilterTree(IndexFilterTree filterTree) { + this.filterTree = filterTree; + } + + /** + * Returns the native result stream handle, or {@code null} if execution has not completed. + */ + public StreamHandle getStreamHandle() { + return streamHandle; + } + + /** + * Sets the native result stream handle after query execution. + */ + public void setStreamHandle(StreamHandle streamHandle) { + this.streamHandle = streamHandle; + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionQuery.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionQuery.java new file mode 100644 index 0000000000000..4d7fde7c6c503 --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionQuery.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion; + +/** + * Represents a DataFusion query — wraps substrait plan bytes and execution metadata. + */ +public class DatafusionQuery { + + private final String indexName; + private final byte[] substraitBytes; + private boolean fetchPhase; + + public DatafusionQuery(String indexName, byte[] substraitBytes) { + this.indexName = indexName; + this.substraitBytes = substraitBytes; + } + + public String getIndexName() { + return indexName; + } + + public byte[] getSubstraitBytes() { + return substraitBytes; + } + + public boolean isFetchPhase() { + return fetchPhase; + } + + public void setFetchPhase(boolean fetchPhase) { + this.fetchPhase = fetchPhase; + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReader.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReader.java new file mode 100644 index 0000000000000..25e5185a731ba --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReader.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.be.datafusion.jni.ReaderHandle; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.WriterFileSet; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; + +/** + * DataFusion reader for JNI operations. + *

+ * Each reader represents a point-in-time snapshot of parquet/arrow files for a shard. + * Created from a catalog snapshot during refresh; closed when associated catalog snapshot is removed + * + * @opensearch.experimental + */ +@ExperimentalApi +public class DatafusionReader implements Closeable { + + private static final Logger logger = LogManager.getLogger(DatafusionReader.class); + private final String directoryPath; + private final ReaderHandle readerHandle; + + /** + * @param directoryPath shard data directory + * @param files The file metadata collection + */ + public DatafusionReader(String directoryPath, Collection files) { + this.directoryPath = directoryPath; + String[] fileNames = new String[0]; + if (files != null) { + fileNames = files.stream().flatMap(writerFileSet -> writerFileSet.files().stream()).toArray(String[]::new); + } + readerHandle = new ReaderHandle(directoryPath, fileNames); + } + + @Override + public void close() throws IOException { + readerHandle.close(); + logger.debug("DatafusionReader closed for [{}]", directoryPath); + } + + /** + * Returns the type-safe handle to the native reader. + * Callers should hold this reference and call + * {@link ReaderHandle#getPointer()} only at JNI invocation time. + */ + public ReaderHandle getReaderHandle() { + return readerHandle; + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReaderManager.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReaderManager.java new file mode 100644 index 0000000000000..04160413e26bb --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReaderManager.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.exec.CatalogSnapshot; +import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.shard.ShardPath; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Manages {@link DatafusionReader} instances (native memory). + *

+ * Acquire returns a DatafusionReader with incremented ref count; + * release decrements it. On refresh, a new reader is swapped in + * atomically from the updated catalog snapshot. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class DatafusionReaderManager implements EngineReaderManager { + + Map readers = new HashMap<>(); + private final DataFormat dataFormat; + private final String directoryPath; + + public DatafusionReaderManager(DataFormat dataFormat, ShardPath shardPath) { + this.dataFormat = dataFormat; + directoryPath = shardPath.getDataPath().resolve(dataFormat.name()).toString(); + } + + @Override + public DatafusionReader getReader(CatalogSnapshot catalogSnapshot) throws IOException { + if (readers.containsKey(catalogSnapshot)) { + return readers.get(catalogSnapshot); + } + throw new IOException("No DataFusion reader available"); + } + + @Override + public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException { + readers.remove(catalogSnapshot).close(); + } + + @Override + public void onFilesDeleted(Collection files) throws IOException { + // TODO: evict deleted files from cache manager + } + + @Override + public void onFilesAdded(Collection files) throws IOException { + // TODO: Add new files to cache manager + } + + @Override + public void beforeRefresh() throws IOException {} + + @Override + public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException { + if (!didRefresh) return; + // This catalog snapshot is already present in the reader manager + if (readers.containsKey(catalogSnapshot)) { + return; + } + DatafusionReader reader = new DatafusionReader(directoryPath, catalogSnapshot.getSearchableFiles(dataFormat.name())); + readers.put(catalogSnapshot, reader); + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionResultStream.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionResultStream.java new file mode 100644 index 0000000000000..a5e1dc79786e0 --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionResultStream.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion; + +import org.opensearch.analytics.backend.EngineResultBatch; +import org.opensearch.analytics.backend.EngineResultBatchIterator; +import org.opensearch.analytics.backend.EngineResultStream; +import org.opensearch.be.datafusion.jni.NativeBridge; +import org.opensearch.be.datafusion.jni.StreamHandle; +import org.opensearch.common.annotation.ExperimentalApi; + +import java.util.NoSuchElementException; + +/** + * {@link EngineResultStream} backed by a native DataFusion record batch stream. + *

+ * Reads Arrow record batches from the native stream via JNI and exposes them + * as {@link EngineResultBatch} instances. The stream is single-pass; calling + * {@link #iterator()} multiple times returns the same iterator. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class DatafusionResultStream implements EngineResultStream { + + private final StreamHandle streamHandle; + private volatile BatchIterator iteratorInstance; + + public DatafusionResultStream(StreamHandle streamHandle) { + this.streamHandle = streamHandle; + } + + @Override + public EngineResultBatchIterator iterator() { + if (iteratorInstance == null) { + iteratorInstance = new BatchIterator(streamHandle); + } + return iteratorInstance; + } + + @Override + public void close() { + streamHandle.close(); + } + + /** + * Iterator that pulls Arrow record batches from the native stream via JNI. + * Each call to {@link #next()} returns a batch wrapping the current Arrow data. + */ + static class BatchIterator implements EngineResultBatchIterator { + + private final StreamHandle streamHandle; + private Boolean hasNext; + + BatchIterator(StreamHandle streamHandle) { + this.streamHandle = streamHandle; + } + + @Override + public boolean hasNext() { + if (hasNext == null) { + long arrowArrayAddr = NativeBridge.streamNext(streamHandle.getStreamPtr(), streamHandle.getPointer()); + hasNext = arrowArrayAddr != 0; + // TODO: if hasNext, import ArrowArray into VectorSchemaRoot and cache for next() + } + return hasNext; + } + + @Override + public EngineResultBatch next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + hasNext = null; + // TODO: return batch wrapping the imported VectorSchemaRoot + throw new UnsupportedOperationException("Arrow C Data import not yet wired"); + } + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSearchExecEngine.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSearchExecEngine.java new file mode 100644 index 0000000000000..ea5deba39de0f --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSearchExecEngine.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.exec.SearchExecEngine; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.ShardSearchRequest; + +import java.io.IOException; + +/** + * DataFusion-backed search execution engine. + *

+ * Converts logical plan fragments to Substrait, executes them via the native + * DataFusion runtime, and returns results as a {@link DatafusionResultStream}. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class DatafusionSearchExecEngine implements SearchExecEngine { + + private final NativeRuntimeHandle nativeRuntime; + + public DatafusionSearchExecEngine(NativeRuntimeHandle nativeRuntime, DataFormat dataFormat) { + this.nativeRuntime = nativeRuntime; + } + + @Override + public byte[] convertFragment(Object fragment) { + // TODO: wire Substrait conversion (RelNode → Substrait bytes) + throw new UnsupportedOperationException("Substrait conversion not yet wired"); + } + + @Override + public DatafusionContext createContext( + Object reader, + byte[] plan, + ShardSearchRequest request, + SearchShardTarget shardTarget, + SearchShardTask task + ) throws IOException { + DatafusionReader dfReader = (DatafusionReader) reader; + DatafusionContext context = new DatafusionContext(request, shardTarget, dfReader, nativeRuntime); + context.setDatafusionQuery(new DatafusionQuery("", plan)); + return context; + } + + @Override + public DatafusionResultStream execute(DatafusionContext context) throws IOException { + DatafusionSearcher searcher = context.getEngineSearcher(); + searcher.search(context); + return new DatafusionResultStream(context.getStreamHandle()); + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSearcher.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSearcher.java new file mode 100644 index 0000000000000..b9f9d61e76aa1 --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSearcher.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion; + +import org.opensearch.be.datafusion.jni.NativeBridge; +import org.opensearch.be.datafusion.jni.ReaderHandle; +import org.opensearch.be.datafusion.jni.StreamHandle; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.EngineSearcher; + +import java.io.IOException; + +/** + * DataFusion searcher — executes substrait query plans against a native DataFusion reader. + *

+ * After {@link #search}, the result stream handle is available on the context + * via {@link DatafusionContext#getStreamHandle()}. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class DatafusionSearcher implements EngineSearcher { + + private final ReaderHandle readerHandle; + + public DatafusionSearcher(ReaderHandle readerHandle) { + this.readerHandle = readerHandle; + } + + @Override + public void search(DatafusionContext context) throws IOException { + if (context.getFilterTree() == null) { + searchVanilla(context); + } else { + searchWithFilterTree(context); + } + } + + private void searchWithFilterTree(DatafusionContext context) { + throw new UnsupportedOperationException("Indexed query path not yet wired"); + } + + private void searchVanilla(DatafusionContext context) throws IOException { + DatafusionQuery query = context.getDatafusionQuery(); + if (query == null) { + throw new IllegalStateException("DatafusionQuery must be set before search"); + } + long streamPtr = NativeBridge.executeQuery( + readerHandle.getPointer(), + query.getIndexName(), + query.getSubstraitBytes(), + context.getRuntimePtr() + ); + context.setStreamHandle(new StreamHandle(streamPtr, context.getRuntimePtr())); + } + + /** + * Returns the type-safe handle to the native reader. + * Call {@link ReaderHandle#getPointer()} only at JNI invocation time + * to get the raw pointer with a liveness check. + */ + public ReaderHandle getReaderHandle() { + return readerHandle; + } + + @Override + public void close() { + // ReaderHandle lifecycle is owned by DatafusionReader / EngineReaderManager, + // not by the searcher. Do not close it here. + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/NativeRuntimeHandle.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/NativeRuntimeHandle.java new file mode 100644 index 0000000000000..77af5ff83e1d9 --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/NativeRuntimeHandle.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.Closeable; + +/** + * Thread-safe wrapper around a native runtime pointer. + *

+ * Encapsulates the raw {@code long} so it cannot be copied or used after + * the runtime is destroyed. All consumers obtain the pointer via {@link #get()} + * which performs a liveness check on every call. + *

+ * Implements {@link Closeable} so it integrates with try-with-resources, + * {@code IOUtils.close()}, and leak detection infrastructure. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class NativeRuntimeHandle implements Closeable { + + private volatile long pointer; + + /** + * Creates a handle wrapping the given native pointer. + * + * @param pointer the native runtime pointer (must be non-zero) + * @throws IllegalArgumentException if pointer is zero + */ + public NativeRuntimeHandle(long pointer) { + if (pointer == 0L) { + throw new IllegalArgumentException("Cannot create NativeRuntimeHandle with null pointer"); + } + this.pointer = pointer; + } + + /** + * Returns the native runtime pointer, checking that it is still live. + * + * @throws IllegalStateException if the handle has been closed + */ + public long get() { + long ptr = pointer; + if (ptr == 0L) { + throw new IllegalStateException("Native runtime handle has been closed"); + } + return ptr; + } + + /** + * Returns true if the handle has not been closed. + */ + public boolean isOpen() { + return pointer != 0L; + } + + /** + * Releases the native runtime. Idempotent and thread-safe. + * After this call, {@link #get()} will throw. + */ + @Override + public synchronized void close() { + long ptr = pointer; + if (ptr != 0L) { + // TODO: NativeBridge.closeGlobalRuntime(ptr); + pointer = 0L; + } + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/NativeBridge.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/NativeBridge.java new file mode 100644 index 0000000000000..20caa6cbd3251 --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/NativeBridge.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion.jni; + +/** + * Core JNI bridge to native DataFusion library. + * All native method declarations are centralized here. + */ +public final class NativeBridge { + + static { + // TODO : NativeLibraryLoader.load("opensearch_datafusion_jni"); + } + + private NativeBridge() {} + + public static native long createDatafusionReader(String path, String[] files); + + public static native void closeDatafusionReader(long ptr); + + public static native long createGlobalRuntime(long memoryLimit, long cacheManagerPtr, String spillDir, long spillLimit); + + public static native void closeGlobalRuntime(long ptr); + + /** + * Executes a substrait plan against the given reader and returns a stream pointer. + * + * @param readerPtr native reader pointer + * @param tableName table name for registration with DataFusion + * @param substraitPlan serialized substrait plan bytes + * @param runtimePtr native runtime pointer + * @return native stream pointer (caller must close via {@link #streamClose}) + */ + public static native long executeQuery(long readerPtr, String tableName, byte[] substraitPlan, long runtimePtr); + + /** + * Returns the Arrow schema address for the given stream. + * + * @param streamPtr native stream pointer + * @return ArrowSchema C Data Interface address + */ + public static native long streamGetSchema(long streamPtr); + + /** + * Loads the next record batch from the stream. + * + * @param runtimePtr native runtime pointer + * @param streamPtr native stream pointer + * @return ArrowArray C Data Interface address, or 0 if end-of-stream + */ + public static native long streamNext(long runtimePtr, long streamPtr); + + /** + * Closes the native stream and releases associated resources. + * + * @param streamPtr native stream pointer + */ + public static native void streamClose(long streamPtr); +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/ReaderHandle.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/ReaderHandle.java new file mode 100644 index 0000000000000..fed2b8601b845 --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/ReaderHandle.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion.jni; + +import org.opensearch.analytics.backend.jni.NativeHandle; + +/** + * Type-safe handle for native reader. + */ +public final class ReaderHandle extends NativeHandle { + + public ReaderHandle(String path, String[] files) { + super(NativeBridge.createDatafusionReader(path, files)); + } + + /** + * Closes the datafusion reader and releases any associated resources. + */ + @Override + protected void doClose() { + NativeBridge.closeDatafusionReader(ptr); + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/StreamHandle.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/StreamHandle.java new file mode 100644 index 0000000000000..53b380867e90b --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/StreamHandle.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.datafusion.jni; + +import org.opensearch.analytics.backend.jni.NativeHandle; + +/** + * Type-safe handle for a native DataFusion result stream. + * Wraps the stream pointer returned by {@link NativeBridge#executeQuery}. + */ +public final class StreamHandle extends NativeHandle { + + private final long streamPtr; + + public StreamHandle(long ptr, long streamPtr) { + super(ptr); + this.streamPtr = streamPtr; + } + + public long getStreamPtr() { + return streamPtr; + } + + @Override + protected void doClose() { + NativeBridge.streamClose(ptr); + } +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/package-info.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/package-info.java new file mode 100644 index 0000000000000..6a8481365c71c --- /dev/null +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/jni/package-info.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * JNI bridge layer for DataFusion native library integration. + * + *

This package provides: + *

    + *
  • Type-safe native handle wrappers ({@link org.opensearch.be.datafusion.jni.ReaderHandle})
  • + *
  • Centralized native method declarations ({@link org.opensearch.be.datafusion.jni.NativeBridge})
  • + *
+ * + */ +package org.opensearch.be.datafusion.jni; diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/package-info.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/package-info.java index dccab0e7fb8a7..07ffaf562b3f0 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/package-info.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/package-info.java @@ -6,7 +6,4 @@ * compatible open source license. */ -/** - * DataFusion native execution engine back-end plugin. - */ package org.opensearch.be.datafusion; diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/resources/META-INF/services/org.opensearch.analytics.spi.AnalyticsBackEndPlugin b/sandbox/plugins/analytics-backend-datafusion/src/main/resources/META-INF/services/org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin similarity index 100% rename from sandbox/plugins/analytics-backend-datafusion/src/main/resources/META-INF/services/org.opensearch.analytics.spi.AnalyticsBackEndPlugin rename to sandbox/plugins/analytics-backend-datafusion/src/main/resources/META-INF/services/org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin diff --git a/sandbox/plugins/analytics-backend-lucene/build.gradle b/sandbox/plugins/analytics-backend-lucene/build.gradle new file mode 100644 index 0000000000000..c0413a6c6d41a --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/build.gradle @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +apply plugin: 'opensearch.internal-cluster-test' + +opensearchplugin { + description = 'OpenSearch plugin providing Lucene-based search execution engine' + classname = 'org.opensearch.lucene.LuceneSearchEnginePlugin' +} + +dependencies { + // Shared types and SPI interfaces (EngineBridge, AnalyticsBackEndPlugin, etc.) + // Also provides calcite-core transitively via api. + api project(':sandbox:libs:analytics-framework') + + implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}" + implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}" +} + +test { + systemProperty 'tests.security.manager', 'false' +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneEngineSearcher.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneEngineSearcher.java new file mode 100644 index 0000000000000..6cd3605499c07 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneEngineSearcher.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.EngineSearcher; + +import java.io.IOException; +import java.util.List; + +/** + * Lucene-backed engine searcher. + *

+ * This class is stateless with respect to active queries + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneEngineSearcher implements EngineSearcher { + + private final IndexSearcher indexSearcher; + private final DirectoryReader directoryReader; + + public LuceneEngineSearcher(IndexSearcher indexSearcher, DirectoryReader directoryReader) { + this.indexSearcher = indexSearcher; + this.directoryReader = directoryReader; + } + + /** + * Execute: create a Weight from the query, register it on the + * context's lifecycle manager, and store the key + segment metadata + * on the context for JNI callbacks. + */ + @Override + public void search(LuceneSearchContext context) throws IOException { + Query query = context.getQuery(); + if (query == null) { + throw new IllegalStateException("No query set on LuceneSearchContext"); + } + Query rewritten = indexSearcher.rewrite(query); + Weight weight = indexSearcher.createWeight(rewritten, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + List leaves = directoryReader.leaves(); + // TODO : Complete the wiring for search execution + + } + + public IndexSearcher getIndexSearcher() { + return indexSearcher; + } + + public DirectoryReader getDirectoryReader() { + return directoryReader; + } + + @Override + public void close() {} +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexFilterContext.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexFilterContext.java new file mode 100644 index 0000000000000..796a1c3cdcf17 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexFilterContext.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.CollectorQueryLifecycleManager; +import org.opensearch.index.engine.exec.IndexFilterContext; + +import java.io.IOException; +import java.util.List; + +/** + * Lucene-specific index filter context. + *

+ * Holds the Weight (per-query), and manages per-segment scorers/collectors. + * One context per (query, reader) pair. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneIndexFilterContext implements IndexFilterContext { + + private final Weight weight; + private final List leaves; + private final CollectorQueryLifecycleManager collectorManager = new CollectorQueryLifecycleManager(); + + public LuceneIndexFilterContext(Query query, DirectoryReader reader) throws IOException { + IndexSearcher searcher = new IndexSearcher(reader); + Query rewritten = searcher.rewrite(query); + this.weight = searcher.createWeight(rewritten, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + this.leaves = reader.leaves(); + } + + @Override + public int segmentCount() { + return leaves.size(); + } + + @Override + public int segmentMaxDoc(int segmentOrd) { + return leaves.get(segmentOrd).reader().maxDoc(); + } + + Weight getWeight() { + return weight; + } + + List getLeaves() { + return leaves; + } + + /** + * Returns the collector lifecycle manager + */ + public CollectorQueryLifecycleManager getCollectorManager() { + return collectorManager; + } + + @Override + public void close() { + collectorManager.close(); + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexFilterProvider.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexFilterProvider.java new file mode 100644 index 0000000000000..9aae1e997b2b2 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexFilterProvider.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorer; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.CollectorQueryLifecycleManager; +import org.opensearch.index.engine.exec.IndexFilterProvider; +import org.opensearch.index.engine.exec.SegmentCollector; + +import java.io.IOException; +import java.util.BitSet; + +/** + * Lucene-backed {@link IndexFilterProvider}. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneIndexFilterProvider implements IndexFilterProvider { + + @Override + public LuceneIndexFilterContext createContext(Query query, DirectoryReader reader) throws IOException { + return new LuceneIndexFilterContext(query, reader); + } + + /** + * Creates a collector for the given segment and registers it in the + * context's {@link CollectorQueryLifecycleManager}. + * + * @return an int key that identifies this collector across JNI + */ + @Override + public int createCollector(LuceneIndexFilterContext context, int segmentOrd, int minDoc, int maxDoc) { + SegmentCollector collector = createCollectorInternal(context, segmentOrd, minDoc, maxDoc); + return context.getCollectorManager().registerCollector(collector); + } + + /** + * Collects matching doc IDs for the collector identified by {@code key}. + */ + public long[] collectDocs(LuceneIndexFilterContext context, int key, int minDoc, int maxDoc) { + return context.getCollectorManager().collectDocs(key, minDoc, maxDoc); + } + + /** + * Releases the collector identified by {@code key}. + */ + public void releaseCollector(LuceneIndexFilterContext context, int key) { + context.getCollectorManager().releaseCollector(key); + } + + @Override + public void close() {} + + private SegmentCollector createCollectorInternal(LuceneIndexFilterContext context, int segmentOrd, int minDoc, int maxDoc) { + try { + Scorer scorer = context.getWeight().scorer(context.getLeaves().get(segmentOrd)); + if (scorer == null) { + return EMPTY_COLLECTOR; + } + return new LuceneSegmentCollector(scorer.iterator(), minDoc, maxDoc); + } catch (IOException e) { + return EMPTY_COLLECTOR; + } + } + + private static final SegmentCollector EMPTY_COLLECTOR = (min, max) -> new long[0]; + + private static class LuceneSegmentCollector implements SegmentCollector { + private final DocIdSetIterator iterator; + private final int collectorMinDoc; + private final int collectorMaxDoc; + private int currentDoc = -1; + + LuceneSegmentCollector(DocIdSetIterator iterator, int minDoc, int maxDoc) { + this.iterator = iterator; + this.collectorMinDoc = minDoc; + this.collectorMaxDoc = maxDoc; + } + + @Override + public long[] collectDocs(int minDoc, int maxDoc) { + int effectiveMin = Math.max(minDoc, collectorMinDoc); + int effectiveMax = Math.min(maxDoc, collectorMaxDoc); + if (effectiveMin >= effectiveMax) { + return new long[0]; + } + + BitSet bitset = new BitSet(effectiveMax - effectiveMin); + try { + int docId = currentDoc; + if (docId == DocIdSetIterator.NO_MORE_DOCS || docId >= collectorMaxDoc) { + return new long[0]; + } + if (docId < effectiveMin) { + docId = iterator.advance(effectiveMin); + } + while (docId != DocIdSetIterator.NO_MORE_DOCS && docId < effectiveMax) { + bitset.set(docId - effectiveMin); + docId = iterator.nextDoc(); + } + currentDoc = docId; + } catch (IOException e) { + return new long[0]; + } + return bitset.toLongArray(); + } + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java new file mode 100644 index 0000000000000..46ea0dc1c2359 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.ReferenceManager; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.exec.CatalogSnapshot; +import org.opensearch.index.engine.exec.EngineReaderManager; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Lucene implementation of {@link EngineReaderManager}. + *

+ * Wraps Lucene's {@link ReferenceManager} for {@link DirectoryReader}. + * Acquire increments the ref count on the current reader; + * release decrements it — same pattern as {@code DatafusionReaderManager}. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneReaderManager implements EngineReaderManager { + + Map readers = new HashMap<>(); + DataFormat dataFormat; + + @SuppressWarnings("unchecked") + public LuceneReaderManager(DataFormat dataFormat) { + this.dataFormat = dataFormat; + } + + /** Called when files are deleted after merges. */ + public void onFilesDeleted(Collection files) throws IOException { + // no-op + } + + @Override + public void onFilesAdded(Collection files) throws IOException { + // no-op + } + + @Override + public DirectoryReader getReader(CatalogSnapshot catalogSnapshot) throws IOException { + return readers.get(catalogSnapshot); + } + + @Override + public void beforeRefresh() throws IOException { + + } + + @Override + public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException { + if (readers.containsKey(catalogSnapshot)) { + return; + } + readers.put(catalogSnapshot, (DirectoryReader) catalogSnapshot.getReader(dataFormat)); + } + + @Override + public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException { + readers.remove(catalogSnapshot).close(); + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchContext.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchContext.java new file mode 100644 index 0000000000000..2851d2759b180 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchContext.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.search.SearchExecutionContext; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.ShardSearchRequest; + +import java.io.IOException; + +/** + * Lucene-specific search execution context. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneSearchContext implements SearchExecutionContext { + + private final ShardSearchRequest request; + private final SearchShardTarget shardTarget; + + private final DirectoryReader reader; + private final LuceneEngineSearcher searcher; + private Query query; + + public LuceneSearchContext(ShardSearchRequest request, SearchShardTarget shardTarget, DirectoryReader reader) throws IOException { + this.reader = reader; + IndexSearcher indexSearcher = new IndexSearcher(reader); + this.searcher = new LuceneEngineSearcher(indexSearcher, reader); + this.request = request; + this.shardTarget = shardTarget; + } + + public Query getQuery() { + return query; + } + + public DirectoryReader getReader() { + return reader; + } + + public void setQuery(Query query) { + this.query = query; + } + + /** + * Returns the number of segments for the registered weight. + */ + public int getSegmentCount() { + return -1; + } + + /** + * Returns the max doc array for all segments of the registered weight. + */ + public int[] getSegmentMaxDocs() { + return null; + } + + @Override + public ShardSearchRequest request() { + return request; + } + + @Override + public SearchShardTarget shardTarget() { + return shardTarget; + } + + @Override + public void close() throws IOException { + searcher.close(); + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java new file mode 100644 index 0000000000000..9de3cf5d53cfe --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.calcite.sql.SqlOperatorTable; +import org.opensearch.analytics.backend.EngineBridge; +import org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.IndexFilterProvider; +import org.opensearch.index.engine.exec.SourceProvider; +import org.opensearch.index.shard.ShardPath; + +import java.io.IOException; +import java.util.List; + +/** + * Plugin providing Lucene as an index filter or source provider. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneSearchEnginePlugin implements AnalyticsSearchBackendPlugin { + + @Override + public String name() { + return "lucene-analytics-backend"; + } + + @Override + public EngineBridge bridge() { + return null; + } + + @Override + public SqlOperatorTable operatorTable() { + return null; + } + + @Override + public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) throws IOException { + return new LuceneReaderManager(format); + } + + @Override + public IndexFilterProvider createIndexFilterProvider(DataFormat format, ShardPath shardPath) throws IOException { + return new LuceneIndexFilterProvider(); + } + + @Override + public SourceProvider createSourceProvider(DataFormat format, ShardPath shardPath) throws IOException { + return new LuceneSourceProvider(); + } + + @Override + public List getSupportedFormats() { + return List.of(); + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchExecEngine.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchExecEngine.java new file mode 100644 index 0000000000000..c899fdbe9263c --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchExecEngine.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.SearchExecEngine; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.ShardSearchRequest; + +import java.io.IOException; + +/** + * Lucene-backed search execution engine. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneSearchExecEngine implements SearchExecEngine { + + @Override + public Query convertFragment(Object fragment) { + if (fragment instanceof Query) { + return (Query) fragment; + } + throw new UnsupportedOperationException("Expected Lucene Query, got " + fragment.getClass().getSimpleName()); + } + + @Override + public LuceneSearchContext createContext( + Object reader, + Query plan, + ShardSearchRequest request, + SearchShardTarget shardTarget, + SearchShardTask task + ) throws IOException { + DirectoryReader directoryReader = (DirectoryReader) reader; + return new LuceneSearchContext(request, shardTarget, directoryReader); + } + + @Override + public Void execute(LuceneSearchContext context) throws IOException { + DirectoryReader reader = context.getReader(); + LuceneEngineSearcher searcher = new LuceneEngineSearcher(new IndexSearcher(reader), reader); + try { + searcher.search(context); + } finally { + searcher.close(); + } + return null; // TODO : figure out this path or remove this class for now + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSourceContext.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSourceContext.java new file mode 100644 index 0000000000000..bf495f4220fb5 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSourceContext.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.IndexSearcher; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.SourceContext; + +import java.io.IOException; + +/** + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneSourceContext implements SourceContext { + + private final Object query; + private final DirectoryReader reader; + private final IndexSearcher searcher; + + public LuceneSourceContext(Object query, DirectoryReader reader) { + this.query = query; + this.reader = reader; + this.searcher = new IndexSearcher(reader); + } + + @Override + public Object query() { + return query; + } + + public DirectoryReader getReader() { + return reader; + } + + public IndexSearcher getSearcher() { + return searcher; + } + + @Override + public void close() throws IOException {} +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSourceProvider.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSourceProvider.java new file mode 100644 index 0000000000000..d2de84add4880 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSourceProvider.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.index.DirectoryReader; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.SourceProvider; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +/** + * Lucene-backed {@link SourceProvider}. + *

+ * Executes the full query+scan+filter in Lucene and streams back + * projections/aggregation results to the primary engine (DataFusion). + *

+ * Used when all queried fields are Lucene-indexed and Lucene can + * fully resolve the query more efficiently than scanning parquet. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneSourceProvider implements SourceProvider { + + @Override + public LuceneSourceContext createContext(Object query, DirectoryReader reader) throws IOException { + return new LuceneSourceContext(query, reader); + } + + @Override + public Iterator execute(LuceneSourceContext context) throws IOException { + // TODO: execute query via context.getSearcher(), collect results, return iterator + return Collections.emptyIterator(); + } + + @Override + public void close() {} +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/package-info.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/package-info.java new file mode 100644 index 0000000000000..f34e1c6276645 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/resources/META-INF/services/org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin b/sandbox/plugins/analytics-backend-lucene/src/main/resources/META-INF/services/org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin new file mode 100644 index 0000000000000..53330f0ac02ef --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/resources/META-INF/services/org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin @@ -0,0 +1 @@ +org.opensearch.be.lucene.LuceneSearchEnginePlugin diff --git a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/AnalyticsPlugin.java b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/AnalyticsPlugin.java index 1191e4215afb2..9d4132031aab6 100644 --- a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/AnalyticsPlugin.java +++ b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/AnalyticsPlugin.java @@ -17,7 +17,7 @@ import org.opensearch.analytics.exec.DefaultPlanExecutor; import org.opensearch.analytics.exec.QueryPlanExecutor; import org.opensearch.analytics.schema.OpenSearchSchemaBuilder; -import org.opensearch.analytics.spi.AnalyticsBackEndPlugin; +import org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Module; @@ -54,12 +54,12 @@ public class AnalyticsPlugin extends Plugin implements ExtensiblePlugin { */ public AnalyticsPlugin() {} - private final List backEnds = new ArrayList<>(); + private final List backEnds = new ArrayList<>(); private SqlOperatorTable operatorTable; @Override public void loadExtensions(ExtensionLoader loader) { - backEnds.addAll(loader.loadExtensions(AnalyticsBackEndPlugin.class)); + backEnds.addAll(loader.loadExtensions(AnalyticsSearchBackendPlugin.class)); operatorTable = aggregateOperatorTables(); } @@ -77,7 +77,10 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { - return List.of(new DefaultPlanExecutor(backEnds), new DefaultEngineContext(clusterService, operatorTable)); + return List.of( + new DefaultPlanExecutor(backEnds, null/* TODO: pass indices service */, clusterService), + new DefaultEngineContext(clusterService, operatorTable) + ); } @Override @@ -92,7 +95,7 @@ public Collection createGuiceModules() { private SqlOperatorTable aggregateOperatorTables() { List tables = new ArrayList<>(); - for (AnalyticsBackEndPlugin backEnd : backEnds) { + for (AnalyticsSearchBackendPlugin backEnd : backEnds) { SqlOperatorTable table = backEnd.operatorTable(); if (table != null) { tables.add(table); diff --git a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java index a766466fc7b47..1c3b904faeca4 100644 --- a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java +++ b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java @@ -9,37 +9,107 @@ package org.opensearch.analytics.exec; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.analytics.spi.AnalyticsBackEndPlugin; +import org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.index.IndexService; +import org.opensearch.index.engine.DataFormatAwareEngine; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.exec.SearchExecEngine; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; /** * {@link QueryPlanExecutor} default implementation. + *

+ * Acquires a {@link DataFormatAwareEngine.DataFormatAwareReader} on the latest catalog snapshot, + * then routes plan fragments to the appropriate {@link SearchExecEngine} per data format. + * The composite reader holds the snapshot reference alive for the duration of the search. */ public class DefaultPlanExecutor implements QueryPlanExecutor> { private static final Logger logger = LogManager.getLogger(DefaultPlanExecutor.class); + private final Map backEnds; + private final IndicesService indicesService; + private final ClusterService clusterService; - /** - * Creates a plan executor with the given back-end plugins. - * - * @param backEnds registered back-end engine plugins - */ - public DefaultPlanExecutor(List backEnds) { - // TODO: use back-ends + public DefaultPlanExecutor(List plugins, IndicesService indicesService, ClusterService clusterService) { + this.backEnds = new LinkedHashMap<>(); + for (AnalyticsSearchBackendPlugin plugin : plugins) { + this.backEnds.put(plugin.name(), plugin); + } + this.indicesService = indicesService; + this.clusterService = clusterService; } + @SuppressWarnings("unchecked") @Override public Iterable execute(RelNode logicalFragment, Object context) { - RelNode fragment = logicalFragment; - int fieldCount = fragment.getRowType().getFieldCount(); + // TODO : wire this properly , this is just to give an idea of flow + AnalyticsSearchBackendPlugin plugin = selectBackEnd(); + String tableName = extractTableName(logicalFragment); + DataFormatAwareEngine dataFormatAwareEngine = resolveCompositeEngine(tableName); - logger.debug("[DefaultPlanExecutor] Executing fragment with {} fields: {}", fieldCount, fragment.explain()); + List formats = plugin.getSupportedFormats(); + DataFormat format = formats.get(0); - // Stub: return empty result set. - return new ArrayList<>(); + // Acquire composite reader — incRefs the latest catalog snapshot. + // Closing the reader decRefs the snapshot, allowing file cleanup. + try (DataFormatAwareEngine.DataFormatAwareReader dataFormatAwareReader = dataFormatAwareEngine.acquireReader()) { + Object reader = dataFormatAwareReader.getReader(format); + SearchExecEngine searchEngine = dataFormatAwareEngine.getSearchExecEngine(format); + Object plan = searchEngine.convertFragment(logicalFragment); + var engineContext = searchEngine.createContext(reader, plan, null, null, null); + Object result = searchEngine.execute(engineContext); + + // TODO: consume result stream into rows + logger.info("[DefaultPlanExecutor] Executed via [{}]", plugin.name()); + return new ArrayList<>(); + } catch (Exception e) { + throw new RuntimeException("Execution failed for [" + plugin.name() + "]", e); + } + } + + // TODO: Placeholder logic + static String extractTableName(RelNode node) { + if (node instanceof TableScan) { + List qn = node.getTable().getQualifiedName(); + return qn.get(qn.size() - 1); + } + for (RelNode input : node.getInputs()) { + String name = extractTableName(input); + if (name != null) return name; + } + throw new IllegalArgumentException("No TableScan found in plan fragment"); + } + + // TODO: Placeholder logic + private DataFormatAwareEngine resolveCompositeEngine(String indexName) { + IndexMetadata meta = clusterService.state().metadata().index(indexName); + if (meta == null) throw new IllegalArgumentException("Index [" + indexName + "] not found"); + IndexService indexService = indicesService.indexService(meta.getIndex()); + if (indexService == null) throw new IllegalStateException("Index [" + indexName + "] not on this node"); + Set shardIds = indexService.shardIds(); + if (shardIds.isEmpty()) throw new IllegalStateException("No shards for [" + indexName + "]"); + IndexShard shard = indexService.getShardOrNull(shardIds.iterator().next()); + if (shard == null) throw new IllegalStateException("Shard not found"); + DataFormatAwareEngine ce = shard.getCompositeEngine(); + if (ce == null) throw new IllegalStateException("No CompositeEngine on shard"); + return ce; + } + + // TODO: Placeholder logic + private AnalyticsSearchBackendPlugin selectBackEnd() { + if (backEnds.isEmpty()) throw new IllegalStateException("No back-end plugins registered"); + return backEnds.values().iterator().next(); } } diff --git a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/DefaultPlanExecutorTests.java b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/DefaultPlanExecutorTests.java index a61246f3dfc41..51a9b39c8dab4 100644 --- a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/DefaultPlanExecutorTests.java +++ b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/DefaultPlanExecutorTests.java @@ -45,7 +45,7 @@ public void setUp() throws Exception { * Test that execute() does not throw for a valid fragment. */ public void testExecuteDoesNotThrowForValidFragment() { - DefaultPlanExecutor service = new DefaultPlanExecutor(List.of()); + DefaultPlanExecutor service = new DefaultPlanExecutor(List.of(), null, null); RelNode fragment = createRelNodeWithFieldCount(3); Object context = new Object(); @@ -58,7 +58,7 @@ public void testExecuteDoesNotThrowForValidFragment() { * Test that execute() works with a multi-field fragment. */ public void testExecuteWithMultiFieldFragment() { - DefaultPlanExecutor service = new DefaultPlanExecutor(List.of()); + DefaultPlanExecutor service = new DefaultPlanExecutor(List.of(), null, null); int fieldCount = 5; RelNode fragment = createRelNodeWithFieldCount(fieldCount); @@ -72,7 +72,7 @@ public void testExecuteWithMultiFieldFragment() { * Test that execute() works with a single-field fragment. */ public void testExecuteWithSingleFieldFragment() { - DefaultPlanExecutor service = new DefaultPlanExecutor(List.of()); + DefaultPlanExecutor service = new DefaultPlanExecutor(List.of(), null, null); RelNode fragment = createRelNodeWithFieldCount(1); Object context = new Object(); diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index c59d2bdbbaf89..0aa358fc71f89 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -733,7 +733,8 @@ public static final IndexShard newIndexShard( indexService.getRefreshMutex(), clusterService.getClusterApplierService(), MergedSegmentPublisher.EMPTY, - ReferencedSegmentsPublisher.EMPTY + ReferencedSegmentsPublisher.EMPTY, + null // TODO ); } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 1a4b14ddef9ba..2dc861b54f94a 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -46,6 +46,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; +import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.SetOnce; import org.opensearch.common.TriFunction; import org.opensearch.common.annotation.ExperimentalApi; @@ -74,6 +75,7 @@ import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineConfigFactory; import org.opensearch.index.engine.EngineFactory; +import org.opensearch.index.engine.exec.DataFormatAwareEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -741,7 +743,8 @@ public IndexService newIndexService( Consumer replicator, Function segmentReplicationStatsProvider, Supplier clusterDefaultMaxMergeAtOnceSupplier, - ClusterMergeSchedulerConfig clusterMergeSchedulerConfig + ClusterMergeSchedulerConfig clusterMergeSchedulerConfig, + CheckedTriFunction compositeEngineFactorySupplier ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -814,7 +817,8 @@ public IndexService newIndexService( replicator, segmentReplicationStatsProvider, clusterDefaultMaxMergeAtOnceSupplier, - clusterMergeSchedulerConfig + clusterMergeSchedulerConfig, + compositeEngineFactorySupplier ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 2a862dd94b43e..174168057b985 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; +import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.InternalApi; import org.opensearch.common.annotation.PublicApi; @@ -78,6 +79,7 @@ import org.opensearch.index.engine.EngineConfigFactory; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.MergedSegmentWarmerFactory; +import org.opensearch.index.engine.exec.DataFormatAwareEngineFactory; import org.opensearch.index.fielddata.IndexFieldDataCache; import org.opensearch.index.fielddata.IndexFieldDataService; import org.opensearch.index.mapper.MapperService; @@ -209,6 +211,12 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private volatile TimeValue refreshInterval; private volatile boolean shardLevelRefreshEnabled; private final IndexStorePlugin.StoreFactory storeFactory; + private final CheckedTriFunction< + ShardPath, + MapperService, + IndexSettings, + DataFormatAwareEngineFactory, + IOException> compositeEngineFactorySupplier; @InternalApi public IndexService( @@ -255,7 +263,8 @@ public IndexService( Consumer replicator, Function segmentReplicationStatsProvider, Supplier clusterDefaultMaxMergeAtOnceSupplier, - ClusterMergeSchedulerConfig clusterMergeSchedulerConfig + ClusterMergeSchedulerConfig clusterMergeSchedulerConfig, + CheckedTriFunction compositeEngineFactorySupplier ) { super(indexSettings); this.storeFactory = storeFactory; @@ -366,6 +375,7 @@ public IndexService( startIndexLevelRefreshTask(); } } + this.compositeEngineFactorySupplier = compositeEngineFactorySupplier; } @InternalApi @@ -454,7 +464,8 @@ public IndexService( s -> {}, (shardId) -> ReplicationStats.empty(), clusterDefaultMaxMergeAtOnce, - clusterMergeSchedulerConfig + clusterMergeSchedulerConfig, + null ); } @@ -775,6 +786,9 @@ protected void closeInternal() { directoryFactory ); eventListener.onStoreCreated(shardId); + DataFormatAwareEngineFactory dataFormatAwareEngineFactory = compositeEngineFactorySupplier != null + ? compositeEngineFactorySupplier.apply(path, mapperService, this.indexSettings) + : null; indexShard = new IndexShard( routing, this.indexSettings, @@ -813,7 +827,8 @@ protected void closeInternal() { refreshMutex, clusterService.getClusterApplierService(), this.indexSettings.isSegRepEnabledOrRemoteNode() ? mergedSegmentPublisher : null, - this.indexSettings.isSegRepEnabledOrRemoteNode() ? referencedSegmentsPublisher : null + this.indexSettings.isSegRepEnabledOrRemoteNode() ? referencedSegmentsPublisher : null, + dataFormatAwareEngineFactory ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java b/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java new file mode 100644 index 0000000000000..9fc7905487e55 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java @@ -0,0 +1,200 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.exec.CatalogSnapshot; +import org.opensearch.index.engine.exec.DataFormatAwareEngineFactory; +import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.IndexFilterProvider; +import org.opensearch.index.engine.exec.SearchExecEngine; +import org.opensearch.index.engine.exec.SourceProvider; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Owns all reader managers, lazily creates search engines, index filter providers + * and source providers per data format. + *

+ * Instances are created by {@link DataFormatAwareEngineFactory}. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class DataFormatAwareEngine implements Closeable { + + private final Map> readerManagers; + private final Map, IOException>> engineSuppliers; + private final Map, IOException>> indexFilterProviderSuppliers; + private final Map, IOException>> sourceProviderSuppliers; + private volatile CatalogSnapshot latestSnapshot; + + /** + * Constructs a new CompositeEngine with pre-built maps. + * Prefer using {@link DataFormatAwareEngineFactory#create()}. + */ + public DataFormatAwareEngine( + Map> readerManagers, + Map, IOException>> engineSuppliers, + Map, IOException>> indexFilterProviderSuppliers, + Map, IOException>> sourceProviderSuppliers + ) { + this.readerManagers = readerManagers; + this.engineSuppliers = engineSuppliers; + this.indexFilterProviderSuppliers = indexFilterProviderSuppliers; + this.sourceProviderSuppliers = sourceProviderSuppliers; + } + + public EngineReaderManager getReaderManager(DataFormat format) { + return readerManagers.get(format); + } + + public SearchExecEngine getSearchExecEngine(DataFormat format) throws IOException { + return getFromSupplier(engineSuppliers, format, "search exec engine"); + } + + public IndexFilterProvider getIndexFilterProvider(DataFormat format) throws IOException { + return getFromSupplier(indexFilterProviderSuppliers, format, "index filter provider"); + } + + public SourceProvider getSourceProvider(DataFormat format) throws IOException { + return getFromSupplier(sourceProviderSuppliers, format, "source provider"); + } + + private T getFromSupplier(Map> suppliers, DataFormat format, String label) + throws IOException { + CheckedSupplier supplier = suppliers.get(format); + if (supplier == null) { + throw new IllegalArgumentException("No " + label + " registered for format: " + format.name()); + } + return supplier.get(); + } + + /** + * Called by the catalog snapshot lifecycle listener after a refresh + * to update the latest searchable snapshot. + */ + public void setLatestSnapshot(CatalogSnapshot snapshot) { + CatalogSnapshot prev = this.latestSnapshot; + this.latestSnapshot = snapshot; + if (prev != null) { + prev.decRef(); + } + } + + /** + * Acquires a DataFormatAwareReader on the latest catalog snapshot. + * The snapshot is incRef'd; the caller MUST close the returned + * {@link DataFormatAwareReader} when done, which decRef's the snapshot. + */ + public DataFormatAwareReader acquireReader() throws IOException { + CatalogSnapshot snapshot = latestSnapshot; + if (snapshot == null) { + throw new IllegalStateException("No catalog snapshot available"); + } + return acquireReader(snapshot); + } + + /** + * Acquires a composite reader on a specific catalog snapshot. + */ + public DataFormatAwareReader acquireReader(CatalogSnapshot catalogSnapshot) throws IOException { + catalogSnapshot.incRef(); + try { + Map readers = new HashMap<>(); + for (Map.Entry> entry : readerManagers.entrySet()) { + Object reader = entry.getValue().getReader(catalogSnapshot); + if (reader != null) { + readers.put(entry.getKey(), reader); + } + } + return new DataFormatAwareReader(catalogSnapshot, readers); + } catch (Exception e) { + catalogSnapshot.decRef(); + throw e; + } + } + + /** + * A catalog-snapshot-backed data-format aware reader providing per-format reader access. + * Closing this reader releases the catalog snapshot reference. + */ + @ExperimentalApi + public static class DataFormatAwareReader implements Closeable { + private final CatalogSnapshot catalogSnapshot; + private final Map readers; + + DataFormatAwareReader(CatalogSnapshot catalogSnapshot, Map readers) { + this.catalogSnapshot = catalogSnapshot; + this.readers = readers; + } + + public Object getReader(DataFormat format) { + return readers.get(format); + } + + public CatalogSnapshot getCatalogSnapshot() { + return catalogSnapshot; + } + + @Override + public void close() { + catalogSnapshot.decRef(); + } + } + + @Override + public void close() throws IOException { + List exceptions = new ArrayList<>(); + closeSupplierInstances(engineSuppliers.values(), exceptions); + closeSupplierInstances(indexFilterProviderSuppliers.values(), exceptions); + closeSupplierInstances(sourceProviderSuppliers.values(), exceptions); + for (EngineReaderManager rm : readerManagers.values()) { + if (rm instanceof Closeable) { + try { + ((Closeable) rm).close(); + } catch (Exception e) { + exceptions.add(e); + } + } + } + if (exceptions.isEmpty() == false) { + IOException ioException = new IOException("Failed to close CompositeEngine resources"); + for (Exception e : exceptions) { + ioException.addSuppressed(e); + } + throw ioException; + } + } + + /** + * Attempts to retrieve each memoized instance and close it if it implements {@link Closeable}. + * Suppliers that were never invoked will return quickly from the memoize wrapper. + */ + private static void closeSupplierInstances(Collection> suppliers, List exceptions) { + for (CheckedSupplier supplier : suppliers) { + try { + T instance = supplier.get(); + if (instance instanceof Closeable) { + ((Closeable) instance).close(); + } + } catch (Exception e) { + exceptions.add(e); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/IndexFilterTree.java b/server/src/main/java/org/opensearch/index/engine/IndexFilterTree.java new file mode 100644 index 0000000000000..c918aeaa5c704 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/IndexFilterTree.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Boolean tree structure for multi-engine query decomposition. + *

+ * Wraps the root node and provides compact array + * serialization for JNI transport to the Rust layer. + *

+ * + * @opensearch.experimental + */ +@ExperimentalApi +public class IndexFilterTree implements Closeable { + + // TODO + @Override + public void close() throws IOException {} +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/CatalogSnapshot.java b/server/src/main/java/org/opensearch/index/engine/exec/CatalogSnapshot.java index 90207e58cd1f5..80abcb59eccbe 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/CatalogSnapshot.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/CatalogSnapshot.java @@ -10,6 +10,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.util.concurrent.AbstractRefCounted; +import org.opensearch.index.engine.dataformat.DataFormat; import java.io.IOException; import java.util.Collection; @@ -133,4 +134,6 @@ public CatalogSnapshot cloneNoAcquire() { * @param b additional boolean parameter for implementation-specific behavior */ public abstract void setUserData(Map userData, boolean b); + + public abstract Object getReader(DataFormat dataFormat); } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/CatalogSnapshotLifecycleListener.java b/server/src/main/java/org/opensearch/index/engine/exec/CatalogSnapshotLifecycleListener.java new file mode 100644 index 0000000000000..e0a40709acf33 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/CatalogSnapshotLifecycleListener.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.IOException; + +/** + * Unified lifecycle listener for catalog snapshots. + *

+ * Combines refresh notifications (create/update) and delete notifications + * into a single interface so plugins only need to wire one listener. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface CatalogSnapshotLifecycleListener { + + /** Singleton that silently ignores every callback. */ + CatalogSnapshotLifecycleListener NOOP = new CatalogSnapshotLifecycleListener() { + @Override + public void beforeRefresh() {} + + @Override + public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) {} + + @Override + public void onDeleted(CatalogSnapshot catalogSnapshot) {} + }; + + /** + * Called before a refresh operation. + */ + void beforeRefresh() throws IOException; + + /** + * Called after a refresh operation with the resulting catalog snapshot. + * @param didRefresh whether the refresh actually occurred + * @param catalogSnapshot the current catalog snapshot with file information + */ + void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException; + + /** + * Called when a catalog snapshot is deleted. + * @param catalogSnapshot the snapshot being deleted + */ + void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/CollectorQueryLifecycleManager.java b/server/src/main/java/org/opensearch/index/engine/exec/CollectorQueryLifecycleManager.java new file mode 100644 index 0000000000000..da24f5d7757e5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/CollectorQueryLifecycleManager.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Manages the lifecycle of {@link SegmentCollector} instances for a single query. + *

+ * Provides a JNI-friendly primitives-only API: callers receive an {@code int} key + * from {@link #registerCollector} and use it to invoke {@link #collectDocs} and + * {@link #releaseCollector}. Java owns the collector state; the native (Rust) side + * only holds lightweight int keys. + *

+ * One manager is created per query and closed when the query finishes. + * {@link #close()} acts as a safety net, releasing any collectors that were not + * explicitly released by the caller. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CollectorQueryLifecycleManager implements Closeable { + + private final AtomicInteger nextKey = new AtomicInteger(1); + private final Map collectors = new ConcurrentHashMap<>(); + + /** + * Registers a collector and returns its int key. + * + * @param collector the segment collector to manage + * @return a unique key that identifies this collector + */ + public int registerCollector(SegmentCollector collector) { + int key = nextKey.getAndIncrement(); + collectors.put(key, collector); + return key; + } + + /** + * Collects matching document IDs for the collector identified by {@code key}. + * + * @param key the collector key returned by {@link #registerCollector} + * @param minDoc inclusive lower bound + * @param maxDoc exclusive upper bound + * @return packed {@code long[]} bitset of matching doc IDs, or empty array if key is invalid + */ + public long[] collectDocs(int key, int minDoc, int maxDoc) { + SegmentCollector collector = collectors.get(key); + if (collector == null) { + return new long[0]; + } + return collector.collectDocs(minDoc, maxDoc); + } + + /** + * Releases the collector identified by {@code key}, closing it and + * removing it from the registry. + * + * @param key the collector key returned by {@link #registerCollector} + */ + public void releaseCollector(int key) { + SegmentCollector collector = collectors.remove(key); + if (collector != null) { + collector.close(); + } + } + + /** + * Closes all remaining collectors. Acts as a safety net for any + * collectors that were not explicitly released. + */ + @Override + public void close() { + for (SegmentCollector collector : collectors.values()) { + collector.close(); + } + collectors.clear(); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java new file mode 100644 index 0000000000000..b05fc42d65f84 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.DataFormatAwareEngine; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.PluginsService; +import org.opensearch.plugins.SearchBackEndPlugin; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Factory that discovers {@link SearchBackEndPlugin}s via + * {@link PluginsService} and builds the per-format reader managers and + * memoizing suppliers consumed by {@link DataFormatAwareEngine}. + *

+ * This keeps DataformatAwareEngine decoupled from the plugin layer. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class DataFormatAwareEngineFactory { + + private final Map> readerManagers = new HashMap<>(); + private final Map, IOException>> engineSuppliers = new HashMap<>(); + private final Map, IOException>> indexFilterProviderSuppliers = + new HashMap<>(); + private final Map, IOException>> sourceProviderSuppliers = new HashMap<>(); + + private final IndexFileDeleter indexFileDeleter; + + public DataFormatAwareEngineFactory( + PluginsService pluginsService, + ShardPath shardPath, + MapperService mapperService, + IndexSettings indexSettings + ) throws IOException { + for (SearchBackEndPlugin plugin : pluginsService.filterPlugins(SearchBackEndPlugin.class)) { + for (DataFormat format : plugin.getSupportedFormats()) { + // TODO: use mapperService and indexSettings to filter formats relevant to this index + readerManagers.put(format, plugin.createReaderManager(format, shardPath)); + engineSuppliers.put(format, memoize(format, f -> plugin.createSearchExecEngine(f, shardPath))); + indexFilterProviderSuppliers.put(format, memoize(format, f -> plugin.createIndexFilterProvider(f, shardPath))); + sourceProviderSuppliers.put(format, memoize(format, f -> plugin.createSourceProvider(f, shardPath))); + } + } + this.indexFileDeleter = new IndexFileDeleter(null, shardPath); + } + + /** + * Wraps a {@link CheckedFunction} factory into a thread-safe memoizing supplier + * using double-checked locking. The factory is invoked at most once. + */ + private static CheckedSupplier memoize(DataFormat format, CheckedFunction factory) { + return new CheckedSupplier<>() { + private volatile T instance; + + @Override + public T get() throws IOException { + T result = instance; + if (result != null) { + return result; + } + synchronized (this) { + result = instance; + if (result != null) { + return result; + } + result = factory.apply(format); + instance = result; + return result; + } + } + }; + } + + /** + * Creates a new {@link DataFormatAwareEngine} populated with the discovered + * reader managers and memoizing suppliers. + */ + public DataFormatAwareEngine create() { + return new DataFormatAwareEngine(readerManagers, engineSuppliers, indexFilterProviderSuppliers, sourceProviderSuppliers); + } + + /** + * Creates a {@link CatalogSnapshotLifecycleListener} that routes events + * through the {@link IndexFileDeleter} and fans out to the given reader managers. + * + * @param readerManagers the per-format reader managers that receive notifications + */ + public CatalogSnapshotLifecycleListener createCatalogSnapshotListener(Map> readerManagers) { + return new DataFormatEngineCatalogSnapshotListener(readerManagers, indexFileDeleter); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/DataFormatEngineCatalogSnapshotListener.java b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatEngineCatalogSnapshotListener.java new file mode 100644 index 0000000000000..85e247bd29fd1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatEngineCatalogSnapshotListener.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +/** + * Routes {@link CatalogSnapshotLifecycleListener} events through the + * {@link IndexFileDeleter} and then fans out to the per-format + * {@link EngineReaderManager}s. + *

+ * Keeps lifecycle orchestration separate from the engine's component + * registry responsibilities. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class DataFormatEngineCatalogSnapshotListener implements CatalogSnapshotLifecycleListener { + + private final Map> readerManagers; + private final IndexFileDeleter indexFileDeleter; + + public DataFormatEngineCatalogSnapshotListener( + Map> readerManagers, + IndexFileDeleter indexFileDeleter + ) { + this.readerManagers = readerManagers; + this.indexFileDeleter = indexFileDeleter; + } + + @Override + public void beforeRefresh() throws IOException { + for (CatalogSnapshotLifecycleListener listener : readerManagers.values()) { + listener.beforeRefresh(); + } + } + + @Override + public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException { + Map> newFiles = indexFileDeleter.addFileReferences(catalogSnapshot); + if (newFiles.isEmpty() == false) { + notifyFilesAdded(newFiles); + } + for (CatalogSnapshotLifecycleListener listener : readerManagers.values()) { + listener.afterRefresh(didRefresh, catalogSnapshot); + } + } + + @Override + public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException { + Map> deletedFiles = indexFileDeleter.removeFileReferences(catalogSnapshot); + if (deletedFiles.isEmpty() == false) { + notifyFilesDeleted(deletedFiles); + } + for (CatalogSnapshotLifecycleListener listener : readerManagers.values()) { + listener.onDeleted(catalogSnapshot); + } + } + + private void notifyFilesAdded(Map> filesByFormat) throws IOException { + for (Map.Entry> entry : filesByFormat.entrySet()) { + EngineReaderManager rm = readerManagers.get(entry.getKey()); + if (rm != null) { + rm.onFilesAdded(entry.getValue()); + } + } + } + + private void notifyFilesDeleted(Map> filesByFormat) throws IOException { + for (Map.Entry> entry : filesByFormat.entrySet()) { + EngineReaderManager rm = readerManagers.get(entry.getKey()); + if (rm != null) { + rm.onFilesDeleted(entry.getValue()); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/EngineReaderManager.java b/server/src/main/java/org/opensearch/index/engine/exec/EngineReaderManager.java new file mode 100644 index 0000000000000..b420dd6299471 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/EngineReaderManager.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.IOException; + +/** + * Engine-agnostic reader manager. + *

+ * For Lucene, wraps {@code ReferenceManager}. + * For pluggable engines, wraps the engine-specific reader lifecycle. + * + * @param the reader type managed by this instance + * @opensearch.experimental + */ +@ExperimentalApi +public interface EngineReaderManager extends CatalogSnapshotLifecycleListener, FilesListener { + T getReader(CatalogSnapshot catalogSnapshot) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/EngineSearcher.java b/server/src/main/java/org/opensearch/index/engine/exec/EngineSearcher.java new file mode 100644 index 0000000000000..bc5385d180bbb --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/EngineSearcher.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lease.Releasable; +import org.opensearch.search.SearchExecutionContext; + +import java.io.IOException; + +/** + * Engine-agnostic searcher interface. + *

+ * Each engine implementation provides its own searcher that knows how to + * execute queries against its reader. The searcher is acquired from + * {@link SearchExecEngine} and used to execute searches against a + * point-in-time snapshot. + * + * @param the context type this searcher operates on + * @opensearch.experimental + */ +@ExperimentalApi +public interface EngineSearcher extends Releasable { + + /** + * Execute a search using this searcher, populating results on the context. + */ + void search(C context) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/FileMetadata.java b/server/src/main/java/org/opensearch/index/engine/exec/FileMetadata.java new file mode 100644 index 0000000000000..71b85e0c2a4c6 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/FileMetadata.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.util.Objects; + +/** + * Represents metadata for a file in the index, including its data format and filename. + * Files can be in different formats (e.g., "lucene", "metadata") and this class provides + * a unified way to represent and serialize file information across the system. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class FileMetadata { + + /** + * Delimiter used to separate filename and data format in serialized form. + */ + public static final String DELIMITER = ":::"; + private static final String METADATA_KEY = "metadata"; + + private final String file; + private final String dataFormat; + + /** + * Constructs a FileMetadata with explicit data format and filename. + * + * @param dataFormat the data format identifier (e.g., "lucene", "metadata") + * @param file the filename + */ + public FileMetadata(String dataFormat, String file) { + this.file = file; + this.dataFormat = dataFormat; + } + + /** + * Constructs a FileMetadata by parsing a serialized data-format-aware filename. + * The format is "filename:::dataFormat". If no delimiter is present and the filename + * starts with "metadata", it's treated as a metadata file. Otherwise, defaults to "lucene". + * + * @param dataFormatAwareFile the serialized filename with optional data format + */ + public FileMetadata(String dataFormatAwareFile) { + if (!dataFormatAwareFile.contains(DELIMITER) && dataFormatAwareFile.startsWith(METADATA_KEY)) { + this.dataFormat = "metadata"; + this.file = dataFormatAwareFile; + return; + } + String[] parts = dataFormatAwareFile.split(DELIMITER); + this.dataFormat = (parts.length == 1) ? "lucene" : parts[1]; + this.file = parts[0]; + } + + /** + * Serializes this FileMetadata to a string in the format "filename:::dataFormat". + * + * @return the serialized representation + */ + public String serialize() { + return file + DELIMITER + dataFormat; + } + + @Override + public String toString() { + return serialize(); + } + + /** + * Returns the filename. + * + * @return the filename + */ + public String file() { + return file; + } + + /** + * Returns the data format identifier. + * + * @return the data format (e.g., "lucene", "metadata") + */ + public String dataFormat() { + return dataFormat; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + FileMetadata that = (FileMetadata) o; + return Objects.equals(file, that.file) && Objects.equals(dataFormat, that.dataFormat); + } + + @Override + public int hashCode() { + return Objects.hash(file, dataFormat); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/FilesListener.java b/server/src/main/java/org/opensearch/index/engine/exec/FilesListener.java new file mode 100644 index 0000000000000..7c6b69acbe9cf --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/FilesListener.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.IOException; +import java.util.Collection; + +@ExperimentalApi +public interface FilesListener { + void onFilesDeleted(Collection files) throws IOException; + + void onFilesAdded(Collection files) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/IndexFileDeleter.java b/server/src/main/java/org/opensearch/index/engine/exec/IndexFileDeleter.java new file mode 100644 index 0000000000000..61507b7ffe9d7 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/IndexFileDeleter.java @@ -0,0 +1,122 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.DataFormatAwareEngine; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.shard.ShardPath; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tracks per-format file reference counts and computes which files are newly + * added or fully dereferenced after catalog snapshot changes. + *

+ * This class does not notify reader managers itself — it returns the + * computed change sets so the caller ({@link DataFormatAwareEngine}) + * can route notifications to the appropriate reader managers. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class IndexFileDeleter { + + private final Map> fileRefCounts = new ConcurrentHashMap<>(); + + public IndexFileDeleter(CatalogSnapshot initialCatalogSnapshot, ShardPath shardPath) throws IOException { + if (initialCatalogSnapshot != null) { + addFileReferences(initialCatalogSnapshot); + deleteUnreferencedFiles(shardPath); + } + } + + /** + * Increments reference counts for all files in the snapshot. + * + * @return files whose reference count went from 0 → 1 (newly added), grouped by format. + * Returns an empty map when there are no new files. + */ + public synchronized Map> addFileReferences(CatalogSnapshot snapshot) { + Map> dfSegregatedFiles = segregateFilesByFormat(snapshot); + Map> dfNewFiles = new HashMap<>(); + + for (Map.Entry> entry : dfSegregatedFiles.entrySet()) { + DataFormat dataFormat = entry.getKey(); + Collection newFiles = new HashSet<>(); + Map dfFileRefCounts = fileRefCounts.computeIfAbsent(dataFormat, k -> new HashMap<>()); + Collection files = entry.getValue(); + for (String file : files) { + AtomicInteger refCount = dfFileRefCounts.computeIfAbsent(file, k -> new AtomicInteger(0)); + if (refCount.incrementAndGet() == 1) { + newFiles.add(file); + } + } + if (newFiles.isEmpty() == false) { + dfNewFiles.put(dataFormat, newFiles); + } + } + + return dfNewFiles.isEmpty() ? Collections.emptyMap() : dfNewFiles; + } + + /** + * Decrements reference counts for all files in the snapshot. + * + * @return files whose reference count reached 0 (ready for deletion), grouped by format. + * Returns an empty map when there are no files to delete. + */ + public synchronized Map> removeFileReferences(CatalogSnapshot snapshot) { + Map> dfSegregatedFiles = segregateFilesByFormat(snapshot); + Map> dfFilesToDelete = new HashMap<>(); + + for (Map.Entry> entry : dfSegregatedFiles.entrySet()) { + DataFormat dataFormat = entry.getKey(); + Collection filesToDelete = new HashSet<>(); + Map dfFileRefCounts = fileRefCounts.get(dataFormat); + if (dfFileRefCounts != null) { + Collection files = entry.getValue(); + for (String file : files) { + AtomicInteger refCount = dfFileRefCounts.get(file); + if (refCount != null && refCount.decrementAndGet() == 0) { + dfFileRefCounts.remove(file); + filesToDelete.add(file); + } + } + } + if (filesToDelete.isEmpty() == false) { + dfFilesToDelete.put(dataFormat, filesToDelete); + } + } + + return dfFilesToDelete.isEmpty() ? Collections.emptyMap() : dfFilesToDelete; + } + + private Map> segregateFilesByFormat(CatalogSnapshot snapshot) { + Map> dfSegregatedFiles = new HashMap<>(); + // TODO + return dfSegregatedFiles; + } + + private void deleteUnreferencedFiles(ShardPath shardPath) throws IOException { + // TODO + } + + @Override + public String toString() { + return "IndexFileDeleter{fileRefCounts=" + fileRefCounts + "}"; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/IndexFilterContext.java b/server/src/main/java/org/opensearch/index/engine/exec/IndexFilterContext.java new file mode 100644 index 0000000000000..415cecec55129 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/IndexFilterContext.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.Closeable; + +/** + * @opensearch.experimental + */ +@ExperimentalApi +public interface IndexFilterContext extends Closeable { + + int segmentCount(); + + int segmentMaxDoc(int segmentOrd); +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/IndexFilterProvider.java b/server/src/main/java/org/opensearch/index/engine/exec/IndexFilterProvider.java new file mode 100644 index 0000000000000..2d5224c48d162 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/IndexFilterProvider.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Provides index-level filtering (partition pruning, segment filtering) for a given data format. + * + * @param the query type (e.g. Lucene Query) + * @param the context type + * @param the engine-specific reader type + * @opensearch.experimental + */ +@ExperimentalApi +public interface IndexFilterProvider extends Closeable { + + C createContext(Q query, ReaderT reader) throws IOException; + + int createCollector(C context, int segmentOrd, int minDoc, int maxDoc); + + long[] collectDocs(C context, int collectorKey, int minDoc, int maxDoc); + + void releaseCollector(C context, int collectorKey); +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/SearchExecEngine.java b/server/src/main/java/org/opensearch/index/engine/exec/SearchExecEngine.java new file mode 100644 index 0000000000000..a78645054b5b7 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/SearchExecEngine.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.DataFormatAwareEngine; +import org.opensearch.search.SearchExecutionContext; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.ShardSearchRequest; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Shard-level search execution engine interface. + * + * @param the engine-specific context type + * @param the engine-native plan type (e.g. byte[] for substrait) + * @param the result stream type returned by {@link #execute} + * @opensearch.experimental + */ +@ExperimentalApi +public interface SearchExecEngine extends Closeable { + + /** + * Converts a logical plan fragment into the engine's native plan format. + */ + default T convertFragment(Object fragment) { + throw new UnsupportedOperationException("convertFragment not supported by " + getClass().getSimpleName()); + } + + /** + * Creates a search context bound to the given reader and plan. + * The reader is provided by {@link DataFormatAwareEngine} + * which owns all reader managers. + */ + C createContext(Object reader, T plan, ShardSearchRequest request, SearchShardTarget shardTarget, SearchShardTask task) + throws IOException; + + /** + * Executes the plan held by the context and returns the result stream. + */ + S execute(C context) throws IOException; + + @Override + default void close() throws IOException {} +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/SegmentCollector.java b/server/src/main/java/org/opensearch/index/engine/exec/SegmentCollector.java new file mode 100644 index 0000000000000..772244d88436f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/SegmentCollector.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.Closeable; + +/** + * A per-segment document collector returned by + * {@link IndexFilterProvider#createCollector}. + *

+ * Callers should use try-with-resources to ensure cleanup. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface SegmentCollector extends Closeable { + + /** + * Collect matching document IDs in the given range. + * + * @param minDoc inclusive lower bound + * @param maxDoc exclusive upper bound + * @return packed {@code long[]} bitset of matching doc IDs + */ + long[] collectDocs(int minDoc, int maxDoc); + + @Override + default void close() {} +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/SourceContext.java b/server/src/main/java/org/opensearch/index/engine/exec/SourceContext.java new file mode 100644 index 0000000000000..7bbfaadec8957 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/SourceContext.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.Closeable; + +/** + * Context for a source provider execution. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface SourceContext extends Closeable { + + Object query(); +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/SourceProvider.java b/server/src/main/java/org/opensearch/index/engine/exec/SourceProvider.java new file mode 100644 index 0000000000000..ddddcd4157940 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/SourceProvider.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; + +/** + * Provides source-field data for a given data format. + * + * @param the context type + * @param the result batch type + * @param the engine-specific reader type + * @opensearch.experimental + */ +@ExperimentalApi +public interface SourceProvider extends Closeable { + + C createContext(Object query, ReaderT reader) throws IOException; + + Iterator execute(C context) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1c155c897acba..44d99b06b8bf0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -129,6 +129,7 @@ import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.CommitStats; +import org.opensearch.index.engine.DataFormatAwareEngine; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.Engine.GetResult; import org.opensearch.index.engine.EngineBackedIndexer; @@ -144,6 +145,7 @@ import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.engine.Segment; import org.opensearch.index.engine.SegmentsStats; +import org.opensearch.index.engine.exec.DataFormatAwareEngineFactory; import org.opensearch.index.engine.exec.Indexer; import org.opensearch.index.fielddata.FieldDataStats; import org.opensearch.index.fielddata.ShardFieldData; @@ -316,6 +318,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex private final AtomicReference currentEngineReference = new AtomicReference<>(); + private final AtomicReference currentCompositeEngineReference = new AtomicReference<>(); final EngineFactory engineFactory; final EngineConfigFactory engineConfigFactory; @@ -404,6 +407,8 @@ Runnable getGlobalCheckpointSyncer() { // Used to limit the number of concurrent translog tasks. When the semaphore is exhausted, serial recovery is used. private static final Semaphore translogConcurrentRecoverySemaphore = new Semaphore(1000); + private final DataFormatAwareEngineFactory dataFormatAwareEngineFactory; + @InternalApi public IndexShard( final ShardRouting shardRouting, @@ -443,7 +448,8 @@ public IndexShard( final Object refreshMutex, final ClusterApplierService clusterApplierService, @Nullable final MergedSegmentPublisher mergedSegmentPublisher, - @Nullable final ReferencedSegmentsPublisher referencedSegmentsPublisher + @Nullable final ReferencedSegmentsPublisher referencedSegmentsPublisher, + @Nullable final DataFormatAwareEngineFactory dataFormatAwareEngineFactory ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -569,6 +575,10 @@ public boolean shouldCache(Query query) { startRefreshTask(); } } + this.dataFormatAwareEngineFactory = dataFormatAwareEngineFactory; + if (dataFormatAwareEngineFactory != null) { + this.currentCompositeEngineReference.set(dataFormatAwareEngineFactory.create()); + } } /** @@ -2204,6 +2214,20 @@ public Engine.Searcher acquireSearcher(String source) { return acquireSearcher(source, Engine.SearcherScope.EXTERNAL); } + /** + * Returns the current CompositeEngine, or null if no optimized index is active. + */ + public DataFormatAwareEngine getCompositeEngine() { + return currentCompositeEngineReference.get(); + } + + /** + * Sets the CompositeEngine for this shard (called during shard initialization for optimized indexes). + */ + public void setCompositeEngine(DataFormatAwareEngine dataFormatAwareEngine) { + currentCompositeEngineReference.set(dataFormatAwareEngine); + } + private void markSearcherAccessed() { lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis()); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 16229f12c60a8..5bd14d499dc6d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -63,6 +63,7 @@ import org.opensearch.common.CheckedConsumer; import org.opensearch.common.CheckedFunction; import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.InternalApi; import org.opensearch.common.annotation.PublicApi; @@ -123,6 +124,7 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.engine.NoOpEngine; import org.opensearch.index.engine.ReadOnlyEngine; +import org.opensearch.index.engine.exec.DataFormatAwareEngineFactory; import org.opensearch.index.fielddata.IndexFieldDataCache; import org.opensearch.index.flush.FlushStats; import org.opensearch.index.get.GetStats; @@ -146,6 +148,7 @@ import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.IndexingStats; +import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; @@ -424,6 +427,12 @@ public class IndicesService extends AbstractLifecycleComponent private volatile int defaultMaxMergeAtOnce; private final StatusCounterStats statusCounterStats; private final ClusterMergeSchedulerConfig clusterMergeSchedulerConfig; + private final CheckedTriFunction< + ShardPath, + MapperService, + IndexSettings, + DataFormatAwareEngineFactory, + IOException> compositeEngineFactorySupplier; @Override protected void doStart() { @@ -609,6 +618,12 @@ protected void closeInternal() { MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING, this::onClusterLevelForceMergeMBPerSecUpdate ); + this.compositeEngineFactorySupplier = (shardPath, mapperService, indexSettings) -> new DataFormatAwareEngineFactory( + pluginsService, + shardPath, + mapperService, + indexSettings + ); } @InternalApi @@ -1109,6 +1124,7 @@ private synchronized IndexService createIndexService( for (IndexEventListener listener : builtInListeners) { indexModule.addIndexEventListener(listener); } + return indexModule.newIndexService( indexCreationContext, nodeEnv, @@ -1136,7 +1152,8 @@ private synchronized IndexService createIndexService( replicator, segmentReplicationStatsProvider, this::getClusterDefaultMaxMergeAtOnce, - clusterMergeSchedulerConfig + clusterMergeSchedulerConfig, + compositeEngineFactorySupplier ); } diff --git a/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java new file mode 100644 index 0000000000000..a0b1dfb10e0fe --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.IndexFilterProvider; +import org.opensearch.index.engine.exec.SearchExecEngine; +import org.opensearch.index.engine.exec.SourceProvider; +import org.opensearch.index.shard.ShardPath; + +import java.io.IOException; +import java.util.List; + +/** + * Interface for back-end query engines. + * + * @opensearch.internal + */ +public interface SearchBackEndPlugin { + + String name(); + + List getSupportedFormats(); + + EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) throws IOException; + + /** + * Create a search execution engine. Return null if this plugin is an index provider only. + */ + default SearchExecEngine createSearchExecEngine(DataFormat format, ShardPath shardPath) throws IOException { + return null; + } + + /** + * Create an index filter provider. Return null if this plugin is a search engine only. + */ + default IndexFilterProvider createIndexFilterProvider(DataFormat format, ShardPath shardPath) throws IOException { + return null; + } + + /** + * Create a source provider. Return null if this plugin does not provide source data. + *

+ * A source provider executes the full query+scan+filter and streams back + * result batches (projections, aggregations) to the primary engine. + */ + default SourceProvider createSourceProvider(DataFormat format, ShardPath shardPath) throws IOException { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/search/SearchExecutionContext.java b/server/src/main/java/org/opensearch/search/SearchExecutionContext.java new file mode 100644 index 0000000000000..2368d7992b7b5 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/SearchExecutionContext.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.search.internal.ShardSearchRequest; + +import java.io.Closeable; + +/** + * Engine-agnostic search execution context. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface SearchExecutionContext extends Closeable { + + ShardSearchRequest request(); + + SearchShardTarget shardTarget(); +} diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index d3637aac98ae6..57ba262b790ea 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -281,7 +281,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { s -> {}, null, () -> TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE, - mockClusterMergeSchedulerConfig + mockClusterMergeSchedulerConfig, + null ); } diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java index 117ce798494f2..f076442ececd3 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java @@ -9,15 +9,23 @@ package org.opensearch.index.engine.dataformat; import org.opensearch.Version; +import org.opensearch.action.search.SearchShardTask; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.DataFormatAwareEngine; +import org.opensearch.index.engine.exec.CatalogSnapshot; +import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.SearchExecEngine; import org.opensearch.index.engine.exec.Segment; import org.opensearch.index.engine.exec.WriterFileSet; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; +import org.opensearch.search.SearchExecutionContext; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -409,4 +417,414 @@ public > IndexingExecutionEngin return (IndexingExecutionEngine) new MockIndexingExecutionEngine(dataFormat); } } + + /** + * write → refresh → catalog snapshot → DataFormatAwareEngine → acquireReader → search. + */ + public void testWritePathToSearchExecEngine() throws IOException { + MockDataFormat format = new MockDataFormat(); + MockIndexingExecutionEngine indexEngine = new MockIndexingExecutionEngine(format); + + Writer w = indexEngine.createWriter(1L); + MockDocumentInput d1 = indexEngine.newDocumentInput(); + d1.addField(mock(MappedFieldType.class), "Alice"); + d1.setRowId("_row_id", 0); + w.addDoc(d1); + MockDocumentInput d2 = indexEngine.newDocumentInput(); + d2.addField(mock(MappedFieldType.class), "Bob"); + d2.setRowId("_row_id", 1); + w.addDoc(d2); + WriterFileSet fs = w.flush().getWriterFileSet(format).get(); + w.close(); + + RefreshResult refreshResult = indexEngine.refresh(RefreshInput.builder().addWriterFileSet(fs).build()); + MockCatalogSnapshot snapshot = new MockCatalogSnapshot(1L, refreshResult.refreshedSegments(), format); + + MockReaderManager readerManager = new MockReaderManager(format.name()); + readerManager.afterRefresh(true, snapshot); + + DataFormatAwareEngine dataFormatAwareEngine = new DataFormatAwareEngine(Map.of(format, readerManager), Map.of(), Map.of(), Map.of()); + // setLatestSnapshot incRefs snapshot (refcount: 1 initial + 1 engine = 2) + dataFormatAwareEngine.setLatestSnapshot(snapshot); + + // acquireReader incRefs again (refcount: 3) + try (DataFormatAwareEngine.DataFormatAwareReader cr = dataFormatAwareEngine.acquireReader()) { + MockReader reader = (MockReader) cr.getReader(format); + assertNotNull(reader); + assertEquals(2, reader.totalRows); + + MockSearchExecEngine searchEngine = new MockSearchExecEngine(); + String plan = searchEngine.convertFragment("SELECT * FROM hits"); + MockSearchContext ctx = searchEngine.createContext(reader, plan, null, null, null); + List results = searchEngine.execute(ctx); + assertEquals(2, results.size()); + ctx.close(); + } + // cr.close() decRefs. Snapshot still alive — engine owns the construction ref. + assertTrue(snapshot.tryIncRef()); + snapshot.decRef(); // undo probe + } + + /** + * Search holds snapshot alive while refresh replaces it. + *

+ * Timeline: + * 1. new s1 → refcount = 1 (construction) + * 2. setLatestSnapshot(s1) → refcount = 1 (engine takes over construction ref) + * 3. acquireReader() → refcount = 2 (search adds ref) + * 4. setLatestSnapshot(s2) → s1 refcount = 1 (engine releases s1) + * 5. readerManager.onDeleted(s1) → reader closed, but s1 alive (search ref) + * 6. compositeReader.close() → s1 refcount = 0 → dead + */ + public void testSearchHoldsSnapshotAliveWhileRefreshDeletesFiles() throws IOException { + MockDataFormat format = new MockDataFormat(); + MockIndexingExecutionEngine indexEngine = new MockIndexingExecutionEngine(format); + + // Batch 1 + Writer w1 = indexEngine.createWriter(1L); + MockDocumentInput d1 = indexEngine.newDocumentInput(); + d1.addField(mock(MappedFieldType.class), "Alice"); + d1.setRowId("_row_id", 0); + w1.addDoc(d1); + WriterFileSet fs1 = w1.flush().getWriterFileSet(format).get(); + w1.close(); + + RefreshResult rr1 = indexEngine.refresh(RefreshInput.builder().addWriterFileSet(fs1).build()); + MockCatalogSnapshot snapshot1 = new MockCatalogSnapshot(1L, rr1.refreshedSegments(), format); + + MockReaderManager readerManager = new MockReaderManager(format.name()); + readerManager.afterRefresh(true, snapshot1); + + DataFormatAwareEngine dataFormatAwareEngine = new DataFormatAwareEngine(Map.of(format, readerManager), Map.of(), Map.of(), Map.of()); + dataFormatAwareEngine.setLatestSnapshot(snapshot1); // takes over construction ref, refcount: 1 + + // Search acquires reader — refcount: 2 + DataFormatAwareEngine.DataFormatAwareReader dataFormatAwareReader = dataFormatAwareEngine.acquireReader(); + MockReader searchReader = (MockReader) dataFormatAwareReader.getReader(format); + assertEquals(1, searchReader.totalRows); + + // New refresh arrives — setLatestSnapshot(s2) decRefs s1 → refcount: 1 + Writer w2 = indexEngine.createWriter(2L); + MockDocumentInput d2 = indexEngine.newDocumentInput(); + d2.addField(mock(MappedFieldType.class), "Bob"); + d2.setRowId("_row_id", 1); + w2.addDoc(d2); + WriterFileSet fs2 = w2.flush().getWriterFileSet(format).get(); + w2.close(); + + RefreshResult rr2 = indexEngine.refresh(RefreshInput.builder().addWriterFileSet(fs1).addWriterFileSet(fs2).build()); + MockCatalogSnapshot snapshot2 = new MockCatalogSnapshot(2L, rr2.refreshedSegments(), format); + readerManager.afterRefresh(true, snapshot2); + dataFormatAwareEngine.setLatestSnapshot(snapshot2); // s1 refcount: 1 (only search ref) + + // Old snapshot deleted from reader manager — reader closes + readerManager.onDeleted(snapshot1); + assertTrue("Reader for snapshot1 closed in reader manager", searchReader.closed); + + // But snapshot1 still alive — search holds the last ref + assertTrue("Snapshot1 alive while search holds ref", snapshot1.tryIncRef()); + snapshot1.decRef(); // undo probe + + // Search completes — s1 refcount: 0 → dead + dataFormatAwareReader.close(); + assertFalse("Snapshot1 dead after search releases", snapshot1.tryIncRef()); + + // Snapshot 2 still works + try (DataFormatAwareEngine.DataFormatAwareReader cr2 = dataFormatAwareEngine.acquireReader()) { + MockReader r2 = (MockReader) cr2.getReader(format); + assertEquals(2, r2.totalRows); + } + } + + /** + * CompositeReader provides per-format reader access from a single catalog snapshot. + */ + public void testCompositeReaderMultiFormat() throws IOException { + MockDataFormat format1 = new MockDataFormat(); + DataFormat format2 = new DataFormat() { + @Override + public String name() { + return "mock-lucene"; + } + + @Override + public long priority() { + return 50L; + } + + @Override + public Set supportedFields() { + return Set.of(); + } + }; + + MockReaderManager rm1 = new MockReaderManager(format1.name()); + MockReaderManager rm2 = new MockReaderManager(format2.name()); + + Path dir = createTempDir(); + WriterFileSet wfs1 = WriterFileSet.builder().directory(dir).writerGeneration(1L).addFile("data.parquet").addNumRows(10).build(); + WriterFileSet wfs2 = WriterFileSet.builder().directory(dir).writerGeneration(1L).addFile("data.lucene").addNumRows(10).build(); + Segment seg = Segment.builder(0L).addSearchableFiles(format1, wfs1).addSearchableFiles(format2, wfs2).build(); + MockCatalogSnapshot snapshot = new MockCatalogSnapshot(1L, List.of(seg), format1) { + @Override + public Collection getSearchableFiles(String dataFormat) { + if ("mock-lucene".equals(dataFormat)) return List.of(wfs2); + return super.getSearchableFiles(dataFormat); + } + + @Override + public Set getDataFormats() { + return Set.of(format1.name(), format2.name()); + } + }; + + rm1.afterRefresh(true, snapshot); + rm2.afterRefresh(true, snapshot); + + DataFormatAwareEngine dataFormatAwareEngine = new DataFormatAwareEngine(Map.of(format1, rm1, format2, rm2), Map.of(), Map.of(), Map.of()); + dataFormatAwareEngine.setLatestSnapshot(snapshot); + + try (DataFormatAwareEngine.DataFormatAwareReader cr = dataFormatAwareEngine.acquireReader()) { + MockReader r1 = (MockReader) cr.getReader(format1); + MockReader r2 = (MockReader) cr.getReader(format2); + assertNotNull(r1); + assertNotNull(r2); + assertEquals(10, r1.totalRows); + assertEquals(10, r2.totalRows); + assertTrue(r1.fileNames.contains("data.parquet")); + assertTrue(r2.fileNames.contains("data.lucene")); + } + } + + /** + * afterRefresh(false) is a no-op; duplicate afterRefresh for same snapshot reuses reader. + */ + public void testRefreshEdgeCases() throws IOException { + MockDataFormat format = new MockDataFormat(); + MockIndexingExecutionEngine indexEngine = new MockIndexingExecutionEngine(format); + + Writer w = indexEngine.createWriter(1L); + MockDocumentInput d = indexEngine.newDocumentInput(); + d.addField(mock(MappedFieldType.class), "x"); + d.setRowId("_row_id", 0); + w.addDoc(d); + WriterFileSet fs = w.flush().getWriterFileSet(format).get(); + w.close(); + + RefreshResult rr = indexEngine.refresh(RefreshInput.builder().addWriterFileSet(fs).build()); + MockCatalogSnapshot snapshot = new MockCatalogSnapshot(1L, rr.refreshedSegments(), format); + + MockReaderManager rm = new MockReaderManager(format.name()); + + rm.afterRefresh(false, snapshot); + assertNull(rm.getReader(snapshot)); + assertEquals(0, rm.readerCount()); + + rm.afterRefresh(true, snapshot); + assertNotNull(rm.getReader(snapshot)); + assertEquals(1, rm.readerCount()); + + MockReader first = rm.getReader(snapshot); + rm.afterRefresh(true, snapshot); + assertSame(first, rm.getReader(snapshot)); + assertEquals(1, rm.readerCount()); + } + + /** + * File add/delete notifications propagate through reader manager. + */ + public void testFileLifecycleNotifications() throws IOException { + MockReaderManager rm = new MockReaderManager("mock-columnar"); + + rm.onFilesAdded(List.of("a.parquet", "b.parquet")); + assertEquals(2, rm.addedFiles.size()); + assertTrue(rm.addedFiles.contains("a.parquet")); + + rm.onFilesDeleted(List.of("a.parquet")); + assertEquals(1, rm.deletedFiles.size()); + assertTrue(rm.deletedFiles.contains("a.parquet")); + } + + static class MockReader { + final List fileNames; + final long totalRows; + boolean closed; + + MockReader(List fileNames, long totalRows) { + this.fileNames = fileNames; + this.totalRows = totalRows; + } + + void close() { + closed = true; + } + } + + static class MockSearchContext implements SearchExecutionContext { + final String plan; + final long totalRows; + + MockSearchContext(String plan, long totalRows) { + this.plan = plan; + this.totalRows = totalRows; + } + + @Override + public ShardSearchRequest request() { + return null; + } + + @Override + public SearchShardTarget shardTarget() { + return null; + } + + @Override + public void close() {} + } + + static class MockSearchExecEngine implements SearchExecEngine> { + @Override + public String convertFragment(Object fragment) { + return "PLAN:" + fragment; + } + + @Override + public MockSearchContext createContext( + Object reader, + String plan, + ShardSearchRequest request, + SearchShardTarget shardTarget, + SearchShardTask task + ) { + MockReader r = (MockReader) reader; + return new MockSearchContext(plan, r.totalRows); + } + + @Override + public List execute(MockSearchContext context) { + List rows = new ArrayList<>(); + for (int i = 0; i < context.totalRows; i++) { + rows.add(new Object[] { "row_" + i }); + } + return rows; + } + } + + static class MockReaderManager implements EngineReaderManager { + private final String formatName; + private final Map readers = new HashMap<>(); + final List addedFiles = new ArrayList<>(); + final List deletedFiles = new ArrayList<>(); + + MockReaderManager(String formatName) { + this.formatName = formatName; + } + + @Override + public MockReader getReader(CatalogSnapshot snapshot) { + return readers.get(snapshot); + } + + int readerCount() { + return readers.size(); + } + + @Override + public void beforeRefresh() {} + + @Override + public void afterRefresh(boolean didRefresh, CatalogSnapshot snapshot) { + if (didRefresh == false || readers.containsKey(snapshot)) return; + Collection files = snapshot.getSearchableFiles(formatName); + List allFiles = new ArrayList<>(); + long totalRows = 0; + for (WriterFileSet wfs : files) { + allFiles.addAll(wfs.files()); + totalRows += wfs.numRows(); + } + readers.put(snapshot, new MockReader(allFiles, totalRows)); + } + + @Override + public void onDeleted(CatalogSnapshot snapshot) { + MockReader reader = readers.remove(snapshot); + if (reader != null) reader.close(); + } + + @Override + public void onFilesDeleted(Collection files) { + deletedFiles.addAll(files); + } + + @Override + public void onFilesAdded(Collection files) { + addedFiles.addAll(files); + } + } + + static class MockCatalogSnapshot extends CatalogSnapshot { + private final List segments; + private final MockDataFormat format; + + MockCatalogSnapshot(long generation, List segments, MockDataFormat format) { + super("mock-snapshot", generation, 1L); + this.segments = segments; + this.format = format; + } + + @Override + public Map getUserData() { + return Map.of(); + } + + @Override + public long getId() { + return generation; + } + + @Override + public List getSegments() { + return segments; + } + + @Override + public Collection getSearchableFiles(String dataFormat) { + List result = new ArrayList<>(); + for (Segment seg : segments) { + WriterFileSet wfs = seg.dfGroupedSearchableFiles().get(dataFormat); + if (wfs != null) result.add(wfs); + } + return result; + } + + @Override + public Set getDataFormats() { + return Set.of(format.name()); + } + + @Override + public long getLastWriterGeneration() { + return generation; + } + + @Override + public String serializeToString() { + return "mock-snapshot-" + generation; + } + + @Override + public void setCatalogSnapshotMap(Map map) {} + + @Override + public void setUserData(Map userData, boolean b) {} + + @Override + public Object getReader(DataFormat dataFormat) { + return null; + } + + @Override + protected void closeInternal() {} + } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 7e236cf911060..5c85762448adb 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -805,7 +805,8 @@ protected IndexShard newShard( new Object(), clusterService.getClusterApplierService(), mergedSegmentPublisher, - ReferencedSegmentsPublisher.EMPTY + ReferencedSegmentsPublisher.EMPTY, + null // TODO ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) {