Skip to content

Commit 39670d9

Browse files
authored
Add IndexingPressureMonitor to monitor large indexing operations (#126372)
Relates ES-11063
1 parent 0033de9 commit 39670d9

File tree

3 files changed

+78
-1
lines changed

3 files changed

+78
-1
lines changed

docs/changelog/126372.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126372
2+
summary: Add `IndexingPressureMonitor` to monitor large indexing operations
3+
area: CRUD
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/index/IndexingPressure.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import org.elasticsearch.core.Releasable;
2020
import org.elasticsearch.index.stats.IndexingPressureStats;
2121

22+
import java.util.List;
2223
import java.util.Optional;
24+
import java.util.concurrent.CopyOnWriteArrayList;
2325
import java.util.concurrent.atomic.AtomicBoolean;
2426
import java.util.concurrent.atomic.AtomicLong;
2527

26-
public class IndexingPressure {
28+
public class IndexingPressure implements IndexingPressureMonitor {
2729

2830
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES = Setting.memorySizeSetting(
2931
"indexing_pressure.memory.limit",
@@ -127,6 +129,8 @@ public class IndexingPressure {
127129
private final long replicaLimit;
128130
private final long operationLimit;
129131

132+
private final List<IndexingPressureListener> listeners = new CopyOnWriteArrayList<>();
133+
130134
public IndexingPressure(Settings settings) {
131135
this.lowWatermark = SPLIT_BULK_LOW_WATERMARK.get(settings).getBytes();
132136
this.lowWatermarkSize = SPLIT_BULK_LOW_WATERMARK_SIZE.get(settings).getBytes();
@@ -339,12 +343,14 @@ void checkLargestPrimaryOperationIsWithinLimits(
339343
long largestOperationSizeInBytes,
340344
boolean allowsOperationsBeyondSizeLimit
341345
) {
346+
listeners.forEach(l -> l.onPrimaryOperationTracked(largestOperationSizeInBytes));
342347
if (largestOperationSizeInBytes > operationLimit) {
343348
this.largeOpsRejections.getAndIncrement();
344349
this.totalRejectedLargeOpsBytes.addAndGet(largestOperationSizeInBytes);
345350
if (allowsOperationsBeyondSizeLimit == false) {
346351
this.primaryRejections.getAndIncrement();
347352
this.primaryDocumentRejections.addAndGet(operations);
353+
listeners.forEach(l -> l.onLargeIndexingOperationRejection(largestOperationSizeInBytes));
348354
throw new EsRejectedExecutionException(
349355
"Request contains an operation of size ["
350356
+ largestOperationSizeInBytes
@@ -489,4 +495,14 @@ public IndexingPressureStats stats() {
489495
totalRejectedLargeOpsBytes.get()
490496
);
491497
}
498+
499+
@Override
500+
public long getMaxAllowedOperationSizeInBytes() {
501+
return operationLimit;
502+
}
503+
504+
@Override
505+
public void addListener(IndexingPressureListener listener) {
506+
listeners.add(listener);
507+
}
492508
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index;
11+
12+
/**
13+
* Monitors indexing pressure events within the system and tracks operation sizes.
14+
* This interface provides mechanisms to check maximum allowed operation sizes
15+
* and register listeners for indexing pressure events.
16+
*/
17+
public interface IndexingPressureMonitor {
18+
/**
19+
* Returns the maximum allowed size in bytes for any single indexing operation.
20+
* Operations exceeding this limit may be rejected.
21+
*
22+
* @return the maximum allowed operation size in bytes
23+
*/
24+
long getMaxAllowedOperationSizeInBytes();
25+
26+
/**
27+
* Registers a listener to be notified of indexing pressure events.
28+
* The listener will receive callbacks when operations are tracked or rejected.
29+
*
30+
* @param listener the listener to register for indexing pressure events
31+
*/
32+
void addListener(IndexingPressureListener listener);
33+
34+
/**
35+
* Listener interface for receiving notifications about indexing pressure events.
36+
* Implementations can respond to tracking of primary operations and rejections
37+
* of large indexing operations.
38+
*/
39+
interface IndexingPressureListener {
40+
/**
41+
* Called when a primary indexing operation is tracked.
42+
* The implementation should be really lightweight as this is called in a hot path.
43+
*
44+
* @param largestOperationSizeInBytes the size in bytes of the largest operation tracked
45+
*/
46+
void onPrimaryOperationTracked(long largestOperationSizeInBytes);
47+
48+
/**
49+
* Called when a large indexing operation is rejected due to exceeding size limits.
50+
* The implementation should be really lightweight as this is called in a hot path.
51+
*
52+
* @param largestOperationSizeInBytes the size in bytes of the rejected operation
53+
*/
54+
void onLargeIndexingOperationRejection(long largestOperationSizeInBytes);
55+
}
56+
}

0 commit comments

Comments
 (0)