|
27 | 27 | import org.elasticsearch.cluster.service.ClusterService; |
28 | 28 | import org.elasticsearch.common.Strings; |
29 | 29 | import org.elasticsearch.common.settings.Settings; |
30 | | -import org.elasticsearch.common.util.concurrent.EsExecutors; |
31 | 30 | import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; |
32 | 31 | import org.elasticsearch.core.TimeValue; |
33 | 32 | import org.elasticsearch.index.IndexService; |
34 | | -import org.elasticsearch.index.shard.GlobalCheckpointListeners; |
| 33 | +import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; |
35 | 34 | import org.elasticsearch.index.shard.IndexShard; |
36 | 35 | import org.elasticsearch.index.shard.ShardId; |
37 | 36 | import org.elasticsearch.index.store.Store; |
|
45 | 44 | import org.elasticsearch.test.InternalTestCluster; |
46 | 45 | import org.elasticsearch.test.transport.MockTransportService; |
47 | 46 | import org.elasticsearch.threadpool.ThreadPool; |
| 47 | +import org.elasticsearch.transport.RequestHandlerRegistry; |
48 | 48 | import org.elasticsearch.transport.TransportService; |
49 | 49 | import org.hamcrest.Matchers; |
50 | 50 |
|
|
57 | 57 | import java.util.Map; |
58 | 58 | import java.util.Set; |
59 | 59 | import java.util.concurrent.CountDownLatch; |
60 | | -import java.util.concurrent.Executor; |
61 | 60 | import java.util.concurrent.atomic.AtomicBoolean; |
62 | 61 |
|
63 | 62 | import static java.util.Collections.emptySet; |
@@ -431,7 +430,27 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { |
431 | 430 | final int numShards = randomIntBetween(1, 5); |
432 | 431 | createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); |
433 | 432 | ensureGreen(indexName); |
434 | | - final var indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next(); |
| 433 | + |
| 434 | + // Global checkpoint sync actions are asynchronous. We cannot really tell exactly when they are completely off the |
| 435 | + // thread pool. To avoid busy waiting, we redirect them to the generic thread pool so that we have precise control |
| 436 | + // over the write thread pool for assertions. |
| 437 | + final MockTransportService mockTransportService = MockTransportService.getInstance(dataNodeName); |
| 438 | + final var originalRegistry = mockTransportService.transport() |
| 439 | + .getRequestHandlers() |
| 440 | + .getHandler(GlobalCheckpointSyncAction.ACTION_NAME + "[p]"); |
| 441 | + mockTransportService.transport() |
| 442 | + .getRequestHandlers() |
| 443 | + .forceRegister( |
| 444 | + new RequestHandlerRegistry<>( |
| 445 | + GlobalCheckpointSyncAction.ACTION_NAME + "[p]", |
| 446 | + in -> null, // no need to deserialize the request since it's local |
| 447 | + mockTransportService.getTaskManager(), |
| 448 | + originalRegistry.getHandler(), |
| 449 | + mockTransportService.getThreadPool().executor(ThreadPool.Names.GENERIC), |
| 450 | + true, |
| 451 | + true |
| 452 | + ) |
| 453 | + ); |
435 | 454 |
|
436 | 455 | // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads. |
437 | 456 | var barrier = blockDataNodeIndexing(dataNodeName); |
@@ -496,28 +515,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { |
496 | 515 | } |
497 | 516 | Arrays.stream(threadsToJoin).forEach(thread -> assertFalse(thread.isAlive())); |
498 | 517 |
|
499 | | - // Wait for async post replication actions to complete |
500 | | - final var checkpointsSyncLatch = new CountDownLatch(numShards); |
501 | | - for (int i = 0; i < numShards; ++i) { |
502 | | - final var indexShard = indexService.getShard(i); |
503 | | - final long expectedGlobalCheckpoint = indexShard.seqNoStats().getGlobalCheckpoint(); |
504 | | - logger.info("--> shard [{}] waiting for global checkpoint {}", i, expectedGlobalCheckpoint); |
505 | | - indexShard.addGlobalCheckpointListener(expectedGlobalCheckpoint, new GlobalCheckpointListeners.GlobalCheckpointListener() { |
506 | | - @Override |
507 | | - public Executor executor() { |
508 | | - return EsExecutors.DIRECT_EXECUTOR_SERVICE; |
509 | | - } |
510 | | - |
511 | | - @Override |
512 | | - public void accept(long globalCheckpoint, Exception e) { |
513 | | - assertNull(e); // should have no error |
514 | | - logger.info("--> shard [{}] global checkpoint updated to {}", indexShard.shardId().id(), globalCheckpoint); |
515 | | - checkpointsSyncLatch.countDown(); |
516 | | - } |
517 | | - }, TimeValue.THIRTY_SECONDS); |
518 | | - } |
519 | | - safeAwait(checkpointsSyncLatch); |
520 | | - |
521 | 518 | assertThat( |
522 | 519 | "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor, |
523 | 520 | trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(), |
|
0 commit comments