Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,6 @@ tests:
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
method: testWithCustomFeatureProcessors
issue: https://github.com/elastic/elasticsearch/issues/134001
- class: org.elasticsearch.cluster.ClusterInfoServiceIT
method: testMaxQueueLatenciesInClusterInfo
issue: https://github.com/elastic/elasticsearch/issues/134088
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
method: test {csv-spec:fork.ForkBeforeStats}
issue: https://github.com/elastic/elasticsearch/issues/134100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.GlobalCheckpointListeners;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
Expand All @@ -45,6 +44,7 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;

Expand All @@ -57,7 +57,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -431,7 +430,27 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
final int numShards = randomIntBetween(1, 5);
createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(indexName);
final var indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next();

// Global checkpoint sync actions are asynchronous. We cannot really tell exactly when they are completely off the
// thread pool. To avoid busy waiting, we redirect them to the generic thread pool so that we have precise control
// over the write thread pool for assertions.
final MockTransportService mockTransportService = MockTransportService.getInstance(dataNodeName);
final var originalRegistry = mockTransportService.transport()
.getRequestHandlers()
.getHandler(GlobalCheckpointSyncAction.ACTION_NAME + "[p]");
mockTransportService.transport()
.getRequestHandlers()
.forceRegister(
new RequestHandlerRegistry<>(
GlobalCheckpointSyncAction.ACTION_NAME + "[p]",
in -> null, // no need to deserialize the request since it's local
mockTransportService.getTaskManager(),
originalRegistry.getHandler(),
mockTransportService.getThreadPool().executor(ThreadPool.Names.GENERIC),
true,
true
)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible we're fixing the wrong problem here, i.e. if the test is sensitive to other things happening in the write pool, is it likely to flap any time someone does some new work on the write pool? Perhaps we could instead isolate the thing we're measuring, or change the assertion somehow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not very feasible to "isolate the thing we're measuring" at a single thread pool level. The alternative is assertBusy. But based on the original discussion, the preference is to avoid it and be explicit about the other activities. Therefore, I would consider it somewhat a "feature" if it fails in future for new "write" activities, as in we should be aware of exactly what's going on with the write thread pool since every action matters in production.


// Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads.
var barrier = blockDataNodeIndexing(dataNodeName);
Expand Down Expand Up @@ -496,28 +515,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
}
Arrays.stream(threadsToJoin).forEach(thread -> assertFalse(thread.isAlive()));

// Wait for async post replication actions to complete
final var checkpointsSyncLatch = new CountDownLatch(numShards);
for (int i = 0; i < numShards; ++i) {
final var indexShard = indexService.getShard(i);
final long expectedGlobalCheckpoint = indexShard.seqNoStats().getGlobalCheckpoint();
logger.info("--> shard [{}] waiting for global checkpoint {}", i, expectedGlobalCheckpoint);
indexShard.addGlobalCheckpointListener(expectedGlobalCheckpoint, new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
}

@Override
public void accept(long globalCheckpoint, Exception e) {
assertNull(e); // should have no error
logger.info("--> shard [{}] global checkpoint updated to {}", indexShard.shardId().id(), globalCheckpoint);
checkpointsSyncLatch.countDown();
}
}, TimeValue.THIRTY_SECONDS);
}
safeAwait(checkpointsSyncLatch);
Comment on lines -499 to -519
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the previous attempt for fixing the issue. Unfortunately it does not work because:

  1. The listener is called when the task is still running on the write threadpool. There is no guarantee on when the task completely finishes its lifecycle, i.e. going through the afterExecute phase.
  2. A earlier checkpoint update action can sometimes see the latest in-memory value and update to it. When this happens, the listener is called while later checkpoint update actions are still queued. The later actions will basically be noop. This is fine for checkpoint update. But it breaks our test assumption.

I tried different ways to determine when the sync actions are completely off the thread pool but didn't manage to find a solution. Therefore, I went with redirecting them to a different threadpool. Hence this PR.


assertThat(
"Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor,
trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(),
Expand Down