|
41 | 41 | import org.elasticsearch.common.settings.Settings; |
42 | 42 | import org.elasticsearch.common.xcontent.XContentHelper; |
43 | 43 | import org.elasticsearch.core.Nullable; |
44 | | -import org.elasticsearch.core.Releasables; |
45 | 44 | import org.elasticsearch.core.Strings; |
46 | 45 | import org.elasticsearch.core.TimeValue; |
47 | 46 | import org.elasticsearch.core.Tuple; |
@@ -171,15 +170,13 @@ protected void dispatchedShardOperationOnPrimary( |
171 | 170 | IndexShard primary, |
172 | 171 | ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> outerListener |
173 | 172 | ) { |
174 | | - var primaryOperationExpansionReleasable = indexingPressure.trackPrimaryOperationExpansion( |
175 | | - primaryOperationCount(request), |
176 | | - getMaxOperationMemoryOverhead(request), |
177 | | - force(request) |
178 | | - ); |
179 | | - // Ensure that we release the accounted memory for document expansion as soon as we're done processing the operations. |
180 | | - var listener = ActionListener.runBefore( |
181 | | - outerListener, |
182 | | - () -> Releasables.closeExpectNoException(primaryOperationExpansionReleasable) |
| 173 | + var listener = ActionListener.releaseBefore( |
| 174 | + indexingPressure.trackPrimaryOperationExpansion( |
| 175 | + primaryOperationCount(request), |
| 176 | + getMaxOperationMemoryOverhead(request), |
| 177 | + force(request) |
| 178 | + ), |
| 179 | + outerListener |
183 | 180 | ); |
184 | 181 | ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); |
185 | 182 | performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> { |
@@ -671,14 +668,9 @@ protected void dispatchedShardOperationOnReplica( |
671 | 668 | IndexShard replica, |
672 | 669 | ActionListener<ReplicaResult> outerListener |
673 | 670 | ) { |
674 | | - var replicaOperationExpansionReleasable = indexingPressure.trackReplicaOperationExpansion( |
675 | | - getMaxOperationMemoryOverhead(request), |
676 | | - force(request) |
677 | | - ); |
678 | | - // Ensure that we release the accounted memory for document expansion as soon as we're done processing the operations. |
679 | | - var listener = ActionListener.runBefore( |
680 | | - outerListener, |
681 | | - () -> Releasables.closeExpectNoException(replicaOperationExpansionReleasable) |
| 671 | + var listener = ActionListener.releaseBefore( |
| 672 | + indexingPressure.trackReplicaOperationExpansion(getMaxOperationMemoryOverhead(request), force(request)), |
| 673 | + outerListener |
682 | 674 | ); |
683 | 675 | ActionListener.completeWith(listener, () -> { |
684 | 676 | final long startBulkTime = System.nanoTime(); |
|
0 commit comments