Skip to content

Make BulkItemRequest immutable#20831

Draft
msfroh wants to merge 2 commits intoopensearch-project:mainfrom
msfroh:make_bulk_item_request_immutable
Draft

Make BulkItemRequest immutable#20831
msfroh wants to merge 2 commits intoopensearch-project:mainfrom
msfroh:make_bulk_item_request_immutable

Conversation

@msfroh
Copy link
Contributor

@msfroh msfroh commented Mar 10, 2026

Description

I was talking with @itschrispeck about some JIT optimization issues in BulkItemRequest's serialization. While looking at the code, the volatile keyword on the primaryResponse field made me cringe. Why is a BulkItemRequest mutable at all?

It turns out that we modify the existing BulkItemRequest instances on the primary shard. These modified requests are send to the replicas.

This change makes BulkItemRequest immutable. The primary execution context collects all of the primary responses, then produces a new BulkShardRequest that gets forwarded to replicas.

Related Issues

N/A

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 10, 2026

PR Reviewer Guide 🔍

(Review updated until commit 93814fc)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Make BulkItemRequest fields final and remove abort/setPrimaryResponse methods

Relevant files:

  • server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java
  • server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java

Sub-PR theme: Collect primary responses in context and produce new BulkShardRequest for replicas

Relevant files:

  • server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java
  • server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java
  • server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java

⚡ Recommended focus areas for review

Aborted Items Lost

The old code had an abort() method on BulkItemRequest that would set a failed primary response directly on the item, and findNextNonAborted would skip items that already had an aborted response. With the new immutable design, there is no longer a way to pre-abort items before execution. The testAbortedSkipped and testSkipBulkIndexRequestIfAborted tests were removed. It should be verified that the abort/pre-rejection path is still handled correctly elsewhere, or that this use case is intentionally dropped.

private int findNextNonAborted(int startIndex) {
    final int length = request.items().length;
    while (startIndex < length && isAborted(request.items()[startIndex].getPrimaryResponse())) {
        startIndex++;
    }
    return startIndex;
}

private static boolean isAborted(BulkItemResponse response) {
    return response != null && response.isFailed() && response.getFailure().isAborted();
}

/** move to the next item to execute */
private void advance() {
    assert currentItemState == ItemProcessingState.COMPLETED || currentIndex == -1
        : "moving to next but current item wasn't completed (state: " + currentItemState + ")";
    currentItemState = ItemProcessingState.INITIAL;
    currentIndex = findNextNonAborted(currentIndex + 1);
    retryCounter = 0;
    requestToExecute = null;
    executionResult = null;
    assert assertInvariants(ItemProcessingState.INITIAL);
}

/** gets the current, untranslated item request */
public DocWriteRequest<?> getCurrent() {
    return getCurrentItem().request();
}

public BulkShardRequest getBulkShardRequest() {
    BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length];
    for (int i = 0; i < newRequests.length; i++) {
        BulkItemRequest oldRequest = request.items()[i];
        newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]);
    }
    return new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests);
Null primaryResponse

In buildShardResponse, primaryResponses is passed directly to BulkShardResponse. If any item was skipped or not completed (e.g., due to an abort or early exit), the corresponding entry in primaryResponses will be null. This could cause NullPointerExceptions downstream when iterating over responses.

public BulkShardResponse buildShardResponse(long serviceTimeEWMAInNanos, int nodeQueueSize) {
    assert hasMoreOperationsToExecute() == false;
    return new BulkShardResponse(request.shardId(), primaryResponses, serviceTimeEWMAInNanos, nodeQueueSize);
}
getBulkShardRequest Overhead

getBulkShardRequest() creates a new BulkItemRequest[] array and a new BulkShardRequest every time it is called. In tests it is called once per test, but if called multiple times in production code paths, this could be wasteful. Consider caching the result or ensuring it is only called once after all operations complete.

public BulkShardRequest getBulkShardRequest() {
    BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length];
    for (int i = 0; i < newRequests.length; i++) {
        BulkItemRequest oldRequest = request.items()[i];
        newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]);
    }
    return new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests);
}

@github-actions
Copy link
Contributor

github-actions bot commented Mar 10, 2026

PR Code Suggestions ✨

Latest suggestions up to 93814fc

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Avoid mutating original request items array

When markAsCompleted updates request.items()[currentIndex] with a new
BulkItemRequest (for translated requests like updates), the getBulkShardRequest()
method later reads oldRequest.request() from request.items()[i]. This means the
translated requestToExecute is correctly captured in the items array. However,
primaryResponses[currentIndex] is stored separately and then combined in
getBulkShardRequest(). This is consistent, but the update to request.items() is
still mutating the original BulkShardRequest's items array, which undermines the
immutability goal. Consider storing translated requests in a separate array instead
of mutating request.items().

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java [342-345]

+// In BulkPrimaryExecutionContext, add a field:
+private final DocWriteRequest<?>[] translatedRequests;
+
+// In constructor:
+this.translatedRequests = new DocWriteRequest<?>[request.items().length];
+
+// In markAsCompleted:
 if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) {
-    request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
+    translatedRequests[currentIndex] = requestToExecute;
 }
 primaryResponses[currentIndex] = translatedResponse;
 
+// In getBulkShardRequest:
+BulkItemRequest oldRequest = request.items()[i];
+DocWriteRequest<?> effectiveRequest = translatedRequests[i] != null ? translatedRequests[i] : oldRequest.request();
+newRequests[i] = new BulkItemRequest(oldRequest.id(), effectiveRequest, primaryResponses[i]);
+
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that request.items()[currentIndex] is still being mutated in markAsCompleted, which partially undermines the immutability goal of the PR. However, the improved_code is a multi-part snippet spanning multiple methods and fields, making it harder to evaluate precisely. The concern is valid but the implementation is complex and the getBulkShardRequest() method already reads from request.items() to get the (potentially mutated) request, so the current approach is at least consistent.

Low
Guard against premature bulk request snapshot

The getBulkShardRequest() method creates a new BulkShardRequest on every call, which
is expensive and may produce inconsistent snapshots if called multiple times during
processing. Since this method is called both during processing (to update item
requests) and at the end to retrieve results, consider only building the new request
once after all operations are complete, or caching the result after processing is
done.

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java [133-139]

 public BulkShardRequest getBulkShardRequest() {
+    assert hasMoreOperationsToExecute() == false : "getBulkShardRequest() called before all operations are complete";
     BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length];
     for (int i = 0; i < newRequests.length; i++) {
         BulkItemRequest oldRequest = request.items()[i];
         newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]);
     }
     return new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests);
 }
Suggestion importance[1-10]: 4

__

Why: Adding an assertion to prevent getBulkShardRequest() from being called before all operations complete is a reasonable defensive check. However, looking at the test code, getBulkShardRequest() is called after assertFalse(context.hasMoreOperationsToExecute()), so this is more of a safety guard than fixing a real bug. The improvement is minor.

Low
Possible issue
Verify request identity preserved after immutability refactor

The loop iterates over completedRequest[0].items() but calls verify with
eq(updateRequest) where updateRequest is cast from item.request(). Since
getBulkShardRequest() now creates new BulkItemRequest objects, the updateRequest
reference from the completed request may differ from the original UpdateRequest used
in when(updateHelper.prepare(...)). Verify that the UpdateRequest identity is
preserved through the new BulkItemRequest construction to ensure Mockito's eq()
matcher works correctly.

server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java [1232-1243]

 for (BulkItemRequest item : completedRequest[0].items()) {
+    assertNotNull(item.getPrimaryResponse());
     assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class);
 
-    // this assertion is based on the assumption that all bulk item requests are updates and are hence calling
-    // UpdateRequest::prepareRequest
     UpdateRequest updateRequest = (UpdateRequest) item.request();
+    // Ensure the same UpdateRequest instance is used for verification
+    assertSame(updateRequest, items.stream()
+        .filter(orig -> orig.id() == item.id())
+        .findFirst().get().request());
     verify(updateHelper, times(updateRequest.retryOnConflict() + 1)).prepare(
         eq(updateRequest),
         any(IndexShard.class),
         any(LongSupplier.class)
     );
 }
Suggestion importance[1-10]: 5

__

Why: The concern about UpdateRequest identity through the new BulkItemRequest construction is valid - since getBulkShardRequest() creates new BulkItemRequest objects wrapping the same DocWriteRequest references, the eq() matcher should still work as object identity is preserved. However, the improved_code adds unnecessary complexity with a stream lookup. The suggestion raises a legitimate concern but the actual risk is low since the request references are passed through unchanged.

Low

Previous suggestions

Suggestions up to commit 1201d59
CategorySuggestion                                                                                                                                    Impact
General
Guard against incomplete state in request builder

This method is called to get the bulk shard request, but it creates a new
BulkShardRequest on every invocation, which could be expensive if called multiple
times. More critically, primaryResponses[i] may be null for items that haven't been
processed yet, which could cause issues downstream if callers expect all responses
to be populated. Consider documenting this behavior or asserting that all responses
are set before calling this method.

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java [132-139]

 public BulkShardRequest getBulkShardRequest() {
+    assert hasMoreOperationsToExecute() == false : "getBulkShardRequest called before all operations are completed";
     BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length];
     for (int i = 0; i < newRequests.length; i++) {
         BulkItemRequest oldRequest = request.items()[i];
         newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]);
     }
     return new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests);
 }
Suggestion importance[1-10]: 5

__

Why: Adding an assertion that hasMoreOperationsToExecute() == false before building the request is a valid defensive check, as primaryResponses[i] could be null for unprocessed items. This mirrors the pattern already used in buildShardResponse.

Low
Defensive copy prevents internal array mutation

The primaryResponses array is passed directly to BulkShardResponse, which may allow
external mutation of the internal state. A defensive copy should be used to prevent
unintended modifications to the array after the response is built.

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java [351-354]

 public BulkShardResponse buildShardResponse(long serviceTimeEWMAInNanos, int nodeQueueSize) {
     assert hasMoreOperationsToExecute() == false;
-    return new BulkShardResponse(request.shardId(), primaryResponses, serviceTimeEWMAInNanos, nodeQueueSize);
+    return new BulkShardResponse(request.shardId(), Arrays.copyOf(primaryResponses, primaryResponses.length), serviceTimeEWMAInNanos, nodeQueueSize);
 }
Suggestion importance[1-10]: 3

__

Why: While defensive copying is a good practice, primaryResponses is a private final field only accessible within this class, and BulkShardResponse likely treats it as immutable. The risk of external mutation is low in this context.

Low
Verify response array alignment with item array

When a translated request (e.g., update translated to index) replaces the original
item in request.items(), the primaryResponses array is indexed by currentIndex which
aligns with the original request.items() array. However, getBulkShardRequest()
iterates over request.items() and pairs each with primaryResponses[i], so the
replacement of request.items()[currentIndex] with a new BulkItemRequest (without a
response) and then separately storing the response in primaryResponses[currentIndex]
is correct. This is fine, but worth verifying that getBulkShardRequest() is only
called after all operations complete to avoid mismatched null responses.

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java [342-345]

+if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) {
+    request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
+}
+primaryResponses[currentIndex] = translatedResponse;
 
-
Suggestion importance[1-10]: 1

__

Why: The existing_code and improved_code are identical, making this suggestion a verification request rather than an actual code change. It offers no actionable improvement to the PR.

Low
Suggestions up to commit 0450fb9
CategorySuggestion                                                                                                                                    Impact
General
Guard against incomplete state in bulk request retrieval

getBulkShardRequest() creates a new BulkShardRequest on every call, which is
expensive if called multiple times. Additionally, primaryResponses[i] may be null
for items that haven't been processed yet, which could cause issues downstream if
callers expect all responses to be populated. Consider documenting this behavior or
asserting that all responses are set before calling this method.

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java [132-139]

 public BulkShardRequest getBulkShardRequest() {
+    assert hasMoreOperationsToExecute() == false : "getBulkShardRequest called before all items were processed";
     BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length];
     for (int i = 0; i < newRequests.length; i++) {
         BulkItemRequest oldRequest = request.items()[i];
         newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]);
     }
     return new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests);
 }
Suggestion importance[1-10]: 5

__

Why: Adding an assertion that hasMoreOperationsToExecute() == false before building the request is a valid defensive check, similar to how buildShardResponse already does this. It prevents misuse when primaryResponses may have null entries for unprocessed items.

Low
Defensive copy prevents internal array mutation

The primaryResponses array is passed directly to BulkShardResponse, which may allow
external mutation of the internal state. A defensive copy should be used to prevent
the caller from modifying the array after the response is built.

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java [351-354]

 public BulkShardResponse buildShardResponse(long serviceTimeEWMAInNanos, int nodeQueueSize) {
     assert hasMoreOperationsToExecute() == false;
-    return new BulkShardResponse(request.shardId(), primaryResponses, serviceTimeEWMAInNanos, nodeQueueSize);
+    return new BulkShardResponse(request.shardId(), Arrays.copyOf(primaryResponses, primaryResponses.length), serviceTimeEWMAInNanos, nodeQueueSize);
 }
Suggestion importance[1-10]: 3

__

Why: While defensive copying is a good practice, primaryResponses is a private final field and BulkShardResponse likely stores it internally. The risk of external mutation is low in this context, making this a minor improvement.

Low

@github-actions
Copy link
Contributor

❌ Gradle check result for 0450fb9: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

I was talking with @itschrispeck about some JIT optimization issues in
BulkItemRequest's serialization. While looking at the code, the
`volatile` keyword on the `primaryResponse` field made me cringe. Why
is a `BulkItemRequest` mutable at all?

It turns out that we modify the existing `BulkItemRequest` instances
on the primary shard. These modified requests are send to the replicas.

This change makes `BulkItemRequest` immutable. The primary execution
context collects all of the primary responses, then produces a new
`BulkShardRequest` that gets forwarded to replicas.

Signed-off-by: Michael Froh <msfroh@apache.org>
@msfroh msfroh force-pushed the make_bulk_item_request_immutable branch from 0450fb9 to 1201d59 Compare March 19, 2026 23:31
@github-actions
Copy link
Contributor

Persistent review updated to latest commit 1201d59

@github-actions
Copy link
Contributor

❌ Gradle check result for 1201d59: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

These tests relied on the assumption that the BulkShardRequest would be
mutated on the primary.

In particular, there were some ridiculous tests that were verifying
that the output was unchanged from the input, when the output was the
same object as the input, which had been changed in place.

Signed-off-by: Michael Froh <msfroh@apache.org>
@github-actions
Copy link
Contributor

Persistent review updated to latest commit 93814fc

*
* @opensearch.internal
*/
public class BulkItemRequest implements Writeable, Accountable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like a broken record (no pun intended) because I suggest this all the time, but can you make this a record?

@github-actions
Copy link
Contributor

❌ Gradle check result for 93814fc: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants