Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126372.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126372
summary: Add `IndexingPressureMonitor` to monitor large indexing operations
area: CRUD
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.stats.IndexingPressureStats;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class IndexingPressure {
public class IndexingPressure implements IndexingPressureMonitor {

public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES = Setting.memorySizeSetting(
"indexing_pressure.memory.limit",
Expand Down Expand Up @@ -127,6 +129,8 @@ public class IndexingPressure {
private final long replicaLimit;
private final long operationLimit;

private final List<IndexingPressureListener> listeners = new CopyOnWriteArrayList<>();

public IndexingPressure(Settings settings) {
this.lowWatermark = SPLIT_BULK_LOW_WATERMARK.get(settings).getBytes();
this.lowWatermarkSize = SPLIT_BULK_LOW_WATERMARK_SIZE.get(settings).getBytes();
Expand Down Expand Up @@ -335,12 +339,14 @@ void checkLargestPrimaryOperationIsWithinLimits(
long largestOperationSizeInBytes,
boolean allowsOperationsBeyondSizeLimit
) {
listeners.forEach(l -> l.onPrimaryOperationTracked(largestOperationSizeInBytes));
if (largestOperationSizeInBytes > operationLimit) {
this.largeOpsRejections.getAndIncrement();
this.totalRejectedLargeOpsBytes.addAndGet(largestOperationSizeInBytes);
if (allowsOperationsBeyondSizeLimit == false) {
this.primaryRejections.getAndIncrement();
this.primaryDocumentRejections.addAndGet(operations);
listeners.forEach(l -> l.onLargeIndexingOperationRejection(largestOperationSizeInBytes));
throw new EsRejectedExecutionException(
"Request contains an operation of size ["
+ largestOperationSizeInBytes
Expand Down Expand Up @@ -485,4 +491,14 @@ public IndexingPressureStats stats() {
totalRejectedLargeOpsBytes.get()
);
}

@Override
public long getMaxAllowedOperationSizeInBytes() {
return operationLimit;
}

@Override
public void addListener(IndexingPressureListener listener) {
listeners.add(listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index;

/**
* Monitors indexing pressure events within the system and tracks operation sizes.
* This interface provides mechanisms to check maximum allowed operation sizes
* and register listeners for indexing pressure events.
*/
public interface IndexingPressureMonitor {
/**
* Returns the maximum allowed size in bytes for any single indexing operation.
* Operations exceeding this limit may be rejected.
*
* @return the maximum allowed operation size in bytes
*/
long getMaxAllowedOperationSizeInBytes();

/**
* Registers a listener to be notified of indexing pressure events.
* The listener will receive callbacks when operations are tracked or rejected.
*
* @param listener the listener to register for indexing pressure events
*/
void addListener(IndexingPressureListener listener);

/**
* Listener interface for receiving notifications about indexing pressure events.
* Implementations can respond to tracking of primary operations and rejections
* of large indexing operations.
*/
interface IndexingPressureListener {
/**
* Called when a primary indexing operation is tracked.
* The implementation should be really lightweight as this is called in a hot path.
*
* @param largestOperationSizeInBytes the size in bytes of the largest operation tracked
*/
void onPrimaryOperationTracked(long largestOperationSizeInBytes);

/**
* Called when a large indexing operation is rejected due to exceeding size limits.
* The implementation should be really lightweight as this is called in a hot path.
*
* @param largestOperationSizeInBytes the size in bytes of the rejected operation
*/
void onLargeIndexingOperationRejection(long largestOperationSizeInBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,8 @@ public Map<String, String> queryFields() {
metadataCreateIndexService
);

final IndexingPressure indexingLimits = new IndexingPressure(settings);

PluginServiceInstances pluginServices = new PluginServiceInstances(
client,
clusterService,
Expand All @@ -950,7 +952,8 @@ public Map<String, String> queryFields() {
documentParsingProvider,
taskManager,
projectResolver,
slowLogFieldProvider
slowLogFieldProvider,
indexingLimits
);

Collection<?> pluginComponents = pluginsService.flatMap(plugin -> {
Expand Down Expand Up @@ -983,7 +986,6 @@ public Map<String, String> queryFields() {
.map(TerminationHandlerProvider::handler);
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);

final IndexingPressure indexingLimits = new IndexingPressure(settings);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);

final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.index.IndexingPressureMonitor;
import org.elasticsearch.index.SlowLogFieldProvider;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.SystemIndices;
Expand Down Expand Up @@ -55,5 +56,6 @@ public record PluginServiceInstances(
DocumentParsingProvider documentParsingProvider,
TaskManager taskManager,
ProjectResolver projectResolver,
SlowLogFieldProvider slowLogFieldProvider
SlowLogFieldProvider slowLogFieldProvider,
IndexingPressureMonitor indexingPressureMonitor
) implements Plugin.PluginServices {}
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettingProvider;
import org.elasticsearch.index.IndexingPressureMonitor;
import org.elasticsearch.index.SlowLogFieldProvider;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.SystemIndices;
Expand Down Expand Up @@ -186,6 +187,11 @@ public interface PluginServices {
* Provider for additional SlowLog fields
*/
SlowLogFieldProvider slowLogFieldProvider();

/**
* Monitors indexing pressure events within the system and tracks operation sizes.
*/
IndexingPressureMonitor indexingPressureMonitor();
}

/**
Expand Down
Loading