2727import org .elasticsearch .cluster .service .ClusterService ;
2828import org .elasticsearch .common .Strings ;
2929import org .elasticsearch .common .settings .Settings ;
30+ import org .elasticsearch .common .util .concurrent .EsExecutors ;
3031import org .elasticsearch .common .util .concurrent .TaskExecutionTimeTrackingEsThreadPoolExecutor ;
3132import org .elasticsearch .core .TimeValue ;
3233import org .elasticsearch .index .IndexService ;
34+ import org .elasticsearch .index .shard .GlobalCheckpointListeners ;
3335import org .elasticsearch .index .shard .IndexShard ;
3436import org .elasticsearch .index .shard .ShardId ;
3537import org .elasticsearch .index .store .Store ;
5557import java .util .Map ;
5658import java .util .Set ;
5759import java .util .concurrent .CountDownLatch ;
60+ import java .util .concurrent .Executor ;
5861import java .util .concurrent .atomic .AtomicBoolean ;
5962
6063import static java .util .Collections .emptySet ;
@@ -423,9 +426,12 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
423426 var masterName = internalCluster ().startMasterOnlyNode (settings );
424427 var dataNodeName = internalCluster ().startDataOnlyNode (settings );
425428 ensureStableCluster (2 );
426- assertEquals (internalCluster ().getMasterName (), masterName );
427- assertNotEquals (internalCluster ().getMasterName (), dataNodeName );
428- logger .info ("---> master node: " + masterName + ", data node: " + dataNodeName );
429+
430+ String indexName = randomIdentifier ();
431+ final int numShards = randomIntBetween (1 , 5 );
432+ createIndex (indexName , Settings .builder ().put (SETTING_NUMBER_OF_SHARDS , numShards ).put (SETTING_NUMBER_OF_REPLICAS , 0 ).build ());
433+ ensureGreen (indexName );
434+ final var indexService = internalCluster ().getInstance (IndicesService .class , dataNodeName ).iterator ().next ();
429435
430436 // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads.
431437 var barrier = blockDataNodeIndexing (dataNodeName );
@@ -434,12 +440,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
434440 // strictly need a single task to occupy the queue.
435441 int numberOfTasks = randomIntBetween (1 , 5 );
436442 Thread [] threadsToJoin = new Thread [numberOfTasks ];
437- String indexName = randomIdentifier ();
438- createIndex (
439- indexName ,
440- // NB: Set 0 replicas so that there aren't any stray GlobalCheckpointSyncAction tasks on the write thread pool.
441- Settings .builder ().put (SETTING_NUMBER_OF_SHARDS , randomIntBetween (1 , 5 )).put (SETTING_NUMBER_OF_REPLICAS , 0 ).build ()
442- );
443443 for (int i = 0 ; i < numberOfTasks ; ++i ) {
444444 threadsToJoin [i ] = startParallelSingleWrite (indexName );
445445 }
@@ -494,6 +494,30 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
494494 for (int i = 0 ; i < numberOfTasks ; ++i ) {
495495 threadsToJoin [i ].join ();
496496 }
497+ Arrays .stream (threadsToJoin ).forEach (thread -> assertFalse (thread .isAlive ()));
498+
499+ // Wait for async post replication actions to complete
500+ final var checkpointsSyncLatch = new CountDownLatch (numShards );
501+ for (int i = 0 ; i < numShards ; ++i ) {
502+ final var indexShard = indexService .getShard (i );
503+ final long expectedGlobalCheckpoint = indexShard .seqNoStats ().getGlobalCheckpoint ();
504+ logger .info ("--> shard [{}] waiting for global checkpoint {}" , i , expectedGlobalCheckpoint );
505+ indexShard .addGlobalCheckpointListener (expectedGlobalCheckpoint , new GlobalCheckpointListeners .GlobalCheckpointListener () {
506+ @ Override
507+ public Executor executor () {
508+ return EsExecutors .DIRECT_EXECUTOR_SERVICE ;
509+ }
510+
511+ @ Override
512+ public void accept (long globalCheckpoint , Exception e ) {
513+ assertNull (e ); // should have no error
514+ logger .info ("--> shard [{}] global checkpoint updated to {}" , indexShard .shardId ().id (), globalCheckpoint );
515+ checkpointsSyncLatch .countDown ();
516+ }
517+ }, TimeValue .THIRTY_SECONDS );
518+ }
519+ safeAwait (checkpointsSyncLatch );
520+
497521 assertThat (
498522 "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor ,
499523 trackingWriteExecutor .peekMaxQueueLatencyInQueueMillis (),
0 commit comments