|
27 | 27 | import org.elasticsearch.cluster.service.ClusterService;
|
28 | 28 | import org.elasticsearch.common.Strings;
|
29 | 29 | import org.elasticsearch.common.settings.Settings;
|
30 |
| -import org.elasticsearch.common.util.concurrent.EsExecutors; |
31 | 30 | import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
|
32 | 31 | import org.elasticsearch.core.TimeValue;
|
33 | 32 | import org.elasticsearch.index.IndexService;
|
34 |
| -import org.elasticsearch.index.shard.GlobalCheckpointListeners; |
| 33 | +import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; |
35 | 34 | import org.elasticsearch.index.shard.IndexShard;
|
36 | 35 | import org.elasticsearch.index.shard.ShardId;
|
37 | 36 | import org.elasticsearch.index.store.Store;
|
|
45 | 44 | import org.elasticsearch.test.InternalTestCluster;
|
46 | 45 | import org.elasticsearch.test.transport.MockTransportService;
|
47 | 46 | import org.elasticsearch.threadpool.ThreadPool;
|
| 47 | +import org.elasticsearch.transport.RequestHandlerRegistry; |
48 | 48 | import org.elasticsearch.transport.TransportService;
|
49 | 49 | import org.hamcrest.Matchers;
|
50 | 50 |
|
|
57 | 57 | import java.util.Map;
|
58 | 58 | import java.util.Set;
|
59 | 59 | import java.util.concurrent.CountDownLatch;
|
60 |
| -import java.util.concurrent.Executor; |
61 | 60 | import java.util.concurrent.atomic.AtomicBoolean;
|
62 | 61 |
|
63 | 62 | import static java.util.Collections.emptySet;
|
@@ -431,7 +430,27 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
|
431 | 430 | final int numShards = randomIntBetween(1, 5);
|
432 | 431 | createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
|
433 | 432 | ensureGreen(indexName);
|
434 |
| - final var indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next(); |
| 433 | + |
| 434 | + // Global checkpoint sync actions are asynchronous. We cannot really tell exactly when they are completely off the |
| 435 | + // thread pool. To avoid busy waiting, we redirect them to the generic thread pool so that we have precise control |
| 436 | + // over the write thread pool for assertions. |
| 437 | + final MockTransportService mockTransportService = MockTransportService.getInstance(dataNodeName); |
| 438 | + final var originalRegistry = mockTransportService.transport() |
| 439 | + .getRequestHandlers() |
| 440 | + .getHandler(GlobalCheckpointSyncAction.ACTION_NAME + "[p]"); |
| 441 | + mockTransportService.transport() |
| 442 | + .getRequestHandlers() |
| 443 | + .forceRegister( |
| 444 | + new RequestHandlerRegistry<>( |
| 445 | + GlobalCheckpointSyncAction.ACTION_NAME + "[p]", |
| 446 | + in -> null, // no need to deserialize the request since it's local |
| 447 | + mockTransportService.getTaskManager(), |
| 448 | + originalRegistry.getHandler(), |
| 449 | + mockTransportService.getThreadPool().executor(ThreadPool.Names.GENERIC), |
| 450 | + true, |
| 451 | + true |
| 452 | + ) |
| 453 | + ); |
435 | 454 |
|
436 | 455 | // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads.
|
437 | 456 | var barrier = blockDataNodeIndexing(dataNodeName);
|
@@ -496,28 +515,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
|
496 | 515 | }
|
497 | 516 | Arrays.stream(threadsToJoin).forEach(thread -> assertFalse(thread.isAlive()));
|
498 | 517 |
|
499 |
| - // Wait for async post replication actions to complete |
500 |
| - final var checkpointsSyncLatch = new CountDownLatch(numShards); |
501 |
| - for (int i = 0; i < numShards; ++i) { |
502 |
| - final var indexShard = indexService.getShard(i); |
503 |
| - final long expectedGlobalCheckpoint = indexShard.seqNoStats().getGlobalCheckpoint(); |
504 |
| - logger.info("--> shard [{}] waiting for global checkpoint {}", i, expectedGlobalCheckpoint); |
505 |
| - indexShard.addGlobalCheckpointListener(expectedGlobalCheckpoint, new GlobalCheckpointListeners.GlobalCheckpointListener() { |
506 |
| - @Override |
507 |
| - public Executor executor() { |
508 |
| - return EsExecutors.DIRECT_EXECUTOR_SERVICE; |
509 |
| - } |
510 |
| - |
511 |
| - @Override |
512 |
| - public void accept(long globalCheckpoint, Exception e) { |
513 |
| - assertNull(e); // should have no error |
514 |
| - logger.info("--> shard [{}] global checkpoint updated to {}", indexShard.shardId().id(), globalCheckpoint); |
515 |
| - checkpointsSyncLatch.countDown(); |
516 |
| - } |
517 |
| - }, TimeValue.THIRTY_SECONDS); |
518 |
| - } |
519 |
| - safeAwait(checkpointsSyncLatch); |
520 |
| - |
521 | 518 | assertThat(
|
522 | 519 | "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor,
|
523 | 520 | trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(),
|
|
0 commit comments