Skip to content

Commit c8662eb

Browse files
authored
Fix combine result for ingest_took (#132088) (#132278)
If we combined two `BulkResponse`s with `ingest_took` set to `NO_INGEST_TOOK` we'd get an `ingest_took` of `-2`. Which doesn't make any sense. This fixes it to be set to `NO_INGEST_TOOK` properly.
1 parent 8656cc6 commit c8662eb

File tree

4 files changed

+48
-22
lines changed

4 files changed

+48
-22
lines changed

docs/changelog/132088.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132088
2+
summary: Fix combine result for `ingest_took`
3+
area: ES|QL
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.io.IOException;
2323
import java.util.Iterator;
24+
import java.util.List;
2425

2526
/**
2627
* A response of a bulk execution. Holding a response for each item responding (in order) of the
@@ -167,4 +168,32 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
167168
return b;
168169
}).array(ITEMS, Iterators.forArray(responses)));
169170
}
171+
172+
/**
173+
* Combine many bulk responses into one.
174+
*/
175+
public static BulkResponse combine(List<BulkResponse> responses) {
176+
long tookInMillis = 0;
177+
long ingestTookInMillis = NO_INGEST_TOOK;
178+
int itemResponseCount = 0;
179+
for (BulkResponse response : responses) {
180+
tookInMillis += response.getTookInMillis();
181+
if (response.getIngestTookInMillis() != NO_INGEST_TOOK) {
182+
if (ingestTookInMillis == NO_INGEST_TOOK) {
183+
ingestTookInMillis = 0;
184+
}
185+
ingestTookInMillis += response.getIngestTookInMillis();
186+
}
187+
itemResponseCount += response.getItems().length;
188+
}
189+
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemResponseCount];
190+
int i = 0;
191+
for (BulkResponse response : responses) {
192+
for (BulkItemResponse itemResponse : response.getItems()) {
193+
bulkItemResponses[i++] = itemResponse;
194+
}
195+
}
196+
197+
return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
198+
}
170199
}

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
191191
@Override
192192
public void onResponse(BulkResponse bulkResponse) {
193193
handleBulkSuccess(bulkResponse);
194-
listener.onResponse(combineResponses());
194+
listener.onResponse(BulkResponse.combine(responses));
195195
}
196196

197197
@Override
@@ -233,7 +233,7 @@ private void errorResponse(ActionListener<BulkResponse> listener) {
233233
if (globalFailure) {
234234
listener.onFailure(bulkActionLevelFailure);
235235
} else {
236-
listener.onResponse(combineResponses());
236+
listener.onResponse(BulkResponse.combine(responses));
237237
}
238238
}
239239

@@ -292,25 +292,5 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState)
292292
bulkRequest.setRefreshPolicy(refresh);
293293
}
294294
}
295-
296-
private BulkResponse combineResponses() {
297-
long tookInMillis = 0;
298-
long ingestTookInMillis = 0;
299-
int itemResponseCount = 0;
300-
for (BulkResponse response : responses) {
301-
tookInMillis += response.getTookInMillis();
302-
ingestTookInMillis += response.getIngestTookInMillis();
303-
itemResponseCount += response.getItems().length;
304-
}
305-
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemResponseCount];
306-
int i = 0;
307-
for (BulkResponse response : responses) {
308-
for (BulkItemResponse itemResponse : response.getItems()) {
309-
bulkItemResponses[i++] = itemResponse;
310-
}
311-
}
312-
313-
return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
314-
}
315295
}
316296
}

server/src/test/java/org/elasticsearch/action/bulk/BulkResponseTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,18 @@ public void testToXContentPlacesErrorsFirst() throws IOException {
144144
}
145145
}
146146

147+
public void testCombineNoIngest() {
148+
BulkResponse first = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK);
149+
BulkResponse second = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK);
150+
assertThat(BulkResponse.combine(List.of(first, second)).getIngestTookInMillis(), equalTo(NO_INGEST_TOOK));
151+
}
152+
153+
public void testCombineOneIngest() {
154+
BulkResponse first = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK);
155+
BulkResponse second = new BulkResponse(new BulkItemResponse[0], 1, 2);
156+
assertThat(BulkResponse.combine(List.of(first, second)).getIngestTookInMillis(), equalTo(2L));
157+
}
158+
147159
private static Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> success(
148160
DocWriteRequest.OpType opType,
149161
XContentType xContentType

0 commit comments

Comments
 (0)