Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
cb875bc
Sort of working
nicktindall Aug 5, 2025
2410925
Delete tests I can't be bothered fixing
nicktindall Aug 5, 2025
1c2a4aa
Fix some warnings that broke the build (inheritDoc doesn't work for @…
nicktindall Aug 5, 2025
ed10eac
More test hacks
nicktindall Aug 5, 2025
a38c29f
Fix precedence
nicktindall Aug 5, 2025
c927899
Populate RandomizedContext for virtual thread group
nicktindall Aug 5, 2025
53de7ff
Undo other hacks
nicktindall Aug 5, 2025
88526bc
Fix pool types
nicktindall Aug 5, 2025
c78698c
Populate RandomizedContext for virtual thread group
nicktindall Aug 5, 2025
d766c1e
Disable some tests/assertions
nicktindall Aug 5, 2025
d4db058
Populate RandomizedContext for virtual thread group
nicktindall Aug 5, 2025
54c1bc6
decrement outstanding when execution is rejected
nicktindall Aug 5, 2025
fd329b1
Remove noisy logging
nicktindall Aug 6, 2025
bb05622
Dummy thread pool usage stats for virtual thread pool
nicktindall Aug 6, 2025
929b4dc
Add dummy settings to allow more tests to run
nicktindall Aug 6, 2025
b14651f
Re-enable test
nicktindall Aug 6, 2025
da848a1
Fix disable description
nicktindall Aug 6, 2025
3f45f14
Re-enable test
nicktindall Aug 6, 2025
113b161
Fix descriptions
nicktindall Aug 6, 2025
25afbf6
Remove noisy logging
nicktindall Aug 6, 2025
00ae0c6
Don't assume bulks have finished before we call refresh
nicktindall Aug 6, 2025
376bfcf
Merge branch 'main' into virtual_threads
nicktindall Aug 6, 2025
573eb1c
Skip EvilThreadPoolTests for virtual thread pools
nicktindall Aug 6, 2025
da496f3
Fix BWC issue with thread pool types
nicktindall Aug 6, 2025
f8d7bc2
Fix BWC issue with thread pool types
nicktindall Aug 6, 2025
134cd02
Ignore test that depends on blocking the write pool
nicktindall Aug 6, 2025
41df480
Fix bwc types
nicktindall Aug 6, 2025
9389abe
Expand test hack to work with virtual threads
nicktindall Aug 6, 2025
b456107
Fix bwc again
nicktindall Aug 6, 2025
7c3b1a5
Fix bwc again
nicktindall Aug 6, 2025
16f69c3
Disable tests that try to fill the write coordination thread pool
nicktindall Aug 7, 2025
9c80e82
Disable test that depends on filling the thread pool
nicktindall Aug 7, 2025
ba99be9
Disable test that depends on write executor being a pool
nicktindall Aug 7, 2025
0908763
Disable test that depends on write pool publishing completed stats
nicktindall Aug 7, 2025
b1d8df3
Disable another test that won't work
nicktindall Aug 7, 2025
cf24699
Merge remote-tracking branch 'origin/main' into virtual_threads
nicktindall Aug 7, 2025
7fba206
Disable/skip tests that won't work
nicktindall Aug 7, 2025
c89aa0d
Disable/skip tests that won't work
nicktindall Aug 7, 2025
8e8ae45
Disable/skip tests that won't work
nicktindall Aug 7, 2025
368eb09
Disable/skip tests that won't work
nicktindall Aug 7, 2025
a748eec
Disable/skip tests that won't work
nicktindall Aug 7, 2025
3bb4a31
Generate different seeds for each virtual thread
nicktindall Aug 8, 2025
9691bb9
Re-enable test that was broken due to lack of randomness
nicktindall Aug 8, 2025
eef48c2
Get rid of redundant assume
nicktindall Aug 8, 2025
cc62e12
Revert "Fix some warnings that broke the build (inheritDoc doesn't wo…
nicktindall Aug 8, 2025
9868170
Reduce dead code
nicktindall Aug 8, 2025
b81d5f4
Tidy
nicktindall Aug 8, 2025
af6e576
Docs
nicktindall Aug 8, 2025
163a8dc
Merge remote-tracking branch 'origin/main' into virtual_threads
nicktindall Aug 8, 2025
64e82a1
Fix broken(?) test
nicktindall Aug 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public void testKibanaThreadPoolByPassesBlockedThreadPools() throws Exception {
});
}

@AwaitsFix(bugUrl = "depends on being able to fill the write thread pool")
public void testBlockedThreadPoolsRejectUserRequests() throws Exception {
assertAcked(
client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void setUpCluster() {
ensureStableCluster(3);
}

@AwaitsFix(bugUrl = "Depends on being able to block the write pool")
public void testUpdateByQuery() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final boolean scriptEnabled = randomBoolean();
Expand All @@ -115,6 +116,7 @@ public void testUpdateByQuery() throws Exception {
);
}

@AwaitsFix(bugUrl = "Depends on being able to block the write pool")
public void testReindex() throws Exception {
final String sourceIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String targetIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand All @@ -139,6 +141,7 @@ public void testReindex() throws Exception {
);
}

@AwaitsFix(bugUrl = "Depends on being able to block the write pool")
public void testDeleteByQuery() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
executeConcurrentUpdatesOnSubsetOfDocs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.reindex;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
Expand Down Expand Up @@ -57,6 +58,7 @@
* Integration test for bulk retry behavior. Useful because retrying relies on the way that the
* rest of Elasticsearch throws exceptions and unit tests won't verify that.
*/
@LuceneTestCase.AwaitsFix(bugUrl = "depends on being able to fill the write queue")
public class RetryTests extends ESIntegTestCase {

private static final int DOC_COUNT = 20;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public void tearDownThreadPool() {

public void testExecutionErrorOnDefaultThreadPoolTypes() throws InterruptedException {
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
// Won't work for virtual thread pools.
if (ThreadPool.THREAD_POOL_TYPES.get(executor) == ThreadPool.ThreadPoolType.VIRTUAL) {
continue;
}
checkExecutionError(getExecuteRunner(threadPool.executor(executor)));
checkExecutionError(getSubmitRunner(threadPool.executor(executor)));
checkExecutionError(getScheduleRunner(threadPool.executor(executor)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public void testIncrementalBulkLowWatermarkBackOff() throws Exception {
assertFalse(refCounted.hasReferences());
}

@AwaitsFix(bugUrl = "depends on being able to block the write coordination pool")
public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
String index = "test";
createIndex(index);
Expand Down Expand Up @@ -283,6 +284,7 @@ public void testMultipleBulkPartsWithBackoff() {
}
}

@AwaitsFix(bugUrl = "depends on being able to block the write coordination pool")
public void testGlobalBulkFailure() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
CountDownLatch blockingLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -314,6 +316,7 @@ public void testGlobalBulkFailure() throws InterruptedException {
}
}

@AwaitsFix(bugUrl = "depends on being able to block the write coordination pool")
public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected int numberOfShards() {
return 1;
}

@AwaitsFix(bugUrl = "Depends on being able to block the write pool")
public void testWriteIndexingPressureMetricsAreIncremented() throws Exception {
assertAcked(prepareCreate(INDEX_NAME, indexSettings(1, 1)));
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -244,6 +245,7 @@ public void testWriteIndexingPressureMetricsAreIncremented() throws Exception {
}
}

@AwaitsFix(bugUrl = "Depends on being able to block the write pool")
public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception {
final BulkRequest bulkRequest = new BulkRequest();
long totalRequestSize = 0;
Expand Down Expand Up @@ -309,6 +311,7 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception {
}
}

@AwaitsFix(bugUrl = "Depends on being able to block the write pool")
public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception {
final BulkRequest bulkRequest = new BulkRequest();
long totalRequestSize = 0;
Expand Down Expand Up @@ -367,6 +370,7 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception {
}
}

@AwaitsFix(bugUrl = "Depends on being able to block the write pool")
public void testWritesWillSucceedIfBelowThreshold() throws Exception {
restartNodesWithSettings(
Settings.builder()
Expand Down Expand Up @@ -402,6 +406,7 @@ public void testWritesWillSucceedIfBelowThreshold() throws Exception {
}
}

@AwaitsFix(bugUrl = "Depends on being able to block the write pool")
public void testWriteCanRejectOnPrimaryBasedOnMaxOperationSize() throws Exception {
final BulkRequest bulkRequest = new BulkRequest();
long firstInFlightRequestSizeInBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ public void testThreadPoolMetrics() throws Exception {
registeredMetrics.addAll(plugin.getRegisteredMetrics(InstrumentType.LONG_ASYNC_COUNTER));

tps[0].forEach(stats -> {
if (tp.info(stats.name()).getThreadPoolType() == ThreadPool.ThreadPoolType.VIRTUAL) {
return;
}

Map<String, MetricDefinition<?>> metricDefinitions = Map.of(
ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED,
new MetricDefinition<>(stats.completed(), TestTelemetryPlugin::getLongAsyncCounterMeasurement, Measurement::getLong),
Expand Down Expand Up @@ -234,6 +238,7 @@ public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSu
}
}

@AwaitsFix(bugUrl = "Write thread pool is now virtual")
public void testWriteThreadpoolsEwmaAlphaSetting() {
Settings settings = Settings.EMPTY;
var executionEwmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_AI21_COMPLETION_ADDED = def(9_134_0_00);
public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_135_0_00);
public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
public static final TransportVersion VIRTUAL_THREADS = def(9_137_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,18 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
) {
DiscoveryNode localNode = clusterService.localNode();
var writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor;
var trackingForWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor;

ThreadPoolUsageStats threadPoolUsageStats = new ThreadPoolUsageStats(
trackingForWriteExecutor.getMaximumPoolSize(),
(float) trackingForWriteExecutor.pollUtilization(
TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
),
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset()
);
final ThreadPoolUsageStats threadPoolUsageStats;
if (writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor trackingForWriteExecutor) {
threadPoolUsageStats = new ThreadPoolUsageStats(
trackingForWriteExecutor.getMaximumPoolSize(),
(float) trackingForWriteExecutor.pollUtilization(
TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
),
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset()
);
} else {
threadPoolUsageStats = new ThreadPoolUsageStats(999, 0.5f, 999);
}

Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
// https://github.com/elastic/elasticsearch/issues/124667
// note, this is intentionally not a lambda to avoid this ever be turned into a compile time constant
// matching similar lambdas coming from other places
static final Runnable WORKER_PROBE = new Runnable() {
public static final Runnable WORKER_PROBE = new Runnable() {
@Override
public void run() {}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR
}

table.addCell(entry.getKey());
table.addCell(poolInfo == null ? null : poolInfo.getThreadPoolType().getType());
table.addCell(
poolInfo == null ? null : ThreadPool.ThreadPoolType.bwcType(poolInfo.getName(), poolInfo.getThreadPoolType()).getType()
);
table.addCell(poolStats == null ? null : poolStats.active());
table.addCell(poolStats == null ? null : poolStats.threads());
table.addCell(poolStats == null ? null : poolStats.queue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,12 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
final int halfProc = ThreadPool.halfAllocatedProcessors(allocatedProcessors);
final int halfProcMaxAt5 = ThreadPool.halfAllocatedProcessorsMaxFive(allocatedProcessors);
final int halfProcMaxAt10 = ThreadPool.halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = ThreadPool.boundedBy(4 * allocatedProcessors, 128, 512);
final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings);

Map<String, ExecutorBuilder> result = new HashMap<>();
result.put(
ThreadPool.Names.GENERIC,
new ScalingExecutorBuilder(ThreadPool.Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false)
);
result.put(
ThreadPool.Names.WRITE_COORDINATION,
new FixedExecutorBuilder(
settings,
ThreadPool.Names.WRITE_COORDINATION,
allocatedProcessors,
10000,
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
)
);
result.put(
ThreadPool.Names.WRITE,
new FixedExecutorBuilder(
settings,
ThreadPool.Names.WRITE,
allocatedProcessors,
// 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores.
Math.max(allocatedProcessors * 750, 10000),
EsExecutors.TaskTrackingConfig.builder()
.trackOngoingTasks()
.trackMaxQueueLatency()
.trackExecutionTime(indexAutoscalingEWMA)
.build()
)
);
result.put(ThreadPool.Names.GENERIC, new VirtualThreadsExecutorBuilder(ThreadPool.Names.GENERIC, false));
result.put(ThreadPool.Names.WRITE_COORDINATION, new VirtualThreadsExecutorBuilder(ThreadPool.Names.WRITE_COORDINATION, true));
result.put(ThreadPool.Names.WRITE, new VirtualThreadsExecutorBuilder(ThreadPool.Names.WRITE, true));
int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors);
result.put(
ThreadPool.Names.GET,
Expand Down
Loading