Skip to content

Commit 87d5952

Browse files
committed
Add IndexingPressureMonitor to monitor large indexing operations
Relates ES-11063
1 parent 0b09506 commit 87d5952

File tree

5 files changed

+86
-4
lines changed

5 files changed

+86
-4
lines changed

server/src/main/java/org/elasticsearch/index/IndexingPressure.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import org.elasticsearch.core.Releasable;
2020
import org.elasticsearch.index.stats.IndexingPressureStats;
2121

22+
import java.util.List;
2223
import java.util.Optional;
24+
import java.util.concurrent.CopyOnWriteArrayList;
2325
import java.util.concurrent.atomic.AtomicBoolean;
2426
import java.util.concurrent.atomic.AtomicLong;
2527

26-
public class IndexingPressure {
28+
public class IndexingPressure implements IndexingPressureMonitor {
2729

2830
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES = Setting.memorySizeSetting(
2931
"indexing_pressure.memory.limit",
@@ -127,6 +129,8 @@ public class IndexingPressure {
127129
private final long replicaLimit;
128130
private final long operationLimit;
129131

132+
private final List<IndexingPressureListener> listeners = new CopyOnWriteArrayList<>();
133+
130134
public IndexingPressure(Settings settings) {
131135
this.lowWatermark = SPLIT_BULK_LOW_WATERMARK.get(settings).getBytes();
132136
this.lowWatermarkSize = SPLIT_BULK_LOW_WATERMARK_SIZE.get(settings).getBytes();
@@ -335,12 +339,14 @@ void checkLargestPrimaryOperationIsWithinLimits(
335339
long largestOperationSizeInBytes,
336340
boolean allowsOperationsBeyondSizeLimit
337341
) {
342+
listeners.forEach(l -> l.onPrimaryOperationTracked(largestOperationSizeInBytes));
338343
if (largestOperationSizeInBytes > operationLimit) {
339344
this.largeOpsRejections.getAndIncrement();
340345
this.totalRejectedLargeOpsBytes.addAndGet(largestOperationSizeInBytes);
341346
if (allowsOperationsBeyondSizeLimit == false) {
342347
this.primaryRejections.getAndIncrement();
343348
this.primaryDocumentRejections.addAndGet(operations);
349+
listeners.forEach(l -> l.onLargeIndexingOperationRejection(largestOperationSizeInBytes));
344350
throw new EsRejectedExecutionException(
345351
"Request contains an operation of size ["
346352
+ largestOperationSizeInBytes
@@ -485,4 +491,14 @@ public IndexingPressureStats stats() {
485491
totalRejectedLargeOpsBytes.get()
486492
);
487493
}
494+
495+
@Override
496+
public long getMaxAllowedOperationSizeInBytes() {
497+
return operationLimit;
498+
}
499+
500+
@Override
501+
public void addListener(IndexingPressureListener listener) {
502+
listeners.add(listener);
503+
}
488504
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index;
11+
12+
/**
13+
* Monitors indexing pressure events within the system and tracks operation sizes.
14+
* This interface provides mechanisms to check maximum allowed operation sizes
15+
* and register listeners for indexing pressure events.
16+
*/
17+
public interface IndexingPressureMonitor {
18+
/**
19+
* Returns the maximum allowed size in bytes for any single indexing operation.
20+
* Operations exceeding this limit may be rejected.
21+
*
22+
* @return the maximum allowed operation size in bytes
23+
*/
24+
long getMaxAllowedOperationSizeInBytes();
25+
26+
/**
27+
* Registers a listener to be notified of indexing pressure events.
28+
* The listener will receive callbacks when operations are tracked or rejected.
29+
*
30+
* @param listener the listener to register for indexing pressure events
31+
*/
32+
void addListener(IndexingPressureListener listener);
33+
34+
/**
35+
* Listener interface for receiving notifications about indexing pressure events.
36+
* Implementations can respond to tracking of primary operations and rejections
37+
* of large indexing operations.
38+
*/
39+
interface IndexingPressureListener {
40+
/**
41+
* Called when a primary indexing operation is tracked.
42+
* The implementation should be really lightweight as this is called in a hot path.
43+
*
44+
* @param largestOperationSizeInBytes the size in bytes of the largest operation tracked
45+
*/
46+
void onPrimaryOperationTracked(long largestOperationSizeInBytes);
47+
48+
/**
49+
* Called when a large indexing operation is rejected due to exceeding size limits.
50+
* The implementation should be really lightweight as this is called in a hot path.
51+
*
52+
* @param largestOperationSizeInBytes the size in bytes of the rejected operation
53+
*/
54+
void onLargeIndexingOperationRejection(long largestOperationSizeInBytes);
55+
}
56+
}

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,8 @@ public Map<String, String> queryFields() {
930930
metadataCreateIndexService
931931
);
932932

933+
final IndexingPressure indexingLimits = new IndexingPressure(settings);
934+
933935
PluginServiceInstances pluginServices = new PluginServiceInstances(
934936
client,
935937
clusterService,
@@ -952,7 +954,8 @@ public Map<String, String> queryFields() {
952954
documentParsingProvider,
953955
taskManager,
954956
projectResolver,
955-
slowLogFieldProvider
957+
slowLogFieldProvider,
958+
indexingLimits
956959
);
957960

958961
Collection<?> pluginComponents = pluginsService.flatMap(plugin -> {
@@ -985,7 +988,6 @@ public Map<String, String> queryFields() {
985988
.map(TerminationHandlerProvider::handler);
986989
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
987990

988-
final IndexingPressure indexingLimits = new IndexingPressure(settings);
989991
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
990992

991993
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);

server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.env.Environment;
2121
import org.elasticsearch.env.NodeEnvironment;
2222
import org.elasticsearch.features.FeatureService;
23+
import org.elasticsearch.index.IndexingPressureMonitor;
2324
import org.elasticsearch.index.SlowLogFieldProvider;
2425
import org.elasticsearch.indices.IndicesService;
2526
import org.elasticsearch.indices.SystemIndices;
@@ -55,5 +56,6 @@ public record PluginServiceInstances(
5556
DocumentParsingProvider documentParsingProvider,
5657
TaskManager taskManager,
5758
ProjectResolver projectResolver,
58-
SlowLogFieldProvider slowLogFieldProvider
59+
SlowLogFieldProvider slowLogFieldProvider,
60+
IndexingPressureMonitor indexingPressureMonitor
5961
) implements Plugin.PluginServices {}

server/src/main/java/org/elasticsearch/plugins/Plugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.features.FeatureService;
3030
import org.elasticsearch.index.IndexModule;
3131
import org.elasticsearch.index.IndexSettingProvider;
32+
import org.elasticsearch.index.IndexingPressureMonitor;
3233
import org.elasticsearch.index.SlowLogFieldProvider;
3334
import org.elasticsearch.indices.IndicesService;
3435
import org.elasticsearch.indices.SystemIndices;
@@ -186,6 +187,11 @@ public interface PluginServices {
186187
* Provider for additional SlowLog fields
187188
*/
188189
SlowLogFieldProvider slowLogFieldProvider();
190+
191+
/**
192+
* Monitors indexing pressure events within the system and tracks operation sizes.
193+
*/
194+
IndexingPressureMonitor indexingPressureMonitor();
189195
}
190196

191197
/**

0 commit comments

Comments
 (0)