Skip to content

Commit 937fb54

Browse files
authored
Fix combine result for ingest_took (#132088) (#132277)
If we combined two `BulkResponse`s with `ingest_took` set to `NO_INGEST_TOOK` we'd get an `ingest_took` of `-2`. Which doesn't make any sense. This fixes it to be set to `NO_INGEST_TOOK` properly.
1 parent 1f8beed commit 937fb54

File tree

4 files changed

+48
-22
lines changed

4 files changed

+48
-22
lines changed

docs/changelog/132088.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132088
2+
summary: Fix combine result for `ingest_took`
3+
area: ES|QL
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.IOException;
2222
import java.util.Iterator;
23+
import java.util.List;
2324

2425
/**
2526
* A response of a bulk execution. Holding a response for each item responding (in order) of the
@@ -166,4 +167,32 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
166167
return builder.startArray(ITEMS);
167168
}), Iterators.forArray(responses), Iterators.<ToXContent>single((builder, p) -> builder.endArray().endObject()));
168169
}
170+
171+
/**
172+
* Combine many bulk responses into one.
173+
*/
174+
public static BulkResponse combine(List<BulkResponse> responses) {
175+
long tookInMillis = 0;
176+
long ingestTookInMillis = NO_INGEST_TOOK;
177+
int itemResponseCount = 0;
178+
for (BulkResponse response : responses) {
179+
tookInMillis += response.getTookInMillis();
180+
if (response.getIngestTookInMillis() != NO_INGEST_TOOK) {
181+
if (ingestTookInMillis == NO_INGEST_TOOK) {
182+
ingestTookInMillis = 0;
183+
}
184+
ingestTookInMillis += response.getIngestTookInMillis();
185+
}
186+
itemResponseCount += response.getItems().length;
187+
}
188+
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemResponseCount];
189+
int i = 0;
190+
for (BulkResponse response : responses) {
191+
for (BulkItemResponse itemResponse : response.getItems()) {
192+
bulkItemResponses[i++] = itemResponse;
193+
}
194+
}
195+
196+
return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
197+
}
169198
}

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
210210
@Override
211211
public void onResponse(BulkResponse bulkResponse) {
212212
handleBulkSuccess(bulkResponse);
213-
listener.onResponse(combineResponses());
213+
listener.onResponse(BulkResponse.combine(responses));
214214
}
215215

216216
@Override
@@ -252,7 +252,7 @@ private void errorResponse(ActionListener<BulkResponse> listener) {
252252
if (globalFailure) {
253253
listener.onFailure(bulkActionLevelFailure);
254254
} else {
255-
listener.onResponse(combineResponses());
255+
listener.onResponse(BulkResponse.combine(responses));
256256
}
257257
}
258258

@@ -311,25 +311,5 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState)
311311
bulkRequest.setRefreshPolicy(refresh);
312312
}
313313
}
314-
315-
private BulkResponse combineResponses() {
316-
long tookInMillis = 0;
317-
long ingestTookInMillis = 0;
318-
int itemResponseCount = 0;
319-
for (BulkResponse response : responses) {
320-
tookInMillis += response.getTookInMillis();
321-
ingestTookInMillis += response.getIngestTookInMillis();
322-
itemResponseCount += response.getItems().length;
323-
}
324-
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemResponseCount];
325-
int i = 0;
326-
for (BulkResponse response : responses) {
327-
for (BulkItemResponse itemResponse : response.getItems()) {
328-
bulkItemResponses[i++] = itemResponse;
329-
}
330-
}
331-
332-
return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
333-
}
334314
}
335315
}

server/src/test/java/org/elasticsearch/action/bulk/BulkResponseTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,18 @@ public void testToXContentPlacesErrorsFirst() throws IOException {
144144
}
145145
}
146146

147+
public void testCombineNoIngest() {
148+
BulkResponse first = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK);
149+
BulkResponse second = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK);
150+
assertThat(BulkResponse.combine(List.of(first, second)).getIngestTookInMillis(), equalTo(NO_INGEST_TOOK));
151+
}
152+
153+
public void testCombineOneIngest() {
154+
BulkResponse first = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK);
155+
BulkResponse second = new BulkResponse(new BulkItemResponse[0], 1, 2);
156+
assertThat(BulkResponse.combine(List.of(first, second)).getIngestTookInMillis(), equalTo(2L));
157+
}
158+
147159
private static Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> success(
148160
DocWriteRequest.OpType opType,
149161
XContentType xContentType

0 commit comments

Comments
 (0)