Skip to content

Commit b785bd8

Browse files
committed
Add memory accounting for document expansion and impose max document size limit
1 parent b1e6908 commit b785bd8

File tree

16 files changed

+575
-23
lines changed

16 files changed

+575
-23
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void testShortCircuitFailure() throws Exception {
112112
IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
113113
long memoryLimit = primaryPressure.stats().getMemoryLimit();
114114
long primaryRejections = primaryPressure.stats().getPrimaryRejections();
115-
try (Releasable ignored = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
115+
try (Releasable ignored = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) {
116116
while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) {
117117
while (nextRequested.get()) {
118118
nextRequested.set(false);

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ public void testShortCircuitShardLevelFailure() throws Exception {
399399
IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
400400
long memoryLimit = primaryPressure.stats().getMemoryLimit();
401401
long primaryRejections = primaryPressure.stats().getPrimaryRejections();
402-
try (Releasable releasable = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
402+
try (Releasable releasable = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) {
403403
while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) {
404404
while (nextRequested.get()) {
405405
nextRequested.set(false);
@@ -497,7 +497,7 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio
497497
assertThat(node, equalTo(dataOnlyNode));
498498
IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
499499
long memoryLimit = primaryPressure.stats().getMemoryLimit();
500-
try (Releasable releasable = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
500+
try (Releasable releasable = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) {
501501
while (nextRequested.get()) {
502502
nextRequested.set(false);
503503
refCounted.incRef();

server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java

Lines changed: 238 additions & 6 deletions
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,12 @@ private static void assertThatRetentionLeaseSyncCompletesSuccessfully(String pri
9090
*/
9191
private static Releasable fullyAllocatePrimaryIndexingCapacityOnNode(String targetNode) {
9292
return internalCluster().getInstance(IndexingPressure.class, targetNode)
93-
.markPrimaryOperationStarted(
93+
.validateAndMarkPrimaryOperationStarted(
9494
1,
9595
IndexingPressure.MAX_PRIMARY_BYTES.get(internalCluster().getInstance(Settings.class, targetNode)).getBytes() + 1,
96-
true
96+
0,
97+
true,
98+
false
9799
);
98100
}
99101
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ static TransportVersion def(int id) {
205205
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES = def(9_015_0_00);
206206
public static final TransportVersion ESQL_SERIALIZE_SOURCE_FUNCTIONS_WARNINGS = def(9_016_0_00);
207207
public static final TransportVersion ESQL_DRIVER_NODE_DESCRIPTION = def(9_017_0_00);
208+
public static final TransportVersion MAX_OPERATION_SIZE_REJECTIONS_ADDED = def(9_018_0_00);
208209

209210
/*
210211
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,8 @@ static class IndexPressureStats implements ToXContentFragment {
795795
long currentReplicaOps = 0;
796796
long lowWaterMarkSplits = 0;
797797
long highWaterMarkSplits = 0;
798+
long largeOpsRejections = 0;
799+
long totalLargeRejectedOpsBytes = 0;
798800
for (NodeStats nodeStat : nodeStats) {
799801
IndexingPressureStats nodeStatIndexingPressureStats = nodeStat.getIndexingPressureStats();
800802
if (nodeStatIndexingPressureStats != null) {
@@ -820,6 +822,8 @@ static class IndexPressureStats implements ToXContentFragment {
820822
totalCoordinatingRequests += nodeStatIndexingPressureStats.getTotalCoordinatingRequests();
821823
lowWaterMarkSplits += nodeStatIndexingPressureStats.getLowWaterMarkSplits();
822824
highWaterMarkSplits += nodeStatIndexingPressureStats.getHighWaterMarkSplits();
825+
largeOpsRejections += nodeStatIndexingPressureStats.getLargeOpsRejections();
826+
totalLargeRejectedOpsBytes += nodeStatIndexingPressureStats.getTotalLargeRejectedOpsBytes();
823827
}
824828
}
825829
indexingPressureStats = new IndexingPressureStats(
@@ -844,7 +848,9 @@ static class IndexPressureStats implements ToXContentFragment {
844848
primaryDocumentRejections,
845849
totalCoordinatingRequests,
846850
lowWaterMarkSplits,
847-
highWaterMarkSplits
851+
highWaterMarkSplits,
852+
largeOpsRejections,
853+
totalLargeRejectedOpsBytes
848854
);
849855
}
850856

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,24 @@ public long totalSizeInBytes() {
103103
return totalSizeInBytes;
104104
}
105105

106+
public long maxOperationSizeInBytes() {
107+
long maxOperationSizeInBytes = 0;
108+
for (int i = 0; i < items.length; i++) {
109+
DocWriteRequest<?> request = items[i].request();
110+
if (request instanceof IndexRequest) {
111+
if (((IndexRequest) request).source() != null) {
112+
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((IndexRequest) request).source().length());
113+
}
114+
} else if (request instanceof UpdateRequest) {
115+
IndexRequest doc = ((UpdateRequest) request).doc();
116+
if (doc != null && doc.source() != null) {
117+
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((UpdateRequest) request).doc().source().length());
118+
}
119+
}
120+
}
121+
return maxOperationSizeInBytes;
122+
}
123+
106124
public BulkItemRequest[] items() {
107125
return items;
108126
}
@@ -199,6 +217,14 @@ public long ramBytesUsed() {
199217
return sum;
200218
}
201219

220+
public long largestOperationSize() {
221+
long maxOperationSize = 0;
222+
for (BulkItemRequest item : items) {
223+
maxOperationSize = Math.max(maxOperationSize, item.ramBytesUsed());
224+
}
225+
return maxOperationSize;
226+
}
227+
202228
public boolean isSimulated() {
203229
return isSimulated;
204230
}

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,13 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
8686

8787
private static final Logger logger = LogManager.getLogger(TransportShardBulkAction.class);
8888

89+
// Represents the maximum memory overhead factor for an operation when processed for indexing.
90+
// This accounts for potential increases in memory usage due to document expansion, including:
91+
// 1. If the document source is not stored in a contiguous byte array, it will be copied to ensure contiguity.
92+
// 2. If the document contains strings, Jackson uses char arrays (2 bytes per character) to parse string fields, doubling memory usage.
93+
// 3. Parsed string fields create new copies of their data, further increasing memory consumption.
94+
private static final int MAX_EXPANDED_OPERATION_MEMORY_OVERHEAD_FACTOR = 4;
95+
8996
private final UpdateHelper updateHelper;
9097
private final MappingUpdatedAction mappingUpdatedAction;
9198
private final Consumer<Runnable> postWriteAction;
@@ -158,8 +165,16 @@ protected void shardOperationOnPrimary(
158165
protected void dispatchedShardOperationOnPrimary(
159166
BulkShardRequest request,
160167
IndexShard primary,
161-
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
168+
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> outerListener
162169
) {
170+
var listener = ActionListener.releaseAfter(
171+
outerListener,
172+
indexingPressure.trackPrimaryOperationExpansion(
173+
primaryOperationCount(request),
174+
getMaxOperationMemoryOverhead(request),
175+
force(request)
176+
)
177+
);
163178
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
164179
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
165180
assert update != null;
@@ -196,6 +211,16 @@ protected int primaryOperationCount(BulkShardRequest request) {
196211
return request.items().length;
197212
}
198213

214+
@Override
215+
protected long primaryLargestOperationSize(BulkShardRequest request) {
216+
return request.largestOperationSize();
217+
}
218+
219+
@Override
220+
protected boolean primaryAllowsOperationsBeyondSizeLimit(BulkShardRequest request) {
221+
return false;
222+
}
223+
199224
public static void performOnPrimary(
200225
BulkShardRequest request,
201226
IndexShard primary,
@@ -634,7 +659,15 @@ private static BulkItemResponse processUpdateResponse(
634659
}
635660

636661
@Override
637-
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
662+
protected void dispatchedShardOperationOnReplica(
663+
BulkShardRequest request,
664+
IndexShard replica,
665+
ActionListener<ReplicaResult> outerListener
666+
) {
667+
var listener = ActionListener.releaseAfter(
668+
outerListener,
669+
indexingPressure.trackReplicaOperationExpansion(getMaxOperationMemoryOverhead(request), force(request))
670+
);
638671
ActionListener.completeWith(listener, () -> {
639672
final long startBulkTime = System.nanoTime();
640673
final Translog.Location location = performOnReplica(request, replica);
@@ -643,6 +676,10 @@ protected void dispatchedShardOperationOnReplica(BulkShardRequest request, Index
643676
});
644677
}
645678

679+
private static long getMaxOperationMemoryOverhead(BulkShardRequest request) {
680+
return request.maxOperationSizeInBytes() * MAX_EXPANDED_OPERATION_MEMORY_OVERHEAD_FACTOR;
681+
}
682+
646683
@Override
647684
protected long replicaOperationSize(BulkShardRequest request) {
648685
return request.ramBytesUsed();

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ protected int primaryOperationCount(ResyncReplicationRequest request) {
135135
return request.getOperations().length;
136136
}
137137

138+
@Override
139+
protected long primaryLargestOperationSize(ResyncReplicationRequest request) {
140+
return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).max().orElse(0);
141+
}
142+
138143
public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) {
139144
return request;
140145
}

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,13 @@ protected Executor executor(IndexShard shard) {
113113

114114
@Override
115115
protected Releasable checkOperationLimits(Request request) {
116-
return indexingPressure.markPrimaryOperationStarted(primaryOperationCount(request), primaryOperationSize(request), force(request));
116+
return indexingPressure.validateAndMarkPrimaryOperationStarted(
117+
primaryOperationCount(request),
118+
primaryOperationSize(request),
119+
primaryLargestOperationSize(request),
120+
force(request),
121+
primaryAllowsOperationsBeyondSizeLimit(request)
122+
);
117123
}
118124

119125
protected boolean force(ReplicatedWriteRequest<?> request) {
@@ -131,6 +137,11 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal
131137
// If this primary request was received from a local reroute initiated by the node client, we
132138
// must mark a new primary operation local to the coordinating node.
133139
if (localRerouteInitiatedByNodeClient) {
140+
indexingPressure.checkLargestPrimaryOperationIsWithinLimits(
141+
primaryOperationCount(request),
142+
primaryLargestOperationSize(request),
143+
primaryAllowsOperationsBeyondSizeLimit(request)
144+
);
134145
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(
135146
primaryOperationCount(request),
136147
primaryOperationSize(request)
@@ -142,11 +153,14 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal
142153
// If this primary request was received directly from the network, we must mark a new primary
143154
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
144155
// primary delegation, after the primary relocation hand-off.
145-
return indexingPressure.markPrimaryOperationStarted(
156+
return indexingPressure.validateAndMarkPrimaryOperationStarted(
146157
primaryOperationCount(request),
147158
primaryOperationSize(request),
148-
force(request)
159+
primaryLargestOperationSize(request),
160+
force(request),
161+
primaryAllowsOperationsBeyondSizeLimit(request)
149162
);
163+
150164
}
151165
}
152166

@@ -158,6 +172,14 @@ protected int primaryOperationCount(Request request) {
158172
return 0;
159173
}
160174

175+
protected long primaryLargestOperationSize(Request request) {
176+
return 0;
177+
}
178+
179+
protected boolean primaryAllowsOperationsBeyondSizeLimit(Request request) {
180+
return true;
181+
}
182+
161183
@Override
162184
protected Releasable checkReplicaLimits(ReplicaRequest request) {
163185
return indexingPressure.markReplicaOperationStarted(replicaOperationCount(request), replicaOperationSize(request), force(request));

0 commit comments

Comments
 (0)