Skip to content

Commit f62c5d0

Browse files
committed
Release before
1 parent 94a8870 commit f62c5d0

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.common.settings.Settings;
4141
import org.elasticsearch.common.xcontent.XContentHelper;
4242
import org.elasticsearch.core.Nullable;
43+
import org.elasticsearch.core.Releasables;
4344
import org.elasticsearch.core.Strings;
4445
import org.elasticsearch.core.TimeValue;
4546
import org.elasticsearch.core.Tuple;
@@ -167,13 +168,15 @@ protected void dispatchedShardOperationOnPrimary(
167168
IndexShard primary,
168169
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> outerListener
169170
) {
170-
var listener = ActionListener.releaseAfter(
171+
var primaryOperationExpansionReleasable = indexingPressure.trackPrimaryOperationExpansion(
172+
primaryOperationCount(request),
173+
getMaxOperationMemoryOverhead(request),
174+
force(request)
175+
);
176+
// Ensure that we release the accounted memory for document expansion as soon as we're done processing the operations.
177+
var listener = ActionListener.runBefore(
171178
outerListener,
172-
indexingPressure.trackPrimaryOperationExpansion(
173-
primaryOperationCount(request),
174-
getMaxOperationMemoryOverhead(request),
175-
force(request)
176-
)
179+
() -> Releasables.closeExpectNoException(primaryOperationExpansionReleasable)
177180
);
178181
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
179182
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
@@ -664,9 +667,14 @@ protected void dispatchedShardOperationOnReplica(
664667
IndexShard replica,
665668
ActionListener<ReplicaResult> outerListener
666669
) {
667-
var listener = ActionListener.releaseAfter(
670+
var replicaOperationExpansionReleasable = indexingPressure.trackReplicaOperationExpansion(
671+
getMaxOperationMemoryOverhead(request),
672+
force(request)
673+
);
674+
// Ensure that we release the accounted memory for document expansion as soon as we're done processing the operations.
675+
var listener = ActionListener.runBefore(
668676
outerListener,
669-
indexingPressure.trackReplicaOperationExpansion(getMaxOperationMemoryOverhead(request), force(request))
677+
() -> Releasables.closeExpectNoException(replicaOperationExpansionReleasable)
670678
);
671679
ActionListener.completeWith(listener, () -> {
672680
final long startBulkTime = System.nanoTime();

server/src/main/java/org/elasticsearch/index/IndexingPressure.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,8 @@ private Releasable markPrimaryOperationStarted(int operations, long bytes, boole
301301
}
302302

303303
public Releasable trackReplicaOperationExpansion(long expandedBytes, boolean forceExecution) {
304+
// Operations are already tracked by the initial call to #markReplicaStarted.
305+
// This method only increments the in-flight bytes to account for operation expansion during indexing.
304306
return markReplicaOperationStarted(0, expandedBytes, forceExecution);
305307
}
306308

0 commit comments

Comments
 (0)