From b4498ac736fe05f0793445f0646e98ad964df3c4 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 31 Jul 2025 11:02:44 -0400 Subject: [PATCH] Fix combine result for `ingest_took` (#132088) If we combined two `BulkResponse`s with `ingest_took` set to `NO_INGEST_TOOK` we'd get an `ingest_took` of `-2`. Which doesn't make any sense. This fixes it to be set to `NO_INGEST_TOOK` properly. --- docs/changelog/132088.yaml | 5 ++++ .../action/bulk/BulkResponse.java | 29 +++++++++++++++++++ .../action/bulk/IncrementalBulkService.java | 24 ++------------- .../action/bulk/BulkResponseTests.java | 12 ++++++++ 4 files changed, 48 insertions(+), 22 deletions(-) create mode 100644 docs/changelog/132088.yaml diff --git a/docs/changelog/132088.yaml b/docs/changelog/132088.yaml new file mode 100644 index 0000000000000..2de281e917ff7 --- /dev/null +++ b/docs/changelog/132088.yaml @@ -0,0 +1,5 @@ +pr: 132088 +summary: Fix combine result for `ingest_took` +area: ES|QL +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java index 567b433d94daf..97236b47e53aa 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Iterator; +import java.util.List; /** * A response of a bulk execution. Holding a response for each item responding (in order) of the @@ -166,4 +167,32 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.startArray(ITEMS); }), Iterators.forArray(responses), Iterators.single((builder, p) -> builder.endArray().endObject())); } + + /** + * Combine many bulk responses into one. + */ + public static BulkResponse combine(List responses) { + long tookInMillis = 0; + long ingestTookInMillis = NO_INGEST_TOOK; + int itemResponseCount = 0; + for (BulkResponse response : responses) { + tookInMillis += response.getTookInMillis(); + if (response.getIngestTookInMillis() != NO_INGEST_TOOK) { + if (ingestTookInMillis == NO_INGEST_TOOK) { + ingestTookInMillis = 0; + } + ingestTookInMillis += response.getIngestTookInMillis(); + } + itemResponseCount += response.getItems().length; + } + BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemResponseCount]; + int i = 0; + for (BulkResponse response : responses) { + for (BulkItemResponse itemResponse : response.getItems()) { + bulkItemResponses[i++] = itemResponse; + } + } + + return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 81ff1925182eb..b2b4404f48d47 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -210,7 +210,7 @@ public void lastItems(List> items, Releasable releasable, Act @Override public void onResponse(BulkResponse bulkResponse) { handleBulkSuccess(bulkResponse); - listener.onResponse(combineResponses()); + listener.onResponse(BulkResponse.combine(responses)); } @Override @@ -252,7 +252,7 @@ private void errorResponse(ActionListener listener) { if (globalFailure) { listener.onFailure(bulkActionLevelFailure); } else { - listener.onResponse(combineResponses()); + listener.onResponse(BulkResponse.combine(responses)); } } @@ -311,25 +311,5 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) bulkRequest.setRefreshPolicy(refresh); } } - - private BulkResponse combineResponses() { - long tookInMillis = 0; - long ingestTookInMillis = 0; - int itemResponseCount = 0; - for (BulkResponse response : responses) { - tookInMillis += response.getTookInMillis(); - ingestTookInMillis += response.getIngestTookInMillis(); - itemResponseCount += response.getItems().length; - } - BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemResponseCount]; - int i = 0; - for (BulkResponse response : responses) { - for (BulkItemResponse itemResponse : response.getItems()) { - bulkItemResponses[i++] = itemResponse; - } - } - - return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis); - } } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkResponseTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkResponseTests.java index 042a8422ca64d..7c1aad1a2af54 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkResponseTests.java @@ -144,6 +144,18 @@ public void testToXContentPlacesErrorsFirst() throws IOException { } } + public void testCombineNoIngest() { + BulkResponse first = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK); + BulkResponse second = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK); + assertThat(BulkResponse.combine(List.of(first, second)).getIngestTookInMillis(), equalTo(NO_INGEST_TOOK)); + } + + public void testCombineOneIngest() { + BulkResponse first = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK); + BulkResponse second = new BulkResponse(new BulkItemResponse[0], 1, 2); + assertThat(BulkResponse.combine(List.of(first, second)).getIngestTookInMillis(), equalTo(2L)); + } + private static Tuple success( DocWriteRequest.OpType opType, XContentType xContentType