Skip to content

Commit 387eef0

Browse files
authored
Enhance memory accounting for document expansion and introduce max document size limit (#123543)
This commit improves memory accounting by incorporating document expansion during shard bulk execution. Additionally, it introduces a new limit on the maximum document size, which defaults to 5% of the available heap. This limit can be configured using the new setting: indexing_pressure.memory.max_operation_size These changes help prevent excessive memory consumption and improve indexing stability. Closes ES-10777
1 parent c702eb9 commit 387eef0

File tree

19 files changed

+594
-32
lines changed

19 files changed

+594
-32
lines changed

docs/changelog/123543.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 123543
2+
summary: Enhance memory accounting for document expansion and introduce max document
3+
size limit
4+
area: CRUD
5+
type: enhancement
6+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void testShortCircuitFailure() throws Exception {
112112
IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
113113
long memoryLimit = primaryPressure.stats().getMemoryLimit();
114114
long primaryRejections = primaryPressure.stats().getPrimaryRejections();
115-
try (Releasable ignored = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
115+
try (Releasable ignored = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) {
116116
while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) {
117117
while (nextRequested.get()) {
118118
nextRequested.set(false);

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ public void testShortCircuitShardLevelFailure() throws Exception {
399399
IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
400400
long memoryLimit = primaryPressure.stats().getMemoryLimit();
401401
long primaryRejections = primaryPressure.stats().getPrimaryRejections();
402-
try (Releasable releasable = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
402+
try (Releasable releasable = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) {
403403
while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) {
404404
while (nextRequested.get()) {
405405
nextRequested.set(false);
@@ -497,7 +497,7 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio
497497
assertThat(node, equalTo(dataOnlyNode));
498498
IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
499499
long memoryLimit = primaryPressure.stats().getMemoryLimit();
500-
try (Releasable releasable = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
500+
try (Releasable releasable = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) {
501501
while (nextRequested.get()) {
502502
nextRequested.set(false);
503503
refCounted.incRef();

server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java

Lines changed: 238 additions & 6 deletions
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,12 @@ private static void assertThatRetentionLeaseSyncCompletesSuccessfully(String pri
9090
*/
9191
private static Releasable fullyAllocatePrimaryIndexingCapacityOnNode(String targetNode) {
9292
return internalCluster().getInstance(IndexingPressure.class, targetNode)
93-
.markPrimaryOperationStarted(
93+
.validateAndMarkPrimaryOperationStarted(
9494
1,
9595
IndexingPressure.MAX_PRIMARY_BYTES.get(internalCluster().getInstance(Settings.class, targetNode)).getBytes() + 1,
96-
true
96+
0,
97+
true,
98+
false
9799
);
98100
}
99101
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ static TransportVersion def(int id) {
185185
public static final TransportVersion RE_REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(9_021_0_00);
186186
public static final TransportVersion UNASSIGENEDINFO_RESHARD_ADDED = def(9_022_0_00);
187187
public static final TransportVersion INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM = def(9_023_0_00);
188+
public static final TransportVersion MAX_OPERATION_SIZE_REJECTIONS_ADDED = def(9_024_0_00);
188189

189190
/*
190191
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,8 @@ static class IndexPressureStats implements ToXContentFragment {
795795
long currentReplicaOps = 0;
796796
long lowWaterMarkSplits = 0;
797797
long highWaterMarkSplits = 0;
798+
long largeOpsRejections = 0;
799+
long totalLargeRejectedOpsBytes = 0;
798800
for (NodeStats nodeStat : nodeStats) {
799801
IndexingPressureStats nodeStatIndexingPressureStats = nodeStat.getIndexingPressureStats();
800802
if (nodeStatIndexingPressureStats != null) {
@@ -820,6 +822,8 @@ static class IndexPressureStats implements ToXContentFragment {
820822
totalCoordinatingRequests += nodeStatIndexingPressureStats.getTotalCoordinatingRequests();
821823
lowWaterMarkSplits += nodeStatIndexingPressureStats.getLowWaterMarkSplits();
822824
highWaterMarkSplits += nodeStatIndexingPressureStats.getHighWaterMarkSplits();
825+
largeOpsRejections += nodeStatIndexingPressureStats.getLargeOpsRejections();
826+
totalLargeRejectedOpsBytes += nodeStatIndexingPressureStats.getTotalLargeRejectedOpsBytes();
823827
}
824828
}
825829
indexingPressureStats = new IndexingPressureStats(
@@ -844,7 +848,9 @@ static class IndexPressureStats implements ToXContentFragment {
844848
primaryDocumentRejections,
845849
totalCoordinatingRequests,
846850
lowWaterMarkSplits,
847-
highWaterMarkSplits
851+
highWaterMarkSplits,
852+
largeOpsRejections,
853+
totalLargeRejectedOpsBytes
848854
);
849855
}
850856

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,24 @@ public long totalSizeInBytes() {
103103
return totalSizeInBytes;
104104
}
105105

106+
public long maxOperationSizeInBytes() {
107+
long maxOperationSizeInBytes = 0;
108+
for (int i = 0; i < items.length; i++) {
109+
DocWriteRequest<?> request = items[i].request();
110+
if (request instanceof IndexRequest) {
111+
if (((IndexRequest) request).source() != null) {
112+
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((IndexRequest) request).source().length());
113+
}
114+
} else if (request instanceof UpdateRequest) {
115+
IndexRequest doc = ((UpdateRequest) request).doc();
116+
if (doc != null && doc.source() != null) {
117+
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((UpdateRequest) request).doc().source().length());
118+
}
119+
}
120+
}
121+
return maxOperationSizeInBytes;
122+
}
123+
106124
public BulkItemRequest[] items() {
107125
return items;
108126
}
@@ -199,6 +217,14 @@ public long ramBytesUsed() {
199217
return sum;
200218
}
201219

220+
public long largestOperationSize() {
221+
long maxOperationSize = 0;
222+
for (BulkItemRequest item : items) {
223+
maxOperationSize = Math.max(maxOperationSize, item.ramBytesUsed());
224+
}
225+
return maxOperationSize;
226+
}
227+
202228
public boolean isSimulated() {
203229
return isSimulated;
204230
}

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
8787

8888
private static final Logger logger = LogManager.getLogger(TransportShardBulkAction.class);
8989

90+
// Represents the maximum memory overhead factor for an operation when processed for indexing.
91+
// This accounts for potential increases in memory usage due to document expansion, including:
92+
// 1. If the document source is not stored in a contiguous byte array, it will be copied to ensure contiguity.
93+
// 2. If the document contains strings, Jackson uses char arrays (2 bytes per character) to parse string fields, doubling memory usage.
94+
// 3. Parsed string fields create new copies of their data, further increasing memory consumption.
95+
private static final int MAX_EXPANDED_OPERATION_MEMORY_OVERHEAD_FACTOR = 4;
96+
9097
private final UpdateHelper updateHelper;
9198
private final MappingUpdatedAction mappingUpdatedAction;
9299
private final Consumer<Runnable> postWriteAction;
@@ -161,8 +168,16 @@ protected void shardOperationOnPrimary(
161168
protected void dispatchedShardOperationOnPrimary(
162169
BulkShardRequest request,
163170
IndexShard primary,
164-
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
171+
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> outerListener
165172
) {
173+
var listener = ActionListener.releaseBefore(
174+
indexingPressure.trackPrimaryOperationExpansion(
175+
primaryOperationCount(request),
176+
getMaxOperationMemoryOverhead(request),
177+
force(request)
178+
),
179+
outerListener
180+
);
166181
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
167182
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
168183
assert update != null;
@@ -200,6 +215,16 @@ protected int primaryOperationCount(BulkShardRequest request) {
200215
return request.items().length;
201216
}
202217

218+
@Override
219+
protected long primaryLargestOperationSize(BulkShardRequest request) {
220+
return request.largestOperationSize();
221+
}
222+
223+
@Override
224+
protected boolean primaryAllowsOperationsBeyondSizeLimit(BulkShardRequest request) {
225+
return false;
226+
}
227+
203228
public static void performOnPrimary(
204229
BulkShardRequest request,
205230
IndexShard primary,
@@ -638,7 +663,15 @@ private static BulkItemResponse processUpdateResponse(
638663
}
639664

640665
@Override
641-
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
666+
protected void dispatchedShardOperationOnReplica(
667+
BulkShardRequest request,
668+
IndexShard replica,
669+
ActionListener<ReplicaResult> outerListener
670+
) {
671+
var listener = ActionListener.releaseBefore(
672+
indexingPressure.trackReplicaOperationExpansion(getMaxOperationMemoryOverhead(request), force(request)),
673+
outerListener
674+
);
642675
ActionListener.completeWith(listener, () -> {
643676
final long startBulkTime = System.nanoTime();
644677
final Translog.Location location = performOnReplica(request, replica);
@@ -647,6 +680,10 @@ protected void dispatchedShardOperationOnReplica(BulkShardRequest request, Index
647680
});
648681
}
649682

683+
private static long getMaxOperationMemoryOverhead(BulkShardRequest request) {
684+
return request.maxOperationSizeInBytes() * MAX_EXPANDED_OPERATION_MEMORY_OVERHEAD_FACTOR;
685+
}
686+
650687
@Override
651688
protected long replicaOperationSize(BulkShardRequest request) {
652689
return request.ramBytesUsed();

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ protected int primaryOperationCount(ResyncReplicationRequest request) {
138138
return request.getOperations().length;
139139
}
140140

141+
@Override
142+
protected long primaryLargestOperationSize(ResyncReplicationRequest request) {
143+
return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).max().orElse(0);
144+
}
145+
141146
public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) {
142147
return request;
143148
}

0 commit comments

Comments
 (0)