Skip to content

Commit 4e4698a

Browse files
committed
More
1 parent 67e6d3b commit 4e4698a

File tree

2 files changed

+25
-21
lines changed

2 files changed

+25
-21
lines changed

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,18 @@ public static class Handler implements Releasable {
9191
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
9292

9393
private final Client client;
94-
private final IndexingPressure indexingPressure;
9594
private final ActiveShardCount waitForActiveShards;
9695
private final TimeValue timeout;
9796
private final String refresh;
9897

9998
private final ArrayList<Releasable> releasables = new ArrayList<>(4);
10099
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
100+
private final IndexingPressure.Coordinating coordinatingOperation;
101101
private boolean closed = false;
102102
private boolean globalFailure = false;
103103
private boolean incrementalRequestSubmitted = false;
104104
private boolean bulkInProgress = false;
105105
private Exception bulkActionLevelFailure = null;
106-
private IndexingPressure.Coordinating coordinatingOperation;
107106
private BulkRequest bulkRequest = null;
108107

109108
protected Handler(
@@ -114,10 +113,10 @@ protected Handler(
114113
@Nullable String refresh
115114
) {
116115
this.client = client;
117-
this.indexingPressure = indexingPressure;
118116
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
119117
this.timeout = timeout;
120118
this.refresh = refresh;
119+
this.coordinatingOperation = indexingPressure.markCoordinatingOperationStarted(0, 0, false);
121120
createNewBulkRequest(EMPTY_STATE);
122121
}
123122

@@ -152,8 +151,7 @@ public void onFailure(Exception e) {
152151
}
153152
}, () -> {
154153
bulkInProgress = false;
155-
coordinatingOperation.close();
156-
coordinatingOperation = null;
154+
coordinatingOperation.releaseCurrent();
157155
toRelease.forEach(Releasable::close);
158156
nextItems.run();
159157
}));
@@ -194,8 +192,7 @@ public void onFailure(Exception e) {
194192
errorResponse(listener);
195193
}
196194
}, () -> {
197-
coordinatingOperation.close();
198-
coordinatingOperation = null;
195+
coordinatingOperation.releaseCurrent();
199196
toRelease.forEach(Releasable::close);
200197
}));
201198
} else {
@@ -208,13 +205,13 @@ public void onFailure(Exception e) {
208205
public void close() {
209206
closed = true;
210207
coordinatingOperation.close();
211-
coordinatingOperation = null;
212208
releasables.forEach(Releasable::close);
213209
releasables.clear();
214210
}
215211

216212
private void shortCircuitDueToTopLevelFailure(List<DocWriteRequest<?>> items, Releasable releasable) {
217213
assert releasables.isEmpty();
214+
assert coordinatingOperation.currentSize() == 0;
218215
assert bulkRequest == null;
219216
if (globalFailure == false) {
220217
addItemLevelFailures(items);
@@ -263,8 +260,7 @@ private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable rele
263260
return true;
264261
} catch (EsRejectedExecutionException e) {
265262
handleBulkFailure(incrementalRequestSubmitted == false, e);
266-
coordinatingOperation.close();
267-
coordinatingOperation = null;
263+
coordinatingOperation.releaseCurrent();
268264
releasables.forEach(Releasable::close);
269265
releasables.clear();
270266
return false;
@@ -275,7 +271,6 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState)
275271
assert bulkRequest == null;
276272
bulkRequest = new BulkRequest();
277273
bulkRequest.incrementalState(incrementalState);
278-
coordinatingOperation = indexingPressure.markCoordinatingOperationStarted(0, 0, false);
279274

280275
if (waitForActiveShards != null) {
281276
bulkRequest.waitForActiveShards(waitForActiveShards);

server/src/main/java/org/elasticsearch/index/IndexingPressure.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -146,16 +146,17 @@ public Coordinating markCoordinatingOperationStarted(int operations, long bytes,
146146

147147
public class Coordinating implements Releasable {
148148

149-
private final AtomicBoolean called = new AtomicBoolean();
149+
private final AtomicBoolean closed = new AtomicBoolean();
150150
private final boolean forceExecution;
151-
private int coordinatingOperations = 0;
151+
private int currentOperations = 0;
152152
private long currentSize = 0;
153153

154154
public Coordinating(boolean forceExecution) {
155155
this.forceExecution = forceExecution;
156156
}
157157

158158
public void increment(int operations, long bytes) {
159+
assert closed.get() == false;
159160
long combinedBytes = currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
160161
long replicaWriteBytes = currentReplicaBytes.get();
161162
long totalBytes = combinedBytes + replicaWriteBytes;
@@ -184,7 +185,7 @@ public void increment(int operations, long bytes) {
184185
false
185186
);
186187
}
187-
coordinatingOperations += operations;
188+
currentOperations += operations;
188189
currentSize += bytes;
189190
logger.trace(() -> Strings.format("adding [%d] coordinating operations and [%d] bytes", operations, bytes));
190191
currentCoordinatingBytes.getAndAdd(bytes);
@@ -195,6 +196,18 @@ public void increment(int operations, long bytes) {
195196
totalCoordinatingRequests.getAndIncrement();
196197
}
197198

199+
public long currentSize() {
200+
return currentSize;
201+
}
202+
203+
public void releaseCurrent() {
204+
currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-currentSize);
205+
currentCoordinatingBytes.getAndAdd(-currentSize);
206+
currentCoordinatingOps.getAndAdd(-currentOperations);
207+
currentSize = 0;
208+
currentOperations = 0;
209+
}
210+
198211
public boolean shouldSplit() {
199212
long currentUsage = (currentCombinedCoordinatingAndPrimaryBytes.get() + currentReplicaBytes.get());
200213
if (currentUsage >= highWatermark && currentSize >= highWatermarkSize) {
@@ -217,13 +230,9 @@ public boolean shouldSplit() {
217230

218231
@Override
219232
public void close() {
220-
if (called.compareAndSet(false, true)) {
221-
logger.trace(
222-
() -> Strings.format("removing [%d] coordinating operations and [%d] bytes", coordinatingOperations, currentSize)
223-
);
224-
currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-currentSize);
225-
currentCoordinatingBytes.getAndAdd(-currentSize);
226-
currentCoordinatingOps.getAndAdd(-coordinatingOperations);
233+
if (closed.compareAndSet(false, true)) {
234+
logger.trace(() -> Strings.format("removing [%d] coordinating operations and [%d] bytes", currentOperations, currentSize));
235+
releaseCurrent();
227236
} else {
228237
logger.error("IndexingPressure memory is adjusted twice", new IllegalStateException("Releasable is called twice"));
229238
assert false : "IndexingPressure is adjusted twice";

0 commit comments

Comments
 (0)