Skip to content

Add ingest pipeline support for pull-based ingestion#20873

Open
imRishN wants to merge 13 commits intoopensearch-project:mainfrom
imRishN:pipeline-integration
Open

Add ingest pipeline support for pull-based ingestion#20873
imRishN wants to merge 13 commits intoopensearch-project:mainfrom
imRishN:pipeline-integration

Conversation

@imRishN
Copy link
Member

@imRishN imRishN commented Mar 15, 2026

Description

Adds final_pipeline execution support to the pull-based ingestion path. Documents are transformed by configured ingest pipelines before being written to Lucene.

  • Pipeline resolution from index settings with dynamic update
  • CompletableFuture sync bridge for async IngestService.executeBulkRequest()
  • Guardrails blocking _id and _routing mutations
  • Zero overhead when no pipeline is configured
  • Pipeline not invoked for delete operations

Related Issues

Resolves -

  1. [Feature Request] Support ingest pipeline execution in pull-based ingestion #20875
  2. Add IT for final_pipeline support with streaming messages in Pull Based Ingestion #20879
  3. Add IT for field_mapping + final_pipeline combined for Pull Based Ingestion #20880

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

imRishN added 4 commits March 11, 2026 09:58
…based ingestion

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
…tion

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 15, 2026

PR Reviewer Guide 🔍

(Review updated until commit 9a50135)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Add IngestPipelineExecutor core implementation and unit tests

Relevant files:

  • server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java
  • server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java

Sub-PR theme: Wire IngestPipelineExecutor into ingestion engine and processor pipeline

Relevant files:

  • server/src/main/java/org/opensearch/index/engine/IngestionEngine.java
  • server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java
  • server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java
  • server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java
  • server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java
  • server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java
  • server/src/test/java/org/opensearch/indices/pollingingest/IngestionEngineFactoryTests.java
  • server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java

Sub-PR theme: Add Kafka integration tests for ingest pipeline in pull-based ingestion

Relevant files:

  • plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java

⚡ Recommended focus areas for review

Race Condition

In executePipelines, the dropped flag is set via onDropped callback and then checked after future.get(). However, if onDropped is called but onCompletion is never called (or called with an exception), the future may complete exceptionally while dropped is already true. The current code checks dropped.get() only after a successful future.get(), so a dropped document that also triggers an exception in onCompletion would throw instead of returning null. More critically, if onDropped is called without onCompletion being called, the future will block until timeout. The contract between onDropped and onCompletion callbacks in IngestService should be verified to ensure onCompletion is always called after onDropped.

CompletableFuture<Void> future = new CompletableFuture<>();
AtomicBoolean dropped = new AtomicBoolean(false);

ingestService.executeBulkRequest(
    1,
    Collections.singletonList(indexRequest),
    (slot, e) -> future.completeExceptionally(e),
    (thread, e) -> {
        if (e != null) {
            future.completeExceptionally(e);
        } else {
            future.complete(null);
        }
    },
    slot -> dropped.set(true),
    ThreadPool.Names.WRITE
);

// Block until pipeline execution completes (with timeout)
try {
    future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Ingest pipeline execution was interrupted", e);
} catch (ExecutionException e) {
    throw new RuntimeException("Ingest pipeline execution failed", e.getCause());
}

if (dropped.get()) {
    return null;
}
Blocking Thread

executePipelines blocks a processor thread (via future.get(...)) while waiting for IngestService.executeBulkRequest to complete. If the ingest pipeline uses async processors (e.g., calling external services), this will block the polling/processor thread for up to 30 seconds per document. Under high throughput or slow pipelines, this can exhaust processor threads and stall ingestion entirely. The PR description acknowledges this is a sync bridge, but there is no backpressure or thread-pool isolation to prevent starvation.

// Block until pipeline execution completes (with timeout)
try {
    future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Ingest pipeline execution was interrupted", e);
} catch (ExecutionException e) {
    throw new RuntimeException("Ingest pipeline execution failed", e.getCause());
}
NoOp Seq Numbers

When a document is dropped by the pipeline, a Engine.NoOp is created with hardcoded seqNo=0 and primaryTerm=1. These values may conflict with actual sequence number tracking in the engine, potentially causing consistency issues or assertion failures. The correct approach would be to use UNASSIGNED_SEQ_NO or obtain the proper sequence number from the engine.

operation = new Engine.NoOp(
    0,
    1,
    Engine.Operation.Origin.PRIMARY,
    System.nanoTime(),
    "Document dropped by ingest pipeline"
);
return new MessageOperation(operation, opType);
Null Supplier Return

The assert assert ingestService != null on line 47 only fires when assertions are enabled (JVM -ea flag). In production without assertions enabled, a null return from ingestServiceSupplier.get() would propagate silently and cause a NullPointerException later inside IngestPipelineExecutor constructor (which does Objects.requireNonNull). The assert message is good but the guard should be a proper null check or the requireNonNull in IngestPipelineExecutor is sufficient — the assert is misleading as a safety net.

IngestService ingestService = ingestServiceSupplier.get();
assert ingestService != null : "IngestService supplier returned null. This indicates a initialization ordering issue.";

@github-actions
Copy link
Contributor

github-actions bot commented Mar 15, 2026

PR Code Suggestions ✨

Latest suggestions up to 9a50135

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Complete future immediately when document is dropped

The onItemFailure callback calls future.completeExceptionally(e) but does not also
call future.complete(null) or signal completion in any way. If onCompletion is never
invoked after onItemFailure in some code paths, the future.get() call will block
until the timeout. The onItemFailure callback should complete the future directly to
avoid unnecessary blocking.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [121-134]

 ingestService.executeBulkRequest(
     1,
     Collections.singletonList(indexRequest),
     (slot, e) -> future.completeExceptionally(e),
     (thread, e) -> {
         if (e != null) {
             future.completeExceptionally(e);
         } else {
             future.complete(null);
         }
     },
-    slot -> dropped.set(true),
+    slot -> {
+        dropped.set(true);
+        future.complete(null);
+    },
     ThreadPool.Names.WRITE
 );
Suggestion importance[1-10]: 7

__

Why: The slot -> dropped.set(true) callback doesn't complete the future, so if onCompletion is not called after onDropped in some code paths, the future will block until timeout. However, looking at the test in IngestPipelineExecutorTests, onDropped is always followed by onCompletion, suggesting the current contract expects onCompletion to always be called. The suggestion is valid as a defensive improvement but may not be strictly necessary given the existing contract.

Medium
Guard against null pipeline executor

When pipelineExecutor is null (e.g., in tests using the old two-argument
MessageProcessor constructor), calling pipelineExecutor.executePipelines(...) will
throw a NullPointerException. A null check should be added before invoking the
executor, or the field should be guaranteed non-null at construction time.

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java [242-259]

 // Execute ingest pipelines
-try {
-    Map<String, Object> transformedSource = pipelineExecutor.executePipelines(id, sourceMap);
-    if (transformedSource == null) {
-        // Document dropped by pipeline
-        operation = new Engine.NoOp(
-            0,
-            1,
-            Engine.Operation.Origin.PRIMARY,
-            System.nanoTime(),
-            "Document dropped by ingest pipeline"
-        );
-        return new MessageOperation(operation, opType);
+if (pipelineExecutor != null) {
+    try {
+        Map<String, Object> transformedSource = pipelineExecutor.executePipelines(id, sourceMap);
+        if (transformedSource == null) {
+            // Document dropped by pipeline
+            operation = new Engine.NoOp(
+                0,
+                1,
+                Engine.Operation.Origin.PRIMARY,
+                System.nanoTime(),
+                "Document dropped by ingest pipeline"
+            );
+            return new MessageOperation(operation, opType);
+        }
+        sourceMap = transformedSource;
+    } catch (Exception e) {
+        throw new RuntimeException("Ingest pipeline execution failed", e);
     }
-    sourceMap = transformedSource;
-} catch (Exception e) {
-    throw new RuntimeException("Ingest pipeline execution failed", e);
 }
Suggestion importance[1-10]: 3

__

Why: Looking at the PR code, pipelineExecutor is always passed through constructors and the old two-argument MessageProcessor constructor was removed. All construction paths now require a pipelineExecutor. The suggestion addresses a theoretical null case that doesn't exist in the current codebase, making it low impact.

Low
General
Fix test to not call completion after failure

The test calls onFailure followed by onCompletion with no exception, which means
future.completeExceptionally is called first and then future.complete(null) is
attempted. Since CompletableFuture ignores subsequent completions, the test may not
accurately reflect real failure behavior where onCompletion is not called after
onFailure. The test should only invoke onFailure without calling onCompletion to
properly test the failure path.

server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java [101-118]

 public void testExecutePipelines_Failure() {
     doAnswer(invocation -> {
         BiConsumer<Integer, Exception> onFailure = invocation.getArgument(2);
-        BiConsumer<Thread, Exception> onCompletion = invocation.getArgument(3);
         onFailure.accept(0, new RuntimeException("processor failed"));
-        onCompletion.accept(Thread.currentThread(), null);
         return null;
     }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString());
Suggestion importance[1-10]: 4

__

Why: The test calls onFailure then onCompletion with no exception, but since CompletableFuture ignores subsequent completions after completeExceptionally, the test still works correctly. However, the suggestion is valid in that it better reflects the real failure scenario and makes the test intent clearer.

Low
Add descriptive message to null check

The constructor now requires a non-null ingestService via Objects.requireNonNull,
but the old no-arg constructor that passed null was removed. Any existing callers
that relied on the two-argument constructor (without IngestService) will now fail
with a NullPointerException rather than a clear error. Ensure all call sites have
been updated, and consider adding a descriptive null-check message.

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java [68-75]

 public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) {
     super(engineConfig);
     this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
     this.pipelineExecutor = new IngestPipelineExecutor(
-        Objects.requireNonNull(ingestService),
+        Objects.requireNonNull(ingestService, "IngestService must not be null for IngestionEngine"),
         engineConfig.getIndexSettings().getIndex().getName(),
         engineConfig.getIndexSettings()
     );
Suggestion importance[1-10]: 2

__

Why: This suggestion only adds a descriptive message to Objects.requireNonNull, which is a minor style improvement with minimal functional impact. The existing code already enforces non-null via Objects.requireNonNull.

Low

Previous suggestions

Suggestions up to commit e0dc3f4
CategorySuggestion                                                                                                                                    Impact
Possible issue
Avoid race condition on volatile pipeline field

resolvedFinalPipeline is a volatile field that can be updated concurrently by
updateFinalPipeline. Between the hasPipelines() check at the top of executePipelines
and this line, the value could change from non-null to null (or vice versa). Capture
the value in a local variable at the start of the method to ensure consistency
throughout the execution.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [120]

-indexRequest.setFinalPipeline(resolvedFinalPipeline != null ? resolvedFinalPipeline : IngestService.NOOP_PIPELINE_NAME);
+final String currentFinalPipeline = resolvedFinalPipeline;
+if (currentFinalPipeline == null) {
+    return sourceMap;
+}
+// ... build IndexRequest ...
+indexRequest.setFinalPipeline(currentFinalPipeline);
Suggestion importance[1-10]: 7

__

Why: This is a valid concurrency concern: resolvedFinalPipeline is volatile and can be updated between the hasPipelines() check and the setFinalPipeline call. Capturing it in a local variable at the start of executePipelines would eliminate this TOCTOU race condition.

Medium
Ensure per-item pipeline failures propagate via future

The failureRef is set in the per-item failure callback, but future.get() is only
checked for ExecutionException (from completeExceptionally). If the per-item failure
callback fires but the completion callback fires without an exception, future.get()
will succeed and the failure check happens after. However, if
future.completeExceptionally is never called (only failureRef is set), the future
completes normally and the failure is caught by the post-get() check. This is
fragile: if the completion callback is never called (e.g., due to a bug or timeout
in the ingest service), the future will block until timeout. More critically, if
both failureRef is set AND future.completeExceptionally is called, the
ExecutionException wraps the completion exception, not the per-item failure.
Consider completing the future exceptionally when a per-item failure is detected to
ensure consistent error propagation.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [130-136]

-ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> failureRef.set(e), (thread, e) -> {
+ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> {
+    failureRef.set(e);
+    future.completeExceptionally(e);
+}, (thread, e) -> {
     if (e != null) {
         future.completeExceptionally(e);
     } else {
         future.complete(null);
     }
 }, slot -> dropped.set(true), ThreadPool.Names.WRITE);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a potential inconsistency: if only failureRef is set (per-item failure) but future is completed normally, the error handling relies on a post-get() check. Completing the future exceptionally on per-item failure would make error propagation more consistent and robust, though the current code does handle it via failureRef.get() after future.get().

Low
Use correct sequence number for dropped document NoOp

Using hardcoded seqNo=0 and primaryTerm=1 for the Engine.NoOp operation may conflict
with actual sequence number tracking in the engine, potentially causing consistency
issues. The sequence number and primary term should be obtained from the engine's
current state (e.g., UNASSIGNED_SEQ_NO and the actual primary term) to avoid
corrupting sequence number accounting.

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java [247-254]

 operation = new Engine.NoOp(
-    0,
-    1,
+    UNASSIGNED_SEQ_NO,
+    engine.config().getPrimaryTermSupplier().getAsLong(),
     Engine.Operation.Origin.PRIMARY,
     System.nanoTime(),
     "Document dropped by ingest pipeline"
 );
 return new MessageOperation(operation, opType);
Suggestion importance[1-10]: 6

__

Why: Using hardcoded seqNo=0 and primaryTerm=1 for Engine.NoOp could conflict with the engine's sequence number tracking. However, the improved_code references engine which is a field of MessageProcessor, not directly accessible in getOperation, so the suggested fix may not compile as-is without additional changes.

Low
General
Replace fixed sleep with reliable async assertion

Using Thread.sleep(5000) is a fragile approach for waiting in integration tests, as
it may be too short on slow CI environments or unnecessarily slow on fast ones. The
same pattern is repeated in testPipelineMutatingIdIsBlocked (with 10 seconds) and
testFieldMappingWithDropPipeline. Consider using waitForState with a condition that
checks the document count, or at minimum use the existing waitForState helper with a
timeout to make the test more reliable and faster.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [85-97]

 public void testFinalPipelineDropsDocument() throws Exception {
     createPipeline("drop_pipeline", "{\"processors\": [{\"drop\": {}}]}");
 
     produceData("1", "alice", "25");
     produceData("2", "bob", "30");
 
     createIndexWithPipeline("drop_pipeline", 1, 0);
 
-    Thread.sleep(5000);
-    refresh(indexName);
-    SearchResponse response = client().prepareSearch(indexName).get();
-    assertThat(response.getHits().getTotalHits().value(), is(0L));
+    // Wait briefly then assert no documents were indexed
+    assertBusy(() -> {
+        refresh(indexName);
+        SearchResponse response = client().prepareSearch(indexName).get();
+        assertThat(response.getHits().getTotalHits().value(), is(0L));
+    }, 15, TimeUnit.SECONDS);
 }
Suggestion importance[1-10]: 5

__

Why: Using Thread.sleep in integration tests is fragile and can cause flaky tests on slow CI environments. The suggestion to use assertBusy is a valid improvement for test reliability, though verifying "no documents" with a busy-wait is inherently tricky since the condition may be true before messages are even processed.

Low
Suggestions up to commit 04af53e
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix race condition in pipeline failure handling

The failureRef is set in the per-item failure callback, but future.get() will
succeed (not throw) even when a per-item failure occurs. After future.get() returns
normally, failureRef.get() is checked, but if the completion callback fires before
the failure callback sets failureRef, there is a race condition. Additionally, when
failureRef is set, the future should be completed to avoid blocking until timeout.
Consider completing the future in the failure callback as well, or using the failure
to complete the future exceptionally.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [130-136]

-ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> failureRef.set(e), (thread, e) -> {
+ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> {
+    if (e != null) {
+        failureRef.set(e);
+    }
+}, (thread, e) -> {
     if (e != null) {
         future.completeExceptionally(e);
+    } else if (failureRef.get() != null) {
+        future.completeExceptionally(failureRef.get());
     } else {
         future.complete(null);
     }
 }, slot -> dropped.set(true), ThreadPool.Names.WRITE);
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a real race condition: the per-item failure callback sets failureRef but doesn't complete the future, so if the completion callback fires before failureRef is set, the future completes normally and the failure check after future.get() could miss it. The improved code properly integrates failure handling into the completion callback.

Medium
Prevent TOCTOU race on volatile pipeline field

The resolvedFinalPipeline field is volatile and is read twice: once in
hasPipelines() and once here. Between these two reads, a concurrent call to
updateFinalPipeline could set it to null, causing a NullPointerException or
incorrect behavior. Capture the value in a local variable at the start of
executePipelines to ensure consistency throughout the method.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [120]

-indexRequest.setFinalPipeline(resolvedFinalPipeline != null ? resolvedFinalPipeline : IngestService.NOOP_PIPELINE_NAME);
+final String currentFinalPipeline = resolvedFinalPipeline;
+if (currentFinalPipeline == null) {
+    return sourceMap;
+}
+// ... use currentFinalPipeline instead of resolvedFinalPipeline below
+indexRequest.setFinalPipeline(currentFinalPipeline);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a TOCTOU race on the volatile resolvedFinalPipeline field — it's read in hasPipelines() and again later in executePipelines. Capturing it in a local variable at the start of the method is a valid and standard fix for this pattern.

Low
General
Replace fixed sleeps with bounded polling assertions

Using Thread.sleep with fixed durations (5000ms, 10000ms) to wait for negative
conditions (asserting zero documents) is fragile and can cause flaky tests on slow
CI environments or false positives on fast ones. Consider using a waitForState with
a timeout that polls and verifies the condition, or at minimum use a consistent
helper that retries the assertion.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [93-97]

-public void testFinalPipelineDropsDocument() throws Exception {
-    ...
-    Thread.sleep(5000);
+// Instead of Thread.sleep(5000), use a bounded wait with a stable check:
+// For "drop" scenarios, wait for the poller to have processed messages, then assert 0 docs.
+// Example approach:
+assertBusy(() -> {
     refresh(indexName);
     SearchResponse response = client().prepareSearch(indexName).get();
     assertThat(response.getHits().getTotalHits().value(), is(0L));
-}
-...
-public void testPipelineMutatingIdIsBlocked() throws Exception {
-    ...
-    Thread.sleep(10000);
-    refresh(indexName);
-    SearchResponse response = client().prepareSearch(indexName).get();
-    assertThat(response.getHits().getTotalHits().value(), is(0L));
-}
+}, 30, TimeUnit.SECONDS);
Suggestion importance[1-10]: 5

__

Why: Using Thread.sleep with hardcoded durations for negative condition checks is a known source of flaky tests. The suggestion to use assertBusy with a timeout is a valid improvement for test reliability, though the existing_code snippet doesn't exactly match the PR diff format (it spans two separate methods).

Low
Ensure non-null IngestService at construction

The constructor now requires a non-null IngestService via Objects.requireNonNull,
but the test file passes mock(IngestService.class) which is fine. However, the old
no-arg constructor that passed null has been removed, and callers that previously
relied on null being acceptable will now throw NullPointerException at construction
time rather than gracefully degrading. Ensure all call sites (including production
code paths where IngestService may not yet be initialized) always provide a valid
non-null IngestService.

server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngine.java [68-75]

 public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) {
     super(engineConfig);
     this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
+    Objects.requireNonNull(ingestService, "IngestService must not be null");
     this.pipelineExecutor = new IngestPipelineExecutor(
-        Objects.requireNonNull(ingestService),
+        ingestService,
         engineConfig.getIndexSettings().getIndex().getName(),
         engineConfig.getIndexSettings()
     );
Suggestion importance[1-10]: 2

__

Why: The improved_code is functionally equivalent to the existing code since IngestPipelineExecutor's constructor already calls Objects.requireNonNull(ingestService). Adding a redundant null check in IngestionEngine provides no additional safety and the suggestion is essentially a no-op improvement.

Low
Suggestions up to commit 28c9752
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent race condition on volatile pipeline field reads

The field resolvedFinalPipeline is volatile and is read twice: once in
hasPipelines() and once here. Between these two reads, a concurrent call to
updateFinalPipeline could set it to null, causing a NullPointerException or
incorrect behavior. Capture the value in a local variable at the start of
executePipelines to ensure consistency throughout the method.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [116]

-indexRequest.setFinalPipeline(resolvedFinalPipeline != null ? resolvedFinalPipeline : IngestService.NOOP_PIPELINE_NAME);
+final String currentFinalPipeline = resolvedFinalPipeline;
+if (currentFinalPipeline == null) {
+    return sourceMap;
+}
+// ... use currentFinalPipeline instead of resolvedFinalPipeline below
+indexRequest.setFinalPipeline(currentFinalPipeline);
Suggestion importance[1-10]: 7

__

Why: The resolvedFinalPipeline volatile field is read multiple times in executePipelines, and a concurrent updateFinalPipeline call between reads could cause inconsistency or NPE. Capturing the value in a local variable at the start of the method is a correct and important fix for thread safety.

Medium
Fix potential race condition in pipeline failure handling

The failureRef is set in the per-item failure callback, but future.get() is called
after without checking if the future completed exceptionally due to the completion
callback. If the completion callback fires with null error but failureRef is set,
the code correctly throws after future.get(). However, if the completion callback
fires before failureRef is set (race condition in async execution), the failure may
be missed. Consider completing the future exceptionally directly in the per-item
failure callback to ensure the failure is always propagated.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [126-132]

-ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> failureRef.set(e), (thread, e) -> {
+ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> {
+    if (e != null) {
+        failureRef.set(e);
+        future.completeExceptionally(e);
+    }
+}, (thread, e) -> {
     if (e != null) {
         future.completeExceptionally(e);
     } else {
         future.complete(null);
     }
 }, slot -> dropped.set(true), ThreadPool.Names.WRITE);
Suggestion importance[1-10]: 6

__

Why: The suggestion identifies a valid potential race condition where failureRef could be set after future.get() returns if the completion callback fires before the per-item failure callback. Completing the future exceptionally in the per-item failure callback ensures the failure is always propagated. However, in practice, IngestService.executeBulkRequest likely calls the per-item failure before the completion callback, making this a low-probability issue.

Low
General
Replace fixed sleep with reliable assertion in tests

Using Thread.sleep(5000) is a fragile approach for verifying that no documents are
indexed, as it relies on a fixed wait time that may be insufficient in slow
environments or wasteful in fast ones. Consider using a waitForState with a timeout
that asserts the count remains zero, or at minimum use a longer, configurable sleep.
This pattern appears in both testFinalPipelineDropsDocument and
testFieldMappingWithDropPipeline.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [93-96]

-Thread.sleep(5000);
-refresh(indexName);
-SearchResponse response = client().prepareSearch(indexName).get();
-assertThat(response.getHits().getTotalHits().value(), is(0L));
+// Wait a reasonable time and verify no documents are indexed
+assertBusy(() -> {
+    refresh(indexName);
+    SearchResponse response = client().prepareSearch(indexName).get();
+    assertThat(response.getHits().getTotalHits().value(), is(0L));
+}, 30, java.util.concurrent.TimeUnit.SECONDS);
Suggestion importance[1-10]: 5

__

Why: Using Thread.sleep(5000) is fragile in integration tests and can cause flakiness. Using assertBusy with a timeout is a more robust approach, though the test is specifically verifying a negative condition (no documents indexed), which makes assertBusy slightly less natural but still better than a fixed sleep.

Low
Add null check for indexSettings parameter

The indexSettings parameter is not null-checked before use, which will throw a
NullPointerException with no helpful message if null is passed. Add a
Objects.requireNonNull check for consistency with the other parameters and to
provide a clear error message.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [56-61]

 public IngestPipelineExecutor(IngestService ingestService, String index, IndexSettings indexSettings) {
     this.ingestService = Objects.requireNonNull(ingestService);
     this.index = Objects.requireNonNull(index);
+    Objects.requireNonNull(indexSettings, "indexSettings must not be null");
     indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline);
     updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings()));
 }
Suggestion importance[1-10]: 3

__

Why: While adding a null check for indexSettings improves consistency and provides a clearer error message, this is a minor defensive programming improvement. The NPE would still be thrown on the next line without the check, just with a less descriptive message.

Low
Suggestions up to commit b771d73
CategorySuggestion                                                                                                                                    Impact
Possible issue
Volatile field read is not thread-safe across method

Since executePipelines is only called when hasPipelines() returns true (i.e.,
resolvedFinalPipeline != null), the null check here is redundant. However,
resolvedFinalPipeline is volatile and could theoretically be set to null between the
hasPipelines() check and this line due to a concurrent dynamic settings update.
Capture the value in a local variable at the start of the method to ensure
consistency throughout the execution.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [116]

-indexRequest.setFinalPipeline(resolvedFinalPipeline != null ? resolvedFinalPipeline : IngestService.NOOP_PIPELINE_NAME);
+public Map<String, Object> executePipelines(String id, Map<String, Object> sourceMap) throws Exception {
+    final String finalPipeline = resolvedFinalPipeline;
+    if (finalPipeline == null) {
+        return sourceMap;
+    }
 
+    IndexRequest indexRequest = new IndexRequest(index);
+    indexRequest.id(id);
+    indexRequest.source(sourceMap);
+
+    indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
+    indexRequest.setFinalPipeline(finalPipeline);
+    indexRequest.isPipelineResolved(true);
+
Suggestion importance[1-10]: 7

__

Why: This is a valid thread-safety concern — resolvedFinalPipeline is volatile and could change between the hasPipelines() check and its use later in the method. Capturing it in a local variable at the start of executePipelines is a correct and important fix to ensure consistent behavior during concurrent dynamic settings updates.

Medium
Unify failure handling into the future completion path

The failureRef is set in the per-item failure callback, but future.complete(null) is
called in the completion callback regardless of whether failureRef was set. This
means a per-item failure will not cause future.completeExceptionally, so the
future.get() call will succeed and the failure check happens after — but if the
completion callback also fires an exception, both paths compete. More critically, if
failureRef is set but future completes normally, the exception is only caught after
the blocking get(), which is correct but fragile. Consider completing the future
exceptionally when failureRef is set to make the failure path consistent and avoid
the dual-check pattern.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [126-132]

 ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> failureRef.set(e), (thread, e) -> {
     if (e != null) {
         future.completeExceptionally(e);
+    } else if (failureRef.get() != null) {
+        future.completeExceptionally(failureRef.get());
     } else {
         future.complete(null);
     }
 }, slot -> dropped.set(true), ThreadPool.Names.WRITE);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a potential race condition where failureRef is set but future completes normally, requiring a post-get() check. Completing the future exceptionally when failureRef is set would make the failure path more consistent and eliminate the dual-check pattern. This is a valid improvement to error handling robustness.

Low
General
Pipeline error stage is unused in error handling

The pipeline exception is wrapped in a new RuntimeException and re-thrown, but the
outer process() method catches Exception and wraps it again in another
RuntimeException. This double-wrapping makes it harder to identify the root cause.
The pipeline failure should propagate with the original cause preserved, or the
error strategy should be applied at this level using the PIPELINE error stage that
was added to IngestionErrorStrategy.ErrorStage.

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java [259-261]

 } catch (Exception e) {
-    throw new RuntimeException("Ingest pipeline execution failed", e);
+    throw new RuntimeException("Ingest pipeline execution failed: " + e.getMessage(), e);
 }
Suggestion importance[1-10]: 3

__

Why: The improved_code only changes the exception message string, which doesn't address the actual concern raised about double-wrapping or using the PIPELINE error stage. The suggestion content describes a more meaningful change than what the improved_code demonstrates, making it inconsistent.

Low
Replace fixed sleep with reliable wait mechanism

Using Thread.sleep(5000) as a fixed wait is fragile — it may be too short on slow CI
environments or unnecessarily slow otherwise. The same test class uses
waitForState() for polling-based assertions. Consider using a waitForState() with a
timeout that asserts the count remains 0, or at minimum use a longer sleep with a
comment explaining the rationale.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [85-97]

 public void testFinalPipelineDropsDocument() throws Exception {
-    ...
-    Thread.sleep(5000);
+    createPipeline("drop_pipeline", "{\"processors\": [{\"drop\": {}}]}");
+
+    produceData("1", "alice", "25");
+    produceData("2", "bob", "30");
+
+    createIndexWithPipeline("drop_pipeline", 1, 0);
+
+    // Wait briefly to allow any potential indexing to occur, then assert no docs indexed
+    Thread.sleep(8000);
     refresh(indexName);
     SearchResponse response = client().prepareSearch(indexName).get();
     assertThat(response.getHits().getTotalHits().value(), is(0L));
 }
Suggestion importance[1-10]: 3

__

Why: The improved_code still uses Thread.sleep (just with a longer duration of 8000ms instead of 5000ms), which doesn't meaningfully address the fragility concern. The suggestion content recommends using waitForState() but the improved code doesn't implement that approach.

Low
Suggestions up to commit be0f220

@github-actions
Copy link
Contributor

❌ Gradle check result for 6771f6e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

imRishN added 2 commits March 15, 2026 20:49
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Contributor

Persistent review updated to latest commit 9d488ed

@imRishN imRishN requested review from a team and sohami as code owners March 15, 2026 16:00
@github-actions
Copy link
Contributor

✅ Gradle check result for 9d488ed: SUCCESS

@codecov
Copy link

codecov bot commented Mar 15, 2026

Codecov Report

❌ Patch coverage is 89.77273% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.22%. Comparing base (8f8f7b5) to head (9d488ed).
⚠️ Report is 20 commits behind head on main.

Files with missing lines Patch % Lines
.../indices/pollingingest/IngestPipelineExecutor.java 86.66% 6 Missing and 2 partials ⚠️
.../indices/pollingingest/IngestionEngineFactory.java 66.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #20873      +/-   ##
============================================
- Coverage     73.31%   73.22%   -0.09%     
+ Complexity    72248    72165      -83     
============================================
  Files          5795     5796       +1     
  Lines        330044   330134      +90     
  Branches      47641    47648       +7     
============================================
- Hits         241975   241748     -227     
- Misses        68609    68984     +375     
+ Partials      19460    19402      -58     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

}

/**
* Resolves pipeline names from index settings. Called lazily on first document and cached.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we resolving the pipelines lazily on first document? It seems like we only resolve when the MessageProcessor is initialized? I feel what we do now (resolve on initialization) is better.

Copy link
Member Author

@imRishN imRishN Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah fixing the comment


// Block until pipeline execution completes (with timeout)
try {
future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it would be better to add synchronous execution support in IngestService, something like

executePipelineSync(..) {
  CountDownLatch latch = new CountDownLatch(1);
  // execute the pipeline (ex: innerExecute(..))
  latch.await()
}

If this is possible, we could possibly execute the pipelines on the same thread avoiding the thread handoff. For async pipelines, it would still continue to wait for the result to be available.

What do you think? Have we already explored this path and run into any other challenges?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, executing on same thread would avoid the handoff overhead. But there are a few things I considered -

  1. IngestService is a core class used by all push based indexing. Adding a sync execution might modify a stable interface and would need deeper review which could be beyond the scope of this PR. Additionally, runBulkRequestInBatch() handles batching, metrics tracking, pipeline chaining, index change detection, and slot management. While we can expose a sync path through all of that but looks non trivial. Wdyt?
  2. For async processors, we'd still need a latch/future to block for those cases. The internal Pipeline.execute() -> IngestDocument.executePipeline() chain is fundamentally callback-based
  3. And there seem to be no practical impact for most processors(low weight simpler ones) as execution time dominates the context switch cost

Even with above nuances, this could be a valid optimization and can be taken up as a follow up when we benchmark our changes. Can create a tracking issue for this. Let me know your thoughts

updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings()));

// Register dynamic settings listener for final_pipeline updates
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we register the listener in the constructor before resolvePipelineNames can be called so we don't miss any update?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point. This also helped to remove some cluttered code.

String indexName = engine.config().getIndexSettings().getIndex().getName();
this.engine = engine;
this.index = indexName;
this.pipelineExecutor = new IngestPipelineExecutor(ingestService, indexName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to create a single instance of IngestPipelineExecutor at the IngestionEngineFactory layer and pass it all the way through? Thinking if it can help us avoid duplicate settings update consumer registration across writer threads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Refactored to create a single IngestPipelineExecutor instance in IngestionEngine and pass it through the chain instead of IngestService. Now this has also eliminated duplicate listener registerations.

Also, as we discussed is prior PRs, this has in a way cleaned up the wiring and intermediate layers no longer know about IngestService at all. The wiring looks like this now:

IngestionEngine (creates single IngestPipelineExecutor) → DefaultStreamPoller (passes executor through) → PartitionedBlockingQueueContainer (passes executor through) → N × MessageProcessorRunnable (all share the same executor)

IngestService is now only referenced in IngestionEngine and IngestPipelineExecutor itself. The rest of the pull-based path is decoupled from it.

this.engine = engine;
this.index = indexName;
this.pipelineExecutor = new IngestPipelineExecutor(ingestService, indexName);
this.pipelineExecutor.resolvePipelineNames(engine.config().getIndexSettings());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to call resolvePipelineNames inside the IngestPipelineExecutor constructor instead of exposing it outside?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Resolution is now fully encapsulated in the constructor

* synchronously by bridging IngestService's async callback API with CompletableFuture.
* Only {@code final_pipeline} is supported.
*/
public class IngestPipelineExecutor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can highlight in the javadocs that ingest pipeline/processors on pull-based ingestion flow does not require INGEST roles and executes the transformations on the current node (request is not forwarded to ingest nodes).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah missed adding that. Added now

POLLING,
PROCESSING
PROCESSING,
PIPELINE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using the new error stage anywhere? PIPELINE will be a part of the PROCESSING stage, maybe we can avoid the new error stage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this for fine grained tracking, but removing for now in context of this PR. Will review if this is needed later

imRishN added 2 commits March 20, 2026 12:59
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
…estion

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Contributor

Persistent review updated to latest commit be0f220

@github-actions
Copy link
Contributor

❌ Gradle check result for be0f220: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Contributor

Persistent review updated to latest commit b771d73

@github-actions
Copy link
Contributor

❌ Gradle check result for b771d73: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

…ionEngine

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Contributor

Persistent review updated to latest commit 28c9752

@github-actions
Copy link
Contributor

❌ Gradle check result for 28c9752: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Contributor

Persistent review updated to latest commit 04af53e

@github-actions
Copy link
Contributor

Persistent review updated to latest commit e0dc3f4

@github-actions
Copy link
Contributor

❌ Gradle check result for e0dc3f4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Contributor

Persistent review updated to latest commit 9a50135

@github-actions
Copy link
Contributor

❌ Gradle check result for 9a50135: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants