Skip to content

Commit 5d36d39

Browse files
authored
[ML] Reset retryable index requests after failures (#109320) (#109396)
Fixes the `autoGeneratedTimestamp should not be set externally` error
1 parent ef9accd commit 5d36d39

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed

docs/changelog/109320.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 109320
2+
summary: Reset retryable index requests after failures
3+
area: Machine Learning
4+
type: bug
5+
issues: []

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ private static boolean isIrrecoverable(Exception ex) {
325325
}
326326

327327
@SuppressWarnings("NonAtomicOperationOnVolatileField")
328-
private static class BulkRequestRewriter {
328+
static class BulkRequestRewriter {
329329
private volatile BulkRequest bulkRequest;
330330

331331
BulkRequestRewriter(BulkRequest initialRequest) {
@@ -533,7 +533,7 @@ public void cancel(Exception e) {
533533
}
534534
}
535535

536-
private static BulkRequest buildNewRequestFromFailures(BulkRequest bulkRequest, BulkResponse bulkResponse) {
536+
static BulkRequest buildNewRequestFromFailures(BulkRequest bulkRequest, BulkResponse bulkResponse) {
537537
// If we failed, lets set the bulkRequest to be a collection of the failed requests
538538
BulkRequest bulkRequestOfFailures = new BulkRequest();
539539
Set<String> failedDocIds = Arrays.stream(bulkResponse.getItems())
@@ -542,6 +542,9 @@ private static BulkRequest buildNewRequestFromFailures(BulkRequest bulkRequest,
542542
.collect(Collectors.toSet());
543543
bulkRequest.requests().forEach(docWriteRequest -> {
544544
if (failedDocIds.contains(docWriteRequest.id())) {
545+
if (docWriteRequest instanceof IndexRequest ir) {
546+
ir.reset();
547+
}
545548
bulkRequestOfFailures.add(docWriteRequest);
546549
}
547550
});

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import static org.hamcrest.Matchers.empty;
6161
import static org.hamcrest.Matchers.equalTo;
6262
import static org.hamcrest.Matchers.hasSize;
63+
import static org.hamcrest.Matchers.instanceOf;
6364
import static org.hamcrest.Matchers.is;
6465
import static org.hamcrest.Matchers.nullValue;
6566
import static org.mockito.ArgumentMatchers.any;
@@ -375,6 +376,34 @@ public void testBulkRequestRetriesMsgHandlerIsCalled() {
375376
assertThat(lastMessage.get(), containsString("failed to index after [1] attempts. Will attempt again"));
376377
}
377378

379+
public void testBuildNewRequestFromFailures_resetsId() {
380+
var bulkRequest = new BulkRequest();
381+
var indexRequestAutoGeneratedId = new IndexRequest("index-foo");
382+
indexRequestAutoGeneratedId.autoGenerateId();
383+
var autoGenId = indexRequestAutoGeneratedId.id();
384+
var plainIndexRequest = new IndexRequest("index-foo2").id("id-set");
385+
386+
bulkRequest.add(indexRequestAutoGeneratedId);
387+
bulkRequest.add(plainIndexRequest);
388+
389+
var bulkResponse = mock(BulkResponse.class);
390+
391+
var failed = mock(BulkItemResponse.class);
392+
when(failed.isFailed()).thenReturn(Boolean.TRUE);
393+
when(failed.getId()).thenReturn(autoGenId);
394+
395+
var sucessful = mock(BulkItemResponse.class);
396+
when(sucessful.isFailed()).thenReturn(Boolean.FALSE);
397+
398+
when(bulkResponse.getItems()).thenReturn(new BulkItemResponse[] { failed, sucessful });
399+
400+
var modifiedRequestForRetry = ResultsPersisterService.buildNewRequestFromFailures(bulkRequest, bulkResponse);
401+
assertThat(modifiedRequestForRetry.requests(), hasSize(1)); // only the failed item is in the new request
402+
assertThat(modifiedRequestForRetry.requests().get(0), instanceOf(IndexRequest.class));
403+
var ir = (IndexRequest) modifiedRequestForRetry.requests().get(0);
404+
assertEquals(ir.getAutoGeneratedTimestamp(), -1L); // failed request was reset
405+
}
406+
378407
private static <Response> Stubber doAnswerWithResponses(Response response1, Response response2) {
379408
return doAnswer(withResponse(response1)).doAnswer(withResponse(response2));
380409
}

0 commit comments

Comments
 (0)