Skip to content

Commit 1201d59

Browse files
committed
Make BulkItemRequest immutable
I was talking with @itschrispeck about some JIT optimization issues in BulkItemRequest's serialization. While looking at the code, the `volatile` keyword on the `primaryResponse` field made me cringe. Why is a `BulkItemRequest` mutable at all? It turns out that we modify the existing `BulkItemRequest` instances on the primary shard. These modified requests are send to the replicas. This change makes `BulkItemRequest` immutable. The primary execution context collects all of the primary responses, then produces a new `BulkShardRequest` that gets forwarded to replicas. Signed-off-by: Michael Froh <msfroh@apache.org>
1 parent e5b3fa3 commit 1201d59

File tree

5 files changed

+49
-216
lines changed

5 files changed

+49
-216
lines changed

server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java

Lines changed: 10 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,12 @@
3636
import org.apache.lucene.util.RamUsageEstimator;
3737
import org.opensearch.action.DocWriteRequest;
3838
import org.opensearch.common.Nullable;
39-
import org.opensearch.core.common.Strings;
4039
import org.opensearch.core.common.io.stream.StreamInput;
4140
import org.opensearch.core.common.io.stream.StreamOutput;
4241
import org.opensearch.core.common.io.stream.Writeable;
4342
import org.opensearch.core.index.shard.ShardId;
44-
import org.opensearch.core.xcontent.MediaTypeRegistry;
4543

4644
import java.io.IOException;
47-
import java.util.Objects;
4845

4946
/**
5047
* Transport request for a Single bulk item
@@ -55,9 +52,9 @@ public class BulkItemRequest implements Writeable, Accountable {
5552

5653
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class);
5754

58-
private int id;
59-
private DocWriteRequest<?> request;
60-
private volatile BulkItemResponse primaryResponse;
55+
private final int id;
56+
private final DocWriteRequest<?> request;
57+
private final BulkItemResponse primaryResponse;
6158

6259
/**
6360
* @param shardId the shard id
@@ -71,13 +68,20 @@ public class BulkItemRequest implements Writeable, Accountable {
7168
} else {
7269
primaryResponse = new BulkItemResponse(shardId, in);
7370
}
71+
} else {
72+
primaryResponse = null;
7473
}
7574
}
7675

7776
// NOTE: public for testing only
7877
public BulkItemRequest(int id, DocWriteRequest<?> request) {
78+
this(id, request, null);
79+
}
80+
81+
BulkItemRequest(int id, DocWriteRequest<?> request, BulkItemResponse primaryResponse) {
7982
this.id = id;
8083
this.request = request;
84+
this.primaryResponse = primaryResponse;
8185
}
8286

8387
public int id() {
@@ -97,38 +101,6 @@ BulkItemResponse getPrimaryResponse() {
97101
return primaryResponse;
98102
}
99103

100-
void setPrimaryResponse(BulkItemResponse primaryResponse) {
101-
this.primaryResponse = primaryResponse;
102-
}
103-
104-
/**
105-
* Abort this request, and store a {@link org.opensearch.action.bulk.BulkItemResponse.Failure} response.
106-
*
107-
* @param index The concrete index that was resolved for this request
108-
* @param cause The cause of the rejection (may not be null)
109-
* @throws IllegalStateException If a response already exists for this request
110-
*/
111-
public void abort(String index, Exception cause) {
112-
if (primaryResponse == null) {
113-
final BulkItemResponse.Failure failure = new BulkItemResponse.Failure(index, request.id(), Objects.requireNonNull(cause), true);
114-
setPrimaryResponse(new BulkItemResponse(id, request.opType(), failure));
115-
} else {
116-
assert primaryResponse.isFailed() && primaryResponse.getFailure().isAborted() : "response ["
117-
+ Strings.toString(MediaTypeRegistry.JSON, primaryResponse)
118-
+ "]; cause ["
119-
+ cause
120-
+ "]";
121-
if (primaryResponse.isFailed() && primaryResponse.getFailure().isAborted()) {
122-
primaryResponse.getFailure().getCause().addSuppressed(cause);
123-
} else {
124-
throw new IllegalStateException(
125-
"aborting item that with response [" + primaryResponse + "] that was previously processed",
126-
cause
127-
);
128-
}
129-
}
130-
}
131-
132104
@Override
133105
public void writeTo(StreamOutput out) throws IOException {
134106
out.writeVInt(id);

server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,6 @@ public long getTerm() {
421421
/**
422422
* Whether this failure is the result of an <em>abort</em>.
423423
* If {@code true}, the request to which this failure relates should never be retried, regardless of the {@link #getCause() cause}.
424-
* @see BulkItemRequest#abort(String, Exception)
425424
*/
426425
public boolean isAborted() {
427426
return aborted;

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@
4545

4646
import java.util.Arrays;
4747

48-
import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_QUEUE_SIZE;
49-
import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_SERVICE_TIME_IN_NANOS;
50-
5148
/**
5249
* This is a utility class that holds the per request state needed to perform bulk operations on the primary.
5350
* More specifically, it maintains an index to the current executing bulk item, which allows execution
@@ -91,13 +88,15 @@ enum ItemProcessingState {
9188
private int currentIndex = -1;
9289

9390
private ItemProcessingState currentItemState;
94-
private DocWriteRequest requestToExecute;
91+
private DocWriteRequest<?> requestToExecute;
9592
private BulkItemResponse executionResult;
9693
private int retryCounter;
94+
private final BulkItemResponse[] primaryResponses;
9795

9896
BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) {
9997
this.request = request;
10098
this.primary = primary;
99+
this.primaryResponses = new BulkItemResponse[request.items().length];
101100
advance();
102101
}
103102

@@ -131,7 +130,12 @@ public DocWriteRequest<?> getCurrent() {
131130
}
132131

133132
public BulkShardRequest getBulkShardRequest() {
134-
return request;
133+
BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length];
134+
for (int i = 0; i < newRequests.length; i++) {
135+
BulkItemRequest oldRequest = request.items()[i];
136+
newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]);
137+
}
138+
return new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests);
135139
}
136140

137141
/** returns the result of the request that has been executed on the shard */
@@ -145,21 +149,11 @@ public int getRetryCounter() {
145149
return retryCounter;
146150
}
147151

148-
/** returns true if the current request has been executed on the primary */
149-
public boolean isOperationExecuted() {
150-
return currentItemState == ItemProcessingState.EXECUTED;
151-
}
152-
153152
/** returns true if the request needs to wait for a mapping update to arrive from the cluster-manager */
154153
public boolean requiresWaitingForMappingUpdate() {
155154
return currentItemState == ItemProcessingState.WAIT_FOR_MAPPING_UPDATE;
156155
}
157156

158-
/** returns true if the current request should be retried without waiting for an external event */
159-
public boolean requiresImmediateRetry() {
160-
return currentItemState == ItemProcessingState.IMMEDIATE_RETRY;
161-
}
162-
163157
/**
164158
* returns true if the current request has been completed and it's result translated to a user
165159
* facing response
@@ -206,17 +200,18 @@ public IndexShard getPrimary() {
206200
}
207201

208202
/**
209-
* sets the request that should actually be executed on the primary. This can be different then the request
203+
* sets the request that should actually be executed on the primary. This can be different from the request
210204
* received from the user (specifically, an update request is translated to an indexing or delete request).
211205
*/
212-
public void setRequestToExecute(DocWriteRequest writeRequest) {
206+
public void setRequestToExecute(DocWriteRequest<?> writeRequest) {
213207
assert assertInvariants(ItemProcessingState.INITIAL);
214208
requestToExecute = writeRequest;
215209
currentItemState = ItemProcessingState.TRANSLATED;
216210
assert assertInvariants(ItemProcessingState.TRANSLATED);
217211
}
218212

219213
/** returns the request that should be executed on the shard. */
214+
@SuppressWarnings("unchecked")
220215
public <T extends DocWriteRequest<T>> T getRequestToExecute() {
221216
assert assertInvariants(ItemProcessingState.TRANSLATED);
222217
return (T) requestToExecute;
@@ -252,7 +247,7 @@ public void markOperationAsNoOp(DocWriteResponse response) {
252247
public void failOnMappingUpdate(Exception cause) {
253248
assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE);
254249
currentItemState = ItemProcessingState.EXECUTED;
255-
final DocWriteRequest docWriteRequest = getCurrentItem().request();
250+
final DocWriteRequest<?> docWriteRequest = getCurrentItem().request();
256251
executionResult = new BulkItemResponse(
257252
getCurrentItem().id(),
258253
docWriteRequest.opType(),
@@ -267,7 +262,7 @@ public void failOnMappingUpdate(Exception cause) {
267262
public void markOperationAsExecuted(Engine.Result result) {
268263
assertInvariants(ItemProcessingState.TRANSLATED);
269264
final BulkItemRequest current = getCurrentItem();
270-
DocWriteRequest docWriteRequest = getRequestToExecute();
265+
DocWriteRequest<?> docWriteRequest = getRequestToExecute();
271266
switch (result.getResultType()) {
272267
case SUCCESS:
273268
final DocWriteResponse response;
@@ -347,24 +342,15 @@ public void markAsCompleted(BulkItemResponse translatedResponse) {
347342
if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) {
348343
request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
349344
}
350-
getCurrentItem().setPrimaryResponse(translatedResponse);
345+
primaryResponses[currentIndex] = translatedResponse;
351346
currentItemState = ItemProcessingState.COMPLETED;
352347
advance();
353348
}
354349

355350
/** builds the bulk shard response to return to the user */
356351
public BulkShardResponse buildShardResponse(long serviceTimeEWMAInNanos, int nodeQueueSize) {
357352
assert hasMoreOperationsToExecute() == false;
358-
return new BulkShardResponse(
359-
request.shardId(),
360-
Arrays.stream(request.items()).map(BulkItemRequest::getPrimaryResponse).toArray(BulkItemResponse[]::new),
361-
serviceTimeEWMAInNanos,
362-
nodeQueueSize
363-
);
364-
}
365-
366-
public BulkShardResponse buildShardResponse() {
367-
return buildShardResponse(DEFAULT_SERVICE_TIME_IN_NANOS, DEFAULT_QUEUE_SIZE);
353+
return new BulkShardResponse(request.shardId(), primaryResponses, serviceTimeEWMAInNanos, nodeQueueSize);
368354
}
369355

370356
private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
@@ -379,18 +365,14 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
379365
assert requestToExecute == null : requestToExecute;
380366
assert executionResult == null : executionResult;
381367
break;
382-
case TRANSLATED:
368+
case TRANSLATED, IMMEDIATE_RETRY:
383369
assert requestToExecute != null;
384370
assert executionResult == null : executionResult;
385371
break;
386372
case WAIT_FOR_MAPPING_UPDATE:
387373
assert requestToExecute == null;
388374
assert executionResult == null : executionResult;
389375
break;
390-
case IMMEDIATE_RETRY:
391-
assert requestToExecute != null;
392-
assert executionResult == null : executionResult;
393-
break;
394376
case EXECUTED:
395377
// requestToExecute can be null if the update ended up as NOOP
396378
assert executionResult != null;

server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -40,75 +40,18 @@
4040
import org.opensearch.action.index.IndexRequest;
4141
import org.opensearch.action.support.WriteRequest;
4242
import org.opensearch.action.update.UpdateRequest;
43-
import org.opensearch.core.index.AppendOnlyIndexOperationRetryException;
4443
import org.opensearch.core.index.shard.ShardId;
4544
import org.opensearch.index.engine.Engine;
4645
import org.opensearch.index.shard.IndexShard;
4746
import org.opensearch.index.translog.Translog;
4847
import org.opensearch.test.OpenSearchTestCase;
4948

50-
import java.util.ArrayList;
51-
5249
import static org.hamcrest.Matchers.equalTo;
5350
import static org.mockito.Mockito.mock;
5451
import static org.mockito.Mockito.when;
5552

5653
public class BulkPrimaryExecutionContextTests extends OpenSearchTestCase {
5754

58-
public void testAbortedSkipped() {
59-
BulkShardRequest shardRequest = generateRandomRequest();
60-
61-
ArrayList<DocWriteRequest<?>> nonAbortedRequests = new ArrayList<>();
62-
for (BulkItemRequest request : shardRequest.items()) {
63-
if (randomBoolean()) {
64-
request.abort("index", new OpenSearchException("bla"));
65-
} else {
66-
nonAbortedRequests.add(request.request());
67-
}
68-
}
69-
70-
ArrayList<DocWriteRequest<?>> visitedRequests = new ArrayList<>();
71-
for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, null); context
72-
.hasMoreOperationsToExecute();) {
73-
visitedRequests.add(context.getCurrent());
74-
context.setRequestToExecute(context.getCurrent());
75-
// using failures prevents caring about types
76-
context.markOperationAsExecuted(new Engine.IndexResult(new OpenSearchException("bla"), 1));
77-
context.markAsCompleted(context.getExecutionResult());
78-
}
79-
80-
assertThat(visitedRequests, equalTo(nonAbortedRequests));
81-
}
82-
83-
public void testAppendOnlyIndexOperationRetryException() {
84-
BulkShardRequest shardRequest = generateRandomRequest();
85-
86-
final IndexShard primary = mock(IndexShard.class);
87-
when(primary.shardId()).thenReturn(shardRequest.shardId());
88-
ArrayList<DocWriteRequest<?>> nonAbortedRequests = new ArrayList<>();
89-
for (BulkItemRequest request : shardRequest.items()) {
90-
if (randomBoolean()) {
91-
request.abort("index", new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices"));
92-
} else {
93-
nonAbortedRequests.add(request.request());
94-
}
95-
}
96-
97-
ArrayList<DocWriteRequest<?>> visitedRequests = new ArrayList<>();
98-
for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary); context
99-
.hasMoreOperationsToExecute();) {
100-
visitedRequests.add(context.getCurrent());
101-
context.setRequestToExecute(context.getCurrent());
102-
// using failures prevents caring about types
103-
context.markOperationAsExecuted(
104-
new Engine.IndexResult(new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices"), 1)
105-
);
106-
context.markAsCompleted(context.getExecutionResult());
107-
}
108-
109-
assertThat(visitedRequests, equalTo(nonAbortedRequests));
110-
}
111-
11255
private BulkShardRequest generateRandomRequest() {
11356
BulkItemRequest[] items = new BulkItemRequest[randomInt(20)];
11457
for (int i = 0; i < items.length; i++) {

0 commit comments

Comments
 (0)