Skip to content

Add sandbox plugin for composite indexing execution engine#20909

Open
alchemist51 wants to merge 1 commit intoopensearch-project:mainfrom
alchemist51:ciee
Open

Add sandbox plugin for composite indexing execution engine#20909
alchemist51 wants to merge 1 commit intoopensearch-project:mainfrom
alchemist51:ciee

Conversation

@alchemist51
Copy link
Contributor

@alchemist51 alchemist51 commented Mar 18, 2026

Description

This PR introduces the composite-engine sandbox plugin that implements the CompositeIndexingExecutionEngine — the orchestration layer for multi-format indexing as described in RFC #20644

The composite engine enables an index to write documents to multiple storage formats (e.g., Lucene + Parquet) simultaneously through a single IndexingExecutionEngine interface. Format plugins register via the ExtensiblePlugin SPI, and the composite engine delegates writes, refresh, and file management to each per-format engine.

Note: we have used ExtensiblePlugin SPI model temporarily. Once we have introduced Dataformat Registry, we should be able to get rid of this model.

What's included

New sandbox plugin: sandbox/plugins/composite-engine

  • CompositeEnginePluginExtensiblePlugin entry point that discovers DataFormatPlugin implementations at node bootstrap, validates index settings, and creates the composite engine. Registers three index settings:
    • index.composite.enabled (default false)
    • index.composite.primary_data_format (default "lucene")
    • index.composite.secondary_data_formats (default [])
  • CompositeIndexingExecutionEngine — Orchestrates indexing across a primary and zero or more secondary per-format engines. Handles writer creation, refresh (flush all writers → build segments → delegate per-format refresh), file deletion, and document input creation.
  • CompositeDataFormat — A DataFormat wrapper over the constituent formats. Uses Long.MIN_VALUE priority so concrete formats take precedence.
  • CompositeDocumentInput — Broadcasts addField, setRowId, and other metadata operations to all per-format DocumentInput instances. Releases the writer back to the pool on close().
  • CompositeWriter — Delegates addDoc, flush, sync, and close to each per-format writer (primary first, then secondaries). Implements Lock for pool checkout semantics.
  • CompositeDataFormatWriterPool — Thread-safe pool of CompositeWriter instances with lock-based checkout/release and a checkoutAll for flush.
  • RowIdGenerator — Generates monotonically increasing row IDs for cross-format document synchronization within a writer's segment scope.

New sandbox lib: sandbox/libs/composite-engine-lib

  • ConcurrentQueue — Striped concurrent queue using thread-affinity hashing to reduce contention across concurrent indexing threads.
  • LockableConcurrentQueue — Extends ConcurrentQueue with tryLock-based polling so writers can be checked out without blocking.

How format plugins integrate

Format plugins (e.g., Parquet) extend this plugin by:

  1. Declaring extendedPlugins = ['composite-engine'] in their build.gradle
  2. Implementing DataFormatPlugin
  3. The ExtensiblePlugin SPI discovers them automatically during node bootstrap

Related issues

Resolves part of #20876

Check List

  • New functionality includes testing
  • New functionality has been documented
  • All classes are annotated with @ExperimentalApi
  • No BWC tests required (sandbox/experimental)
  • Commits are signed off (DCO)

@github-actions
Copy link
Contributor

github-actions bot commented Mar 18, 2026

PR Reviewer Guide 🔍

(Review updated until commit ea0817e)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Add striped concurrent queue library (ConcurrentQueue and LockableConcurrentQueue)

Relevant files:

  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/ConcurrentQueue.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockableConcurrentQueue.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/Lockable.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/package-info.java
  • libs/concurrent-queue/src/test/java/org/opensearch/common/queue/ConcurrentQueueTests.java
  • libs/concurrent-queue/src/test/java/org/opensearch/common/queue/LockableConcurrentQueueTests.java

Sub-PR theme: Refactor DataFormat to abstract class and add DataformatAwareLockableWriterPool

Relevant files:

  • server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java
  • server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java
  • server/src/main/java/org/opensearch/index/engine/dataformat/DataformatAwareLockableWriterPool.java
  • server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/DataformatAwareLockableWriterPoolTests.java

Sub-PR theme: Add composite-engine sandbox plugin with CompositeIndexingExecutionEngine

Relevant files:

  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/RowIdGenerator.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDocumentInputTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/RowIdGeneratorTests.java
  • sandbox/plugins/composite-engine/README.md

⚡ Recommended focus areas for review

Race Condition in checkoutAll

In checkoutAll(), writers are locked one-by-one in a loop over this (the iterator snapshot), then a synchronized block removes them. Between the initial lock and the synchronized removal, a concurrent getAndLock() could have already polled and locked a writer from availableWriters, meaning it won't be in availableWriters anymore but is still in writers. The logic isRegistered(writer) && writers.remove(writer) followed by availableWriters.remove(writer) may miss writers that are currently checked out by another thread, leading to incomplete flush coverage.

public List<W> checkoutAll() {
    ensureOpen();
    List<W> lockedWriters = new ArrayList<>();
    List<W> checkedOutWriters = new ArrayList<>();
    for (W writer : this) {
        writer.lock();
        lockedWriters.add(writer);
    }
    synchronized (this) {
        for (W writer : lockedWriters) {
            try {
                if (isRegistered(writer) && writers.remove(writer)) {
                    availableWriters.remove(writer);
                    checkedOutWriters.add(writer);
                }
            } finally {
                writer.unlock();
            }
        }
    }
    return Collections.unmodifiableList(checkedOutWriters);
}
Null Return on Empty Refresh

refresh() returns null when no new segments are produced (line 219). Callers must handle a null RefreshResult, but the interface contract may not document this. This could cause NullPointerExceptions in callers that don't check for null, and is inconsistent with the secondary engines' refresh() which returns an empty RefreshResult.

if (newSegmentList.isEmpty()) {
    logger.debug("No new segments produced from flush");
    return null;
}
Partial Failure on Secondary Write

In refresh(), after flushing all writers and calling writer.close(), the per-format engine.refresh() calls are made with an empty RefreshInput (no segments). The newly produced segments in newSegmentList are never passed to the per-format engines. This means per-format engines may not be aware of the new segments, potentially causing inconsistency between the composite segment list and the per-format engine state.

// Delegate refresh to each per-format engine
RefreshInput emptyInput = RefreshInput.builder().build();
primaryEngine.refresh(emptyInput);
for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
    engine.refresh(emptyInput);
}

return new RefreshResult(refreshedSegments);
No Partial Failure Rollback

In addDoc(), if the primary write succeeds but a secondary write fails, the method returns the failure result without rolling back the primary write. This leaves the data in an inconsistent state across formats — the primary has the document but one or more secondaries do not. There is no compensation or abort mechanism for the primary writer in this case.

public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
    // Write to primary first
    WriteResult primaryResult = primaryWriter.getValue().addDoc(doc.getPrimaryInput());
    switch (primaryResult) {
        case WriteResult.Success s -> logger.trace("Successfully added document in primary format [{}]", primaryWriter.getKey().name());
        case WriteResult.Failure f -> {
            logger.debug("Failed to add document in primary format [{}]", primaryWriter.getKey().name());
            return primaryResult;
        }
    }

    // Then write to each secondary — keyed lookup by DataFormat (equals/hashCode based on name)
    Map<DataFormat, DocumentInput<?>> secondaryInputs = doc.getSecondaryInputs();
    for (Map.Entry<DataFormat, DocumentInput<?>> inputEntry : secondaryInputs.entrySet()) {
        DataFormat format = inputEntry.getKey();
        Writer<DocumentInput<?>> writer = secondaryWritersByFormat.get(format);
        if (writer == null) {
            logger.warn("No writer found for secondary format [{}], skipping", format.name());
            continue;
        }
        WriteResult result = writer.addDoc(inputEntry.getValue());
        switch (result) {
            case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", format.name());
            case WriteResult.Failure f -> {
                logger.debug("Failed to add document in secondary format [{}]", format.name());
                return result;
            }
        }
    }

    return primaryResult;
}
Double-Close Throws Exception

close() throws AlreadyClosedException if called a second time. This is unusual for Closeable implementations — the standard contract (per Closeable Javadoc) is that calling close() more than once has no effect. This could cause issues in try-with-resources or other idiomatic Java patterns.

public void close() throws IOException {
    if (closed.trySet(true) == false) {
        throw new AlreadyClosedException("DataformatAwareLockableWriterPool is already closed");
    }
}

@github-actions
Copy link
Contributor

github-actions bot commented Mar 18, 2026

PR Code Suggestions ✨

Latest suggestions up to ea0817e

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Preserve lock on successfully checked-out writers

In checkoutAll, writers that are successfully checked out are unlocked in the
finally block, but the callers (e.g., refresh) expect to hold the lock on the
checked-out writers. Writers that are not checked out (e.g., already unregistered)
should be unlocked, but writers that are successfully checked out should remain
locked until the caller explicitly releases them.

server/src/main/java/org/opensearch/index/engine/dataformat/DataformatAwareLockableWriterPool.java [110-131]

 public List<W> checkoutAll() {
     ensureOpen();
     List<W> lockedWriters = new ArrayList<>();
     List<W> checkedOutWriters = new ArrayList<>();
     for (W writer : this) {
         writer.lock();
         lockedWriters.add(writer);
     }
     synchronized (this) {
         for (W writer : lockedWriters) {
-            try {
-                if (isRegistered(writer) && writers.remove(writer)) {
-                    availableWriters.remove(writer);
-                    checkedOutWriters.add(writer);
-                }
-            } finally {
+            if (isRegistered(writer) && writers.remove(writer)) {
+                availableWriters.remove(writer);
+                checkedOutWriters.add(writer);
+            } else {
                 writer.unlock();
             }
         }
     }
     return Collections.unmodifiableList(checkedOutWriters);
 }
Suggestion importance[1-10]: 8

__

Why: This is a real bug: the finally block unconditionally unlocks all writers including those successfully checked out, but callers like refresh in CompositeIndexingExecutionEngine expect to hold the lock on checked-out writers. The improved code correctly only unlocks writers that were not checked out.

Medium
Unlock entry before adding to queue

The addAndUnlock method adds the entry to the queue and then unlocks it. However,
between queue.add(entry) and entry.unlock(), another thread calling lockAndPoll
could see the entry in the queue but fail to lock it (since it's still locked),
causing a spurious retry loop. The entry should be unlocked before being added to
the queue so that it is immediately acquirable by polling threads.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/LockableConcurrentQueue.java [74-78]

 public void addAndUnlock(T entry) {
+    entry.unlock();
     queue.add(entry);
-    entry.unlock();
     addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion points to a real ordering issue: if an entry is added to the queue while still locked, a polling thread could find it but fail tryLock, causing unnecessary retries. However, the file path in the suggestion is wrong (the class is in LockableConcurrentQueue.java under libs/concurrent-queue), and the actual code lines are in that file at lines 74-78. The logic concern is valid but the window is very small since unlock immediately follows add.

Medium
Fix race condition in initialization check

The initialize method has a race condition: two threads could both pass the null
check before either sets writerSupplier, resulting in double initialization. Use a
synchronized block or SetOnce (already imported) to make this check-and-set atomic.

server/src/main/java/org/opensearch/index/engine/dataformat/DataformatAwareLockableWriterPool.java [68-73]

-public void initialize(Supplier<W> writerSupplier) {
+public synchronized void initialize(Supplier<W> writerSupplier) {
     if (this.writerSupplier != null) {
         throw new IllegalStateException("DataformatAwareLockableWriterPool is already initialized");
     }
     this.writerSupplier = Objects.requireNonNull(writerSupplier, "writerSupplier must not be null");
 }
Suggestion importance[1-10]: 6

__

Why: The race condition in initialize is a real concurrency issue since two threads could both pass the null check before either sets writerSupplier. Adding synchronized is a straightforward fix, though in practice initialize is typically called once during construction.

Low
General
Return actual inputs instead of null

getFinalInput() always returns null, which may cause NullPointerException in callers
that expect a valid list of per-format document inputs. It should return a list
containing the primary and all secondary document inputs so callers can access the
finalized inputs for each format.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java [76-78]

 @Override
 public List<? extends DocumentInput<?>> getFinalInput() {
-    return null;
+    List<DocumentInput<?>> inputs = new ArrayList<>();
+    inputs.add(primaryDocumentInput);
+    inputs.addAll(secondaryDocumentInputs.values());
+    return Collections.unmodifiableList(inputs);
 }
Suggestion importance[1-10]: 4

__

Why: While returning null from getFinalInput() could be problematic for callers, the existing code and tests explicitly assert null is returned, suggesting this is intentional for now (with a TODO-like design). The suggestion may be premature without knowing the full contract of getFinalInput.

Low

Previous suggestions

Suggestions up to commit 1839651
CategorySuggestion                                                                                                                                    Impact
Possible issue
Checked-out writers should remain locked

In checkoutAll, writers that are locked but not successfully checked out (e.g.,
already removed from writers) are unlocked in the finally block, which is correct.
However, writers that ARE successfully checked out are also unlocked in the finally
block before being returned to the caller. This means the caller receives writers
that are no longer locked, defeating the purpose of "checkout". Writers added to
checkedOutWriters should remain locked and only be unlocked by the caller after use.

server/src/main/java/org/opensearch/index/engine/dataformat/DataformatAwareLockableWriterPool.java [110-131]

 public List<W> checkoutAll() {
     ensureOpen();
     List<W> lockedWriters = new ArrayList<>();
     List<W> checkedOutWriters = new ArrayList<>();
     for (W writer : this) {
         writer.lock();
         lockedWriters.add(writer);
     }
     synchronized (this) {
         for (W writer : lockedWriters) {
-            try {
-                if (isRegistered(writer) && writers.remove(writer)) {
-                    availableWriters.remove(writer);
-                    checkedOutWriters.add(writer);
-                }
-            } finally {
+            if (isRegistered(writer) && writers.remove(writer)) {
+                availableWriters.remove(writer);
+                checkedOutWriters.add(writer);
+            } else {
                 writer.unlock();
             }
         }
     }
     return Collections.unmodifiableList(checkedOutWriters);
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical correctness bug: the finally block unconditionally unlocks all writers including those successfully checked out, so callers receive unlocked writers. The refresh method in CompositeIndexingExecutionEngine calls checkoutAll() and then calls writer.setFlushPending() and writer.flush() expecting exclusive access, which is broken if the writers are already unlocked. The improved code correctly only unlocks writers that were NOT checked out.

High
Unlock entry before adding to queue

The addAndUnlock method adds the entry to the queue and then unlocks it. However,
between queue.add(entry) and entry.unlock(), another thread calling lockAndPoll
could see the entry in the queue but fail to acquire its lock (since it's still
locked), causing unnecessary retries. The unlock should happen before adding to the
queue so the entry is immediately acquirable when visible.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/LockableConcurrentQueue.java [74-78]

 public void addAndUnlock(T entry) {
+    entry.unlock();
     queue.add(entry);
-    entry.unlock();
     addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential issue: if an entry is added to the queue while still locked, a polling thread could see it but fail tryLock, causing unnecessary retries. Unlocking before adding ensures the entry is immediately acquirable. However, this is a design trade-off (the current order ensures the counter increment signals availability after unlock), so the impact is moderate.

Medium
Release writer on document input creation failure

If primaryEngine.newDocumentInput() or any secondaryEngine.newDocumentInput() throws
an exception, the writer obtained from writerPool.getAndLock() will never be
released back to the pool, causing a resource leak. The writer checkout should be
wrapped in a try-catch to ensure it is released on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [257-271]

 @Override
 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
-    DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.isFlushPending() == false && writer.isAborted() == false : "CompositeWriter has pending flush: "
+                + writer.isFlushPending()
+                + " aborted="
+                + writer.isAborted();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.isFlushPending() == false && writer.isAborted() == false : "CompositeWriter has pending flush: "
-            + writer.isFlushPending()
-            + " aborted="
-            + writer.isAborted();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 7

__

Why: This is a valid resource leak concern: if newDocumentInput() throws on the primary or secondary engine, the locked writer is never returned to the pool. The fix correctly wraps the creation in a try-catch to ensure the writer is released on failure.

Medium
Synchronize initialization to prevent race condition

The initialize method has a race condition: two threads could both pass the null
check before either sets writerSupplier, resulting in double initialization. The
check-then-act should be synchronized or use an atomic compare-and-set to be
thread-safe.

server/src/main/java/org/opensearch/index/engine/dataformat/DataformatAwareLockableWriterPool.java [68-73]

-public void initialize(Supplier<W> writerSupplier) {
+public synchronized void initialize(Supplier<W> writerSupplier) {
     if (this.writerSupplier != null) {
         throw new IllegalStateException("DataformatAwareLockableWriterPool is already initialized");
     }
     this.writerSupplier = Objects.requireNonNull(writerSupplier, "writerSupplier must not be null");
 }
Suggestion importance[1-10]: 6

__

Why: The race condition in initialize is real but low-risk in practice since initialization is typically called once during engine construction. Adding synchronized is a simple and correct fix to prevent double-initialization under concurrent access.

Low
Suggestions up to commit 2823348
CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard against null writer from pool

If getAndLock() returns null (which it can when the pool is exhausted and
fetchWriter fails), the subsequent releaseAndUnlock(null) call in the onClose
callback will throw a NullPointerException. The result of getAndLock() should be
null-checked before proceeding, or the method should guarantee a non-null writer.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [246-259]

 @Override
 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = dataFormatWriterPool.getAndLock();
+    if (writer == null) {
+        throw new IllegalStateException("Failed to acquire a CompositeWriter from the pool");
+    }
     DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
     Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
     for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
         secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
     }
     return new CompositeDocumentInput(
         primaryEngine.getDataFormat(),
         primaryInput,
         secondaryInputMap,
         () -> dataFormatWriterPool.releaseAndUnlock(writer)
     );
 }
Suggestion importance[1-10]: 7

__

Why: The getAndLock() method can return null when the pool is empty and fetchWriter fails, and passing null to releaseAndUnlock in the onClose callback would cause a NullPointerException. Adding a null check here prevents a real runtime error.

Medium
Prevent indefinite lock hold on uncheckout writers

In checkoutAll, writers that are locked but fail the isRegistered check are unlocked
in the finally block without being added to checkedOutWriters. However, these
writers were locked by the current thread and are not returned to the caller, so
they remain locked indefinitely, causing a deadlock or resource leak. Writers that
are not checked out should be explicitly unlocked only if they were not already
handled.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java [101-124]

 public List<CompositeWriter> checkoutAll() {
     ensureOpen();
     List<CompositeWriter> lockedWriters = new ArrayList<>();
     List<CompositeWriter> checkedOutWriters = new ArrayList<>();
     for (CompositeWriter writer : this) {
         writer.lock();
         lockedWriters.add(writer);
     }
     synchronized (this) {
         for (CompositeWriter writer : lockedWriters) {
             try {
-                // Release this writer if it's no longer managed by this pool; otherwise, check it out.
                 if (isRegistered(writer) && writers.remove(writer)) {
                     availableWriters.remove(writer);
                     writer.setFlushPending();
                     checkedOutWriters.add(writer);
+                } else {
+                    // Not checked out — unlock immediately so it isn't held indefinitely
+                    writer.unlock();
                 }
-            } finally {
+            } catch (Exception e) {
                 writer.unlock();
+                throw e;
             }
         }
     }
     return Collections.unmodifiableList(checkedOutWriters);
 }
Suggestion importance[1-10]: 6

__

Why: Looking at the existing code, the finally block does call writer.unlock() for all writers including those not checked out, so the concern about indefinite lock holding is partially addressed. However, the suggestion correctly points out that checked-out writers are unlocked in the finally block even though they should remain locked for the caller — this is a real logic issue worth addressing.

Low
Fix race condition in add-and-unlock ordering

The addAndUnlock method in LockableConcurrentQueue adds the entry to the queue and
then unlocks it. However, between queue.add(entry) and entry.unlock(), another
thread calling lockAndPoll could acquire the entry's lock via tryLock while the
entry is still considered "locked" by the releasing thread, leading to a race
condition. The unlock should happen before adding to the queue, or the counter
increment should happen before the unlock to ensure visibility.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java [74-78]

 public void addAndUnlock(T entry) {
+    addAndUnlockCounter.incrementAndGet();
     queue.add(entry);
     entry.unlock();
-    addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion moves addAndUnlockCounter.incrementAndGet() before queue.add(entry), but this doesn't actually fix a race condition — it introduces a different ordering issue where the counter is incremented before the entry is visible in the queue. The actual concern about the ordering between queue.add and entry.unlock is valid but the proposed fix doesn't clearly resolve it either.

Low
General
Return actual inputs instead of null

getFinalInput() always returns null, which may cause NullPointerException in callers
that expect a valid list of per-format document inputs. This method should return
the actual list of all constituent document inputs (primary + secondaries) to
fulfill the contract of DocumentInput.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java [76-78]

 @Override
 public List<? extends DocumentInput<?>> getFinalInput() {
-    return null;
+    List<DocumentInput<?>> inputs = new ArrayList<>();
+    inputs.add(primaryDocumentInput);
+    inputs.addAll(secondaryDocumentInputs.values());
+    return Collections.unmodifiableList(inputs);
 }
Suggestion importance[1-10]: 4

__

Why: The getFinalInput() returning null could cause NPEs in callers, but the test testGetFinalInputReturnsNull explicitly asserts null is returned, suggesting this is intentional behavior for now. The suggestion may be valid long-term but contradicts the current design intent.

Low
Suggestions up to commit e68f39f
CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle null writer returned from pool

If dataFormatWriterPool.getAndLock() returns null (which it can when no writer is
available), the code proceeds to create document inputs and then passes null to
releaseAndUnlock, which will throw a NullPointerException. A null check should be
added after getAndLock().

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [250-263]

 @Override
 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = dataFormatWriterPool.getAndLock();
+    if (writer == null) {
+        throw new IllegalStateException("No available writer in the pool");
+    }
     DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
     Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
     for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
         secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
     }
     return new CompositeDocumentInput(
         primaryEngine.getDataFormat(),
         primaryInput,
         secondaryInputMap,
         () -> dataFormatWriterPool.releaseAndUnlock(writer)
     );
 }
Suggestion importance[1-10]: 8

__

Why: getAndLock() can return null when no writer is available (as documented in LockableConcurrentQueue.lockAndPoll()), and passing null to releaseAndUnlock would cause a NullPointerException. This is a real bug that needs to be addressed.

Medium
Fix race condition in add-then-unlock ordering

The addAndUnlock method adds the entry to the queue and then unlocks it. This
creates a race condition: between queue.add(entry) and entry.unlock(), another
thread calling lockAndPoll could poll the entry and attempt tryLock() on it, which
would fail because the entry is still locked by the current thread. The unlock
should happen before adding to the queue, or the entry should be added in an
unlocked state.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/LockableConcurrentQueue.java [75-79]

 public void addAndUnlock(T entry) {
+    entry.unlock();
     queue.add(entry);
-    entry.unlock();
     addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 7

__

Why: The current ordering adds the entry to the queue while it's still locked, which means a polling thread could see the entry but fail tryLock() unnecessarily. Unlocking before adding ensures the entry is immediately acquirable when visible. However, the lockAndPoll loop handles this gracefully via the addAndUnlockCounter retry mechanism, so the impact is a performance concern rather than a correctness bug.

Medium
Guard writer release against closed pool state

The releaseAndUnlock method adds the writer back to the pool even when the pool is
already closed. If close() is called concurrently, a writer could be returned to a
closed pool, potentially causing resource leaks or unexpected behavior. An
ensureOpen() check (or a check on closed) should be added before returning the
writer to the queue.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java [87-94]

 public void releaseAndUnlock(CompositeWriter state) {
     assert !state.isFlushPending() && !state.isAborted() : "CompositeWriter has pending flush: "
         + state.isFlushPending()
         + " aborted="
         + state.isAborted();
     assert isRegistered(state) : "CompositeDocumentWriterPool doesn't know about this CompositeWriter";
+    if (closed) {
+        state.unlock();
+        return;
+    }
     availableWriters.addAndUnlock(state);
 }
Suggestion importance[1-10]: 6

__

Why: This is a valid concern about a potential resource leak when releaseAndUnlock is called on a closed pool. However, the close() method only sets a flag and doesn't prevent further operations, so this is a real but edge-case issue. The fix is reasonable but the impact is moderate.

Low
General
Return union of all formats' supported fields

The method is documented as returning the "Union of all constituent formats'
supported fields" but actually only returns the first format's fields. This is
misleading and functionally incorrect — fields supported by secondary formats will
not be reported, potentially causing those formats to never receive field writes.
The implementation should return the union of all formats' supported fields.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java [69-76]

 @Override
 public Set<FieldTypeCapabilities> supportedFields() {
-    // Union of all constituent formats' supported fields
-    // TODO:: Post the changes done in mappings, we will relook this
     if (dataFormats.isEmpty()) {
         return Set.of();
     }
-    return dataFormats.get(0).supportedFields();
+    Set<FieldTypeCapabilities> union = new java.util.HashSet<>();
+    for (DataFormat format : dataFormats) {
+        union.addAll(format.supportedFields());
+    }
+    return Collections.unmodifiableSet(union);
 }
Suggestion importance[1-10]: 5

__

Why: The comment says "Union of all constituent formats' supported fields" but the implementation only returns the first format's fields. The code itself has a TODO acknowledging this is incomplete, so the suggestion is valid but the issue is already known and intentionally deferred.

Low
Suggestions up to commit 8425a85
Suggestions up to commit b9627f1
CategorySuggestion                                                                                                                                    Impact
Possible issue
Re-register writer on release after flush checkout

The releaseAndUnlock method does not re-register the writer into the writers set
after checkoutAll removes it via writers.remove(writer). Once a writer is checked
out for flush and then released, it will no longer be tracked by the pool, causing
isRegistered to return false and the assert to fail on subsequent releases. Writers
checked out via checkoutAll should be re-added to writers before being returned to
the available queue.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java [87-94]

 public void releaseAndUnlock(CompositeWriter state) {
     assert !state.isFlushPending() && !state.isAborted() : "CompositeWriter has pending flush: "
         + state.isFlushPending()
         + " aborted="
         + state.isAborted();
+    synchronized (this) {
+        writers.add(state); // re-register if removed during checkoutAll
+    }
     assert isRegistered(state) : "CompositeDocumentWriterPool doesn't know about this CompositeWriter";
     availableWriters.addAndUnlock(state);
 }
Suggestion importance[1-10]: 7

__

Why: The checkoutAll method removes writers from the writers set via writers.remove(writer), but releaseAndUnlock asserts isRegistered(state) which checks writers.contains(state). After a flush cycle, releasing a writer would fail the assertion since it was removed. The fix to re-add to writers before the assert is logically sound and addresses a real bug.

Medium
Ensure writer is closed on flush exception

If writer.flush() or any segmentBuilder operation throws an exception,
writer.close() will not be called, leaking the writer resource. The writer.close()
call should be placed in a finally block to ensure it is always executed.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [189-207]

 for (CompositeWriter writer : dataFormatWriters) {
-    FileInfos fileInfos = writer.flush();
-    ...
-    writer.close();
-    if (hasFiles) {
-        newSegmentList.add(segmentBuilder.build());
+    try {
+        FileInfos fileInfos = writer.flush();
+        Segment.Builder segmentBuilder = Segment.builder(writer.getWriterGeneration());
+        boolean hasFiles = false;
+        for (Map.Entry<DataFormat, WriterFileSet> entry : fileInfos.writerFilesMap().entrySet()) {
+            segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue());
+            hasFiles = true;
+        }
+        if (hasFiles) {
+            newSegmentList.add(segmentBuilder.build());
+        }
+    } finally {
+        writer.close();
     }
 }
Suggestion importance[1-10]: 7

__

Why: If writer.flush() throws an exception, writer.close() is never called, leaking the underlying writer resources. Wrapping the flush logic in a try-finally block is a valid resource management fix that prevents potential resource leaks in error scenarios.

Medium
Unlock entry before adding to queue

The entry is added to the queue and then unlocked, creating a window where another
thread calling lockAndPoll could dequeue and attempt to tryLock the entry before it
is unlocked, causing the poll to fail and potentially miss the entry. The entry
should be unlocked before being added to the queue so it is immediately available
for polling.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/LockableConcurrentQueue.java [75-79]

 public void addAndUnlock(T entry) {
+    entry.unlock();
     queue.add(entry);
-    entry.unlock();
     addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 6

__

Why: The current order (add then unlock) creates a race window where lockAndPoll could dequeue the entry and fail tryLock since it's still locked. Unlocking before adding ensures the entry is immediately pollable. However, the addAndUnlockCounter increment after queue.add still serves as a signal, and the lockAndPoll retry loop partially mitigates this, so the impact is moderate.

Low
General
Fix misleading comment on supported fields union

The method is documented as returning the "union of all constituent formats'
supported fields" but actually only returns the first format's fields, silently
ignoring all others. This means capabilities of secondary formats are invisible to
the engine. Until the TODO is resolved, the comment should accurately reflect the
current behavior to avoid misleading callers.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java [69-76]

 @Override
 public Set<FieldTypeCapabilities> supportedFields() {
-    // Union of all constituent formats' supported fields
-    // TODO:: Post the changes done in mappings, we will relook this
+    // NOTE: Currently returns only the primary (first) format's supported fields.
+    // TODO: Post the changes done in mappings, compute the union of all constituent formats' supported fields.
     if (dataFormats.isEmpty()) {
         return Set.of();
     }
     return dataFormats.get(0).supportedFields();
 }
Suggestion importance[1-10]: 2

__

Why: This is purely a comment/documentation change that doesn't affect functionality. While the comment is indeed misleading, the suggestion only updates the comment text without changing any code behavior, making it a very low-impact improvement.

Low

@github-actions
Copy link
Contributor

❌ Gradle check result for 392e1fd: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

Persistent review updated to latest commit b3f4e8a

@github-actions
Copy link
Contributor

❌ Gradle check result for b3f4e8a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

Persistent review updated to latest commit ae4560a

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 03e124c

@github-actions
Copy link
Contributor

❌ Gradle check result for 03e124c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 82ffb04

@github-actions
Copy link
Contributor

Persistent review updated to latest commit dc8d029

@github-actions
Copy link
Contributor

❌ Gradle check result for dc8d029: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 6687b5d

@github-actions
Copy link
Contributor

❌ Gradle check result for 6687b5d: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 238416a.

PathLineSeverityDescription
sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java208lowMultiple [COMPOSITE_DEBUG] log statements at debug level expose internal segment counts, writer generations, and file sets. If debug logging is accidentally enabled in production, this could disclose internal storage topology details. This appears to be intentional development instrumentation left in production code rather than malicious, but warrants cleanup before release.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 0 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 238416a

@alchemist51 alchemist51 added the Indexing Indexing, Bulk Indexing and anything related to indexing label Mar 19, 2026
@github-actions
Copy link
Contributor

Persistent review updated to latest commit 61cd8e4

@github-actions
Copy link
Contributor

❌ Gradle check result for 61cd8e4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 2e5d2fa

@github-actions
Copy link
Contributor

Persistent review updated to latest commit f685976

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 5214646

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 166e609

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 2823348

@github-actions
Copy link
Contributor

❌ Gradle check result for 2823348: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 1839651

@alchemist51 alchemist51 requested a review from Bukhtawar March 21, 2026 14:47
@github-actions
Copy link
Contributor

❌ Gradle check result for 1839651: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit d4d2e6f.

PathLineSeverityDescription
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java318mediumRemoves a deliberate thread-safety guard: the method `supportsConcurrentSegmentSearch()` previously returned `filterDuplicateText == false` with an explicit comment that `DuplicateByteSequenceSpotter` is stateful and not thread-safe. It now unconditionally returns `true`, enabling concurrent segment search even when `filterDuplicateText` is active. This could cause data races, incorrect aggregation results, or crashes. The corresponding CHANGELOG entry ('Disable concurrent search for filter duplicates in significant_text') was also silently removed, suggesting the revert was intentional but unexplained.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 0 | Medium: 1 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
@github-actions
Copy link
Contributor

Persistent review updated to latest commit ea0817e

@github-actions
Copy link
Contributor

❌ Gradle check result for ea0817e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Indexing Indexing, Bulk Indexing and anything related to indexing lucene skip-changelog

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants