2525import org .elasticsearch .cluster .routing .RoutingNodes ;
2626import org .elasticsearch .cluster .routing .ShardRouting ;
2727import org .elasticsearch .cluster .routing .UnassignedInfo ;
28+ import org .elasticsearch .cluster .routing .allocation .AllocationDeciderMetrics ;
2829import org .elasticsearch .cluster .routing .allocation .WriteLoadConstraintSettings ;
2930import org .elasticsearch .cluster .service .ClusterService ;
3031import org .elasticsearch .common .settings .Settings ;
3637import org .elasticsearch .index .shard .ShardPath ;
3738import org .elasticsearch .index .store .StoreStats ;
3839import org .elasticsearch .plugins .Plugin ;
40+ import org .elasticsearch .plugins .PluginsService ;
41+ import org .elasticsearch .telemetry .TestTelemetryPlugin ;
3942import org .elasticsearch .test .ClusterServiceUtils ;
4043import org .elasticsearch .test .ESIntegTestCase ;
4144import org .elasticsearch .test .transport .MockTransportService ;
4548import java .nio .file .Path ;
4649import java .util .ArrayList ;
4750import java .util .Collection ;
51+ import java .util .HashMap ;
4852import java .util .List ;
4953import java .util .Map ;
54+ import java .util .concurrent .CyclicBarrier ;
5055
5156import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_REPLICAS ;
5257import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_SHARDS ;
58+ import static org .hamcrest .Matchers .everyItem ;
59+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
60+ import static org .hamcrest .Matchers .hasSize ;
5361
5462@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
5563public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {
5664
5765 @ Override
66+ @ SuppressWarnings ("unchecked" )
5867 protected Collection <Class <? extends Plugin >> getMockPlugins () {
59- return CollectionUtils .appendToCopy (super .nodePlugins (), MockTransportService .TestPlugin .class );
68+ return CollectionUtils .appendToCopyNoNullElements (
69+ super .nodePlugins (),
70+ MockTransportService .TestPlugin .class ,
71+ TestTelemetryPlugin .class
72+ );
6073 }
6174
6275 /**
@@ -227,11 +240,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
227240 */
228241
229242 logger .info ("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node" );
230- final InternalClusterInfoService clusterInfoService = asInstanceOf (
231- InternalClusterInfoService .class ,
232- internalCluster ().getInstance (ClusterInfoService .class , masterName )
233- );
234- ClusterInfoServiceUtils .refresh (clusterInfoService );
243+ refreshClusterInfo (masterName );
235244
236245 logger .info (
237246 "---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes"
@@ -254,6 +263,76 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
254263 }));
255264 }
256265
266+ public void testMaxQueueLatencyMetricIsPublished () {
267+ final Settings settings = Settings .builder ()
268+ .put (
269+ WriteLoadConstraintSettings .WRITE_LOAD_DECIDER_ENABLED_SETTING .getKey (),
270+ WriteLoadConstraintSettings .WriteLoadDeciderStatus .ENABLED
271+ )
272+ .build ();
273+ final String masterName = internalCluster ().startMasterOnlyNode (settings );
274+ final var dataNodes = internalCluster ().startDataOnlyNodes (2 , settings );
275+ ensureStableCluster (3 );
276+
277+ // Refresh cluster info (should trigger polling)
278+ refreshClusterInfo (masterName );
279+
280+ Map <String , Long > mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics (dataNodes );
281+ assertThat (mostRecentQueueLatencyMetrics .keySet (), hasSize (dataNodes .size ()));
282+ assertThat (mostRecentQueueLatencyMetrics .values (), everyItem (greaterThanOrEqualTo (0L )));
283+
284+ final String dataNodeToDelay = randomFrom (dataNodes );
285+ final ThreadPool threadPoolToDelay = internalCluster ().getInstance (ThreadPool .class , dataNodeToDelay );
286+
287+ // Fill the write thread pool
288+ final int writeThreadPoolSize = threadPoolToDelay .info (ThreadPool .Names .WRITE ).getMax ();
289+ final CyclicBarrier delayLatch = new CyclicBarrier (writeThreadPoolSize + 1 );
290+ for (int i = 0 ; i < writeThreadPoolSize ; i ++) {
291+ threadPoolToDelay .executor (ThreadPool .Names .WRITE ).execute (() -> {
292+ safeAwait (delayLatch );
293+ safeAwait (delayLatch );
294+ });
295+ }
296+ safeAwait (delayLatch );
297+ // Submit a task that will be delayed
298+ threadPoolToDelay .executor (ThreadPool .Names .WRITE ).execute (() -> {
299+ // Doesn't need to do anything
300+ });
301+ final long delayMillis = randomIntBetween (100 , 200 );
302+ safeSleep (delayMillis );
303+ // Unblock the pool
304+ safeAwait (delayLatch );
305+
306+ refreshClusterInfo (masterName );
307+ mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics (dataNodes );
308+ assertThat (mostRecentQueueLatencyMetrics .keySet (), hasSize (dataNodes .size ()));
309+ assertThat (mostRecentQueueLatencyMetrics .get (dataNodeToDelay ), greaterThanOrEqualTo (delayMillis ));
310+ }
311+
312+ private static void refreshClusterInfo (String masterName ) {
313+ final InternalClusterInfoService clusterInfoService = asInstanceOf (
314+ InternalClusterInfoService .class ,
315+ internalCluster ().getInstance (ClusterInfoService .class , masterName )
316+ );
317+ ClusterInfoServiceUtils .refresh (clusterInfoService );
318+ }
319+
320+ private static Map <String , Long > getMostRecentQueueLatencyMetrics (List <String > dataNodes ) {
321+ final Map <String , Long > measurements = new HashMap <>();
322+ for (String nodeName : dataNodes ) {
323+ PluginsService pluginsService = internalCluster ().getInstance (PluginsService .class , nodeName );
324+ final TestTelemetryPlugin telemetryPlugin = pluginsService .filterPlugins (TestTelemetryPlugin .class ).findFirst ().orElseThrow ();
325+ telemetryPlugin .collect ();
326+ final var maxLatencyValues = telemetryPlugin .getLongGaugeMeasurement (
327+ AllocationDeciderMetrics .WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE
328+ );
329+ if (maxLatencyValues .isEmpty () == false ) {
330+ measurements .put (nodeName , maxLatencyValues .getLast ().getLong ());
331+ }
332+ }
333+ return measurements ;
334+ }
335+
257336 /**
258337 * Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node.
259338 */
0 commit comments