- 
                Notifications
    You must be signed in to change notification settings 
- Fork 25.6k
[Test] Use generic for GlobalCheckpoingSyncAction #134180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -27,11 +27,10 @@ | |
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
| import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.index.IndexService; | ||
| import org.elasticsearch.index.shard.GlobalCheckpointListeners; | ||
| import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; | ||
| import org.elasticsearch.index.shard.IndexShard; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
| import org.elasticsearch.index.store.Store; | ||
|  | @@ -45,6 +44,7 @@ | |
| import org.elasticsearch.test.InternalTestCluster; | ||
| import org.elasticsearch.test.transport.MockTransportService; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.RequestHandlerRegistry; | ||
| import org.elasticsearch.transport.TransportService; | ||
| import org.hamcrest.Matchers; | ||
|  | ||
|  | @@ -57,7 +57,6 @@ | |
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.Executor; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|  | ||
| import static java.util.Collections.emptySet; | ||
|  | @@ -431,7 +430,27 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { | |
| final int numShards = randomIntBetween(1, 5); | ||
| createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); | ||
| ensureGreen(indexName); | ||
| final var indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next(); | ||
|  | ||
| // Global checkpoint sync actions are asynchronous. We cannot really tell exactly when they are completely off the | ||
| // thread pool. To avoid busy waiting, we redirect them to the generic thread pool so that we have precise control | ||
| // over the write thread pool for assertions. | ||
| final MockTransportService mockTransportService = MockTransportService.getInstance(dataNodeName); | ||
| final var originalRegistry = mockTransportService.transport() | ||
| .getRequestHandlers() | ||
| .getHandler(GlobalCheckpointSyncAction.ACTION_NAME + "[p]"); | ||
| mockTransportService.transport() | ||
| .getRequestHandlers() | ||
| .forceRegister( | ||
| new RequestHandlerRegistry<>( | ||
| GlobalCheckpointSyncAction.ACTION_NAME + "[p]", | ||
| in -> null, // no need to deserialize the request since it's local | ||
| mockTransportService.getTaskManager(), | ||
| originalRegistry.getHandler(), | ||
| mockTransportService.getThreadPool().executor(ThreadPool.Names.GENERIC), | ||
| true, | ||
| true | ||
| ) | ||
| ); | ||
|  | ||
| // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads. | ||
| var barrier = blockDataNodeIndexing(dataNodeName); | ||
|  | @@ -496,28 +515,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { | |
| } | ||
| Arrays.stream(threadsToJoin).forEach(thread -> assertFalse(thread.isAlive())); | ||
|  | ||
| // Wait for async post replication actions to complete | ||
| final var checkpointsSyncLatch = new CountDownLatch(numShards); | ||
| for (int i = 0; i < numShards; ++i) { | ||
| final var indexShard = indexService.getShard(i); | ||
| final long expectedGlobalCheckpoint = indexShard.seqNoStats().getGlobalCheckpoint(); | ||
| logger.info("--> shard [{}] waiting for global checkpoint {}", i, expectedGlobalCheckpoint); | ||
| indexShard.addGlobalCheckpointListener(expectedGlobalCheckpoint, new GlobalCheckpointListeners.GlobalCheckpointListener() { | ||
| @Override | ||
| public Executor executor() { | ||
| return EsExecutors.DIRECT_EXECUTOR_SERVICE; | ||
| } | ||
|  | ||
| @Override | ||
| public void accept(long globalCheckpoint, Exception e) { | ||
| assertNull(e); // should have no error | ||
| logger.info("--> shard [{}] global checkpoint updated to {}", indexShard.shardId().id(), globalCheckpoint); | ||
| checkpointsSyncLatch.countDown(); | ||
| } | ||
| }, TimeValue.THIRTY_SECONDS); | ||
| } | ||
| safeAwait(checkpointsSyncLatch); | ||
| 
      Comment on lines
    
      -499
     to 
      -519
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the previous attempt for fixing the issue. Unfortunately it does not work because: 
 I tried different ways to determine when the sync actions are completely off the thread pool but didn't manage to find a solution. Therefore, I went with redirecting them to a different threadpool. Hence this PR. | ||
|  | ||
| assertThat( | ||
| "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor, | ||
| trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(), | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible we're fixing the wrong problem here, i.e. if the test is sensitive to other things happening in the write pool, is it likely to flap any time someone does some new work on the write pool? Perhaps we could instead isolate the thing we're measuring, or change the assertion somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is not very feasible to "isolate the thing we're measuring" at a single thread pool level. The alternative is
assertBusy. But based on the original discussion, the preference is to avoid it and be explicit about the other activities. Therefore, I would consider it somewhat a "feature" if it fails in future for new "write" activities, as in we should be aware of exactly what's going on with the write thread pool since every action matters in production.