Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -25,9 +26,12 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT;
Expand Down Expand Up @@ -295,6 +299,7 @@ public void run() {
}
try {
executor.execute(new Runnable() {

@Override
public void run() {
// Doesn't matter is going to be rejected
Expand Down Expand Up @@ -787,8 +792,6 @@ public void testScalingWithEmptyCoreAndKeepAlive() {
}

public void testScalingWithEmptyCoreAndLargerMaxSize() {
// TODO currently the reproduction of the starvation bug does not work if max pool size > 1
// https://github.com/elastic/elasticsearch/issues/124867
testScalingWithEmptyCore(
EsExecutors.newScaling(
getTestName(),
Expand All @@ -804,8 +807,6 @@ public void testScalingWithEmptyCoreAndLargerMaxSize() {
}

public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() {
// TODO currently the reproduction of the starvation bug does not work if max pool size > 1
// https://github.com/elastic/elasticsearch/issues/124867
testScalingWithEmptyCore(
EsExecutors.newScaling(
getTestName(),
Expand All @@ -821,8 +822,6 @@ public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() {
}

public void testScalingWithEmptyCoreAndWorkerPoolProbing() {
// https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
// if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
// the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
testScalingWithEmptyCore(
new EsThreadPoolExecutor(
Expand All @@ -840,8 +839,6 @@ public void testScalingWithEmptyCoreAndWorkerPoolProbing() {
}

public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() {
// https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
// if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
// the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
testScalingWithEmptyCore(
new EsThreadPoolExecutor(
Expand All @@ -859,46 +856,68 @@ public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() {
}

private void testScalingWithEmptyCore(EsThreadPoolExecutor executor) {
try {
class Task extends AbstractRunnable {
private int remaining;
private final CountDownLatch doneLatch;

Task(int iterations, CountDownLatch doneLatch) {
this.remaining = iterations;
this.doneLatch = doneLatch;
}

@Override
public void onFailure(Exception e) {
fail(e);
}
class TaskScheduler extends SubscribableListener<Void> {
final ExecutorService scheduler;
final CyclicBarrier cyclicBarrier;
final Semaphore taskCompletions;
volatile int remaining;

TaskScheduler(ExecutorService scheduler, int iterations) {
this.scheduler = scheduler;
this.taskCompletions = new Semaphore(0);
this.cyclicBarrier = new CyclicBarrier(executor.getMaximumPoolSize(), () -> remaining--);
this.remaining = iterations;
}

@Override
protected void doRun() {
if (--remaining == 0) {
doneLatch.countDown();
} else {
logger.trace("--> remaining [{}]", remaining);
final long keepAliveNanos = executor.getKeepAliveTime(TimeUnit.NANOSECONDS);
new Thread(() -> {
if (keepAliveNanos > 0) {
final var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-10_000, 10_000);
while (System.nanoTime() < targetNanoTime) {
Thread.yield();
public void start() {
final var keepAliveNanos = executor.getKeepAliveTime(TimeUnit.NANOSECONDS);
final var tasks = executor.getMaximumPoolSize();
Runnable runnable = () -> {
try {
while (remaining > 0) {
// wait for all scheduler threads to be ready for the next attempt
var first = cyclicBarrier.await(2, TimeUnit.SECONDS) == 0;
executor.execute(taskCompletions::release);
if (first) {
// let the first scheduler (by arrival on the barrier) wait for completion of all tasks
if (taskCompletions.tryAcquire(tasks, 1, TimeUnit.SECONDS) == false) {
var msg = Strings.format(
"timed out waiting for [%s] of [%s] tasks to complete [queue size: %s, workers: %s] ",
tasks - taskCompletions.availablePermits(),
tasks,
executor.getQueue().size(),
executor.getPoolSize()
);
onFailure(new TimeoutException(msg));
return;
}
if (keepAliveNanos > 0) {
var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-1_000, 1_000);
while (System.nanoTime() < targetNanoTime) {
Thread.yield();
}
}
}
executor.execute(Task.this);
}).start();
}
} catch (Exception e) {
onFailure(e);
return;
}
onResponse(null);
};
for (int i = 0; i < tasks; i++) {
scheduler.execute(runnable);
}
}
}

for (int i = 0; i < 20; i++) {
try (var scheduler = Executors.newFixedThreadPool(executor.getMaximumPoolSize())) {
for (int i = 0; i < 100; i++) {
logger.trace("--> attempt [{}]", i);
final var doneLatch = new CountDownLatch(1);
executor.execute(new Task(between(1, 500), doneLatch));
safeAwait(doneLatch, TimeValue.ONE_MINUTE);
TaskScheduler taskScheduler = new TaskScheduler(scheduler, between(10, 200));
taskScheduler.start();
safeAwait(taskScheduler);
}
} finally {
ThreadPool.terminate(executor, 1, TimeUnit.SECONDS);
Expand Down