1515import org .elasticsearch .action .admin .indices .stats .IndicesStatsAction ;
1616import org .elasticsearch .action .admin .indices .stats .ShardStats ;
1717import org .elasticsearch .action .admin .indices .stats .TransportIndicesStatsAction ;
18- import org .elasticsearch .cluster .ClusterInfoService ;
19- import org .elasticsearch .cluster .ClusterInfoServiceUtils ;
20- import org .elasticsearch .cluster .InternalClusterInfoService ;
2118import org .elasticsearch .cluster .NodeUsageStatsForThreadPools ;
2219import org .elasticsearch .cluster .metadata .IndexMetadata ;
2320import org .elasticsearch .cluster .node .DiscoveryNode ;
2623import org .elasticsearch .cluster .routing .ShardRouting ;
2724import org .elasticsearch .cluster .routing .UnassignedInfo ;
2825import org .elasticsearch .cluster .routing .allocation .WriteLoadConstraintSettings ;
26+ import org .elasticsearch .cluster .routing .allocation .allocator .DesiredBalanceMetrics ;
2927import org .elasticsearch .cluster .service .ClusterService ;
3028import org .elasticsearch .common .settings .Settings ;
3129import org .elasticsearch .common .util .CollectionUtils ;
3634import org .elasticsearch .index .shard .ShardPath ;
3735import org .elasticsearch .index .store .StoreStats ;
3836import org .elasticsearch .plugins .Plugin ;
37+ import org .elasticsearch .plugins .PluginsService ;
38+ import org .elasticsearch .telemetry .TestTelemetryPlugin ;
3939import org .elasticsearch .test .ClusterServiceUtils ;
4040import org .elasticsearch .test .ESIntegTestCase ;
4141import org .elasticsearch .test .transport .MockTransportService ;
4545import java .nio .file .Path ;
4646import java .util .ArrayList ;
4747import java .util .Collection ;
48+ import java .util .HashMap ;
4849import java .util .List ;
4950import java .util .Map ;
51+ import java .util .concurrent .CountDownLatch ;
5052
53+ import static java .util .stream .IntStream .range ;
5154import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_REPLICAS ;
5255import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_SHARDS ;
5356import static org .elasticsearch .cluster .routing .ShardMovementWriteLoadSimulator .calculateUtilizationForWriteLoad ;
57+ import static org .hamcrest .Matchers .everyItem ;
58+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
59+ import static org .hamcrest .Matchers .hasSize ;
5460
5561@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
5662public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {
5763
5864 @ Override
65+ @ SuppressWarnings ("unchecked" )
5966 protected Collection <Class <? extends Plugin >> getMockPlugins () {
60- return CollectionUtils .appendToCopy (super .nodePlugins (), MockTransportService .TestPlugin .class );
67+ return CollectionUtils .appendToCopyNoNullElements (
68+ super .nodePlugins (),
69+ MockTransportService .TestPlugin .class ,
70+ TestTelemetryPlugin .class
71+ );
6172 }
6273
6374 /**
@@ -236,11 +247,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
236247 */
237248
238249 logger .info ("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node" );
239- final InternalClusterInfoService clusterInfoService = asInstanceOf (
240- InternalClusterInfoService .class ,
241- internalCluster ().getInstance (ClusterInfoService .class , masterName )
242- );
243- ClusterInfoServiceUtils .refresh (clusterInfoService );
250+ refreshClusterInfo ();
244251
245252 logger .info (
246253 "---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes"
@@ -263,6 +270,57 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
263270 }));
264271 }
265272
273+ public void testMaxQueueLatencyMetricIsPublished () {
274+ final Settings settings = Settings .builder ()
275+ .put (
276+ WriteLoadConstraintSettings .WRITE_LOAD_DECIDER_ENABLED_SETTING .getKey (),
277+ WriteLoadConstraintSettings .WriteLoadDeciderStatus .ENABLED
278+ )
279+ .build ();
280+ final var dataNodes = internalCluster ().startNodes (3 , settings );
281+
282+ // Refresh cluster info (should trigger polling)
283+ refreshClusterInfo ();
284+
285+ Map <String , Long > mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics (dataNodes );
286+ assertThat (mostRecentQueueLatencyMetrics .keySet (), hasSize (dataNodes .size ()));
287+ assertThat (mostRecentQueueLatencyMetrics .values (), everyItem (greaterThanOrEqualTo (0L )));
288+
289+ final String dataNodeToDelay = randomFrom (dataNodes );
290+ final ThreadPool threadPoolToDelay = internalCluster ().getInstance (ThreadPool .class , dataNodeToDelay );
291+
292+ // Fill the write thread pool and block a task for some time
293+ final int writeThreadPoolSize = threadPoolToDelay .info (ThreadPool .Names .WRITE ).getMax ();
294+ final var latch = new CountDownLatch (1 );
295+ final var writeThreadPool = threadPoolToDelay .executor (ThreadPool .Names .WRITE );
296+ range (0 , writeThreadPoolSize + 1 ).forEach (i -> writeThreadPool .execute (() -> safeAwait (latch )));
297+ final long delayMillis = randomIntBetween (100 , 200 );
298+ safeSleep (delayMillis );
299+ // Unblock the pool
300+ latch .countDown ();
301+
302+ refreshClusterInfo ();
303+ mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics (dataNodes );
304+ assertThat (mostRecentQueueLatencyMetrics .keySet (), hasSize (dataNodes .size ()));
305+ assertThat (mostRecentQueueLatencyMetrics .get (dataNodeToDelay ), greaterThanOrEqualTo (delayMillis ));
306+ }
307+
308+ private static Map <String , Long > getMostRecentQueueLatencyMetrics (List <String > dataNodes ) {
309+ final Map <String , Long > measurements = new HashMap <>();
310+ for (String nodeName : dataNodes ) {
311+ PluginsService pluginsService = internalCluster ().getInstance (PluginsService .class , nodeName );
312+ final TestTelemetryPlugin telemetryPlugin = pluginsService .filterPlugins (TestTelemetryPlugin .class ).findFirst ().orElseThrow ();
313+ telemetryPlugin .collect ();
314+ final var maxLatencyValues = telemetryPlugin .getLongGaugeMeasurement (
315+ DesiredBalanceMetrics .WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE
316+ );
317+ if (maxLatencyValues .isEmpty () == false ) {
318+ measurements .put (nodeName , maxLatencyValues .getLast ().getLong ());
319+ }
320+ }
321+ return measurements ;
322+ }
323+
266324 /**
267325 * Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node.
268326 */
0 commit comments