Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
Expand All @@ -25,9 +27,12 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT;
Expand Down Expand Up @@ -295,6 +300,7 @@ public void run() {
}
try {
executor.execute(new Runnable() {

@Override
public void run() {
// Doesn't matter is going to be rejected
Expand Down Expand Up @@ -757,7 +763,7 @@ public void onRejection(Exception e) {
}

public void testScalingWithEmptyCore() {
testScalingWithEmptyCore(
testScalingWithEmptyCoreAndMaxSingleThread(
EsExecutors.newScaling(
getTestName(),
0,
Expand All @@ -772,7 +778,7 @@ public void testScalingWithEmptyCore() {
}

public void testScalingWithEmptyCoreAndKeepAlive() {
testScalingWithEmptyCore(
testScalingWithEmptyCoreAndMaxSingleThread(
EsExecutors.newScaling(
getTestName(),
0,
Expand All @@ -787,13 +793,11 @@ public void testScalingWithEmptyCoreAndKeepAlive() {
}

public void testScalingWithEmptyCoreAndLargerMaxSize() {
// TODO currently the reproduction of the starvation bug does not work if max pool size > 1
// https://github.com/elastic/elasticsearch/issues/124867
testScalingWithEmptyCore(
testScalingWithEmptyCoreAndMaxMultipleThreads(
EsExecutors.newScaling(
getTestName(),
0,
between(2, 5),
2,// between(2, 5),
0,
TimeUnit.MILLISECONDS,
true,
Expand All @@ -804,13 +808,11 @@ public void testScalingWithEmptyCoreAndLargerMaxSize() {
}

public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() {
// TODO currently the reproduction of the starvation bug does not work if max pool size > 1
// https://github.com/elastic/elasticsearch/issues/124867
testScalingWithEmptyCore(
testScalingWithEmptyCoreAndMaxMultipleThreads(
EsExecutors.newScaling(
getTestName(),
0,
between(2, 5),
2,//
1,
TimeUnit.MILLISECONDS,
true,
Expand All @@ -821,10 +823,8 @@ public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() {
}

public void testScalingWithEmptyCoreAndWorkerPoolProbing() {
// https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
// if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
// the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
testScalingWithEmptyCore(
testScalingWithEmptyCoreAndMaxSingleThread(
new EsThreadPoolExecutor(
getTestName(),
0,
Expand All @@ -840,10 +840,8 @@ public void testScalingWithEmptyCoreAndWorkerPoolProbing() {
}

public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() {
// https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
// if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
// the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
testScalingWithEmptyCore(
testScalingWithEmptyCoreAndMaxSingleThread(
new EsThreadPoolExecutor(
getTestName(),
0,
Expand All @@ -858,11 +856,13 @@ public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() {
);
}

private void testScalingWithEmptyCore(EsThreadPoolExecutor executor) {
private void testScalingWithEmptyCoreAndMaxSingleThread(EsThreadPoolExecutor testSubject) {
try {
final var keepAliveNanos = testSubject.getKeepAliveTime(TimeUnit.NANOSECONDS);

class Task extends AbstractRunnable {
private int remaining;
private final CountDownLatch doneLatch;
private int remaining;

Task(int iterations, CountDownLatch doneLatch) {
this.remaining = iterations;
Expand All @@ -879,29 +879,108 @@ protected void doRun() {
if (--remaining == 0) {
doneLatch.countDown();
} else {
logger.trace("--> remaining [{}]", remaining);
final long keepAliveNanos = executor.getKeepAliveTime(TimeUnit.NANOSECONDS);
new Thread(() -> {
if (keepAliveNanos > 0) {
final var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-10_000, 10_000);
while (System.nanoTime() < targetNanoTime) {
Thread.yield();
}
waitUntilKeepAliveTime(keepAliveNanos);
}
executor.execute(Task.this);
testSubject.execute(Task.this);
}).start();
}
}
}

for (int i = 0; i < 20; i++) {
logger.trace("--> attempt [{}]", i);
final var doneLatch = new CountDownLatch(1);
executor.execute(new Task(between(1, 500), doneLatch));
testSubject.execute(new Task(between(1, 500), doneLatch));
safeAwait(doneLatch, TimeValue.ONE_MINUTE);
}
} finally {
ThreadPool.terminate(executor, 1, TimeUnit.SECONDS);
ThreadPool.terminate(testSubject, 1, TimeUnit.SECONDS);
}
}

private void testScalingWithEmptyCoreAndMaxMultipleThreads(EsThreadPoolExecutor testSubject) {
final var keepAliveNanos = testSubject.getKeepAliveTime(TimeUnit.NANOSECONDS);
// Use max pool size with one additional scheduler task if a keep alive time is set.
final var schedulerTasks = testSubject.getMaximumPoolSize() + (keepAliveNanos > 0 ? 1 : 0);

class TaskScheduler {
final SubscribableListener<Void> result = new SubscribableListener<>();
final ExecutorService scheduler;
final CyclicBarrier cyclicBarrier;
final Semaphore taskCompletions;
private int remaining;

TaskScheduler(ExecutorService scheduler, int iterations) {
this.scheduler = scheduler;
this.taskCompletions = new Semaphore(0);
this.cyclicBarrier = new CyclicBarrier(schedulerTasks, () -> remaining--);
this.remaining = iterations;
}

public void start() {
// The scheduler tasks are running on the dedicated scheduler thread pool. Each task submits
// a test task on the EsThreadPoolExecutor (`testSubject`) releasing one `taskCompletions` permit.
final Runnable schedulerTask = () -> {
try {
while (remaining > 0) {
// Wait for all scheduler threads to be ready for the next attempt.
var first = cyclicBarrier.await(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS) == schedulerTasks - 1;
if (first && keepAliveNanos > 0) {
// The task submitted by the first scheduler task (after reaching the keep alive time) is the task
// that might starve without any worker available unless an additional worker probe is submitted.
waitUntilKeepAliveTime(keepAliveNanos);
}
// Test EsThreadPoolExecutor by submitting a task that releases one permit.
testSubject.execute(taskCompletions::release);
if (first) {
// Let the first scheduler task (by arrival on the barrier) wait for all permits.
var success = taskCompletions.tryAcquire(
schedulerTasks,
SAFE_AWAIT_TIMEOUT.millis(),
TimeUnit.MILLISECONDS
);
if (success == false) {
var msg = Strings.format(
"timed out waiting for [%s] of [%s] tasks to complete [queue size: %s, workers: %s] ",
schedulerTasks - taskCompletions.availablePermits(),
schedulerTasks,
testSubject.getQueue().size(),
testSubject.getPoolSize()
);
result.onFailure(new TimeoutException(msg));
return;
}
}
}
} catch (Exception e) {
result.onFailure(e);
return;
}
result.onResponse(null);
};
// Run scheduler tasks on the dedicated scheduler thread pool.
for (int i = 0; i < schedulerTasks; i++) {
scheduler.execute(schedulerTask);
}
}
}

try (var scheduler = Executors.newFixedThreadPool(schedulerTasks)) {
for (int i = 0; i < 100; i++) {
TaskScheduler taskScheduler = new TaskScheduler(scheduler, between(10, 200));
taskScheduler.start();
safeAwait(taskScheduler.result);
}
} finally {
ThreadPool.terminate(testSubject, 1, TimeUnit.SECONDS);
}
}

private void waitUntilKeepAliveTime(long keepAliveNanos) {
var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-1_000, 1_000);
while (System.nanoTime() < targetNanoTime) {
Thread.yield();
}
}
}