2727import org .elasticsearch .cluster .service .ClusterService ;
2828import org .elasticsearch .common .Strings ;
2929import org .elasticsearch .common .settings .Settings ;
30+ import org .elasticsearch .common .util .concurrent .TaskExecutionTimeTrackingEsThreadPoolExecutor ;
3031import org .elasticsearch .core .TimeValue ;
3132import org .elasticsearch .index .IndexService ;
3233import org .elasticsearch .index .shard .IndexShard ;
5960import static java .util .Collections .emptySet ;
6061import static java .util .Collections .singletonList ;
6162import static java .util .Collections .unmodifiableSet ;
63+ import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_REPLICAS ;
64+ import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_SHARDS ;
6265import static org .elasticsearch .common .util .set .Sets .newHashSet ;
6366import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
6467import static org .hamcrest .CoreMatchers .equalTo ;
@@ -346,6 +349,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
346349 WriteLoadConstraintSettings .WRITE_LOAD_DECIDER_ENABLED_SETTING .getKey (),
347350 WriteLoadConstraintSettings .WriteLoadDeciderStatus .ENABLED
348351 )
352+ // Manually control cluster info refreshes
353+ .put (InternalClusterInfoService .INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING .getKey (), "60m" )
349354 .build ();
350355 var masterName = internalCluster ().startMasterOnlyNode (settings );
351356 var dataNodeName = internalCluster ().startDataOnlyNode (settings );
@@ -369,11 +374,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
369374 }
370375 );
371376
372- // Do some writes to create some write thread pool activity.
373- final String indexName = randomIdentifier ();
374- for (int i = 0 ; i < randomIntBetween (1 , 1000 ); i ++) {
375- index (indexName , Integer .toString (i ), Collections .singletonMap ("foo" , "bar" ));
376- }
377+ // Generate some writes to get some non-zero write thread pool stats.
378+ doALotOfDataNodeWrites ();
377379
378380 // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
379381 final InternalClusterInfoService masterClusterInfoService = asInstanceOf (
@@ -387,7 +389,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
387389
388390 final Map <String , NodeUsageStatsForThreadPools > usageStatsForThreadPools = clusterInfo .getNodeUsageStatsForThreadPools ();
389391 logger .info ("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools );
390- assertThat (usageStatsForThreadPools .size (), equalTo (1 )); // only stats from data nodes should be collectedg
392+ assertThat (usageStatsForThreadPools .size (), equalTo (1 )); // only stats from data nodes should be collected
391393 var dataNodeId = getNodeId (dataNodeName );
392394 var nodeUsageStatsForThreadPool = usageStatsForThreadPools .get (dataNodeId );
393395 assertNotNull (nodeUsageStatsForThreadPool );
@@ -400,4 +402,174 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
400402 assertThat (writeThreadPoolStats .averageThreadPoolUtilization (), greaterThan (0f ));
401403 assertThat (writeThreadPoolStats .maxThreadPoolQueueLatencyMillis (), greaterThanOrEqualTo (0L ));
402404 }
405+
406+ /**
407+ * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latencies:
408+ * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and
409+ * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis()}. The latter looks at currently queued tasks,
410+ * and the former tracks the queue latency of tasks when they are taken off of the queue to start execution.
411+ */
412+ public void testMaxQueueLatenciesInClusterInfo () throws Exception {
413+ var settings = Settings .builder ()
414+ .put (
415+ WriteLoadConstraintSettings .WRITE_LOAD_DECIDER_ENABLED_SETTING .getKey (),
416+ WriteLoadConstraintSettings .WriteLoadDeciderStatus .ENABLED
417+ )
418+ // Manually control cluster info refreshes
419+ .put (InternalClusterInfoService .INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING .getKey (), "60m" )
420+ .build ();
421+ var masterName = internalCluster ().startMasterOnlyNode (settings );
422+ var dataNodeName = internalCluster ().startDataOnlyNode (settings );
423+ ensureStableCluster (2 );
424+ assertEquals (internalCluster ().getMasterName (), masterName );
425+ assertNotEquals (internalCluster ().getMasterName (), dataNodeName );
426+ logger .info ("---> master node: " + masterName + ", data node: " + dataNodeName );
427+
428+ // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads.
429+ var barrier = blockDataNodeIndexing (dataNodeName );
430+ try {
431+ // Arbitrary number of tasks, which will queue because all the write threads are occupied already, greater than one: only
432+ // strictly need a single task to occupy the queue.
433+ int numberOfTasks = randomIntBetween (1 , 5 );
434+ Thread [] threadsToJoin = new Thread [numberOfTasks ];
435+ String indexName = randomIdentifier ();
436+ createIndex (
437+ indexName ,
438+ // NB: Set 0 replicas so that there aren't any stray GlobalCheckpointSyncAction tasks on the write thread pool.
439+ Settings .builder ().put (SETTING_NUMBER_OF_SHARDS , randomIntBetween (1 , 5 )).put (SETTING_NUMBER_OF_REPLICAS , 0 ).build ()
440+ );
441+ for (int i = 0 ; i < numberOfTasks ; ++i ) {
442+ threadsToJoin [i ] = startParallelSingleWrite (indexName );
443+ }
444+
445+ // Reach into the data node's write thread pool to check that tasks have reached the queue.
446+ var dataNodeThreadPool = internalCluster ().getInstance (ThreadPool .class , dataNodeName );
447+ var writeExecutor = dataNodeThreadPool .executor (ThreadPool .Names .WRITE );
448+ assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor ;
449+ var trackingWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor ) writeExecutor ;
450+ assertBusy (
451+ // Wait for the parallel threads' writes to get queued in the write thread pool.
452+ () -> assertThat (
453+ "Write thread pool dump: " + trackingWriteExecutor ,
454+ trackingWriteExecutor .peekMaxQueueLatencyInQueueMillis (),
455+ greaterThan (0L )
456+ )
457+ );
458+
459+ // Force a refresh of the ClusterInfo state to collect fresh info from the data node.
460+ final InternalClusterInfoService masterClusterInfoService = asInstanceOf (
461+ InternalClusterInfoService .class ,
462+ internalCluster ().getCurrentMasterNodeInstance (ClusterInfoService .class )
463+ );
464+ final ClusterInfo clusterInfo = ClusterInfoServiceUtils .refresh (masterClusterInfoService );
465+
466+ // Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue, which is called from the
467+ // TransportNodeUsageStatsForThreadPoolsAction that a ClusterInfoService refresh initiates, should return a max queue
468+ // latency > 0;
469+ {
470+ final Map <String , NodeUsageStatsForThreadPools > usageStatsForThreadPools = clusterInfo .getNodeUsageStatsForThreadPools ();
471+ logger .info ("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools );
472+ assertThat (usageStatsForThreadPools .size (), equalTo (1 )); // only stats from data node should be collected
473+ var dataNodeId = getNodeId (dataNodeName );
474+ var nodeUsageStatsForThreadPool = usageStatsForThreadPools .get (dataNodeId );
475+ assertNotNull (nodeUsageStatsForThreadPool );
476+ logger .info ("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool );
477+
478+ assertEquals (dataNodeId , nodeUsageStatsForThreadPool .nodeId ());
479+ var writeThreadPoolStats = nodeUsageStatsForThreadPool .threadPoolUsageStatsMap ().get (ThreadPool .Names .WRITE );
480+ assertNotNull ("Expected to find stats for the WRITE thread pool" , writeThreadPoolStats );
481+ assertThat (writeThreadPoolStats .totalThreadPoolThreads (), greaterThan (0 ));
482+ assertThat (writeThreadPoolStats .averageThreadPoolUtilization (), greaterThanOrEqualTo (0f ));
483+ assertThat (writeThreadPoolStats .maxThreadPoolQueueLatencyMillis (), greaterThan (0L ));
484+ }
485+
486+ // Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset
487+ // by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue
488+ // latencies will be present in the next call. There will be nothing in the queue to peek at now, so the result of the max
489+ // queue latency result in TransportNodeUsageStatsForThreadPoolsAction will reflect
490+ // #getMaxQueueLatencyMillisSinceLastPollAndReset and not #peekMaxQueueLatencyInQueue.
491+ barrier .await ();
492+ for (int i = 0 ; i < numberOfTasks ; ++i ) {
493+ threadsToJoin [i ].join ();
494+ }
495+ assertThat (
496+ "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor ,
497+ trackingWriteExecutor .peekMaxQueueLatencyInQueueMillis (),
498+ equalTo (0L )
499+ );
500+
501+ final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils .refresh (masterClusterInfoService );
502+ {
503+ final Map <String , NodeUsageStatsForThreadPools > usageStatsForThreadPools = nextClusterInfo
504+ .getNodeUsageStatsForThreadPools ();
505+ logger .info ("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools );
506+ assertThat (usageStatsForThreadPools .size (), equalTo (1 )); // only stats from data nodes should be collected
507+ var dataNodeId = getNodeId (dataNodeName );
508+ var nodeUsageStatsForThreadPool = usageStatsForThreadPools .get (dataNodeId );
509+ assertNotNull (nodeUsageStatsForThreadPool );
510+ logger .info ("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool );
511+
512+ assertEquals (dataNodeId , nodeUsageStatsForThreadPool .nodeId ());
513+ var writeThreadPoolStats = nodeUsageStatsForThreadPool .threadPoolUsageStatsMap ().get (ThreadPool .Names .WRITE );
514+ assertNotNull ("Expected to find stats for the WRITE thread pool" , writeThreadPoolStats );
515+ assertThat (writeThreadPoolStats .totalThreadPoolThreads (), greaterThan (0 ));
516+ assertThat (writeThreadPoolStats .averageThreadPoolUtilization (), greaterThan (0f ));
517+ assertThat (writeThreadPoolStats .maxThreadPoolQueueLatencyMillis (), greaterThanOrEqualTo (0L ));
518+ }
519+ } finally {
520+ // Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. If the
521+ // callers have already all been successfully released, then there will be nothing left to interrupt.
522+ logger .info ("---> Ensuring release of the barrier on write thread pool tasks" );
523+ barrier .reset ();
524+ }
525+
526+ // Now that there's nothing in the queue, and no activity since the last ClusterInfo refresh, the max latency returned should be
527+ // zero. Verify this.
528+ final InternalClusterInfoService masterClusterInfoService = asInstanceOf (
529+ InternalClusterInfoService .class ,
530+ internalCluster ().getCurrentMasterNodeInstance (ClusterInfoService .class )
531+ );
532+ final ClusterInfo clusterInfo = ClusterInfoServiceUtils .refresh (masterClusterInfoService );
533+ {
534+ final Map <String , NodeUsageStatsForThreadPools > usageStatsForThreadPools = clusterInfo .getNodeUsageStatsForThreadPools ();
535+ logger .info ("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools );
536+ assertThat (usageStatsForThreadPools .size (), equalTo (1 )); // only stats from data nodes should be collected
537+ var dataNodeId = getNodeId (dataNodeName );
538+ var nodeUsageStatsForThreadPool = usageStatsForThreadPools .get (dataNodeId );
539+ assertNotNull (nodeUsageStatsForThreadPool );
540+ logger .info ("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool );
541+
542+ assertEquals (dataNodeId , nodeUsageStatsForThreadPool .nodeId ());
543+ var writeThreadPoolStats = nodeUsageStatsForThreadPool .threadPoolUsageStatsMap ().get (ThreadPool .Names .WRITE );
544+ assertNotNull ("Expected to find stats for the WRITE thread pool" , writeThreadPoolStats );
545+ assertThat (writeThreadPoolStats .totalThreadPoolThreads (), greaterThan (0 ));
546+ assertThat (writeThreadPoolStats .averageThreadPoolUtilization (), equalTo (0f ));
547+ assertThat (writeThreadPoolStats .maxThreadPoolQueueLatencyMillis (), equalTo (0L ));
548+ }
549+ }
550+
551+ /**
552+ * Do some writes to create some write thread pool activity.
553+ */
554+ private void doALotOfDataNodeWrites () {
555+ final String indexName = randomIdentifier ();
556+ final int randomInt = randomIntBetween (500 , 1000 );
557+ for (int i = 0 ; i < randomInt ; i ++) {
558+ index (indexName , Integer .toString (i ), Collections .singletonMap ("foo" , "bar" ));
559+ }
560+ }
561+
562+ /**
563+ * Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually.
564+ */
565+ private Thread startParallelSingleWrite (String indexName ) {
566+ Thread running = new Thread (() -> doSingleWrite (indexName ));
567+ running .start ();
568+ return running ;
569+ }
570+
571+ private void doSingleWrite (String indexName ) {
572+ final int randomId = randomIntBetween (500 , 1000 );
573+ index (indexName , Integer .toString (randomId ), Collections .singletonMap ("foo" , "bar" ));
574+ }
403575}
0 commit comments