-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Use the last good NodeUsageStatsForThreadPools when a node returns an error #133896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
640f0da
8c087f4
94d46a0
d072039
1c6e239
8b374dd
c3d3d3c
20fa020
5fdf7d1
6310e24
355f6d9
3decebd
36ab0fa
ff795ac
d5b17ab
ee9acc5
f8b9b4d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.cluster; | ||
|
|
||
| import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; | ||
| import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; | ||
| import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.util.CollectionUtils; | ||
| import org.elasticsearch.common.util.Maps; | ||
| import org.elasticsearch.plugins.Plugin; | ||
| import org.elasticsearch.test.ESIntegTestCase; | ||
| import org.elasticsearch.test.transport.MockTransportService; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.TestTransportChannel; | ||
|
|
||
| import java.util.Collection; | ||
| import java.util.Objects; | ||
|
|
||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.hasKey; | ||
|
|
||
| public class NodeUsageStatsForThreadPoolsCollectorIT extends ESIntegTestCase { | ||
|
|
||
| @Override | ||
| protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { | ||
| return Settings.builder() | ||
| .put(super.nodeSettings(nodeOrdinal, otherSettings)) | ||
| // Need to enable write load decider to enable node usage stats collection | ||
| .put( | ||
| WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), | ||
| WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED | ||
| ) | ||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
| protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
| return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); | ||
| } | ||
|
|
||
| public void testMostRecentValueIsUsedWhenNodeRequestFails() { | ||
| final var dataNodeName = internalCluster().startDataOnlyNode(); | ||
| final var dataNodeClusterService = internalCluster().getInstance(ClusterService.class, dataNodeName); | ||
| final var dataNodeTransportService = MockTransportService.getInstance(dataNodeName); | ||
| final var threadPoolName = randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH); | ||
|
|
||
| // Intercept the node request and return some fake values | ||
| final int totalThreadPoolThreads = randomIntBetween(2, 40); | ||
| final float averageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true); | ||
| final long maxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000); | ||
| mockThreadPoolUsageStats( | ||
| dataNodeTransportService, | ||
| threadPoolName, | ||
| totalThreadPoolThreads, | ||
| averageThreadPoolUtilization, | ||
| maxThreadPoolQueueLatencyMillis | ||
| ); | ||
|
|
||
| // This info should contain our fake values | ||
| refreshClusterInfoAndAssertThreadPoolHasStats( | ||
| dataNodeClusterService.localNode().getId(), | ||
| threadPoolName, | ||
| totalThreadPoolThreads, | ||
| averageThreadPoolUtilization, | ||
| maxThreadPoolQueueLatencyMillis | ||
| ); | ||
|
|
||
| // Now simulate an error | ||
| dataNodeTransportService.clearInboundRules(); | ||
| dataNodeTransportService.addRequestHandlingBehavior( | ||
| TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", | ||
| (handler, request, channel, task) -> { | ||
| channel.sendResponse(new Exception("simulated error")); | ||
| } | ||
| ); | ||
|
|
||
| // The next response should also contain our fake values | ||
| refreshClusterInfoAndAssertThreadPoolHasStats( | ||
| dataNodeClusterService.localNode().getId(), | ||
| threadPoolName, | ||
| totalThreadPoolThreads, | ||
| averageThreadPoolUtilization, | ||
| maxThreadPoolQueueLatencyMillis | ||
| ); | ||
|
|
||
| // Now start returning values again | ||
| final int newTotalThreadPoolThreads = randomIntBetween(2, 40); | ||
| final float newAverageThreadPoolUtilization = randomFloatBetween(0.0f, 1.0f, true); | ||
| final long newMaxThreadPoolQueueLatencyMillis = randomLongBetween(0, 1000); | ||
| mockThreadPoolUsageStats( | ||
| dataNodeTransportService, | ||
| threadPoolName, | ||
| newTotalThreadPoolThreads, | ||
| newAverageThreadPoolUtilization, | ||
| newMaxThreadPoolQueueLatencyMillis | ||
| ); | ||
|
|
||
| // The next response should contain the current values again | ||
| refreshClusterInfoAndAssertThreadPoolHasStats( | ||
| dataNodeClusterService.localNode().getId(), | ||
| threadPoolName, | ||
| newTotalThreadPoolThreads, | ||
| newAverageThreadPoolUtilization, | ||
| newMaxThreadPoolQueueLatencyMillis | ||
| ); | ||
| } | ||
|
|
||
| private static void mockThreadPoolUsageStats( | ||
| MockTransportService dataNodeTransportService, | ||
| String threadPoolName, | ||
| int totalThreadPoolThreads, | ||
| float averageThreadPoolUtilization, | ||
| long maxThreadPoolQueueLatencyMillis | ||
| ) { | ||
| dataNodeTransportService.clearInboundRules(); | ||
| dataNodeTransportService.addRequestHandlingBehavior( | ||
| TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", | ||
| (handler, request, channel, task) -> { | ||
| NodeUsageStatsForThreadPoolsAction.NodeResponse response = safeAwait( | ||
| l -> handler.messageReceived( | ||
| request, | ||
| new TestTransportChannel(l.map(res -> (NodeUsageStatsForThreadPoolsAction.NodeResponse) res)), | ||
| task | ||
| ) | ||
| ); | ||
| final var responseStats = response.getNodeUsageStatsForThreadPools(); | ||
| channel.sendResponse( | ||
| new NodeUsageStatsForThreadPoolsAction.NodeResponse( | ||
| response.getNode(), | ||
| new NodeUsageStatsForThreadPools( | ||
| responseStats.nodeId(), | ||
| Maps.copyMapWithAddedOrReplacedEntry( | ||
| responseStats.threadPoolUsageStatsMap(), | ||
| threadPoolName, | ||
| new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( | ||
| totalThreadPoolThreads, | ||
| averageThreadPoolUtilization, | ||
| maxThreadPoolQueueLatencyMillis | ||
| ) | ||
| ) | ||
| ) | ||
| ) | ||
| ); | ||
| } | ||
| ); | ||
| } | ||
|
|
||
| private void refreshClusterInfoAndAssertThreadPoolHasStats( | ||
| String nodeId, | ||
| String threadPoolName, | ||
| int totalThreadPoolThreads, | ||
| float averageThreadPoolUtilization, | ||
| long maxThreadPoolQueueLatencyMillis | ||
| ) { | ||
| final var clusterInfo = Objects.requireNonNull(refreshClusterInfo()); | ||
| final var usageStatsMap = clusterInfo.getNodeUsageStatsForThreadPools().get(nodeId).threadPoolUsageStatsMap(); | ||
| assertThat(usageStatsMap, hasKey(threadPoolName)); | ||
| final var threadPoolStats = usageStatsMap.get(threadPoolName); | ||
| assertThat(threadPoolStats.totalThreadPoolThreads(), equalTo(totalThreadPoolThreads)); | ||
| assertThat(threadPoolStats.averageThreadPoolUtilization(), equalTo(averageThreadPoolUtilization)); | ||
| assertThat(threadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(maxThreadPoolQueueLatencyMillis)); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,6 @@ | |
|
|
||
| import java.io.IOException; | ||
| import java.util.Collection; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
@@ -117,11 +116,9 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( | |
| maxQueueLatencyMillis | ||
| ); | ||
|
|
||
| Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>(); | ||
| perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats); | ||
| return new NodeUsageStatsForThreadPoolsAction.NodeResponse( | ||
| localNode, | ||
| new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool) | ||
| new NodeUsageStatsForThreadPools(localNode.getId(), Map.of(ThreadPool.Names.WRITE, threadPoolUsageStats)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needed to do this to use |
||
| ); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,6 +74,7 @@ | |
| import org.elasticsearch.client.internal.Client; | ||
| import org.elasticsearch.client.internal.ClusterAdminClient; | ||
| import org.elasticsearch.client.internal.IndicesAdminClient; | ||
| import org.elasticsearch.cluster.ClusterInfo; | ||
| import org.elasticsearch.cluster.ClusterInfoService; | ||
| import org.elasticsearch.cluster.ClusterInfoServiceUtils; | ||
| import org.elasticsearch.cluster.ClusterModule; | ||
|
|
@@ -1572,14 +1573,21 @@ private static void ensureClusterInfoServiceRunning() { | |
| } | ||
| } | ||
|
|
||
| public static void refreshClusterInfo() { | ||
| /** | ||
| * Refreshes the cluster info on the master | ||
| * | ||
| * @return The new cluster info if the refresh was executed, null if the {@link ClusterInfoService} was of an unknown type | ||
| */ | ||
| @Nullable | ||
| public static ClusterInfo refreshClusterInfo() { | ||
| final ClusterInfoService clusterInfoService = internalCluster().getInstance( | ||
| ClusterInfoService.class, | ||
| internalCluster().getMasterName() | ||
| ); | ||
| if (clusterInfoService instanceof InternalClusterInfoService) { | ||
| ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService)); | ||
| return ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService)); | ||
| } | ||
| return null; | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seemed useful to return the actual |
||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
neat