Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
Expand All @@ -36,6 +37,8 @@
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -45,18 +48,29 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static java.util.stream.IntStream.range;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {

@Override
@SuppressWarnings("unchecked")
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
return CollectionUtils.appendToCopyNoNullElements(
super.nodePlugins(),
MockTransportService.TestPlugin.class,
TestTelemetryPlugin.class
);
}

/**
Expand Down Expand Up @@ -227,11 +241,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
*/

logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node");
final InternalClusterInfoService clusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getInstance(ClusterInfoService.class, masterName)
);
ClusterInfoServiceUtils.refresh(clusterInfoService);
refreshClusterInfo(masterName);

logger.info(
"---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes"
Expand All @@ -254,6 +264,67 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
}));
}

public void testMaxQueueLatencyMetricIsPublished() {
final Settings settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.build();
final String masterName = internalCluster().startMasterOnlyNode(settings);
final var dataNodes = internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);

// Refresh cluster info (should trigger polling)
refreshClusterInfo(masterName);

Map<String, Long> mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics(dataNodes);
assertThat(mostRecentQueueLatencyMetrics.keySet(), hasSize(dataNodes.size()));
assertThat(mostRecentQueueLatencyMetrics.values(), everyItem(greaterThanOrEqualTo(0L)));

final String dataNodeToDelay = randomFrom(dataNodes);
final ThreadPool threadPoolToDelay = internalCluster().getInstance(ThreadPool.class, dataNodeToDelay);

// Fill the write thread pool and block a task for some time
final int writeThreadPoolSize = threadPoolToDelay.info(ThreadPool.Names.WRITE).getMax();
final var latch = new CountDownLatch(1);
final var writeThreadPool = threadPoolToDelay.executor(ThreadPool.Names.WRITE);
range(0, writeThreadPoolSize + 1).forEach(i -> writeThreadPool.execute(() -> safeAwait(latch)));
final long delayMillis = randomIntBetween(100, 200);
safeSleep(delayMillis);
// Unblock the pool
latch.countDown();

refreshClusterInfo(masterName);
mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics(dataNodes);
assertThat(mostRecentQueueLatencyMetrics.keySet(), hasSize(dataNodes.size()));
assertThat(mostRecentQueueLatencyMetrics.get(dataNodeToDelay), greaterThanOrEqualTo(delayMillis));
}

private static void refreshClusterInfo(String masterName) {
final InternalClusterInfoService clusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getInstance(ClusterInfoService.class, masterName)
);
ClusterInfoServiceUtils.refresh(clusterInfoService);
}

private static Map<String, Long> getMostRecentQueueLatencyMetrics(List<String> dataNodes) {
final Map<String, Long> measurements = new HashMap<>();
for (String nodeName : dataNodes) {
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, nodeName);
final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow();
telemetryPlugin.collect();
final var maxLatencyValues = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE
);
if (maxLatencyValues.isEmpty() == false) {
measurements.put(nodeName, maxLatencyValues.getLast().getLong());
}
}
return measurements;
}

/**
* Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,30 @@

package org.elasticsearch.action.admin.cluster.node.usage;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
* Collects some thread pool stats from each data node for purposes of shard allocation balancing. The specific stats are defined in
Expand All @@ -42,20 +45,21 @@ public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesA
NodeUsageStatsForThreadPoolsAction.NodeResponse,
Void> {

private static final Logger logger = LogManager.getLogger(TransportNodeUsageStatsForThreadPoolsAction.class);

public static final String NAME = "internal:monitor/thread_pool/stats";
public static final ActionType<NodeUsageStatsForThreadPoolsAction.Response> TYPE = new ActionType<>(NAME);
private static final int NO_VALUE = -1;

private final ThreadPool threadPool;
private final ClusterService clusterService;
private final AtomicLong lastMaxQueueLatencyMillis = new AtomicLong(NO_VALUE);

@Inject
public TransportNodeUsageStatsForThreadPoolsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters
ActionFilters actionFilters,
DesiredBalanceMetrics desiredBalanceMetrics
) {
super(
NAME,
Expand All @@ -67,6 +71,7 @@ public TransportNodeUsageStatsForThreadPoolsAction(
);
this.threadPool = threadPool;
this.clusterService = clusterService;
desiredBalanceMetrics.registerWriteLoadDeciderMaxLatencyGauge(this::getMaxQueueLatencyMetric);
}

@Override
Expand Down Expand Up @@ -99,15 +104,17 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor;
var trackingForWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor;

long maxQueueLatencyMillis = Math.max(
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis()
);
lastMaxQueueLatencyMillis.set(maxQueueLatencyMillis);
ThreadPoolUsageStats threadPoolUsageStats = new ThreadPoolUsageStats(
trackingForWriteExecutor.getMaximumPoolSize(),
(float) trackingForWriteExecutor.pollUtilization(
TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
),
Math.max(
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis()
)
maxQueueLatencyMillis
);

Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
Expand All @@ -117,4 +124,13 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool)
);
}

private Collection<LongWithAttributes> getMaxQueueLatencyMetric() {
long maxQueueLatencyValue = lastMaxQueueLatencyMillis.getAndSet(NO_VALUE);
if (maxQueueLatencyValue != NO_VALUE) {
return Set.of(new LongWithAttributes(maxQueueLatencyValue));
} else {
return Set.of();
}
}
}
16 changes: 10 additions & 6 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory;
Expand Down Expand Up @@ -134,6 +135,7 @@ public class ClusterModule extends AbstractModule {
private final ShardRoutingRoleStrategy shardRoutingRoleStrategy;
private final AllocationStatsService allocationStatsService;
private final TelemetryProvider telemetryProvider;
private final DesiredBalanceMetrics desiredBalanceMetrics;

public ClusterModule(
Settings settings,
Expand All @@ -160,6 +162,7 @@ public ClusterModule(
writeLoadForecaster,
balancingWeightsFactory
);
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
this.shardsAllocator = createShardsAllocator(
settings,
clusterService.getClusterSettings(),
Expand All @@ -170,8 +173,8 @@ public ClusterModule(
clusterService,
this::reconcile,
writeLoadForecaster,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
desiredBalanceMetrics
);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver);
Expand Down Expand Up @@ -488,8 +491,8 @@ private static ShardsAllocator createShardsAllocator(
ClusterService clusterService,
DesiredBalanceReconcilerAction reconciler,
WriteLoadForecaster writeLoadForecaster,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
DesiredBalanceMetrics desiredBalanceMetrics
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(
Expand All @@ -504,8 +507,8 @@ private static ShardsAllocator createShardsAllocator(
threadPool,
clusterService,
reconciler,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
desiredBalanceMetrics
)
);

Expand Down Expand Up @@ -550,6 +553,7 @@ protected void configure() {
bind(ShardRoutingRoleStrategy.class).toInstance(shardRoutingRoleStrategy);
bind(AllocationStatsService.class).toInstance(allocationStatsService);
bind(TelemetryProvider.class).toInstance(telemetryProvider);
bind(DesiredBalanceMetrics.class).toInstance(desiredBalanceMetrics);
bind(MetadataRolloverService.class).asEagerSingleton();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;

/**
Expand All @@ -31,6 +33,8 @@
*/
public class DesiredBalanceMetrics {

public static DesiredBalanceMetrics NOOP = new DesiredBalanceMetrics(MeterRegistry.NOOP);

/**
* @param unassignedShards Shards that are not assigned to any node.
* @param allocationStatsByRole A breakdown of the allocations stats by {@link ShardRouting.Role}
Expand Down Expand Up @@ -124,8 +128,12 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME =
"es.allocator.allocations.node.forecasted_disk_usage_bytes.current";

// Decider metrics
public static final String WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE = "es.allocator.deciders.write_load.max_latency_value.current";

public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(0, Map.of());

private final MeterRegistry meterRegistry;
private volatile boolean nodeIsMaster = false;

/**
Expand Down Expand Up @@ -153,6 +161,7 @@ public void updateMetrics(
}

public DesiredBalanceMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
meterRegistry.registerLongsGauge(
UNASSIGNED_SHARDS_METRIC_NAME,
"Current number of unassigned shards",
Expand Down Expand Up @@ -260,6 +269,15 @@ public AllocationStats allocationStats() {
return lastReconciliationAllocationStats;
}

public void registerWriteLoadDeciderMaxLatencyGauge(Supplier<Collection<LongWithAttributes>> maxLatencySupplier) {
meterRegistry.registerLongsGauge(
WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE,
"max latency for write load decider",
"ms",
maxLatencySupplier
);
}

private List<LongWithAttributes> getUnassignedShardsMetrics() {
return getIfPublishing(AllocationStats::unassignedShards);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -115,17 +114,17 @@ public DesiredBalanceShardsAllocator(
ThreadPool threadPool,
ClusterService clusterService,
DesiredBalanceReconcilerAction reconciler,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
DesiredBalanceMetrics desiredBalanceMetrics
) {
this(
delegateAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator),
reconciler,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
desiredBalanceMetrics
);
}

Expand All @@ -135,10 +134,10 @@ public DesiredBalanceShardsAllocator(
ClusterService clusterService,
DesiredBalanceComputer desiredBalanceComputer,
DesiredBalanceReconcilerAction reconciler,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
DesiredBalanceMetrics desiredBalanceMetrics
) {
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
this.desiredBalanceMetrics = desiredBalanceMetrics;
this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator;
this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(threadPool, clusterService.getClusterSettings());
this.delegateAllocator = delegateAllocator;
Expand Down
Loading