Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.GlobalCheckpointListeners;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
Expand All @@ -55,6 +57,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -423,9 +426,12 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
var masterName = internalCluster().startMasterOnlyNode(settings);
var dataNodeName = internalCluster().startDataOnlyNode(settings);
ensureStableCluster(2);
assertEquals(internalCluster().getMasterName(), masterName);
assertNotEquals(internalCluster().getMasterName(), dataNodeName);
logger.info("---> master node: " + masterName + ", data node: " + dataNodeName);

String indexName = randomIdentifier();
final int numShards = randomIntBetween(1, 5);
createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(indexName);
final var indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next();

// Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads.
var barrier = blockDataNodeIndexing(dataNodeName);
Expand All @@ -434,12 +440,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
// strictly need a single task to occupy the queue.
int numberOfTasks = randomIntBetween(1, 5);
Thread[] threadsToJoin = new Thread[numberOfTasks];
String indexName = randomIdentifier();
createIndex(
indexName,
// NB: Set 0 replicas so that there aren't any stray GlobalCheckpointSyncAction tasks on the write thread pool.
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
);
for (int i = 0; i < numberOfTasks; ++i) {
threadsToJoin[i] = startParallelSingleWrite(indexName);
}
Expand Down Expand Up @@ -494,6 +494,30 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
for (int i = 0; i < numberOfTasks; ++i) {
threadsToJoin[i].join();
}
Arrays.stream(threadsToJoin).forEach(thread -> assertFalse(thread.isAlive()));

// Wait for async post replication actions to complete
final var checkpointsSyncLatch = new CountDownLatch(numShards);
Comment on lines +499 to +500
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to this comment on the orignal PR. The GlobalCheckpointSyncAction runs even when there is no replica shards since it needs to update the primary's persisted global check point from the in-memory copy as well.

for (int i = 0; i < numShards; ++i) {
final var indexShard = indexService.getShard(i);
final long expectedGlobalCheckpoint = indexShard.seqNoStats().getGlobalCheckpoint();
logger.info("--> shard [{}] waiting for global checkpoint {}", i, expectedGlobalCheckpoint);
indexShard.addGlobalCheckpointListener(expectedGlobalCheckpoint, new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
}

@Override
public void accept(long globalCheckpoint, Exception e) {
assertNull(e); // should have no error
logger.info("--> shard [{}] global checkpoint updated to {}", indexShard.shardId().id(), globalCheckpoint);
checkpointsSyncLatch.countDown();
}
}, TimeValue.THIRTY_SECONDS);
}
safeAwait(checkpointsSyncLatch);

assertThat(
"Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor,
trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(),
Expand Down