Skip to content

Commit e8a2b0d

Browse files
committed
Fixing a race condition in the BulkInferenceExecutor.
1 parent 1d1c4a4 commit e8a2b0d

File tree

3 files changed

+304
-181
lines changed

3 files changed

+304
-181
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutionState.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ public class BulkInferenceExecutionState {
2727
private final Map<Long, InferenceAction.Response> bufferedResponses;
2828
private final AtomicBoolean finished = new AtomicBoolean(false);
2929

30-
public BulkInferenceExecutionState(int bufferSize) {
31-
this.bufferedResponses = new ConcurrentHashMap<>(bufferSize);
30+
public BulkInferenceExecutionState() {
31+
this.bufferedResponses = new ConcurrentHashMap<>();
3232
}
3333

3434
/**
@@ -125,7 +125,7 @@ public void addFailure(Exception e) {
125125
* Indicates whether the entire bulk execution is marked as finished and all responses have been successfully persisted.
126126
*/
127127
public boolean finished() {
128-
return finished.get() && getMaxSeqNo() == getPersistedCheckpoint();
128+
return hasFailure() || (finished.get() && getMaxSeqNo() == getPersistedCheckpoint());
129129
}
130130

131131
/**

0 commit comments

Comments
 (0)