Skip to content

Commit 378c22a

Browse files
authored
Better handling of async processor failures (#104289) (#104768)
1 parent 0b6cd20 commit 378c22a

File tree

4 files changed

+157
-52
lines changed

4 files changed

+157
-52
lines changed

docs/changelog/104289.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 104289
2+
summary: Better handling of async processor failures
3+
area: Ingest Node
4+
type: bug
5+
issues:
6+
- 101921

server/src/internalClusterTest/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77
*/
88
package org.elasticsearch.action.ingest;
99

10+
import org.elasticsearch.action.DocWriteResponse;
11+
import org.elasticsearch.action.bulk.BulkItemResponse;
1012
import org.elasticsearch.action.bulk.BulkRequest;
1113
import org.elasticsearch.action.bulk.BulkResponse;
1214
import org.elasticsearch.action.get.GetRequest;
1315
import org.elasticsearch.action.get.GetResponse;
1416
import org.elasticsearch.action.index.IndexRequest;
17+
import org.elasticsearch.action.update.UpdateResponse;
1518
import org.elasticsearch.client.Client;
1619
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1720
import org.elasticsearch.cluster.service.ClusterService;
@@ -40,18 +43,26 @@
4043
import java.util.function.BiConsumer;
4144
import java.util.function.Supplier;
4245

46+
import static org.hamcrest.Matchers.containsString;
4347
import static org.hamcrest.Matchers.equalTo;
4448

4549
/**
4650
* The purpose of this test is to verify that when a processor executes an operation asynchronously that
4751
* the expected result is the same as if the same operation happens synchronously.
48-
*
49-
* In this test two test processor are defined that basically do the same operation, but a single processor
52+
* <p>
53+
* In this test two test processors are defined that basically do the same operation, but a single processor
5054
* executes asynchronously. The result of the operation should be the same and also the order in which the
5155
* bulk responses are returned should be the same as how the corresponding index requests were defined.
56+
* <p>
57+
* As a further test, one document is dropped by the synchronous processor, and one document causes
58+
* the asynchronous processor throw an exception.
5259
*/
5360
public class AsyncIngestProcessorIT extends ESSingleNodeTestCase {
5461

62+
private static final int DROPPED = 3;
63+
64+
private static final int ERROR = 7;
65+
5566
@Override
5667
protected Collection<Class<? extends Plugin>> getPlugins() {
5768
return Collections.singleton(TestPlugin.class);
@@ -72,11 +83,21 @@ public void testAsyncProcessorImplementation() {
7283
for (int i = 0; i < numDocs; i++) {
7384
String id = Integer.toString(i);
7485
assertThat(bulkResponse.getItems()[i].getId(), equalTo(id));
75-
GetResponse getResponse = client().get(new GetRequest("foobar", id)).actionGet();
76-
// The expected result of async test processor:
77-
assertThat(getResponse.getSource().get("foo"), equalTo("bar-" + id));
78-
// The expected result of sync test processor:
79-
assertThat(getResponse.getSource().get("bar"), equalTo("baz-" + id));
86+
if (i == DROPPED) {
87+
UpdateResponse dropped = bulkResponse.getItems()[i].getResponse();
88+
assertThat(dropped.getId(), equalTo(id));
89+
assertThat(dropped.getResult(), equalTo(DocWriteResponse.Result.NOOP));
90+
} else if (i == ERROR) {
91+
BulkItemResponse failure = bulkResponse.getItems()[i];
92+
assertThat(failure.getFailure().getId(), equalTo(id));
93+
assertThat(failure.getFailure().getMessage(), containsString("lucky number seven"));
94+
} else {
95+
GetResponse getResponse = client().get(new GetRequest("foobar", id)).actionGet();
96+
// The expected result of async test processor:
97+
assertThat(getResponse.getSource().get("foo"), equalTo("bar-" + id));
98+
// The expected result of sync test processor:
99+
assertThat(getResponse.getSource().get("bar"), equalTo("baz-" + id));
100+
}
80101
}
81102
}
82103

@@ -112,15 +133,20 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
112133
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
113134
threadPool.generic().execute(() -> {
114135
String id = (String) ingestDocument.getSourceAndMetadata().get("_id");
115-
if (usually()) {
116-
try {
117-
Thread.sleep(10);
118-
} catch (InterruptedException e) {
119-
// ignore
136+
if (id.equals(String.valueOf(ERROR))) {
137+
// lucky number seven always fails
138+
handler.accept(ingestDocument, new RuntimeException("lucky number seven"));
139+
} else {
140+
if (usually()) {
141+
try {
142+
Thread.sleep(10);
143+
} catch (InterruptedException e) {
144+
// ignore
145+
}
120146
}
147+
ingestDocument.setFieldValue("foo", "bar-" + id);
148+
handler.accept(ingestDocument, null);
121149
}
122-
ingestDocument.setFieldValue("foo", "bar-" + id);
123-
handler.accept(ingestDocument, null);
124150
});
125151
}
126152

@@ -140,8 +166,13 @@ public String getType() {
140166
@Override
141167
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
142168
String id = (String) ingestDocument.getSourceAndMetadata().get("_id");
143-
ingestDocument.setFieldValue("bar", "baz-" + id);
144-
return ingestDocument;
169+
if (id.equals(String.valueOf(DROPPED))) {
170+
// lucky number three is always dropped
171+
return null;
172+
} else {
173+
ingestDocument.setFieldValue("bar", "baz-" + id);
174+
return ingestDocument;
175+
}
145176
}
146177

147178
@Override

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.common.inject.Inject;
4848
import org.elasticsearch.common.unit.ByteSizeUnit;
4949
import org.elasticsearch.common.util.concurrent.AtomicArray;
50+
import org.elasticsearch.common.util.set.Sets;
5051
import org.elasticsearch.core.Releasable;
5152
import org.elasticsearch.core.TimeValue;
5253
import org.elasticsearch.index.Index;
@@ -79,6 +80,7 @@
7980
import java.util.concurrent.atomic.AtomicIntegerArray;
8081
import java.util.function.LongSupplier;
8182
import java.util.stream.Collectors;
83+
import java.util.stream.IntStream;
8284

8385
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
8486
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
@@ -893,19 +895,57 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
893895
);
894896
} else {
895897
return actionListener.map(response -> {
896-
BulkItemResponse[] items = response.getItems();
897-
for (int i = 0; i < items.length; i++) {
898-
itemResponses.add(originalSlots.get(i), response.getItems()[i]);
898+
// these items are the responses from the subsequent bulk request, their 'slots'
899+
// are not correct for this response we're building
900+
final BulkItemResponse[] bulkResponses = response.getItems();
901+
902+
final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()];
903+
904+
// the item responses are from the original request, so their slots are correct.
905+
// these are the responses for requests that failed early and were not passed on to the subsequent bulk.
906+
for (BulkItemResponse item : itemResponses) {
907+
allResponses[item.getItemId()] = item;
899908
}
900-
return new BulkResponse(
901-
itemResponses.toArray(new BulkItemResponse[0]),
902-
response.getTook().getMillis(),
903-
ingestTookInMillis
904-
);
909+
910+
// use the original slots for the responses from the bulk
911+
for (int i = 0; i < bulkResponses.length; i++) {
912+
allResponses[originalSlots.get(i)] = bulkResponses[i];
913+
}
914+
915+
if (Assertions.ENABLED) {
916+
assertResponsesAreCorrect(bulkResponses, allResponses);
917+
}
918+
919+
return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis);
905920
});
906921
}
907922
}
908923

924+
private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkItemResponse[] allResponses) {
925+
// check for an empty intersection between the ids
926+
final Set<Integer> failedIds = itemResponses.stream().map(BulkItemResponse::getItemId).collect(Collectors.toSet());
927+
final Set<Integer> responseIds = IntStream.range(0, bulkResponses.length)
928+
.map(originalSlots::get) // resolve subsequent bulk ids back to the original slots
929+
.boxed()
930+
.collect(Collectors.toSet());
931+
assert Sets.haveEmptyIntersection(failedIds, responseIds)
932+
: "bulk item response slots cannot have failed and been processed in the subsequent bulk request, failed ids: "
933+
+ failedIds
934+
+ ", response ids: "
935+
+ responseIds;
936+
937+
// check for the correct number of responses
938+
final int expectedResponseCount = bulkRequest.requests.size();
939+
final int actualResponseCount = failedIds.size() + responseIds.size();
940+
assert expectedResponseCount == actualResponseCount
941+
: "Expected [" + expectedResponseCount + "] responses, but found [" + actualResponseCount + "]";
942+
943+
// check that every response is present
944+
for (int i = 0; i < allResponses.length; i++) {
945+
assert allResponses[i] != null : "BulkItemResponse at index [" + i + "] was null";
946+
}
947+
}
948+
909949
synchronized void markItemAsDropped(int slot) {
910950
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
911951
failedSlots.set(slot);
@@ -942,7 +982,7 @@ synchronized void markItemAsFailed(int slot, Exception e) {
942982
);
943983

944984
// We hit a error during preprocessing a request, so we:
945-
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
985+
// 1) Remember the request item slot from the bulk, so that when we're done processing all requests we know what failed
946986
// 2) Add a bulk item failure for this request
947987
// 3) Continue with the next request in the bulk.
948988
failedSlots.set(slot);

server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@
1515
import org.elasticsearch.index.shard.ShardId;
1616
import org.elasticsearch.test.ESTestCase;
1717
import org.elasticsearch.xcontent.XContentType;
18-
import org.hamcrest.Matchers;
1918

2019
import java.util.ArrayList;
2120
import java.util.Arrays;
21+
import java.util.Collections;
2222
import java.util.HashSet;
2323
import java.util.List;
2424
import java.util.Set;
2525

2626
import static org.hamcrest.Matchers.equalTo;
2727
import static org.hamcrest.Matchers.instanceOf;
2828
import static org.hamcrest.Matchers.is;
29-
import static org.hamcrest.Matchers.nullValue;
29+
import static org.hamcrest.Matchers.sameInstance;
3030

3131
public class BulkRequestModifierTests extends ESTestCase {
3232

@@ -36,38 +36,53 @@ public void testBulkRequestModifier() {
3636
for (int i = 0; i < numRequests; i++) {
3737
bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}", XContentType.JSON));
3838
}
39-
CaptureActionListener actionListener = new CaptureActionListener();
40-
TransportBulkAction.BulkRequestModifier bulkRequestModifier = new TransportBulkAction.BulkRequestModifier(bulkRequest);
4139

42-
int i = 0;
40+
// wrap the bulk request and fail some of the item requests at random
41+
TransportBulkAction.BulkRequestModifier modifier = new TransportBulkAction.BulkRequestModifier(bulkRequest);
4342
Set<Integer> failedSlots = new HashSet<>();
44-
while (bulkRequestModifier.hasNext()) {
45-
bulkRequestModifier.next();
43+
for (int i = 0; modifier.hasNext(); i++) {
44+
modifier.next();
4645
if (randomBoolean()) {
47-
bulkRequestModifier.markItemAsFailed(i, new RuntimeException());
46+
modifier.markItemAsFailed(i, new RuntimeException());
4847
failedSlots.add(i);
4948
}
50-
i++;
49+
}
50+
assertThat(modifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size()));
51+
52+
// populate the non-failed responses
53+
BulkRequest subsequentBulkRequest = modifier.getBulkRequest();
54+
assertThat(subsequentBulkRequest.requests().size(), equalTo(numRequests - failedSlots.size()));
55+
List<BulkItemResponse> responses = new ArrayList<>();
56+
for (int j = 0; j < subsequentBulkRequest.requests().size(); j++) {
57+
IndexRequest indexRequest = (IndexRequest) subsequentBulkRequest.requests().get(j);
58+
IndexResponse indexResponse = new IndexResponse(new ShardId("_index", "_na_", 0), "_type", indexRequest.id(), 1, 17, 1, true);
59+
responses.add(BulkItemResponse.success(j, indexRequest.opType(), indexResponse));
5160
}
5261

53-
assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size()));
54-
// simulate that we actually executed the modified bulk request:
62+
// simulate that we actually executed the modified bulk request
5563
long ingestTook = randomLong();
56-
ActionListener<BulkResponse> result = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTook, actionListener);
57-
result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0));
64+
CaptureActionListener actionListener = new CaptureActionListener();
65+
ActionListener<BulkResponse> result = modifier.wrapActionListenerIfNeeded(ingestTook, actionListener);
66+
result.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[0]), 0));
5867

68+
// check the results for successes and failures
5969
BulkResponse bulkResponse = actionListener.getResponse();
6070
assertThat(bulkResponse.getIngestTookInMillis(), equalTo(ingestTook));
61-
for (int j = 0; j < bulkResponse.getItems().length; j++) {
62-
if (failedSlots.contains(j)) {
63-
BulkItemResponse item = bulkResponse.getItems()[j];
71+
for (int i = 0; i < bulkResponse.getItems().length; i++) {
72+
BulkItemResponse item = bulkResponse.getItems()[i];
73+
if (failedSlots.contains(i)) {
6474
assertThat(item.isFailed(), is(true));
65-
assertThat(item.getFailure().getIndex(), equalTo("_index"));
66-
assertThat(item.getFailure().getType(), equalTo("_type"));
67-
assertThat(item.getFailure().getId(), equalTo(String.valueOf(j)));
68-
assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException"));
75+
BulkItemResponse.Failure failure = item.getFailure();
76+
assertThat(failure.getIndex(), equalTo("_index"));
77+
assertThat(failure.getType(), equalTo("_type"));
78+
assertThat(failure.getId(), equalTo(String.valueOf(i)));
79+
assertThat(failure.getMessage(), equalTo("java.lang.RuntimeException"));
6980
} else {
70-
assertThat(bulkResponse.getItems()[j], nullValue());
81+
assertThat(item.isFailed(), is(false));
82+
IndexResponse success = item.getResponse();
83+
assertThat(success.getIndex(), equalTo("_index"));
84+
assertThat(success.getType(), equalTo("_type"));
85+
assertThat(success.getId(), equalTo(String.valueOf(i)));
7186
}
7287
}
7388
}
@@ -79,16 +94,29 @@ public void testPipelineFailures() {
7994
}
8095

8196
TransportBulkAction.BulkRequestModifier modifier = new TransportBulkAction.BulkRequestModifier(originalBulkRequest);
97+
98+
final List<Integer> failures = new ArrayList<>();
99+
// iterate the requests in order, recording that half of them should be failures
82100
for (int i = 0; modifier.hasNext(); i++) {
83101
modifier.next();
84102
if (i % 2 == 0) {
85-
modifier.markItemAsFailed(i, new RuntimeException());
103+
failures.add(i);
86104
}
87105
}
88106

107+
// with async processors, the failures can come back 'out of order' so sometimes we'll shuffle the list
108+
if (randomBoolean()) {
109+
Collections.shuffle(failures, random());
110+
}
111+
112+
// actually mark the failures
113+
for (int i : failures) {
114+
modifier.markItemAsFailed(i, new RuntimeException());
115+
}
116+
89117
// So half of the requests have "failed", so only the successful requests are left:
90118
BulkRequest bulkRequest = modifier.getBulkRequest();
91-
assertThat(bulkRequest.requests().size(), Matchers.equalTo(16));
119+
assertThat(bulkRequest.requests().size(), equalTo(16));
92120

93121
List<BulkItemResponse> responses = new ArrayList<>();
94122
ActionListener<BulkResponse> bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener<BulkResponse>() {
@@ -115,11 +143,11 @@ public void onFailure(Exception e) {}
115143
);
116144
originalResponses.add(BulkItemResponse.success(Integer.parseInt(indexRequest.id()), indexRequest.opType(), indexResponse));
117145
}
118-
bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0));
146+
bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[0]), 0));
119147

120-
assertThat(responses.size(), Matchers.equalTo(32));
148+
assertThat(responses.size(), equalTo(32));
121149
for (int i = 0; i < 32; i++) {
122-
assertThat(responses.get(i).getId(), Matchers.equalTo(String.valueOf(i)));
150+
assertThat(responses.get(i).getId(), equalTo(String.valueOf(i)));
123151
}
124152
}
125153

@@ -135,7 +163,7 @@ public void testNoFailures() {
135163
}
136164

137165
BulkRequest bulkRequest = modifier.getBulkRequest();
138-
assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest));
166+
assertThat(bulkRequest, sameInstance(originalBulkRequest));
139167
ActionListener<BulkResponse> actionListener = ActionListener.wrap(() -> {});
140168
assertThat(modifier.wrapActionListenerIfNeeded(1L, actionListener), instanceOf(ActionListener.MappedActionListener.class));
141169
}

0 commit comments

Comments
 (0)