diff --git a/docs/changelog/126372.yaml b/docs/changelog/126372.yaml new file mode 100644 index 0000000000000..75345296d8392 --- /dev/null +++ b/docs/changelog/126372.yaml @@ -0,0 +1,5 @@ +pr: 126372 +summary: Add `IndexingPressureMonitor` to monitor large indexing operations +area: CRUD +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java index 491fb425964e7..3976e8640a2bb 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -19,11 +19,13 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.index.stats.IndexingPressureStats; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -public class IndexingPressure { +public class IndexingPressure implements IndexingPressureMonitor { public static final Setting MAX_INDEXING_BYTES = Setting.memorySizeSetting( "indexing_pressure.memory.limit", @@ -127,6 +129,8 @@ public class IndexingPressure { private final long replicaLimit; private final long operationLimit; + private final List listeners = new CopyOnWriteArrayList<>(); + public IndexingPressure(Settings settings) { this.lowWatermark = SPLIT_BULK_LOW_WATERMARK.get(settings).getBytes(); this.lowWatermarkSize = SPLIT_BULK_LOW_WATERMARK_SIZE.get(settings).getBytes(); @@ -339,12 +343,14 @@ void checkLargestPrimaryOperationIsWithinLimits( long largestOperationSizeInBytes, boolean allowsOperationsBeyondSizeLimit ) { + listeners.forEach(l -> l.onPrimaryOperationTracked(largestOperationSizeInBytes)); if (largestOperationSizeInBytes > operationLimit) { this.largeOpsRejections.getAndIncrement(); this.totalRejectedLargeOpsBytes.addAndGet(largestOperationSizeInBytes); if (allowsOperationsBeyondSizeLimit == false) { this.primaryRejections.getAndIncrement(); this.primaryDocumentRejections.addAndGet(operations); + listeners.forEach(l -> l.onLargeIndexingOperationRejection(largestOperationSizeInBytes)); throw new EsRejectedExecutionException( "Request contains an operation of size [" + largestOperationSizeInBytes @@ -489,4 +495,14 @@ public IndexingPressureStats stats() { totalRejectedLargeOpsBytes.get() ); } + + @Override + public long getMaxAllowedOperationSizeInBytes() { + return operationLimit; + } + + @Override + public void addListener(IndexingPressureListener listener) { + listeners.add(listener); + } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressureMonitor.java b/server/src/main/java/org/elasticsearch/index/IndexingPressureMonitor.java new file mode 100644 index 0000000000000..d6979d35a0247 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressureMonitor.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index; + +/** + * Monitors indexing pressure events within the system and tracks operation sizes. + * This interface provides mechanisms to check maximum allowed operation sizes + * and register listeners for indexing pressure events. + */ +public interface IndexingPressureMonitor { + /** + * Returns the maximum allowed size in bytes for any single indexing operation. + * Operations exceeding this limit may be rejected. + * + * @return the maximum allowed operation size in bytes + */ + long getMaxAllowedOperationSizeInBytes(); + + /** + * Registers a listener to be notified of indexing pressure events. + * The listener will receive callbacks when operations are tracked or rejected. + * + * @param listener the listener to register for indexing pressure events + */ + void addListener(IndexingPressureListener listener); + + /** + * Listener interface for receiving notifications about indexing pressure events. + * Implementations can respond to tracking of primary operations and rejections + * of large indexing operations. + */ + interface IndexingPressureListener { + /** + * Called when a primary indexing operation is tracked. + * The implementation should be really lightweight as this is called in a hot path. + * + * @param largestOperationSizeInBytes the size in bytes of the largest operation tracked + */ + void onPrimaryOperationTracked(long largestOperationSizeInBytes); + + /** + * Called when a large indexing operation is rejected due to exceeding size limits. + * The implementation should be really lightweight as this is called in a hot path. + * + * @param largestOperationSizeInBytes the size in bytes of the rejected operation + */ + void onLargeIndexingOperationRejection(long largestOperationSizeInBytes); + } +}