Skip to content

Commit 52a4ab7

Browse files
committed
Fix high water mark test which submits data early (#114159)
It is possible in the incremental high watermark test that the data is submitted causing a corruption of the bulk request. This commit fixes the issue to ensure we only send new data after it has been requested. Additionally, it adds an assertion to prevent this error from happening again.
1 parent c4698c6 commit 52a4ab7

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,9 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
234234

235235
handlers.add(handlerThrottled);
236236

237+
// Wait until we are ready for the next page
238+
assertBusy(() -> assertTrue(nextPage.get()));
239+
237240
for (IncrementalBulkService.Handler h : handlers) {
238241
refCounted.incRef();
239242
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public static class Handler implements Releasable {
105105
private boolean closed = false;
106106
private boolean globalFailure = false;
107107
private boolean incrementalRequestSubmitted = false;
108+
private boolean bulkInProgress = false;
108109
private ThreadContext.StoredContext requestContext;
109110
private Exception bulkActionLevelFailure = null;
110111
private long currentBulkSize = 0L;
@@ -130,6 +131,7 @@ protected Handler(
130131

131132
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
132133
assert closed == false;
134+
assert bulkInProgress == false;
133135
if (bulkActionLevelFailure != null) {
134136
shortCircuitDueToTopLevelFailure(items, releasable);
135137
nextItems.run();
@@ -143,6 +145,7 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
143145
requestContext.restore();
144146
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
145147
releasables.clear();
148+
bulkInProgress = true;
146149
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
147150

148151
@Override
@@ -158,6 +161,7 @@ public void onFailure(Exception e) {
158161
handleBulkFailure(isFirstRequest, e);
159162
}
160163
}, () -> {
164+
bulkInProgress = false;
161165
requestContext = threadContext.newStoredContext();
162166
toRelease.forEach(Releasable::close);
163167
nextItems.run();
@@ -177,6 +181,7 @@ private boolean shouldBackOff() {
177181
}
178182

179183
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
184+
assert bulkInProgress == false;
180185
if (bulkActionLevelFailure != null) {
181186
shortCircuitDueToTopLevelFailure(items, releasable);
182187
errorResponse(listener);
@@ -187,7 +192,9 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
187192
requestContext.restore();
188193
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
189194
releasables.clear();
190-
client.bulk(bulkRequest, ActionListener.runBefore(new ActionListener<>() {
195+
// We do not need to set this back to false as this will be the last request.
196+
bulkInProgress = true;
197+
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
191198

192199
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
193200

0 commit comments

Comments
 (0)