Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationDeciderMetrics;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -36,6 +37,8 @@
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -45,18 +48,28 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {

@Override
@SuppressWarnings("unchecked")
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
return CollectionUtils.appendToCopyNoNullElements(
super.nodePlugins(),
MockTransportService.TestPlugin.class,
TestTelemetryPlugin.class
);
}

/**
Expand Down Expand Up @@ -227,11 +240,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
*/

logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node");
final InternalClusterInfoService clusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getInstance(ClusterInfoService.class, masterName)
);
ClusterInfoServiceUtils.refresh(clusterInfoService);
refreshClusterInfo(masterName);

logger.info(
"---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes"
Expand All @@ -254,6 +263,76 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
}));
}

public void testMaxQueueLatencyMetricIsPublished() {
final Settings settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.build();
final String masterName = internalCluster().startMasterOnlyNode(settings);
final var dataNodes = internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);

// Refresh cluster info (should trigger polling)
refreshClusterInfo(masterName);

Map<String, Long> mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics(dataNodes);
assertThat(mostRecentQueueLatencyMetrics.keySet(), hasSize(dataNodes.size()));
assertThat(mostRecentQueueLatencyMetrics.values(), everyItem(greaterThanOrEqualTo(0L)));

final String dataNodeToDelay = randomFrom(dataNodes);
final ThreadPool threadPoolToDelay = internalCluster().getInstance(ThreadPool.class, dataNodeToDelay);

// Fill the write thread pool
final int writeThreadPoolSize = threadPoolToDelay.info(ThreadPool.Names.WRITE).getMax();
final CyclicBarrier delayLatch = new CyclicBarrier(writeThreadPoolSize + 1);
for (int i = 0; i < writeThreadPoolSize; i++) {
threadPoolToDelay.executor(ThreadPool.Names.WRITE).execute(() -> {
safeAwait(delayLatch);
safeAwait(delayLatch);
Copy link
Contributor

@mhl-b mhl-b Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a barrier? I think countdown is enough to block thread-pool and let one task sit in queue.

        var latch = new CountDownLatch(1);
        range(0, threads + 1).forEach(i -> pool.execute(() -> safeAwait(latch)));
        var queueTimeMs = between(100,200);
        safeSleep(queueTimeMs);
        latch.countDown();

});
}
safeAwait(delayLatch);
// Submit a task that will be delayed
threadPoolToDelay.executor(ThreadPool.Names.WRITE).execute(() -> {
// Doesn't need to do anything
});
final long delayMillis = randomIntBetween(100, 200);
safeSleep(delayMillis);
// Unblock the pool
safeAwait(delayLatch);

refreshClusterInfo(masterName);
mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics(dataNodes);
assertThat(mostRecentQueueLatencyMetrics.keySet(), hasSize(dataNodes.size()));
assertThat(mostRecentQueueLatencyMetrics.get(dataNodeToDelay), greaterThanOrEqualTo(delayMillis));
}

private static void refreshClusterInfo(String masterName) {
final InternalClusterInfoService clusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getInstance(ClusterInfoService.class, masterName)
);
ClusterInfoServiceUtils.refresh(clusterInfoService);
}

private static Map<String, Long> getMostRecentQueueLatencyMetrics(List<String> dataNodes) {
final Map<String, Long> measurements = new HashMap<>();
for (String nodeName : dataNodes) {
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, nodeName);
final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow();
telemetryPlugin.collect();
final var maxLatencyValues = telemetryPlugin.getLongGaugeMeasurement(
AllocationDeciderMetrics.WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE
);
if (maxLatencyValues.isEmpty() == false) {
measurements.put(nodeName, maxLatencyValues.getLast().getLong());
}
}
return measurements;
}

/**
* Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,30 @@

package org.elasticsearch.action.admin.cluster.node.usage;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationDeciderMetrics;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
* Collects some thread pool stats from each data node for purposes of shard allocation balancing. The specific stats are defined in
Expand All @@ -42,20 +45,21 @@ public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesA
NodeUsageStatsForThreadPoolsAction.NodeResponse,
Void> {

private static final Logger logger = LogManager.getLogger(TransportNodeUsageStatsForThreadPoolsAction.class);

public static final String NAME = "internal:monitor/thread_pool/stats";
public static final ActionType<NodeUsageStatsForThreadPoolsAction.Response> TYPE = new ActionType<>(NAME);
private static final int NO_VALUE = -1;

private final ThreadPool threadPool;
private final ClusterService clusterService;
private final AtomicLong lastMaxQueueLatencyMillis = new AtomicLong(NO_VALUE);

@Inject
public TransportNodeUsageStatsForThreadPoolsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters
ActionFilters actionFilters,
AllocationDeciderMetrics allocationDeciderMetrics
) {
super(
NAME,
Expand All @@ -67,6 +71,7 @@ public TransportNodeUsageStatsForThreadPoolsAction(
);
this.threadPool = threadPool;
this.clusterService = clusterService;
allocationDeciderMetrics.registerWriteLoadDeciderMaxLatencyGauge(this::getMaxQueueLatencyMetric);
}

@Override
Expand Down Expand Up @@ -99,15 +104,17 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor;
var trackingForWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor;

long maxQueueLatencyMillis = Math.max(
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis()
);
lastMaxQueueLatencyMillis.set(maxQueueLatencyMillis);
ThreadPoolUsageStats threadPoolUsageStats = new ThreadPoolUsageStats(
trackingForWriteExecutor.getMaximumPoolSize(),
(float) trackingForWriteExecutor.pollUtilization(
TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
),
Math.max(
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis()
)
maxQueueLatencyMillis
);

Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
Expand All @@ -117,4 +124,13 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool)
);
}

private Collection<LongWithAttributes> getMaxQueueLatencyMetric() {
long maxQueueLatencyValue = lastMaxQueueLatencyMillis.getAndSet(NO_VALUE);
if (maxQueueLatencyValue != NO_VALUE) {
return Set.of(new LongWithAttributes(maxQueueLatencyValue));
} else {
return Set.of();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Collection;
import java.util.function.Supplier;

/**
* A place where metrics related to allocation deciders can live
*/
public class AllocationDeciderMetrics {

public static final String WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE = "es.allocator.deciders.write_load.max_latency_value.current";

private final MeterRegistry meterRegistry;

public AllocationDeciderMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

public void registerWriteLoadDeciderMaxLatencyGauge(Supplier<Collection<LongWithAttributes>> maxLatencySupplier) {
meterRegistry.registerLongsGauge(
WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE,
"max latency for write load decider",
"ms",
maxLatencySupplier
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.cluster.project.ProjectResolverFactory;
import org.elasticsearch.cluster.routing.BatchedRerouteService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationDeciderMetrics;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor;
Expand Down Expand Up @@ -1234,6 +1235,8 @@ public Map<String, String> queryFields() {

final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler);

final AllocationDeciderMetrics allocationDeciderMetrics = new AllocationDeciderMetrics(telemetryProvider.getMeterRegistry());

modules.add(loadPersistentTasksService(settingsModule, clusterService, threadPool, clusterModule.getIndexNameExpressionResolver()));

modules.add(
Expand Down Expand Up @@ -1324,6 +1327,7 @@ public Map<String, String> queryFields() {
b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService);
b.bind(OnlinePrewarmingService.class).toInstance(onlinePrewarmingService);
b.bind(MergeMetrics.class).toInstance(mergeMetrics);
b.bind(AllocationDeciderMetrics.class).toInstance(allocationDeciderMetrics);
});

if (ReadinessService.enabled(environment)) {
Expand Down